diff options
-rw-r--r-- | src/io/StreamImpl.java | 10 |
1 files changed, 10 insertions, 0 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java index 84d9d83..61cdf59 100644 --- a/src/io/StreamImpl.java +++ b/src/io/StreamImpl.java @@ -237,6 +237,11 @@ public class StreamImpl implements Stream { while (worker.isRunning() || worker.hasObjects()) { try { JSONObject obj = worker.getObject(); + if (obj == worker.endOfStream) { + // end of objects marker + assert !worker.isRunning(); + break; + } processObject(obj); } catch (InterruptedException ex) { // interrupted, probably signalled to stop? @@ -269,6 +274,7 @@ public class StreamImpl implements Stream { private volatile boolean running = true; private final BlockingQueue<JSONObject> receivedObjects; private InputStream inputStream; + private final JSONObject endOfStream; Worker(String keywords) throws IOException { this.keywords = keywords; @@ -283,6 +289,7 @@ public class StreamImpl implements Stream { throw ex; } this.receivedObjects = new LinkedBlockingQueue<>(); + this.endOfStream = new JSONObject(); } /** @@ -342,6 +349,7 @@ public class StreamImpl implements Stream { } catch (IOException ex) { run_error = ex; } finally { + running = false; IOUtils.closeQuietly(is); connection.disconnect(); if (run_error != null) { @@ -353,6 +361,8 @@ public class StreamImpl implements Stream { } } } + // mark end of stream, running must be false + receivedObjects.offer(endOfStream); } } |