From d56af15b3e343930f6674868c1d9be8a48f002ff Mon Sep 17 00:00:00 2001 From: Peter Wu Date: Wed, 7 May 2014 11:02:26 +0200 Subject: Allow commit to disconnect for empty keywords, detect dead connection --- src/io/StreamImpl.java | 12 ++++++------ src/main/TweetShell.java | 15 +++++++++++++++ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java index 5dd8981..749be1c 100644 --- a/src/io/StreamImpl.java +++ b/src/io/StreamImpl.java @@ -132,16 +132,15 @@ public class StreamImpl implements Stream { public void commit() throws IOException { String keywordsStr = StringUtils.join(keywords, ","); /* do not reconnect if a connection already exists for the keywords */ - if (workerContainer != null + if (isValid() && workerContainer.getWorker().getKeywords().equals(keywordsStr)) { return; } + disconnect(); // a query is required. if (keywords.isEmpty()) { return; } - /* connect (or reconnect) after setting new keywords */ - disconnect(); Worker worker = new Worker(keywordsStr); workerContainer = new WorkerContainer(worker); workerContainer.start(); @@ -218,7 +217,7 @@ public class StreamImpl implements Stream { } public boolean isValid() { - return ioThread.isAlive(); + return ioThread.isAlive() && worker.isRunning(); } } @@ -355,6 +354,9 @@ public class StreamImpl implements Stream { running = false; IOUtils.closeQuietly(is); connection.disconnect(); + // mark end of stream, running must be false + receivedObjects.offer(endOfStream); + // now that the worker is about to die, notify listener if (run_error != null) { // synchronize just in case the exception listener gets // modified while hell breaks lose. @@ -364,8 +366,6 @@ public class StreamImpl implements Stream { } } } - // mark end of stream, running must be false - receivedObjects.offer(endOfStream); } } diff --git a/src/main/TweetShell.java b/src/main/TweetShell.java index 2b3b078..608a15a 100644 --- a/src/main/TweetShell.java +++ b/src/main/TweetShell.java @@ -6,6 +6,7 @@ import io.OAuthRequester; import io.StreamImpl; import java.io.Closeable; import java.io.IOException; +import java.net.SocketTimeoutException; import java.util.Arrays; import java.util.Map; import java.util.NoSuchElementException; @@ -68,6 +69,20 @@ public class TweetShell implements TwitterApi.PinSupplier { @Override public void exceptionGenerated(Exception ex) { System.err.println("Stream closed due to " + ex); + /* Disabled because it deadlocks in WorkerContainer.finish + // to solve that if necessary, start commit in a new thread + if (ex instanceof SocketTimeoutException + || (ex instanceof IOException + && ex.getMessage().endsWith("Read timed out"))) { + try { + System.err.println("Trying to reconnect."); + getStream().commit(); + } catch (IOException ioex) { + System.err.println("Failed to re-connect: " + + ioex.getMessage()); + } + } + */ } } -- cgit v1.2.1