package io; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.Reader; import static java.lang.Thread.sleep; import java.util.Scanner; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; import mining.Search; import mining.TwitterApi; import mining.TwitterApi.Builder; import provider.ExceptionListener; import provider.ResultListener; /** * Implementation of the search with rate limit handling and multiple threads * * @author Maurice Laveaux */ public class SearchImpl implements Search { private static final String USER_TIMELINE_RESOURCE = "statuses/user_timeline"; /** * The result listener to report results to */ private ResultListener resultListener = null; /** * Dummy object to synchronize the resultListener; */ private final Object resultListenerSync = new Object(); /** * The list of user searches are done on search. */ private final BlockingQueue requestQueue; /** * The thread container that handles both io and request thread. */ private ThreadContainer thread_cached; /** * The twitter api to execute requests. */ private final TwitterApi oauth; public SearchImpl(TwitterApi oauth) { this.oauth = oauth; requestQueue = new LinkedBlockingQueue(); load("search_cache"); if (thread_cached == null) { thread_cached = new ThreadContainer(); thread_cached.start(); } if (requestQueue.size() > 0) { System.out.println("Resumed " + requestQueue.size() + " amount of requests"); } } @Override public void search(String screenName, int numberOfTweets) { if (numberOfTweets > 3200) { numberOfTweets = 3200; System.out.println("Changed number of tweets to max 3200."); } requestQueue.offer(new Request(screenName, numberOfTweets)); } @Override public void setResultListener(ResultListener listener) { synchronized (resultListenerSync) { resultListener = listener; } } @Override public final void save(String filename) { try (FileWriter writer = new FileWriter(filename)) { for (Request req : requestQueue) { writer.append(req.toString() + "\n"); } } catch (IOException ex) { Logger.getLogger(SearchImpl.class.getName()).log(Level.SEVERE, "save error", ex); } } @Override public final void load(String filename) { try (Reader fileReader = new FileReader(filename)) { BufferedReader reader = new BufferedReader(fileReader); String line; while ((line = reader.readLine()) != null) { requestQueue.offer(new Request(line)); } } catch (IOException ex) { Logger.getLogger(getClass().getName()) .log(Level.WARNING, "File named {0} did not exist will be created.", filename); } } @Override public void setExceptionListener(ExceptionListener listener) { } @Override public void close() { if (thread_cached != null) { thread_cached.stop(); save("search_cache"); } } private class Request { public Request(String screenName, int numberOfTweets) { this.screenName = screenName; this.numberOfTweets = numberOfTweets; sinceId = 0; maxId = Long.MAX_VALUE; } public Request(String parse) { String[] args = parse.split(":", 4); this.screenName = args[0]; this.sinceId = Long.parseLong(args[1]); this.maxId = Long.parseLong(args[2]); this.numberOfTweets = Integer.parseInt(args[3]); } @Override public String toString() { return screenName + ":" + Long.toString(sinceId) + ":" + Long.toString(maxId) + ":" + Integer.toString(numberOfTweets); } public String screenName; public long sinceId; public long maxId; public int numberOfTweets; } /** * The container that creates both threads. */ private class ThreadContainer { private final Thread ioThread; private final Poller poller; private final Thread workerThread; private final Worker worker; /** * The poller methods. */ public ThreadContainer() { worker = new Worker(); poller = new Poller(worker); ioThread = new Thread(poller); workerThread = new Thread(worker); } public void start() { ioThread.start(); workerThread.start(); } public void stop() { worker.stop(); while (workerThread.isAlive()) { try { workerThread.interrupt(); workerThread.join(); ioThread.join(); } catch (InterruptedException ex) { Logger.getLogger(getClass().getName()) .warning("Interrupted while waiting for search saving."); } } } } /** * The thread that handles all the queries and the items in the queue. */ private class Worker implements Runnable { /** * The json arrays received from the requests. */ private final BlockingQueue receivedObjects; private volatile boolean running = true; public Worker() { receivedObjects = new LinkedBlockingQueue(); } public boolean isRunning() { return running; } public boolean hasObjects() { return !receivedObjects.isEmpty(); } public void stop() { running = false; } /** * @return The head of the received object queue. */ public JsonObject getObject() throws InterruptedException { return receivedObjects.take(); } @Override public void run() { while (running) { try { Request request = requestQueue.take(); executeRequest(request); } catch (InterruptedException ex) { if (running) { getLogger().log(Level.SEVERE, "Interrupted while active", ex); } return; } } } private void executeRequest(Request request) throws InterruptedException { try { long waittime = 0; Builder url = oauth.build(USER_TIMELINE_RESOURCE) .param("screen_name", request.screenName); int count = Math.min(200, request.numberOfTweets); url.param("count", Integer.toString(count)); if (request.maxId > 0) { url.param("max_id", Long.toString(request.maxId - 1)); } try { Response result = url.request(); // Obtain the lowest id. JsonArray array = result.getJson().getAsJsonArray(); for (JsonElement element : array) { JsonObject object = element.getAsJsonObject(); JsonElement id = object.get("id"); if (id == null || !id.isJsonPrimitive()) { getLogger().log(Level.WARNING, "Search {0}, id was not valid.", object); continue; } if (id.getAsLong() <= request.maxId) { request.maxId = id.getAsLong(); } receivedObjects.offer(object); } request.numberOfTweets -= count; // request completed, do we need more requests? if (request.numberOfTweets > 0) { requestQueue.offer(request); } if (result.getRateLimitRemaining() == 0) { waittime = result.getRateLimitRemainingTime(); } } catch (RateLimitException ex) { requestQueue.offer(request); waittime = ex.getRateLimitRemainingTime(); } if (waittime > 0) { System.out.println("Rate limited, waiting for " + waittime + " ms"); sleep(waittime); } } catch (IOException ex) { requestQueue.offer(request); getLogger().log(Level.SEVERE, null, ex); } } private Logger getLogger() { return Logger.getLogger(SearchImpl.class.getName()); } } /** * Polls the worker thread for leftover JSONobjects to push as results. */ private class Poller implements Runnable { private final Worker worker; private Poller(Worker worker) { this.worker = worker; } @Override public void run() { // keep waiting for objects if the worker is alive, or fetch objects // if the worker has any old ones left. while (worker.isRunning() || worker.hasObjects()) { try { processObject(worker.getObject()); } catch (InterruptedException ex) { // requested to stop. getLogger().log(Level.SEVERE, null, ex); return; } } } /** * Distributes a new JSONObject to the result listeners. * * @param obj */ private void processObject(JsonObject obj) { synchronized (resultListenerSync) { if (resultListener != null) { resultListener.tweetGenerated(obj); } } } private Logger getLogger() { return Logger.getLogger(SearchImpl.class.getName()); } } }