summaryrefslogtreecommitdiff
path: root/src/io/StreamImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/io/StreamImpl.java')
-rw-r--r--src/io/StreamImpl.java40
1 files changed, 23 insertions, 17 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java
index a222bd8..b2f6b0c 100644
--- a/src/io/StreamImpl.java
+++ b/src/io/StreamImpl.java
@@ -1,5 +1,9 @@
package io;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@@ -13,15 +17,13 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
import java.util.logging.Logger;
import mining.Stream;
import oauth.signpost.exception.OAuthException;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
import provider.ExceptionListener;
import provider.ResultListener;
import support.StreamingGZIPInputStream;
@@ -236,7 +238,7 @@ public class StreamImpl implements Stream {
// if the worker has any old ones left.
while (worker.isRunning() || worker.hasObjects()) {
try {
- JSONObject obj = worker.getObject();
+ JsonObject obj = worker.getObject();
if (obj == worker.endOfStream) {
// end of objects marker
assert !worker.isRunning();
@@ -249,7 +251,7 @@ public class StreamImpl implements Stream {
}
}
- private void processObject(JSONObject obj) {
+ private void processObject(JsonObject obj) {
synchronized (resultListenerSync) {
if (resultListener != null) {
resultListener.tweetGenerated(obj);
@@ -263,9 +265,9 @@ public class StreamImpl implements Stream {
private final String keywords;
private final HttpURLConnection connection;
private volatile boolean running = true;
- private final BlockingQueue<JSONObject> receivedObjects;
+ private final BlockingQueue<JsonObject> receivedObjects;
private InputStream inputStream;
- private final JSONObject endOfStream;
+ private final JsonObject endOfStream;
Worker(String keywords) throws IOException {
this.keywords = keywords;
@@ -280,7 +282,7 @@ public class StreamImpl implements Stream {
throw ex;
}
this.receivedObjects = new LinkedBlockingQueue<>();
- this.endOfStream = new JSONObject();
+ this.endOfStream = new JsonObject();
}
/**
@@ -382,17 +384,21 @@ public class StreamImpl implements Stream {
// See https://dev.twitter.com/docs/streaming-apis/messages
InputStreamReader isr = new InputStreamReader(is, Charsets.UTF_8);
BufferedReader reader = new BufferedReader(isr);
- JSONTokener jsonTokener = new JSONTokener(reader);
+ JsonParser parser = new JsonParser();
while (running) {
try {
- Object obj = jsonTokener.nextValue();
- if (obj instanceof JSONObject) {
- processReceivedObject((JSONObject) obj);
+ String line = reader.readLine();
+ if (line.isEmpty()) {
+ continue;
+ }
+ JsonElement elem = parser.parse(line);
+ if (elem.isJsonObject()) {
+ processReceivedObject(elem.getAsJsonObject());
} else {
- getLogger().severe("Got unexpected object: " + obj);
+ getLogger().log(Level.SEVERE, "Got unexpected object: {0}", elem);
throw new IOException("Got unexpected type from stream");
}
- } catch (JSONException ex) {
+ } catch (JsonParseException ex) {
// ignore IO errors for a stop request ("Socket closed")
if (running) {
throw new IOException(ex);
@@ -406,12 +412,12 @@ public class StreamImpl implements Stream {
*
* @param obj an object received at the stream.
*/
- private void processReceivedObject(JSONObject obj) {
+ private void processReceivedObject(JsonObject obj) {
// assume that tweets always have a user field
if (obj.has("user")) {
receivedObjects.offer(obj);
} else {
- getLogger().warning("Received unknown object: " + obj);
+ getLogger().log(Level.WARNING, "Received unknown object: {0}", obj);
}
}
@@ -419,7 +425,7 @@ public class StreamImpl implements Stream {
return !receivedObjects.isEmpty();
}
- public JSONObject getObject() throws InterruptedException {
+ public JsonObject getObject() throws InterruptedException {
return receivedObjects.take();
}