summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-05-07 11:02:26 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-05-07 11:02:26 +0200
commitd56af15b3e343930f6674868c1d9be8a48f002ff (patch)
tree05d92dbff120785a8b83475f2f238cf8ae786f3e
parent30ed4171fa09923d9b3c0365a3f1a4bc7a1658b0 (diff)
downloadTwitterDataAnalytics-d56af15b3e343930f6674868c1d9be8a48f002ff.tar.gz
Allow commit to disconnect for empty keywords, detect dead connection
-rw-r--r--src/io/StreamImpl.java12
-rw-r--r--src/main/TweetShell.java15
2 files changed, 21 insertions, 6 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java
index 5dd8981..749be1c 100644
--- a/src/io/StreamImpl.java
+++ b/src/io/StreamImpl.java
@@ -132,16 +132,15 @@ public class StreamImpl implements Stream {
public void commit() throws IOException {
String keywordsStr = StringUtils.join(keywords, ",");
/* do not reconnect if a connection already exists for the keywords */
- if (workerContainer != null
+ if (isValid()
&& workerContainer.getWorker().getKeywords().equals(keywordsStr)) {
return;
}
+ disconnect();
// a query is required.
if (keywords.isEmpty()) {
return;
}
- /* connect (or reconnect) after setting new keywords */
- disconnect();
Worker worker = new Worker(keywordsStr);
workerContainer = new WorkerContainer(worker);
workerContainer.start();
@@ -218,7 +217,7 @@ public class StreamImpl implements Stream {
}
public boolean isValid() {
- return ioThread.isAlive();
+ return ioThread.isAlive() && worker.isRunning();
}
}
@@ -355,6 +354,9 @@ public class StreamImpl implements Stream {
running = false;
IOUtils.closeQuietly(is);
connection.disconnect();
+ // mark end of stream, running must be false
+ receivedObjects.offer(endOfStream);
+ // now that the worker is about to die, notify listener
if (run_error != null) {
// synchronize just in case the exception listener gets
// modified while hell breaks lose.
@@ -364,8 +366,6 @@ public class StreamImpl implements Stream {
}
}
}
- // mark end of stream, running must be false
- receivedObjects.offer(endOfStream);
}
}
diff --git a/src/main/TweetShell.java b/src/main/TweetShell.java
index 2b3b078..608a15a 100644
--- a/src/main/TweetShell.java
+++ b/src/main/TweetShell.java
@@ -6,6 +6,7 @@ import io.OAuthRequester;
import io.StreamImpl;
import java.io.Closeable;
import java.io.IOException;
+import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -68,6 +69,20 @@ public class TweetShell implements TwitterApi.PinSupplier {
@Override
public void exceptionGenerated(Exception ex) {
System.err.println("Stream closed due to " + ex);
+ /* Disabled because it deadlocks in WorkerContainer.finish
+ // to solve that if necessary, start commit in a new thread
+ if (ex instanceof SocketTimeoutException
+ || (ex instanceof IOException
+ && ex.getMessage().endsWith("Read timed out"))) {
+ try {
+ System.err.println("Trying to reconnect.");
+ getStream().commit();
+ } catch (IOException ioex) {
+ System.err.println("Failed to re-connect: "
+ + ioex.getMessage());
+ }
+ }
+ */
}
}