From 4fba2f23915a0566b521307de8abe2ee13f4cdad Mon Sep 17 00:00:00 2001 From: Peter Wu Date: Thu, 1 May 2014 18:54:19 +0200 Subject: New thread for notifying of new objects * Use volatile boolean instead of AtomicBoolean. There will only be one one writer. * Store received objects in a queue instead of printing them. * Rename thread to clarify its I/O purpose. * Create new Thread for notifying observers of new tweets and users. --- src/io/StreamImpl.java | 108 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 95 insertions(+), 13 deletions(-) diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java index cba9fdd..fadd335 100644 --- a/src/io/StreamImpl.java +++ b/src/io/StreamImpl.java @@ -9,7 +9,8 @@ import java.net.URL; import java.net.URLEncoder; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Logger; import mining.Stream; import oauth.signpost.exception.OAuthException; @@ -38,6 +39,7 @@ public class StreamImpl implements Stream { * Used for thread-safe modifications. */ private final Object listenerSync = new Object(); + private final Object resultListenerSync = new Object(); /** * The target that is interested in received tweets. */ @@ -67,7 +69,10 @@ public class StreamImpl implements Stream { * @param resultListener */ public void setResultListener(ResultListener resultListener) { - this.resultListener = resultListener; + // sync to avoid breakage when the poller thread sends a notification + synchronized (resultListenerSync) { + this.resultListener = resultListener; + } } public ResultListener getResultListener() { @@ -126,10 +131,15 @@ public class StreamImpl implements Stream { } } + /** + * Groups an activated stream connection. + */ private class WorkerContainer { private final Worker worker; - private final Thread thread; + private final Poller poller; + private final Thread ioThread; + private final Thread pollerThread; /** * Holds a thread for the worker. @@ -138,18 +148,27 @@ public class StreamImpl implements Stream { */ public WorkerContainer(Worker worker) { this.worker = worker; - thread = new Thread(worker); + this.poller = new Poller(worker); + ioThread = new Thread(worker); + // the poller is closely coupled with the I/O thread. If the I/O + // thread dies, no new messages will be added to the queue. + pollerThread = new Thread(poller); } public void start() { - thread.start(); + ioThread.start(); + pollerThread.start(); } public void finish() { worker.stopWorker(); - while (thread.isAlive()) { + while (ioThread.isAlive()) { try { - thread.join(); + ioThread.join(); + // maybe the poller is stuck waiting for a new object, wake + // it up. + pollerThread.interrupt(); + pollerThread.join(); } catch (InterruptedException ex) { Logger.getLogger(getClass().getName()) .warning("Interrupted while waiting for stream finish"); @@ -162,7 +181,47 @@ public class StreamImpl implements Stream { } public boolean isValid() { - return thread.isAlive(); + return ioThread.isAlive(); + } + } + + private class Poller implements Runnable { + + private final Worker worker; + + private Poller(Worker worker) { + this.worker = worker; + } + + @Override + public void run() { + // keep waiting for objects if the worker is alive, or fetch objects + // if the worker has any old ones left. + while (worker.isRunning() || worker.hasObjects()) { + try { + JSONObject obj = worker.getObject(); + processObject(obj); + } catch (InterruptedException ex) { + // interrupted, probably signalled to stop? + } + } + } + + private void processObject(JSONObject obj) { + try { + JSONObject user = obj.getJSONObject("user"); + resultListener.profileGenerated(user); + } catch (JSONException ex) { + // should not happen because the worker inserts tweets (which + // assumes that a tweet has a user member). + Logger.getLogger(getClass().getName()) + .severe("Expected a user in a tweet!"); + } + synchronized (resultListenerSync) { + if (resultListener != null) { + resultListener.tweetGenerated(obj); + } + } } } @@ -170,12 +229,13 @@ public class StreamImpl implements Stream { private final String keywords; private final HttpURLConnection connection; - private final AtomicBoolean running; + private volatile boolean running = true; + private final BlockingQueue receivedObjects; Worker(String keywords) throws IOException { this.keywords = keywords; this.connection = connect(keywords); - this.running = new AtomicBoolean(true); + this.receivedObjects = new LinkedBlockingQueue<>(); } /** @@ -258,7 +318,11 @@ public class StreamImpl implements Stream { public void stopWorker() { /* inform the worker to stop as soon as possible */ - running.set(false); + running = false; + } + + public boolean isRunning() { + return running; } private void parseMainLoop(InputStream is) throws IOException { @@ -267,7 +331,7 @@ public class StreamImpl implements Stream { InputStreamReader isr = new InputStreamReader(is, Charsets.UTF_8); BufferedReader reader = new BufferedReader(isr); JSONTokener jsonTokener = new JSONTokener(reader); - while (running.get()) { + while (running) { try { Object obj = jsonTokener.nextValue(); if (obj instanceof JSONObject) { @@ -282,8 +346,26 @@ public class StreamImpl implements Stream { } } + /** + * Detect tweets and queue them for processing. + * + * @param obj an object received at the stream. + */ private void processReceivedObject(JSONObject obj) { - System.out.println(obj); + // assume that tweets always have a user field + if (obj.has("user")) { + receivedObjects.offer(obj); + } else { + getLogger().warning("Received unknown object: " + obj); + } + } + + public boolean hasObjects() { + return !receivedObjects.isEmpty(); + } + + public JSONObject getObject() throws InterruptedException { + return receivedObjects.take(); } private Logger getLogger() { -- cgit v1.2.1