summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-05-02 00:01:19 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-05-02 00:01:19 +0200
commitd18699ca5673d2ac81c70e3ff367dd332f5de34d (patch)
tree7ba8bb218eb0b032426060da62c5ba998f98d938
parentd68b9ac03cf2dbaf55d1e848c9df0a4b5957fc4e (diff)
downloadTwitterDataAnalytics-d18699ca5673d2ac81c70e3ff367dd332f5de34d.tar.gz
Avoid hang during close
Apparently the socket operations are blocking in a way such that even interrupt() cannot stop it. Close the socket (which will generate an IOException masked as JSONException) to immediately close the socket. Also do not leak a socket if an error occurred in the Worker constructor (when a Stream connection is set up).
-rw-r--r--src/io/StreamImpl.java33
1 files changed, 29 insertions, 4 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java
index 426d1fa..27b07c2 100644
--- a/src/io/StreamImpl.java
+++ b/src/io/StreamImpl.java
@@ -7,6 +7,7 @@ import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -246,10 +247,25 @@ public class StreamImpl implements Stream {
private final HttpURLConnection connection;
private volatile boolean running = true;
private final BlockingQueue<JSONObject> receivedObjects;
+ private InputStream inputStream;
Worker(String keywords) throws IOException {
this.keywords = keywords;
- this.connection = connect(keywords);
+ try {
+ this.connection = connect(keywords);
+ this.inputStream = this.connection.getInputStream();
+ } catch (IOException ex) {
+ if (this.inputStream != null) {
+ try {
+ this.inputStream.close();
+ } catch (IOException ioex) {
+ }
+ }
+ if (this.connection != null) {
+ this.connection.disconnect();
+ }
+ throw ex;
+ }
this.receivedObjects = new LinkedBlockingQueue<>();
}
@@ -303,10 +319,9 @@ public class StreamImpl implements Stream {
@Override
public void run() {
- InputStream is = null;
+ InputStream is = inputStream;
IOException run_error = null;
try {
- is = connection.getInputStream();
parseMainLoop(wrapGzip(is));
} catch (IOException ex) {
run_error = ex;
@@ -334,6 +349,13 @@ public class StreamImpl implements Stream {
public void stopWorker() {
/* inform the worker to stop as soon as possible */
running = false;
+ try {
+ inputStream.close();
+ } catch (IOException ex) {
+ getLogger().warning("Error while closing stream: "
+ + ex.getMessage());
+ }
+ connection.disconnect();
}
public boolean isRunning() {
@@ -356,7 +378,10 @@ public class StreamImpl implements Stream {
throw new IOException("Got unexpected type from stream");
}
} catch (JSONException ex) {
- throw new IOException(ex);
+ // ignore IO errors for a stop request ("Socket closed")
+ if (running) {
+ throw new IOException(ex);
+ }
}
}
}