diff options
author | Peter Wu <peter@lekensteyn.nl> | 2014-05-07 11:02:26 +0200 |
---|---|---|
committer | Peter Wu <peter@lekensteyn.nl> | 2014-05-07 11:02:26 +0200 |
commit | d56af15b3e343930f6674868c1d9be8a48f002ff (patch) | |
tree | 05d92dbff120785a8b83475f2f238cf8ae786f3e | |
parent | 30ed4171fa09923d9b3c0365a3f1a4bc7a1658b0 (diff) | |
download | TwitterDataAnalytics-d56af15b3e343930f6674868c1d9be8a48f002ff.tar.gz |
Allow commit to disconnect for empty keywords, detect dead connection
-rw-r--r-- | src/io/StreamImpl.java | 12 | ||||
-rw-r--r-- | src/main/TweetShell.java | 15 |
2 files changed, 21 insertions, 6 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java index 5dd8981..749be1c 100644 --- a/src/io/StreamImpl.java +++ b/src/io/StreamImpl.java @@ -132,16 +132,15 @@ public class StreamImpl implements Stream { public void commit() throws IOException { String keywordsStr = StringUtils.join(keywords, ","); /* do not reconnect if a connection already exists for the keywords */ - if (workerContainer != null + if (isValid() && workerContainer.getWorker().getKeywords().equals(keywordsStr)) { return; } + disconnect(); // a query is required. if (keywords.isEmpty()) { return; } - /* connect (or reconnect) after setting new keywords */ - disconnect(); Worker worker = new Worker(keywordsStr); workerContainer = new WorkerContainer(worker); workerContainer.start(); @@ -218,7 +217,7 @@ public class StreamImpl implements Stream { } public boolean isValid() { - return ioThread.isAlive(); + return ioThread.isAlive() && worker.isRunning(); } } @@ -355,6 +354,9 @@ public class StreamImpl implements Stream { running = false; IOUtils.closeQuietly(is); connection.disconnect(); + // mark end of stream, running must be false + receivedObjects.offer(endOfStream); + // now that the worker is about to die, notify listener if (run_error != null) { // synchronize just in case the exception listener gets // modified while hell breaks lose. @@ -364,8 +366,6 @@ public class StreamImpl implements Stream { } } } - // mark end of stream, running must be false - receivedObjects.offer(endOfStream); } } diff --git a/src/main/TweetShell.java b/src/main/TweetShell.java index 2b3b078..608a15a 100644 --- a/src/main/TweetShell.java +++ b/src/main/TweetShell.java @@ -6,6 +6,7 @@ import io.OAuthRequester; import io.StreamImpl; import java.io.Closeable; import java.io.IOException; +import java.net.SocketTimeoutException; import java.util.Arrays; import java.util.Map; import java.util.NoSuchElementException; @@ -68,6 +69,20 @@ public class TweetShell implements TwitterApi.PinSupplier { @Override public void exceptionGenerated(Exception ex) { System.err.println("Stream closed due to " + ex); + /* Disabled because it deadlocks in WorkerContainer.finish + // to solve that if necessary, start commit in a new thread + if (ex instanceof SocketTimeoutException + || (ex instanceof IOException + && ex.getMessage().endsWith("Read timed out"))) { + try { + System.err.println("Trying to reconnect."); + getStream().commit(); + } catch (IOException ioex) { + System.err.println("Failed to re-connect: " + + ioex.getMessage()); + } + } + */ } } |