From 655369064fc63d781f819002918179c8b9d30c4b Mon Sep 17 00:00:00 2001 From: Peter Wu Date: Wed, 7 May 2014 10:36:05 +0200 Subject: Fix deadlock in Poller if worker thread stops/dies --- src/io/StreamImpl.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java index 84d9d83..61cdf59 100644 --- a/src/io/StreamImpl.java +++ b/src/io/StreamImpl.java @@ -237,6 +237,11 @@ public class StreamImpl implements Stream { while (worker.isRunning() || worker.hasObjects()) { try { JSONObject obj = worker.getObject(); + if (obj == worker.endOfStream) { + // end of objects marker + assert !worker.isRunning(); + break; + } processObject(obj); } catch (InterruptedException ex) { // interrupted, probably signalled to stop? @@ -269,6 +274,7 @@ public class StreamImpl implements Stream { private volatile boolean running = true; private final BlockingQueue receivedObjects; private InputStream inputStream; + private final JSONObject endOfStream; Worker(String keywords) throws IOException { this.keywords = keywords; @@ -283,6 +289,7 @@ public class StreamImpl implements Stream { throw ex; } this.receivedObjects = new LinkedBlockingQueue<>(); + this.endOfStream = new JSONObject(); } /** @@ -342,6 +349,7 @@ public class StreamImpl implements Stream { } catch (IOException ex) { run_error = ex; } finally { + running = false; IOUtils.closeQuietly(is); connection.disconnect(); if (run_error != null) { @@ -353,6 +361,8 @@ public class StreamImpl implements Stream { } } } + // mark end of stream, running must be false + receivedObjects.offer(endOfStream); } } -- cgit v1.2.1