From 1da00321db0aa8c412c3ff1dff5734962ee49240 Mon Sep 17 00:00:00 2001 From: Maurice Laveaux Date: Thu, 22 May 2014 16:00:56 +0200 Subject: Implemented basic search functionality. * Usage, search . --- src/io/SearchImpl.java | 321 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 321 insertions(+) create mode 100644 src/io/SearchImpl.java (limited to 'src/io') diff --git a/src/io/SearchImpl.java b/src/io/SearchImpl.java new file mode 100644 index 0000000..c08675b --- /dev/null +++ b/src/io/SearchImpl.java @@ -0,0 +1,321 @@ +package io; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import java.io.IOException; +import static java.lang.Thread.sleep; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; +import mining.Search; +import mining.TwitterApi; +import mining.TwitterApi.Builder; +import provider.ExceptionListener; +import provider.ResultListener; + +/** + * Implementation of the search with rate limit handling and multiple threads + * + * @author Maurice Laveaux + */ +public class SearchImpl implements Search { + + private static final String USER_TIMELINE_RESOURCE = "statuses/user_timeline"; + + /** + * The result listener to report results to + */ + private ResultListener resultListener = null; + + /** + * Dummy object to synchronize the resultListener; + */ + private final Object resultListenerSync = new Object(); + + /** + * The list of user searches are done on search. + */ + private final BlockingQueue requestQueue; + + /** + * The thread container that handles both io and request thread. + */ + private ThreadContainer thread_cached; + + /** + * The twitter api to execute requests. + */ + private final TwitterApi oauth; + + public SearchImpl(TwitterApi oauth) { + this.oauth = oauth; + + requestQueue = new LinkedBlockingQueue(); + + //load("queue_cache"); + } + + @Override + public void search(String screenName, int numberOfTweets) { + if (numberOfTweets > 3200) { + numberOfTweets = 3200; + System.out.println("Changed number of tweets to max 3200."); + } + + requestQueue.offer(new Request(screenName, numberOfTweets)); + + if (thread_cached == null) { + thread_cached = new ThreadContainer(); + thread_cached.start(); + } + } + + @Override + public void setResultListener(ResultListener listener) { + synchronized (resultListenerSync) { + resultListener = listener; + } + } + + @Override + public void save(String filename) { + + } + + @Override + public void load(String filename) { + + } + + @Override + public void setExceptionListener(ExceptionListener listener) { + + } + + @Override + public void close() { + if (thread_cached != null) { + thread_cached.stop(); + } + } + + private class Request { + + public Request(String screenName, int numberOfTweets) { + this.screenName = screenName; + this.numberOfTweets = numberOfTweets; + + sinceId = 0; + maxId = Long.MAX_VALUE; + } + + public String screenName; + public long sinceId; + public long maxId; + public int numberOfTweets; + } + + /** + * The container that creates both threads. + */ + private class ThreadContainer { + + private final Thread ioThread; + private final Poller poller; + + private final Thread workerThread; + private final Worker worker; + + /** + * The poller methods. + */ + public ThreadContainer() { + worker = new Worker(); + poller = new Poller(worker); + + ioThread = new Thread(poller); + workerThread = new Thread(worker); + } + + public void start() { + ioThread.start(); + workerThread.start(); + } + + public void stop() { + worker.stop(); + while (workerThread.isAlive()) { + try { + workerThread.join(); + ioThread.join(); + save("search_cached.txt"); + } catch (InterruptedException ex) { + Logger.getLogger(getClass().getName()) + .warning("Interrupted while waiting for search saving."); + } + } + } + } + + /** + * The thread that handles all the queries and the items in the queue. + */ + private class Worker implements Runnable { + + /** + * The json arrays received from the requests. + */ + private final BlockingQueue receivedObjects; + + private boolean running = true; + + public Worker() { + receivedObjects = new LinkedBlockingQueue(); + } + + public boolean isRunning() { + return running; + } + + public boolean hasObjects() { + return !receivedObjects.isEmpty(); + } + + public void stop() { + running = false; + } + + /** + * @return The head of the received object queue. + */ + public JsonObject getObject() throws InterruptedException { + return receivedObjects.take(); + } + + @Override + public void run() { + while (true) { + if (!requestQueue.isEmpty()) { + try { + Request request = requestQueue.take(); + executeRequest(request); + } catch (InterruptedException ex) { + getLogger().log(Level.SEVERE, null, ex); + } + } + } + } + + private void executeRequest(Request request) { + try { + Builder url = oauth.build(USER_TIMELINE_RESOURCE) + .param("screen_name", request.screenName); + + int count = 200; + if (count >= request.numberOfTweets) { + count = request.numberOfTweets; + } + request.numberOfTweets -= count; + + url = url.param("count", Integer.toString(count)); + if (request.maxId > 0) { + url = url.param("max_id", Long.toString(request.maxId - 1)); + } + + try { + Response result = url.request(); + + // Obtain the lowest id. + JsonArray array = result.getResp().getAsJsonArray(); + for (int i = 0; i < array.size(); i++) { + JsonObject object = array.get(i).getAsJsonObject(); + + JsonElement id = object.get("id"); + if (id == null) { + getLogger().log(Level.WARNING, "Search {0}, id was not valid.", object); + continue; + } + if (id.getAsLong() <= request.maxId) { + request.maxId = object.get("id").getAsLong(); + } + + receivedObjects.offer(object); + } + + if (request.numberOfTweets > 0) { + requestQueue.offer(request); + } + + if (result.getRateLimitRemaining() == 0) { + waitTil(result.getRateLimitReset()); + } + } catch (RateLimitException ex) { + requestQueue.offer(request); + waitTil(ex.getResetTime()); + } + + } catch (IOException ex) { + getLogger().log(Level.SEVERE, null, ex); + } + } + + private void waitTil(long seconds) { + try { + long interval = seconds * 1000 - System.currentTimeMillis() + 2; + System.out.println("Rate limited, waiting " + interval + "seconds"); + sleep(interval); + } catch (InterruptedException ex) { + getLogger().log(Level.SEVERE, null, ex); + } + } + + private Logger getLogger() { + return Logger.getLogger(SearchImpl.class.getName()); + } + } + + /** + * Polls the worker thread for leftover JSONobjects to push as results. + */ + private class Poller implements Runnable { + + private final Worker worker; + + private Poller(Worker worker) { + this.worker = worker; + } + + @Override + public void run() { + // keep waiting for objects if the worker is alive, or fetch objects + // if the worker has any old ones left. + while (worker.isRunning() || worker.hasObjects()) { + try { + processObject(worker.getObject()); + } catch (InterruptedException | ClassCastException ex) { + getLogger().log(Level.SEVERE, null, ex); + } + } + } + + /** + * Distributes a new JSONObject to the result listeners. + * + * @param obj + */ + private void processObject(JsonObject obj) { + synchronized (resultListenerSync) { + if (resultListener != null) { + resultListener.tweetGenerated(obj); + } + } + } + + private Logger getLogger() { + return Logger.getLogger(SearchImpl.class.getName()); + } + } +} -- cgit v1.2.1