diff options
author | Peter Wu <peter@lekensteyn.nl> | 2014-05-02 00:01:19 +0200 |
---|---|---|
committer | Peter Wu <peter@lekensteyn.nl> | 2014-05-02 00:01:19 +0200 |
commit | d18699ca5673d2ac81c70e3ff367dd332f5de34d (patch) | |
tree | 7ba8bb218eb0b032426060da62c5ba998f98d938 | |
parent | d68b9ac03cf2dbaf55d1e848c9df0a4b5957fc4e (diff) | |
download | TwitterDataAnalytics-d18699ca5673d2ac81c70e3ff367dd332f5de34d.tar.gz |
Avoid hang during close
Apparently the socket operations are blocking in a way such that even
interrupt() cannot stop it. Close the socket (which will generate an
IOException masked as JSONException) to immediately close the socket.
Also do not leak a socket if an error occurred in the Worker
constructor (when a Stream connection is set up).
-rw-r--r-- | src/io/StreamImpl.java | 33 |
1 files changed, 29 insertions, 4 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java index 426d1fa..27b07c2 100644 --- a/src/io/StreamImpl.java +++ b/src/io/StreamImpl.java @@ -7,6 +7,7 @@ import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -246,10 +247,25 @@ public class StreamImpl implements Stream { private final HttpURLConnection connection; private volatile boolean running = true; private final BlockingQueue<JSONObject> receivedObjects; + private InputStream inputStream; Worker(String keywords) throws IOException { this.keywords = keywords; - this.connection = connect(keywords); + try { + this.connection = connect(keywords); + this.inputStream = this.connection.getInputStream(); + } catch (IOException ex) { + if (this.inputStream != null) { + try { + this.inputStream.close(); + } catch (IOException ioex) { + } + } + if (this.connection != null) { + this.connection.disconnect(); + } + throw ex; + } this.receivedObjects = new LinkedBlockingQueue<>(); } @@ -303,10 +319,9 @@ public class StreamImpl implements Stream { @Override public void run() { - InputStream is = null; + InputStream is = inputStream; IOException run_error = null; try { - is = connection.getInputStream(); parseMainLoop(wrapGzip(is)); } catch (IOException ex) { run_error = ex; @@ -334,6 +349,13 @@ public class StreamImpl implements Stream { public void stopWorker() { /* inform the worker to stop as soon as possible */ running = false; + try { + inputStream.close(); + } catch (IOException ex) { + getLogger().warning("Error while closing stream: " + + ex.getMessage()); + } + connection.disconnect(); } public boolean isRunning() { @@ -356,7 +378,10 @@ public class StreamImpl implements Stream { throw new IOException("Got unexpected type from stream"); } } catch (JSONException ex) { - throw new IOException(ex); + // ignore IO errors for a stop request ("Socket closed") + if (running) { + throw new IOException(ex); + } } } } |