summaryrefslogtreecommitdiff
path: root/src/io/StreamImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/io/StreamImpl.java')
-rw-r--r--src/io/StreamImpl.java12
1 files changed, 6 insertions, 6 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java
index 5dd8981..749be1c 100644
--- a/src/io/StreamImpl.java
+++ b/src/io/StreamImpl.java
@@ -132,16 +132,15 @@ public class StreamImpl implements Stream {
public void commit() throws IOException {
String keywordsStr = StringUtils.join(keywords, ",");
/* do not reconnect if a connection already exists for the keywords */
- if (workerContainer != null
+ if (isValid()
&& workerContainer.getWorker().getKeywords().equals(keywordsStr)) {
return;
}
+ disconnect();
// a query is required.
if (keywords.isEmpty()) {
return;
}
- /* connect (or reconnect) after setting new keywords */
- disconnect();
Worker worker = new Worker(keywordsStr);
workerContainer = new WorkerContainer(worker);
workerContainer.start();
@@ -218,7 +217,7 @@ public class StreamImpl implements Stream {
}
public boolean isValid() {
- return ioThread.isAlive();
+ return ioThread.isAlive() && worker.isRunning();
}
}
@@ -355,6 +354,9 @@ public class StreamImpl implements Stream {
running = false;
IOUtils.closeQuietly(is);
connection.disconnect();
+ // mark end of stream, running must be false
+ receivedObjects.offer(endOfStream);
+ // now that the worker is about to die, notify listener
if (run_error != null) {
// synchronize just in case the exception listener gets
// modified while hell breaks lose.
@@ -364,8 +366,6 @@ public class StreamImpl implements Stream {
}
}
}
- // mark end of stream, running must be false
- receivedObjects.offer(endOfStream);
}
}