diff options
Diffstat (limited to 'src/io')
-rw-r--r-- | src/io/DataWriter.java | 42 | ||||
-rw-r--r-- | src/io/StreamImpl.java | 40 |
2 files changed, 46 insertions, 36 deletions
diff --git a/src/io/DataWriter.java b/src/io/DataWriter.java index c1e1ce9..f3d4006 100644 --- a/src/io/DataWriter.java +++ b/src/io/DataWriter.java @@ -1,5 +1,8 @@ package io; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; import java.io.BufferedReader; import java.io.Closeable; import java.io.FileInputStream; @@ -81,7 +84,7 @@ public class DataWriter implements ResultListener, Closeable, Flushable { } @Override - public void tweetGenerated(JSONObject obj) { + public void tweetGenerated(JsonObject obj) { try { // ensure that the file is open m_tweet.open(); @@ -105,19 +108,24 @@ public class DataWriter implements ResultListener, Closeable, Flushable { try { is = store.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(is)); + JsonParser jsonParser = new JsonParser(); // parse each line into a JSONObject, read the id and add it to // the set of ids. while ((line = reader.readLine()) != null) { - JSONObject obj = new JSONObject(line); - long id = obj.getLong("id"); - idSet.add(id); - lineno++; + try { + JsonObject obj = jsonParser.parse(line).getAsJsonObject(); + long id = obj.get("id").getAsLong(); + idSet.add(id); + lineno++; + } catch (JsonParseException ex) { + getLogger().log(Level.WARNING, "Tweet found without an id: {0}", ex); + } } } catch (FileNotFoundException ex) { // ignore, file will be created if necessary. - } catch (JSONException | IOException ex) { + } catch (IOException ex) { if (line != null) { - getLogger().log(Level.INFO, "Last line: " + line); + getLogger().log(Level.INFO, "Last line: {0}", line); } getLogger().log(Level.WARNING, store.getFileName() + ": error occurred in file at line " + lineno, ex); @@ -134,21 +142,17 @@ public class DataWriter implements ResultListener, Closeable, Flushable { * @param output The stream to write objects to. * @param idSet The id set to add the obj id to. */ - private void writeObject(JSONObject obj, OutputStream output, + private void writeObject(JsonObject obj, OutputStream output, Set<Long> idSet) { - try { - long id = obj.getLong("id"); + long id = obj.get("id").getAsLong(); - if (!idSet.contains(id)) { - try { - output.write((obj.toString() + "\n").getBytes(Charsets.UTF_8)); - idSet.add(id); - } catch (IOException ex) { - getLogger().log(Level.WARNING, "Cannot write to file", ex); - } + if (!idSet.contains(id)) { + try { + output.write((obj.toString() + "\n").getBytes(Charsets.UTF_8)); + idSet.add(id); + } catch (IOException ex) { + getLogger().log(Level.WARNING, "Cannot write to file", ex); } - } catch (JSONException ex) { - getLogger().log(Level.WARNING, "ID not found?!", ex); } } diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java index a222bd8..b2f6b0c 100644 --- a/src/io/StreamImpl.java +++ b/src/io/StreamImpl.java @@ -1,5 +1,9 @@ package io; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -13,15 +17,13 @@ import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Level; import java.util.logging.Logger; import mining.Stream; import oauth.signpost.exception.OAuthException; import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; -import org.json.JSONException; -import org.json.JSONObject; -import org.json.JSONTokener; import provider.ExceptionListener; import provider.ResultListener; import support.StreamingGZIPInputStream; @@ -236,7 +238,7 @@ public class StreamImpl implements Stream { // if the worker has any old ones left. while (worker.isRunning() || worker.hasObjects()) { try { - JSONObject obj = worker.getObject(); + JsonObject obj = worker.getObject(); if (obj == worker.endOfStream) { // end of objects marker assert !worker.isRunning(); @@ -249,7 +251,7 @@ public class StreamImpl implements Stream { } } - private void processObject(JSONObject obj) { + private void processObject(JsonObject obj) { synchronized (resultListenerSync) { if (resultListener != null) { resultListener.tweetGenerated(obj); @@ -263,9 +265,9 @@ public class StreamImpl implements Stream { private final String keywords; private final HttpURLConnection connection; private volatile boolean running = true; - private final BlockingQueue<JSONObject> receivedObjects; + private final BlockingQueue<JsonObject> receivedObjects; private InputStream inputStream; - private final JSONObject endOfStream; + private final JsonObject endOfStream; Worker(String keywords) throws IOException { this.keywords = keywords; @@ -280,7 +282,7 @@ public class StreamImpl implements Stream { throw ex; } this.receivedObjects = new LinkedBlockingQueue<>(); - this.endOfStream = new JSONObject(); + this.endOfStream = new JsonObject(); } /** @@ -382,17 +384,21 @@ public class StreamImpl implements Stream { // See https://dev.twitter.com/docs/streaming-apis/messages InputStreamReader isr = new InputStreamReader(is, Charsets.UTF_8); BufferedReader reader = new BufferedReader(isr); - JSONTokener jsonTokener = new JSONTokener(reader); + JsonParser parser = new JsonParser(); while (running) { try { - Object obj = jsonTokener.nextValue(); - if (obj instanceof JSONObject) { - processReceivedObject((JSONObject) obj); + String line = reader.readLine(); + if (line.isEmpty()) { + continue; + } + JsonElement elem = parser.parse(line); + if (elem.isJsonObject()) { + processReceivedObject(elem.getAsJsonObject()); } else { - getLogger().severe("Got unexpected object: " + obj); + getLogger().log(Level.SEVERE, "Got unexpected object: {0}", elem); throw new IOException("Got unexpected type from stream"); } - } catch (JSONException ex) { + } catch (JsonParseException ex) { // ignore IO errors for a stop request ("Socket closed") if (running) { throw new IOException(ex); @@ -406,12 +412,12 @@ public class StreamImpl implements Stream { * * @param obj an object received at the stream. */ - private void processReceivedObject(JSONObject obj) { + private void processReceivedObject(JsonObject obj) { // assume that tweets always have a user field if (obj.has("user")) { receivedObjects.offer(obj); } else { - getLogger().warning("Received unknown object: " + obj); + getLogger().log(Level.WARNING, "Received unknown object: {0}", obj); } } @@ -419,7 +425,7 @@ public class StreamImpl implements Stream { return !receivedObjects.isEmpty(); } - public JSONObject getObject() throws InterruptedException { + public JsonObject getObject() throws InterruptedException { return receivedObjects.take(); } |