summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-05-01 18:54:19 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-05-01 18:54:19 +0200
commit4fba2f23915a0566b521307de8abe2ee13f4cdad (patch)
treebd604fa576ddf2b7cab2c9036741fff4137eaaed
parent70bde08fb9201e85c16ff1bc6130fc0453b4d54e (diff)
downloadTwitterDataAnalytics-4fba2f23915a0566b521307de8abe2ee13f4cdad.tar.gz
New thread for notifying of new objects
* Use volatile boolean instead of AtomicBoolean. There will only be one one writer. * Store received objects in a queue instead of printing them. * Rename thread to clarify its I/O purpose. * Create new Thread for notifying observers of new tweets and users.
-rw-r--r--src/io/StreamImpl.java108
1 files changed, 95 insertions, 13 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java
index cba9fdd..fadd335 100644
--- a/src/io/StreamImpl.java
+++ b/src/io/StreamImpl.java
@@ -9,7 +9,8 @@ import java.net.URL;
import java.net.URLEncoder;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
import mining.Stream;
import oauth.signpost.exception.OAuthException;
@@ -38,6 +39,7 @@ public class StreamImpl implements Stream {
* Used for thread-safe modifications.
*/
private final Object listenerSync = new Object();
+ private final Object resultListenerSync = new Object();
/**
* The target that is interested in received tweets.
*/
@@ -67,7 +69,10 @@ public class StreamImpl implements Stream {
* @param resultListener
*/
public void setResultListener(ResultListener resultListener) {
- this.resultListener = resultListener;
+ // sync to avoid breakage when the poller thread sends a notification
+ synchronized (resultListenerSync) {
+ this.resultListener = resultListener;
+ }
}
public ResultListener getResultListener() {
@@ -126,10 +131,15 @@ public class StreamImpl implements Stream {
}
}
+ /**
+ * Groups an activated stream connection.
+ */
private class WorkerContainer {
private final Worker worker;
- private final Thread thread;
+ private final Poller poller;
+ private final Thread ioThread;
+ private final Thread pollerThread;
/**
* Holds a thread for the worker.
@@ -138,18 +148,27 @@ public class StreamImpl implements Stream {
*/
public WorkerContainer(Worker worker) {
this.worker = worker;
- thread = new Thread(worker);
+ this.poller = new Poller(worker);
+ ioThread = new Thread(worker);
+ // the poller is closely coupled with the I/O thread. If the I/O
+ // thread dies, no new messages will be added to the queue.
+ pollerThread = new Thread(poller);
}
public void start() {
- thread.start();
+ ioThread.start();
+ pollerThread.start();
}
public void finish() {
worker.stopWorker();
- while (thread.isAlive()) {
+ while (ioThread.isAlive()) {
try {
- thread.join();
+ ioThread.join();
+ // maybe the poller is stuck waiting for a new object, wake
+ // it up.
+ pollerThread.interrupt();
+ pollerThread.join();
} catch (InterruptedException ex) {
Logger.getLogger(getClass().getName())
.warning("Interrupted while waiting for stream finish");
@@ -162,7 +181,47 @@ public class StreamImpl implements Stream {
}
public boolean isValid() {
- return thread.isAlive();
+ return ioThread.isAlive();
+ }
+ }
+
+ private class Poller implements Runnable {
+
+ private final Worker worker;
+
+ private Poller(Worker worker) {
+ this.worker = worker;
+ }
+
+ @Override
+ public void run() {
+ // keep waiting for objects if the worker is alive, or fetch objects
+ // if the worker has any old ones left.
+ while (worker.isRunning() || worker.hasObjects()) {
+ try {
+ JSONObject obj = worker.getObject();
+ processObject(obj);
+ } catch (InterruptedException ex) {
+ // interrupted, probably signalled to stop?
+ }
+ }
+ }
+
+ private void processObject(JSONObject obj) {
+ try {
+ JSONObject user = obj.getJSONObject("user");
+ resultListener.profileGenerated(user);
+ } catch (JSONException ex) {
+ // should not happen because the worker inserts tweets (which
+ // assumes that a tweet has a user member).
+ Logger.getLogger(getClass().getName())
+ .severe("Expected a user in a tweet!");
+ }
+ synchronized (resultListenerSync) {
+ if (resultListener != null) {
+ resultListener.tweetGenerated(obj);
+ }
+ }
}
}
@@ -170,12 +229,13 @@ public class StreamImpl implements Stream {
private final String keywords;
private final HttpURLConnection connection;
- private final AtomicBoolean running;
+ private volatile boolean running = true;
+ private final BlockingQueue<JSONObject> receivedObjects;
Worker(String keywords) throws IOException {
this.keywords = keywords;
this.connection = connect(keywords);
- this.running = new AtomicBoolean(true);
+ this.receivedObjects = new LinkedBlockingQueue<>();
}
/**
@@ -258,7 +318,11 @@ public class StreamImpl implements Stream {
public void stopWorker() {
/* inform the worker to stop as soon as possible */
- running.set(false);
+ running = false;
+ }
+
+ public boolean isRunning() {
+ return running;
}
private void parseMainLoop(InputStream is) throws IOException {
@@ -267,7 +331,7 @@ public class StreamImpl implements Stream {
InputStreamReader isr = new InputStreamReader(is, Charsets.UTF_8);
BufferedReader reader = new BufferedReader(isr);
JSONTokener jsonTokener = new JSONTokener(reader);
- while (running.get()) {
+ while (running) {
try {
Object obj = jsonTokener.nextValue();
if (obj instanceof JSONObject) {
@@ -282,8 +346,26 @@ public class StreamImpl implements Stream {
}
}
+ /**
+ * Detect tweets and queue them for processing.
+ *
+ * @param obj an object received at the stream.
+ */
private void processReceivedObject(JSONObject obj) {
- System.out.println(obj);
+ // assume that tweets always have a user field
+ if (obj.has("user")) {
+ receivedObjects.offer(obj);
+ } else {
+ getLogger().warning("Received unknown object: " + obj);
+ }
+ }
+
+ public boolean hasObjects() {
+ return !receivedObjects.isEmpty();
+ }
+
+ public JSONObject getObject() throws InterruptedException {
+ return receivedObjects.take();
}
private Logger getLogger() {