diff options
author | Maurice Laveaux <m.laveaux@student.tue.nl> | 2014-05-22 15:41:54 +0200 |
---|---|---|
committer | Maurice Laveaux <m.laveaux@student.tue.nl> | 2014-05-22 15:41:54 +0200 |
commit | ad6bb6d0d2da264656c5de0e51297997374a41d6 (patch) | |
tree | 90b988b6e63a60fdf60a7ed467f2130c31c619ad /src/io/StreamImpl.java | |
parent | 2b2a47a0086eab52b09e5425e75cafb488e51074 (diff) | |
download | TwitterDataAnalytics-ad6bb6d0d2da264656c5de0e51297997374a41d6.tar.gz |
Refactored JSON to GSON's implementation.
* Changed JSONException to JsonParseException.
Diffstat (limited to 'src/io/StreamImpl.java')
-rw-r--r-- | src/io/StreamImpl.java | 40 |
1 files changed, 23 insertions, 17 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java index a222bd8..b2f6b0c 100644 --- a/src/io/StreamImpl.java +++ b/src/io/StreamImpl.java @@ -1,5 +1,9 @@ package io; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -13,15 +17,13 @@ import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Level; import java.util.logging.Logger; import mining.Stream; import oauth.signpost.exception.OAuthException; import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; -import org.json.JSONException; -import org.json.JSONObject; -import org.json.JSONTokener; import provider.ExceptionListener; import provider.ResultListener; import support.StreamingGZIPInputStream; @@ -236,7 +238,7 @@ public class StreamImpl implements Stream { // if the worker has any old ones left. while (worker.isRunning() || worker.hasObjects()) { try { - JSONObject obj = worker.getObject(); + JsonObject obj = worker.getObject(); if (obj == worker.endOfStream) { // end of objects marker assert !worker.isRunning(); @@ -249,7 +251,7 @@ public class StreamImpl implements Stream { } } - private void processObject(JSONObject obj) { + private void processObject(JsonObject obj) { synchronized (resultListenerSync) { if (resultListener != null) { resultListener.tweetGenerated(obj); @@ -263,9 +265,9 @@ public class StreamImpl implements Stream { private final String keywords; private final HttpURLConnection connection; private volatile boolean running = true; - private final BlockingQueue<JSONObject> receivedObjects; + private final BlockingQueue<JsonObject> receivedObjects; private InputStream inputStream; - private final JSONObject endOfStream; + private final JsonObject endOfStream; Worker(String keywords) throws IOException { this.keywords = keywords; @@ -280,7 +282,7 @@ public class StreamImpl implements Stream { throw ex; } this.receivedObjects = new LinkedBlockingQueue<>(); - this.endOfStream = new JSONObject(); + this.endOfStream = new JsonObject(); } /** @@ -382,17 +384,21 @@ public class StreamImpl implements Stream { // See https://dev.twitter.com/docs/streaming-apis/messages InputStreamReader isr = new InputStreamReader(is, Charsets.UTF_8); BufferedReader reader = new BufferedReader(isr); - JSONTokener jsonTokener = new JSONTokener(reader); + JsonParser parser = new JsonParser(); while (running) { try { - Object obj = jsonTokener.nextValue(); - if (obj instanceof JSONObject) { - processReceivedObject((JSONObject) obj); + String line = reader.readLine(); + if (line.isEmpty()) { + continue; + } + JsonElement elem = parser.parse(line); + if (elem.isJsonObject()) { + processReceivedObject(elem.getAsJsonObject()); } else { - getLogger().severe("Got unexpected object: " + obj); + getLogger().log(Level.SEVERE, "Got unexpected object: {0}", elem); throw new IOException("Got unexpected type from stream"); } - } catch (JSONException ex) { + } catch (JsonParseException ex) { // ignore IO errors for a stop request ("Socket closed") if (running) { throw new IOException(ex); @@ -406,12 +412,12 @@ public class StreamImpl implements Stream { * * @param obj an object received at the stream. */ - private void processReceivedObject(JSONObject obj) { + private void processReceivedObject(JsonObject obj) { // assume that tweets always have a user field if (obj.has("user")) { receivedObjects.offer(obj); } else { - getLogger().warning("Received unknown object: " + obj); + getLogger().log(Level.WARNING, "Received unknown object: {0}", obj); } } @@ -419,7 +425,7 @@ public class StreamImpl implements Stream { return !receivedObjects.isEmpty(); } - public JSONObject getObject() throws InterruptedException { + public JsonObject getObject() throws InterruptedException { return receivedObjects.take(); } |