summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaurice Laveaux <m.laveaux@student.tue.nl>2014-05-22 16:00:56 +0200
committerMaurice Laveaux <m.laveaux@student.tue.nl>2014-05-22 16:00:56 +0200
commit1da00321db0aa8c412c3ff1dff5734962ee49240 (patch)
tree3e1e28f984d96ec0df24fff84701059f1cf7ddc2
parent783f42b52875e09181a97e12b1602db34ce47a09 (diff)
downloadTwitterDataAnalytics-1da00321db0aa8c412c3ff1dff5734962ee49240.tar.gz
Implemented basic search functionality.
* Usage, search <screen_name> <amount_tweets>.
-rw-r--r--src/io/SearchImpl.java321
-rw-r--r--src/main/TweetShell.java64
2 files changed, 372 insertions, 13 deletions
diff --git a/src/io/SearchImpl.java b/src/io/SearchImpl.java
new file mode 100644
index 0000000..c08675b
--- /dev/null
+++ b/src/io/SearchImpl.java
@@ -0,0 +1,321 @@
+package io;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import java.io.IOException;
+import static java.lang.Thread.sleep;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import mining.Search;
+import mining.TwitterApi;
+import mining.TwitterApi.Builder;
+import provider.ExceptionListener;
+import provider.ResultListener;
+
+/**
+ * Implementation of the search with rate limit handling and multiple threads
+ *
+ * @author Maurice Laveaux
+ */
+public class SearchImpl implements Search {
+
+ private static final String USER_TIMELINE_RESOURCE = "statuses/user_timeline";
+
+ /**
+ * The result listener to report results to
+ */
+ private ResultListener resultListener = null;
+
+ /**
+ * Dummy object to synchronize the resultListener;
+ */
+ private final Object resultListenerSync = new Object();
+
+ /**
+ * The list of user searches are done on search.
+ */
+ private final BlockingQueue<Request> requestQueue;
+
+ /**
+ * The thread container that handles both io and request thread.
+ */
+ private ThreadContainer thread_cached;
+
+ /**
+ * The twitter api to execute requests.
+ */
+ private final TwitterApi oauth;
+
+ public SearchImpl(TwitterApi oauth) {
+ this.oauth = oauth;
+
+ requestQueue = new LinkedBlockingQueue();
+
+ //load("queue_cache");
+ }
+
+ @Override
+ public void search(String screenName, int numberOfTweets) {
+ if (numberOfTweets > 3200) {
+ numberOfTweets = 3200;
+ System.out.println("Changed number of tweets to max 3200.");
+ }
+
+ requestQueue.offer(new Request(screenName, numberOfTweets));
+
+ if (thread_cached == null) {
+ thread_cached = new ThreadContainer();
+ thread_cached.start();
+ }
+ }
+
+ @Override
+ public void setResultListener(ResultListener listener) {
+ synchronized (resultListenerSync) {
+ resultListener = listener;
+ }
+ }
+
+ @Override
+ public void save(String filename) {
+
+ }
+
+ @Override
+ public void load(String filename) {
+
+ }
+
+ @Override
+ public void setExceptionListener(ExceptionListener listener) {
+
+ }
+
+ @Override
+ public void close() {
+ if (thread_cached != null) {
+ thread_cached.stop();
+ }
+ }
+
+ private class Request {
+
+ public Request(String screenName, int numberOfTweets) {
+ this.screenName = screenName;
+ this.numberOfTweets = numberOfTweets;
+
+ sinceId = 0;
+ maxId = Long.MAX_VALUE;
+ }
+
+ public String screenName;
+ public long sinceId;
+ public long maxId;
+ public int numberOfTweets;
+ }
+
+ /**
+ * The container that creates both threads.
+ */
+ private class ThreadContainer {
+
+ private final Thread ioThread;
+ private final Poller poller;
+
+ private final Thread workerThread;
+ private final Worker worker;
+
+ /**
+ * The poller methods.
+ */
+ public ThreadContainer() {
+ worker = new Worker();
+ poller = new Poller(worker);
+
+ ioThread = new Thread(poller);
+ workerThread = new Thread(worker);
+ }
+
+ public void start() {
+ ioThread.start();
+ workerThread.start();
+ }
+
+ public void stop() {
+ worker.stop();
+ while (workerThread.isAlive()) {
+ try {
+ workerThread.join();
+ ioThread.join();
+ save("search_cached.txt");
+ } catch (InterruptedException ex) {
+ Logger.getLogger(getClass().getName())
+ .warning("Interrupted while waiting for search saving.");
+ }
+ }
+ }
+ }
+
+ /**
+ * The thread that handles all the queries and the items in the queue.
+ */
+ private class Worker implements Runnable {
+
+ /**
+ * The json arrays received from the requests.
+ */
+ private final BlockingQueue<JsonObject> receivedObjects;
+
+ private boolean running = true;
+
+ public Worker() {
+ receivedObjects = new LinkedBlockingQueue();
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public boolean hasObjects() {
+ return !receivedObjects.isEmpty();
+ }
+
+ public void stop() {
+ running = false;
+ }
+
+ /**
+ * @return The head of the received object queue.
+ */
+ public JsonObject getObject() throws InterruptedException {
+ return receivedObjects.take();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ if (!requestQueue.isEmpty()) {
+ try {
+ Request request = requestQueue.take();
+ executeRequest(request);
+ } catch (InterruptedException ex) {
+ getLogger().log(Level.SEVERE, null, ex);
+ }
+ }
+ }
+ }
+
+ private void executeRequest(Request request) {
+ try {
+ Builder url = oauth.build(USER_TIMELINE_RESOURCE)
+ .param("screen_name", request.screenName);
+
+ int count = 200;
+ if (count >= request.numberOfTweets) {
+ count = request.numberOfTweets;
+ }
+ request.numberOfTweets -= count;
+
+ url = url.param("count", Integer.toString(count));
+ if (request.maxId > 0) {
+ url = url.param("max_id", Long.toString(request.maxId - 1));
+ }
+
+ try {
+ Response result = url.request();
+
+ // Obtain the lowest id.
+ JsonArray array = result.getResp().getAsJsonArray();
+ for (int i = 0; i < array.size(); i++) {
+ JsonObject object = array.get(i).getAsJsonObject();
+
+ JsonElement id = object.get("id");
+ if (id == null) {
+ getLogger().log(Level.WARNING, "Search {0}, id was not valid.", object);
+ continue;
+ }
+ if (id.getAsLong() <= request.maxId) {
+ request.maxId = object.get("id").getAsLong();
+ }
+
+ receivedObjects.offer(object);
+ }
+
+ if (request.numberOfTweets > 0) {
+ requestQueue.offer(request);
+ }
+
+ if (result.getRateLimitRemaining() == 0) {
+ waitTil(result.getRateLimitReset());
+ }
+ } catch (RateLimitException ex) {
+ requestQueue.offer(request);
+ waitTil(ex.getResetTime());
+ }
+
+ } catch (IOException ex) {
+ getLogger().log(Level.SEVERE, null, ex);
+ }
+ }
+
+ private void waitTil(long seconds) {
+ try {
+ long interval = seconds * 1000 - System.currentTimeMillis() + 2;
+ System.out.println("Rate limited, waiting " + interval + "seconds");
+ sleep(interval);
+ } catch (InterruptedException ex) {
+ getLogger().log(Level.SEVERE, null, ex);
+ }
+ }
+
+ private Logger getLogger() {
+ return Logger.getLogger(SearchImpl.class.getName());
+ }
+ }
+
+ /**
+ * Polls the worker thread for leftover JSONobjects to push as results.
+ */
+ private class Poller implements Runnable {
+
+ private final Worker worker;
+
+ private Poller(Worker worker) {
+ this.worker = worker;
+ }
+
+ @Override
+ public void run() {
+ // keep waiting for objects if the worker is alive, or fetch objects
+ // if the worker has any old ones left.
+ while (worker.isRunning() || worker.hasObjects()) {
+ try {
+ processObject(worker.getObject());
+ } catch (InterruptedException | ClassCastException ex) {
+ getLogger().log(Level.SEVERE, null, ex);
+ }
+ }
+ }
+
+ /**
+ * Distributes a new JSONObject to the result listeners.
+ *
+ * @param obj
+ */
+ private void processObject(JsonObject obj) {
+ synchronized (resultListenerSync) {
+ if (resultListener != null) {
+ resultListener.tweetGenerated(obj);
+ }
+ }
+ }
+
+ private Logger getLogger() {
+ return Logger.getLogger(SearchImpl.class.getName());
+ }
+ }
+}
diff --git a/src/main/TweetShell.java b/src/main/TweetShell.java
index 956f060..583b3a4 100644
--- a/src/main/TweetShell.java
+++ b/src/main/TweetShell.java
@@ -39,6 +39,8 @@ public class TweetShell implements TwitterApi.PinSupplier {
private Stream stream_cached;
private final CompositeResultListener resultListeners;
+ private Search search_cached;
+
/**
* Whether to convert uncompressed tweet files (e.g. "tweets.txt") to the
* compressed files ("tweets.txt.gz").
@@ -80,6 +82,15 @@ public class TweetShell implements TwitterApi.PinSupplier {
return stream_cached;
}
+ private Search getSearch() throws IOException {
+ if (search_cached == null) {
+ search_cached = new SearchImpl(getApi());
+ search_cached.setExceptionListener(new StreamExceptionHandler());
+ search_cached.setResultListener(resultListeners);
+ }
+ return search_cached;
+ }
+
private class StreamExceptionHandler implements ExceptionListener {
@Override
@@ -159,7 +170,7 @@ public class TweetShell implements TwitterApi.PinSupplier {
}
public boolean execute(String cmd) {
- String[] args = cmd.trim().split("\\s+", 2);
+ String[] args = cmd.trim().split("\\s+", 3);
if (!args[0].isEmpty()) {
// non-empty command, let's see whether it makes sense?
return execute(args);
@@ -191,6 +202,7 @@ public class TweetShell implements TwitterApi.PinSupplier {
}
// another satisfied customer, next!
return true;
+
}
enum Command {
@@ -199,6 +211,7 @@ public class TweetShell implements TwitterApi.PinSupplier {
del("Deletes a keyword from search", 1),
keywords("Display currently active keywords"),
commit("Activate the stream or apply the stream keyword changes"),
+ search("Searches for the tweets of some user", 2),
status("Show some status and statistics"),
flush("Writes any pending buffers"),
close("Close the stream"),
@@ -297,23 +310,37 @@ public class TweetShell implements TwitterApi.PinSupplier {
getStream().commit();
}
break;
+ case search:
+ Search search = getSearch();
+ search.search(params[0], Integer.parseInt(params[1]));
+ break;
case status:
TweetCounter tc;
- tc = (TweetCounter) resultListeners.findListener(TweetCounter.class);
- if (stream_cached != null && getStream().isValid()) {
+ tc
+ = (TweetCounter) resultListeners.findListener(TweetCounter.class
+ );
+ if (stream_cached
+ != null && getStream()
+ .isValid()) {
System.out.println("Streaming is active.");
} else {
System.out.println("Streaming is inactive.");
}
DateFormat df = SimpleDateFormat.getDateTimeInstance();
String start_date = df.format(tc.getStartDate());
- System.out.println("Started at " + start_date);
- System.out.println("Elapsed: " + tc.getActiveTime());
- System.out.println("Received tweets in session: " + tc.getTweetCount());
- System.out.println("Unique users: " + tc.getUsers().size());
+
+ System.out.println(
+ "Started at " + start_date);
+ System.out.println(
+ "Elapsed: " + tc.getActiveTime());
+ System.out.println(
+ "Received tweets in session: " + tc.getTweetCount());
+ System.out.println(
+ "Unique users: " + tc.getUsers().size());
break;
case flush:
- if (stream_cached != null) {
+ if (stream_cached
+ != null) {
resultListeners.flush();
}
break;
@@ -324,7 +351,8 @@ public class TweetShell implements TwitterApi.PinSupplier {
for (String line : HELP) {
System.out.println(line);
}
- for (Command cmd : Command.values()) {
+ for (Command cmd
+ : Command.values()) {
System.out.printf(" %-10s", cmd.name());
if (!cmd.getDescription().isEmpty()) {
System.out.print(" " + cmd.getDescription());
@@ -339,9 +367,11 @@ public class TweetShell implements TwitterApi.PinSupplier {
break;
case exit:
safeClose();
+
throw new NoSuchElementException();
case target:
- if (params.length > 0) {
+ if (params.length
+ > 0) {
// due to limitations of shell (does not handle escapes),
// do a manualy split as we cannot have spaces in our args.
if (params.length == 1) {
@@ -384,12 +414,20 @@ public class TweetShell implements TwitterApi.PinSupplier {
*/
private ClassEnabledTracker<ResultListener> getPossibleTargets() {
Map<String, Class<? extends ResultListener>> targets = new TreeMap<>();
- targets.put("file", DataWriter.class);
- targets.put("cfile", CompressableDataWriter.class);
- targets.put("shell", StreamHandler.class);
+ targets
+ .put("file", DataWriter.class
+ );
+ targets.put(
+ "cfile", CompressableDataWriter.class
+ );
+ targets.put(
+ "shell", StreamHandler.class
+ );
ClassEnabledTracker<ResultListener> targetFoo = new ClassEnabledTracker<>(targets);
+
targetFoo.disableAll();
+
targetFoo.enableClasses(resultListeners.getRegistered());
return targetFoo;
}