summaryrefslogtreecommitdiff
path: root/src/io/StreamImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/io/StreamImpl.java')
-rw-r--r--src/io/StreamImpl.java28
1 files changed, 26 insertions, 2 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java
index 6d6ba64..9a9b249 100644
--- a/src/io/StreamImpl.java
+++ b/src/io/StreamImpl.java
@@ -12,6 +12,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import mining.Stream;
+import oauth.signpost.exception.OAuthException;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang.StringUtils;
import org.json.JSONException;
@@ -41,6 +42,10 @@ public class StreamImpl implements Stream {
* The target that is interesting in receiving changes.
*/
private final ExceptionListener exceptionListener;
+ /**
+ * Used for signing messages.
+ */
+ private final OAuthRequester oauth;
/**
* Holds an instance that represents an active worker that contains a thread
@@ -48,9 +53,11 @@ public class StreamImpl implements Stream {
*/
private WorkerContainer workerContainer;
- public StreamImpl(ResultListener resultListener, ExceptionListener el) {
+ public StreamImpl(ResultListener resultListener, ExceptionListener el,
+ OAuthRequester oauth) {
this.resultListener = resultListener;
this.exceptionListener = el;
+ this.oauth = oauth;
}
@Override
@@ -84,6 +91,15 @@ public class StreamImpl implements Stream {
}
@Override
+ public void poll() {
+ // HACK: change this, strengten contract
+ try {
+ workerContainer.thread.join();
+ } catch (InterruptedException ex) {
+ }
+ }
+
+ @Override
public void close() {
disconnect();
}
@@ -143,7 +159,7 @@ public class StreamImpl implements Stream {
private final String keywords;
private final HttpURLConnection connection;
- private AtomicBoolean running;
+ private final AtomicBoolean running;
Worker(String keywords) throws IOException {
this.keywords = keywords;
@@ -170,6 +186,11 @@ public class StreamImpl implements Stream {
// connect and send request
conn.setDoOutput(true);
conn.getOutputStream().write(postData.getBytes(Charsets.UTF_8));
+ try {
+ oauth.sign(conn);
+ } catch (OAuthException ex) {
+ throw new IOException("Unable to sign request", ex);
+ }
int respCode = conn.getResponseCode();
if (respCode != 200) {
getLogger().severe("Response code " + respCode);
@@ -218,6 +239,8 @@ public class StreamImpl implements Stream {
}
private void parseMainLoop(InputStream is) throws IOException {
+ // Note: one message per CRLF-terminated line
+ // See https://dev.twitter.com/docs/streaming-apis/messages
InputStreamReader isr = new InputStreamReader(is, Charsets.UTF_8);
BufferedReader reader = new BufferedReader(isr);
JSONTokener jsonTokener = new JSONTokener(reader);
@@ -237,6 +260,7 @@ public class StreamImpl implements Stream {
}
private void processReceivedObject(JSONObject obj) {
+ System.out.println(obj);
}
private Logger getLogger() {