summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-04-30 20:08:28 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-04-30 20:08:28 +0200
commitc72c5c10047304ba6961211307a996c9e4be6994 (patch)
tree4a98dfe838cdc8de8e35c261e8566a2431ab43a2
parentacae5d7b9ef4640c1ee5217d09c83143b16b1b2c (diff)
downloadTwitterDataAnalytics-c72c5c10047304ba6961211307a996c9e4be6994.tar.gz
Use wrapped gzipinputstream from twitter
-rw-r--r--src/io/StreamImpl.java4
-rw-r--r--src/support/StreamingGZIPInputStream.java36
2 files changed, 38 insertions, 2 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java
index b6be055..6d6ba64 100644
--- a/src/io/StreamImpl.java
+++ b/src/io/StreamImpl.java
@@ -11,7 +11,6 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
-import java.util.zip.GZIPInputStream;
import mining.Stream;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang.StringUtils;
@@ -20,6 +19,7 @@ import org.json.JSONObject;
import org.json.JSONTokener;
import provider.ExceptionListener;
import provider.ResultListener;
+import support.StreamingGZIPInputStream;
/**
* Provides access to Twitter stream data. Data is processed in a separate
@@ -188,7 +188,7 @@ public class StreamImpl implements Stream {
*/
private InputStream wrapGzip(InputStream is) throws IOException {
if ("gzip".equals(connection.getContentEncoding())) {
- return new GZIPInputStream(is);
+ return new StreamingGZIPInputStream(is);
}
return is;
}
diff --git a/src/support/StreamingGZIPInputStream.java b/src/support/StreamingGZIPInputStream.java
new file mode 100644
index 0000000..036df5f
--- /dev/null
+++ b/src/support/StreamingGZIPInputStream.java
@@ -0,0 +1,36 @@
+package support;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * Implementation that immediately returns uncompressed data available from an
+ * input stream. Adapted from
+ * https://dev.twitter.com/docs/streaming-apis/processing.
+ */
+public class StreamingGZIPInputStream extends GZIPInputStream {
+
+ private final InputStream wrapped;
+
+ public StreamingGZIPInputStream(InputStream is) throws IOException {
+ super(is);
+ wrapped = is;
+ }
+
+ /**
+ * Overrides behavior of GZIPInputStream which assumes we have all the data
+ * available which is not true for streaming. We instead rely on the
+ * underlying stream to tell us how much data is available.
+ *
+ * Programs should not count on this method to return the actual number of
+ * bytes that could be read without blocking.
+ *
+ * @return whatever the wrapped InputStream returns.
+ * @exception IOException if an I/O error occurs.
+ */
+ @Override
+ public int available() throws IOException {
+ return wrapped.available();
+ }
+}