From 1da00321db0aa8c412c3ff1dff5734962ee49240 Mon Sep 17 00:00:00 2001 From: Maurice Laveaux Date: Thu, 22 May 2014 16:00:56 +0200 Subject: Implemented basic search functionality. * Usage, search . --- src/io/SearchImpl.java | 321 +++++++++++++++++++++++++++++++++++++++++++++++ src/main/TweetShell.java | 64 ++++++++-- 2 files changed, 372 insertions(+), 13 deletions(-) create mode 100644 src/io/SearchImpl.java diff --git a/src/io/SearchImpl.java b/src/io/SearchImpl.java new file mode 100644 index 0000000..c08675b --- /dev/null +++ b/src/io/SearchImpl.java @@ -0,0 +1,321 @@ +package io; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import java.io.IOException; +import static java.lang.Thread.sleep; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; +import mining.Search; +import mining.TwitterApi; +import mining.TwitterApi.Builder; +import provider.ExceptionListener; +import provider.ResultListener; + +/** + * Implementation of the search with rate limit handling and multiple threads + * + * @author Maurice Laveaux + */ +public class SearchImpl implements Search { + + private static final String USER_TIMELINE_RESOURCE = "statuses/user_timeline"; + + /** + * The result listener to report results to + */ + private ResultListener resultListener = null; + + /** + * Dummy object to synchronize the resultListener; + */ + private final Object resultListenerSync = new Object(); + + /** + * The list of user searches are done on search. + */ + private final BlockingQueue requestQueue; + + /** + * The thread container that handles both io and request thread. + */ + private ThreadContainer thread_cached; + + /** + * The twitter api to execute requests. + */ + private final TwitterApi oauth; + + public SearchImpl(TwitterApi oauth) { + this.oauth = oauth; + + requestQueue = new LinkedBlockingQueue(); + + //load("queue_cache"); + } + + @Override + public void search(String screenName, int numberOfTweets) { + if (numberOfTweets > 3200) { + numberOfTweets = 3200; + System.out.println("Changed number of tweets to max 3200."); + } + + requestQueue.offer(new Request(screenName, numberOfTweets)); + + if (thread_cached == null) { + thread_cached = new ThreadContainer(); + thread_cached.start(); + } + } + + @Override + public void setResultListener(ResultListener listener) { + synchronized (resultListenerSync) { + resultListener = listener; + } + } + + @Override + public void save(String filename) { + + } + + @Override + public void load(String filename) { + + } + + @Override + public void setExceptionListener(ExceptionListener listener) { + + } + + @Override + public void close() { + if (thread_cached != null) { + thread_cached.stop(); + } + } + + private class Request { + + public Request(String screenName, int numberOfTweets) { + this.screenName = screenName; + this.numberOfTweets = numberOfTweets; + + sinceId = 0; + maxId = Long.MAX_VALUE; + } + + public String screenName; + public long sinceId; + public long maxId; + public int numberOfTweets; + } + + /** + * The container that creates both threads. + */ + private class ThreadContainer { + + private final Thread ioThread; + private final Poller poller; + + private final Thread workerThread; + private final Worker worker; + + /** + * The poller methods. + */ + public ThreadContainer() { + worker = new Worker(); + poller = new Poller(worker); + + ioThread = new Thread(poller); + workerThread = new Thread(worker); + } + + public void start() { + ioThread.start(); + workerThread.start(); + } + + public void stop() { + worker.stop(); + while (workerThread.isAlive()) { + try { + workerThread.join(); + ioThread.join(); + save("search_cached.txt"); + } catch (InterruptedException ex) { + Logger.getLogger(getClass().getName()) + .warning("Interrupted while waiting for search saving."); + } + } + } + } + + /** + * The thread that handles all the queries and the items in the queue. + */ + private class Worker implements Runnable { + + /** + * The json arrays received from the requests. + */ + private final BlockingQueue receivedObjects; + + private boolean running = true; + + public Worker() { + receivedObjects = new LinkedBlockingQueue(); + } + + public boolean isRunning() { + return running; + } + + public boolean hasObjects() { + return !receivedObjects.isEmpty(); + } + + public void stop() { + running = false; + } + + /** + * @return The head of the received object queue. + */ + public JsonObject getObject() throws InterruptedException { + return receivedObjects.take(); + } + + @Override + public void run() { + while (true) { + if (!requestQueue.isEmpty()) { + try { + Request request = requestQueue.take(); + executeRequest(request); + } catch (InterruptedException ex) { + getLogger().log(Level.SEVERE, null, ex); + } + } + } + } + + private void executeRequest(Request request) { + try { + Builder url = oauth.build(USER_TIMELINE_RESOURCE) + .param("screen_name", request.screenName); + + int count = 200; + if (count >= request.numberOfTweets) { + count = request.numberOfTweets; + } + request.numberOfTweets -= count; + + url = url.param("count", Integer.toString(count)); + if (request.maxId > 0) { + url = url.param("max_id", Long.toString(request.maxId - 1)); + } + + try { + Response result = url.request(); + + // Obtain the lowest id. + JsonArray array = result.getResp().getAsJsonArray(); + for (int i = 0; i < array.size(); i++) { + JsonObject object = array.get(i).getAsJsonObject(); + + JsonElement id = object.get("id"); + if (id == null) { + getLogger().log(Level.WARNING, "Search {0}, id was not valid.", object); + continue; + } + if (id.getAsLong() <= request.maxId) { + request.maxId = object.get("id").getAsLong(); + } + + receivedObjects.offer(object); + } + + if (request.numberOfTweets > 0) { + requestQueue.offer(request); + } + + if (result.getRateLimitRemaining() == 0) { + waitTil(result.getRateLimitReset()); + } + } catch (RateLimitException ex) { + requestQueue.offer(request); + waitTil(ex.getResetTime()); + } + + } catch (IOException ex) { + getLogger().log(Level.SEVERE, null, ex); + } + } + + private void waitTil(long seconds) { + try { + long interval = seconds * 1000 - System.currentTimeMillis() + 2; + System.out.println("Rate limited, waiting " + interval + "seconds"); + sleep(interval); + } catch (InterruptedException ex) { + getLogger().log(Level.SEVERE, null, ex); + } + } + + private Logger getLogger() { + return Logger.getLogger(SearchImpl.class.getName()); + } + } + + /** + * Polls the worker thread for leftover JSONobjects to push as results. + */ + private class Poller implements Runnable { + + private final Worker worker; + + private Poller(Worker worker) { + this.worker = worker; + } + + @Override + public void run() { + // keep waiting for objects if the worker is alive, or fetch objects + // if the worker has any old ones left. + while (worker.isRunning() || worker.hasObjects()) { + try { + processObject(worker.getObject()); + } catch (InterruptedException | ClassCastException ex) { + getLogger().log(Level.SEVERE, null, ex); + } + } + } + + /** + * Distributes a new JSONObject to the result listeners. + * + * @param obj + */ + private void processObject(JsonObject obj) { + synchronized (resultListenerSync) { + if (resultListener != null) { + resultListener.tweetGenerated(obj); + } + } + } + + private Logger getLogger() { + return Logger.getLogger(SearchImpl.class.getName()); + } + } +} diff --git a/src/main/TweetShell.java b/src/main/TweetShell.java index 956f060..583b3a4 100644 --- a/src/main/TweetShell.java +++ b/src/main/TweetShell.java @@ -39,6 +39,8 @@ public class TweetShell implements TwitterApi.PinSupplier { private Stream stream_cached; private final CompositeResultListener resultListeners; + private Search search_cached; + /** * Whether to convert uncompressed tweet files (e.g. "tweets.txt") to the * compressed files ("tweets.txt.gz"). @@ -80,6 +82,15 @@ public class TweetShell implements TwitterApi.PinSupplier { return stream_cached; } + private Search getSearch() throws IOException { + if (search_cached == null) { + search_cached = new SearchImpl(getApi()); + search_cached.setExceptionListener(new StreamExceptionHandler()); + search_cached.setResultListener(resultListeners); + } + return search_cached; + } + private class StreamExceptionHandler implements ExceptionListener { @Override @@ -159,7 +170,7 @@ public class TweetShell implements TwitterApi.PinSupplier { } public boolean execute(String cmd) { - String[] args = cmd.trim().split("\\s+", 2); + String[] args = cmd.trim().split("\\s+", 3); if (!args[0].isEmpty()) { // non-empty command, let's see whether it makes sense? return execute(args); @@ -191,6 +202,7 @@ public class TweetShell implements TwitterApi.PinSupplier { } // another satisfied customer, next! return true; + } enum Command { @@ -199,6 +211,7 @@ public class TweetShell implements TwitterApi.PinSupplier { del("Deletes a keyword from search", 1), keywords("Display currently active keywords"), commit("Activate the stream or apply the stream keyword changes"), + search("Searches for the tweets of some user", 2), status("Show some status and statistics"), flush("Writes any pending buffers"), close("Close the stream"), @@ -297,23 +310,37 @@ public class TweetShell implements TwitterApi.PinSupplier { getStream().commit(); } break; + case search: + Search search = getSearch(); + search.search(params[0], Integer.parseInt(params[1])); + break; case status: TweetCounter tc; - tc = (TweetCounter) resultListeners.findListener(TweetCounter.class); - if (stream_cached != null && getStream().isValid()) { + tc + = (TweetCounter) resultListeners.findListener(TweetCounter.class + ); + if (stream_cached + != null && getStream() + .isValid()) { System.out.println("Streaming is active."); } else { System.out.println("Streaming is inactive."); } DateFormat df = SimpleDateFormat.getDateTimeInstance(); String start_date = df.format(tc.getStartDate()); - System.out.println("Started at " + start_date); - System.out.println("Elapsed: " + tc.getActiveTime()); - System.out.println("Received tweets in session: " + tc.getTweetCount()); - System.out.println("Unique users: " + tc.getUsers().size()); + + System.out.println( + "Started at " + start_date); + System.out.println( + "Elapsed: " + tc.getActiveTime()); + System.out.println( + "Received tweets in session: " + tc.getTweetCount()); + System.out.println( + "Unique users: " + tc.getUsers().size()); break; case flush: - if (stream_cached != null) { + if (stream_cached + != null) { resultListeners.flush(); } break; @@ -324,7 +351,8 @@ public class TweetShell implements TwitterApi.PinSupplier { for (String line : HELP) { System.out.println(line); } - for (Command cmd : Command.values()) { + for (Command cmd + : Command.values()) { System.out.printf(" %-10s", cmd.name()); if (!cmd.getDescription().isEmpty()) { System.out.print(" " + cmd.getDescription()); @@ -339,9 +367,11 @@ public class TweetShell implements TwitterApi.PinSupplier { break; case exit: safeClose(); + throw new NoSuchElementException(); case target: - if (params.length > 0) { + if (params.length + > 0) { // due to limitations of shell (does not handle escapes), // do a manualy split as we cannot have spaces in our args. if (params.length == 1) { @@ -384,12 +414,20 @@ public class TweetShell implements TwitterApi.PinSupplier { */ private ClassEnabledTracker getPossibleTargets() { Map> targets = new TreeMap<>(); - targets.put("file", DataWriter.class); - targets.put("cfile", CompressableDataWriter.class); - targets.put("shell", StreamHandler.class); + targets + .put("file", DataWriter.class + ); + targets.put( + "cfile", CompressableDataWriter.class + ); + targets.put( + "shell", StreamHandler.class + ); ClassEnabledTracker targetFoo = new ClassEnabledTracker<>(targets); + targetFoo.disableAll(); + targetFoo.enableClasses(resultListeners.getRegistered()); return targetFoo; } -- cgit v1.2.1