diff options
Diffstat (limited to 'src/io/StreamImpl.java')
-rw-r--r-- | src/io/StreamImpl.java | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java index 5dd8981..749be1c 100644 --- a/src/io/StreamImpl.java +++ b/src/io/StreamImpl.java @@ -132,16 +132,15 @@ public class StreamImpl implements Stream { public void commit() throws IOException { String keywordsStr = StringUtils.join(keywords, ","); /* do not reconnect if a connection already exists for the keywords */ - if (workerContainer != null + if (isValid() && workerContainer.getWorker().getKeywords().equals(keywordsStr)) { return; } + disconnect(); // a query is required. if (keywords.isEmpty()) { return; } - /* connect (or reconnect) after setting new keywords */ - disconnect(); Worker worker = new Worker(keywordsStr); workerContainer = new WorkerContainer(worker); workerContainer.start(); @@ -218,7 +217,7 @@ public class StreamImpl implements Stream { } public boolean isValid() { - return ioThread.isAlive(); + return ioThread.isAlive() && worker.isRunning(); } } @@ -355,6 +354,9 @@ public class StreamImpl implements Stream { running = false; IOUtils.closeQuietly(is); connection.disconnect(); + // mark end of stream, running must be false + receivedObjects.offer(endOfStream); + // now that the worker is about to die, notify listener if (run_error != null) { // synchronize just in case the exception listener gets // modified while hell breaks lose. @@ -364,8 +366,6 @@ public class StreamImpl implements Stream { } } } - // mark end of stream, running must be false - receivedObjects.offer(endOfStream); } } |