summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-05-07 10:36:05 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-05-07 10:36:05 +0200
commit655369064fc63d781f819002918179c8b9d30c4b (patch)
tree1f636ae00fd8148d6e27ffa2af394062968693ee
parentb9b2e9458ddfd6108685e198104d8c82b3ddc247 (diff)
downloadTwitterDataAnalytics-655369064fc63d781f819002918179c8b9d30c4b.tar.gz
Fix deadlock in Poller if worker thread stops/dies
-rw-r--r--src/io/StreamImpl.java10
1 files changed, 10 insertions, 0 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java
index 84d9d83..61cdf59 100644
--- a/src/io/StreamImpl.java
+++ b/src/io/StreamImpl.java
@@ -237,6 +237,11 @@ public class StreamImpl implements Stream {
while (worker.isRunning() || worker.hasObjects()) {
try {
JSONObject obj = worker.getObject();
+ if (obj == worker.endOfStream) {
+ // end of objects marker
+ assert !worker.isRunning();
+ break;
+ }
processObject(obj);
} catch (InterruptedException ex) {
// interrupted, probably signalled to stop?
@@ -269,6 +274,7 @@ public class StreamImpl implements Stream {
private volatile boolean running = true;
private final BlockingQueue<JSONObject> receivedObjects;
private InputStream inputStream;
+ private final JSONObject endOfStream;
Worker(String keywords) throws IOException {
this.keywords = keywords;
@@ -283,6 +289,7 @@ public class StreamImpl implements Stream {
throw ex;
}
this.receivedObjects = new LinkedBlockingQueue<>();
+ this.endOfStream = new JSONObject();
}
/**
@@ -342,6 +349,7 @@ public class StreamImpl implements Stream {
} catch (IOException ex) {
run_error = ex;
} finally {
+ running = false;
IOUtils.closeQuietly(is);
connection.disconnect();
if (run_error != null) {
@@ -353,6 +361,8 @@ public class StreamImpl implements Stream {
}
}
}
+ // mark end of stream, running must be false
+ receivedObjects.offer(endOfStream);
}
}