diff options
Diffstat (limited to 'src/io/StreamImpl.java')
-rw-r--r-- | src/io/StreamImpl.java | 28 |
1 files changed, 26 insertions, 2 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java index 6d6ba64..9a9b249 100644 --- a/src/io/StreamImpl.java +++ b/src/io/StreamImpl.java @@ -12,6 +12,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import mining.Stream; +import oauth.signpost.exception.OAuthException; import org.apache.commons.io.Charsets; import org.apache.commons.lang.StringUtils; import org.json.JSONException; @@ -41,6 +42,10 @@ public class StreamImpl implements Stream { * The target that is interesting in receiving changes. */ private final ExceptionListener exceptionListener; + /** + * Used for signing messages. + */ + private final OAuthRequester oauth; /** * Holds an instance that represents an active worker that contains a thread @@ -48,9 +53,11 @@ public class StreamImpl implements Stream { */ private WorkerContainer workerContainer; - public StreamImpl(ResultListener resultListener, ExceptionListener el) { + public StreamImpl(ResultListener resultListener, ExceptionListener el, + OAuthRequester oauth) { this.resultListener = resultListener; this.exceptionListener = el; + this.oauth = oauth; } @Override @@ -84,6 +91,15 @@ public class StreamImpl implements Stream { } @Override + public void poll() { + // HACK: change this, strengten contract + try { + workerContainer.thread.join(); + } catch (InterruptedException ex) { + } + } + + @Override public void close() { disconnect(); } @@ -143,7 +159,7 @@ public class StreamImpl implements Stream { private final String keywords; private final HttpURLConnection connection; - private AtomicBoolean running; + private final AtomicBoolean running; Worker(String keywords) throws IOException { this.keywords = keywords; @@ -170,6 +186,11 @@ public class StreamImpl implements Stream { // connect and send request conn.setDoOutput(true); conn.getOutputStream().write(postData.getBytes(Charsets.UTF_8)); + try { + oauth.sign(conn); + } catch (OAuthException ex) { + throw new IOException("Unable to sign request", ex); + } int respCode = conn.getResponseCode(); if (respCode != 200) { getLogger().severe("Response code " + respCode); @@ -218,6 +239,8 @@ public class StreamImpl implements Stream { } private void parseMainLoop(InputStream is) throws IOException { + // Note: one message per CRLF-terminated line + // See https://dev.twitter.com/docs/streaming-apis/messages InputStreamReader isr = new InputStreamReader(is, Charsets.UTF_8); BufferedReader reader = new BufferedReader(isr); JSONTokener jsonTokener = new JSONTokener(reader); @@ -237,6 +260,7 @@ public class StreamImpl implements Stream { } private void processReceivedObject(JSONObject obj) { + System.out.println(obj); } private Logger getLogger() { |