diff options
author | Peter Wu <peter@lekensteyn.nl> | 2014-05-07 10:36:05 +0200 |
---|---|---|
committer | Peter Wu <peter@lekensteyn.nl> | 2014-05-07 10:36:05 +0200 |
commit | 655369064fc63d781f819002918179c8b9d30c4b (patch) | |
tree | 1f636ae00fd8148d6e27ffa2af394062968693ee /src | |
parent | b9b2e9458ddfd6108685e198104d8c82b3ddc247 (diff) | |
download | TwitterDataAnalytics-655369064fc63d781f819002918179c8b9d30c4b.tar.gz |
Fix deadlock in Poller if worker thread stops/dies
Diffstat (limited to 'src')
-rw-r--r-- | src/io/StreamImpl.java | 10 |
1 files changed, 10 insertions, 0 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java index 84d9d83..61cdf59 100644 --- a/src/io/StreamImpl.java +++ b/src/io/StreamImpl.java @@ -237,6 +237,11 @@ public class StreamImpl implements Stream { while (worker.isRunning() || worker.hasObjects()) { try { JSONObject obj = worker.getObject(); + if (obj == worker.endOfStream) { + // end of objects marker + assert !worker.isRunning(); + break; + } processObject(obj); } catch (InterruptedException ex) { // interrupted, probably signalled to stop? @@ -269,6 +274,7 @@ public class StreamImpl implements Stream { private volatile boolean running = true; private final BlockingQueue<JSONObject> receivedObjects; private InputStream inputStream; + private final JSONObject endOfStream; Worker(String keywords) throws IOException { this.keywords = keywords; @@ -283,6 +289,7 @@ public class StreamImpl implements Stream { throw ex; } this.receivedObjects = new LinkedBlockingQueue<>(); + this.endOfStream = new JSONObject(); } /** @@ -342,6 +349,7 @@ public class StreamImpl implements Stream { } catch (IOException ex) { run_error = ex; } finally { + running = false; IOUtils.closeQuietly(is); connection.disconnect(); if (run_error != null) { @@ -353,6 +361,8 @@ public class StreamImpl implements Stream { } } } + // mark end of stream, running must be false + receivedObjects.offer(endOfStream); } } |