summaryrefslogtreecommitdiff
path: root/src/io/SearchImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/io/SearchImpl.java')
-rw-r--r--src/io/SearchImpl.java321
1 files changed, 321 insertions, 0 deletions
diff --git a/src/io/SearchImpl.java b/src/io/SearchImpl.java
new file mode 100644
index 0000000..c08675b
--- /dev/null
+++ b/src/io/SearchImpl.java
@@ -0,0 +1,321 @@
+package io;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import java.io.IOException;
+import static java.lang.Thread.sleep;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import mining.Search;
+import mining.TwitterApi;
+import mining.TwitterApi.Builder;
+import provider.ExceptionListener;
+import provider.ResultListener;
+
+/**
+ * Implementation of the search with rate limit handling and multiple threads
+ *
+ * @author Maurice Laveaux
+ */
+public class SearchImpl implements Search {
+
+ private static final String USER_TIMELINE_RESOURCE = "statuses/user_timeline";
+
+ /**
+ * The result listener to report results to
+ */
+ private ResultListener resultListener = null;
+
+ /**
+ * Dummy object to synchronize the resultListener;
+ */
+ private final Object resultListenerSync = new Object();
+
+ /**
+ * The list of user searches are done on search.
+ */
+ private final BlockingQueue<Request> requestQueue;
+
+ /**
+ * The thread container that handles both io and request thread.
+ */
+ private ThreadContainer thread_cached;
+
+ /**
+ * The twitter api to execute requests.
+ */
+ private final TwitterApi oauth;
+
+ public SearchImpl(TwitterApi oauth) {
+ this.oauth = oauth;
+
+ requestQueue = new LinkedBlockingQueue();
+
+ //load("queue_cache");
+ }
+
+ @Override
+ public void search(String screenName, int numberOfTweets) {
+ if (numberOfTweets > 3200) {
+ numberOfTweets = 3200;
+ System.out.println("Changed number of tweets to max 3200.");
+ }
+
+ requestQueue.offer(new Request(screenName, numberOfTweets));
+
+ if (thread_cached == null) {
+ thread_cached = new ThreadContainer();
+ thread_cached.start();
+ }
+ }
+
+ @Override
+ public void setResultListener(ResultListener listener) {
+ synchronized (resultListenerSync) {
+ resultListener = listener;
+ }
+ }
+
+ @Override
+ public void save(String filename) {
+
+ }
+
+ @Override
+ public void load(String filename) {
+
+ }
+
+ @Override
+ public void setExceptionListener(ExceptionListener listener) {
+
+ }
+
+ @Override
+ public void close() {
+ if (thread_cached != null) {
+ thread_cached.stop();
+ }
+ }
+
+ private class Request {
+
+ public Request(String screenName, int numberOfTweets) {
+ this.screenName = screenName;
+ this.numberOfTweets = numberOfTweets;
+
+ sinceId = 0;
+ maxId = Long.MAX_VALUE;
+ }
+
+ public String screenName;
+ public long sinceId;
+ public long maxId;
+ public int numberOfTweets;
+ }
+
+ /**
+ * The container that creates both threads.
+ */
+ private class ThreadContainer {
+
+ private final Thread ioThread;
+ private final Poller poller;
+
+ private final Thread workerThread;
+ private final Worker worker;
+
+ /**
+ * The poller methods.
+ */
+ public ThreadContainer() {
+ worker = new Worker();
+ poller = new Poller(worker);
+
+ ioThread = new Thread(poller);
+ workerThread = new Thread(worker);
+ }
+
+ public void start() {
+ ioThread.start();
+ workerThread.start();
+ }
+
+ public void stop() {
+ worker.stop();
+ while (workerThread.isAlive()) {
+ try {
+ workerThread.join();
+ ioThread.join();
+ save("search_cached.txt");
+ } catch (InterruptedException ex) {
+ Logger.getLogger(getClass().getName())
+ .warning("Interrupted while waiting for search saving.");
+ }
+ }
+ }
+ }
+
+ /**
+ * The thread that handles all the queries and the items in the queue.
+ */
+ private class Worker implements Runnable {
+
+ /**
+ * The json arrays received from the requests.
+ */
+ private final BlockingQueue<JsonObject> receivedObjects;
+
+ private boolean running = true;
+
+ public Worker() {
+ receivedObjects = new LinkedBlockingQueue();
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public boolean hasObjects() {
+ return !receivedObjects.isEmpty();
+ }
+
+ public void stop() {
+ running = false;
+ }
+
+ /**
+ * @return The head of the received object queue.
+ */
+ public JsonObject getObject() throws InterruptedException {
+ return receivedObjects.take();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ if (!requestQueue.isEmpty()) {
+ try {
+ Request request = requestQueue.take();
+ executeRequest(request);
+ } catch (InterruptedException ex) {
+ getLogger().log(Level.SEVERE, null, ex);
+ }
+ }
+ }
+ }
+
+ private void executeRequest(Request request) {
+ try {
+ Builder url = oauth.build(USER_TIMELINE_RESOURCE)
+ .param("screen_name", request.screenName);
+
+ int count = 200;
+ if (count >= request.numberOfTweets) {
+ count = request.numberOfTweets;
+ }
+ request.numberOfTweets -= count;
+
+ url = url.param("count", Integer.toString(count));
+ if (request.maxId > 0) {
+ url = url.param("max_id", Long.toString(request.maxId - 1));
+ }
+
+ try {
+ Response result = url.request();
+
+ // Obtain the lowest id.
+ JsonArray array = result.getResp().getAsJsonArray();
+ for (int i = 0; i < array.size(); i++) {
+ JsonObject object = array.get(i).getAsJsonObject();
+
+ JsonElement id = object.get("id");
+ if (id == null) {
+ getLogger().log(Level.WARNING, "Search {0}, id was not valid.", object);
+ continue;
+ }
+ if (id.getAsLong() <= request.maxId) {
+ request.maxId = object.get("id").getAsLong();
+ }
+
+ receivedObjects.offer(object);
+ }
+
+ if (request.numberOfTweets > 0) {
+ requestQueue.offer(request);
+ }
+
+ if (result.getRateLimitRemaining() == 0) {
+ waitTil(result.getRateLimitReset());
+ }
+ } catch (RateLimitException ex) {
+ requestQueue.offer(request);
+ waitTil(ex.getResetTime());
+ }
+
+ } catch (IOException ex) {
+ getLogger().log(Level.SEVERE, null, ex);
+ }
+ }
+
+ private void waitTil(long seconds) {
+ try {
+ long interval = seconds * 1000 - System.currentTimeMillis() + 2;
+ System.out.println("Rate limited, waiting " + interval + "seconds");
+ sleep(interval);
+ } catch (InterruptedException ex) {
+ getLogger().log(Level.SEVERE, null, ex);
+ }
+ }
+
+ private Logger getLogger() {
+ return Logger.getLogger(SearchImpl.class.getName());
+ }
+ }
+
+ /**
+ * Polls the worker thread for leftover JSONobjects to push as results.
+ */
+ private class Poller implements Runnable {
+
+ private final Worker worker;
+
+ private Poller(Worker worker) {
+ this.worker = worker;
+ }
+
+ @Override
+ public void run() {
+ // keep waiting for objects if the worker is alive, or fetch objects
+ // if the worker has any old ones left.
+ while (worker.isRunning() || worker.hasObjects()) {
+ try {
+ processObject(worker.getObject());
+ } catch (InterruptedException | ClassCastException ex) {
+ getLogger().log(Level.SEVERE, null, ex);
+ }
+ }
+ }
+
+ /**
+ * Distributes a new JSONObject to the result listeners.
+ *
+ * @param obj
+ */
+ private void processObject(JsonObject obj) {
+ synchronized (resultListenerSync) {
+ if (resultListener != null) {
+ resultListener.tweetGenerated(obj);
+ }
+ }
+ }
+
+ private Logger getLogger() {
+ return Logger.getLogger(SearchImpl.class.getName());
+ }
+ }
+}