summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-04-30 19:34:40 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-04-30 19:35:25 +0200
commite6e30ea585131df43ca1d47ac5cd4b1730b02200 (patch)
tree71edfa30e45d9d6219e8fb72420537b45e5d7b95 /src
parent8441cb20f57644442d6e792da59d7e9c9c50b1d8 (diff)
downloadTwitterDataAnalytics-e6e30ea585131df43ca1d47ac5cd4b1730b02200.tar.gz
[WIP] Stream processor
Diffstat (limited to 'src')
-rw-r--r--src/io/OAuthRequester.java20
-rw-r--r--src/io/StreamImpl.java209
-rw-r--r--src/mining/Stream.java6
-rw-r--r--src/mining/StreamListener.java18
4 files changed, 235 insertions, 18 deletions
diff --git a/src/io/OAuthRequester.java b/src/io/OAuthRequester.java
index b4ac481..77c41c0 100644
--- a/src/io/OAuthRequester.java
+++ b/src/io/OAuthRequester.java
@@ -1,12 +1,17 @@
package io;
import java.io.IOException;
+import java.net.HttpURLConnection;
import java.net.URLConnection;
import oauth.signpost.OAuth;
import oauth.signpost.OAuthConsumer;
import oauth.signpost.basic.DefaultOAuthConsumer;
import oauth.signpost.basic.DefaultOAuthProvider;
+import oauth.signpost.basic.HttpURLConnectionRequestAdapter;
+import oauth.signpost.exception.OAuthCommunicationException;
import oauth.signpost.exception.OAuthException;
+import oauth.signpost.exception.OAuthExpectationFailedException;
+import oauth.signpost.exception.OAuthMessageSignerException;
import org.json.JSONObject;
import support.ConsumerKeySecret;
import support.OAuthAccessTokenSecret;
@@ -92,6 +97,21 @@ public class OAuthRequester extends AbstractRequester {
}
}
+ /**
+ * Signs a HTTP request.
+ *
+ * @param conn An open HTTP connection with data ready to be sent.
+ * @throws OAuthMessageSignerException
+ * @throws OAuthExpectationFailedException
+ * @throws OAuthCommunicationException
+ */
+ public void sign(HttpURLConnection conn) throws OAuthMessageSignerException,
+ OAuthExpectationFailedException, OAuthCommunicationException {
+ HttpURLConnectionRequestAdapter request;
+ request = new HttpURLConnectionRequestAdapter(conn);
+ consumer.sign(request);
+ }
+
@Override
protected void preconnect(URLConnection conn) throws IOException {
try {
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java
new file mode 100644
index 0000000..259caa6
--- /dev/null
+++ b/src/io/StreamImpl.java
@@ -0,0 +1,209 @@
+package io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+import mining.Stream;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.lang.StringUtils;
+import provider.ExceptionListener;
+import provider.ResultListener;
+
+/**
+ * Provides access to Twitter stream data. Data is processed in a separate
+ * thread and consumers are notified via callbacks.
+ *
+ * @author Peter Wu
+ */
+public class StreamImpl implements Stream {
+
+ private final static String STREAM_URL
+ = "https://stream.twitter.com/1.1/statuses/filter.json";
+
+ private final Set<String> keywords = new HashSet<>();
+ /**
+ * The target that is interested in received tweets.
+ */
+ private final ResultListener resultListener;
+ /**
+ * The target that is interesting in receiving changes.
+ */
+ private final ExceptionListener exceptionListener;
+
+ /**
+ * Holds an instance that represents an active worker that contains a thread
+ * to watch for new tweets.
+ */
+ private WorkerContainer workerContainer;
+
+ public StreamImpl(ResultListener resultListener, ExceptionListener el) {
+ this.resultListener = resultListener;
+ this.exceptionListener = el;
+ }
+
+ @Override
+ public void watchKeyword(String keyword) {
+ keywords.add(keyword);
+ }
+
+ @Override
+ public void unwatchKeyword(String keyword) {
+ keywords.remove(keyword);
+ }
+
+ @Override
+ public void commit() throws IOException {
+ String keywordsStr = StringUtils.join(keywords, ",");
+ /* do not reconnect if a connection already exists for the keywords */
+ if (workerContainer != null
+ && workerContainer.getWorker().getKeywords().equals(keywordsStr)) {
+ return;
+ }
+ /* connect (or reconnect) after setting new keywords */
+ disconnect();
+ Worker worker = new Worker(keywordsStr);
+ workerContainer = new WorkerContainer(worker);
+ workerContainer.start();
+ }
+
+ @Override
+ public boolean isValid() {
+ return workerContainer != null && workerContainer.isValid();
+ }
+
+ @Override
+ public void close() {
+ disconnect();
+ }
+
+ /**
+ * Stops the worker if any.
+ */
+ private void disconnect() {
+ // wait for the worker thread to stop
+ while (workerContainer != null) {
+ workerContainer.finish();
+ workerContainer = null;
+ }
+ }
+
+ private class WorkerContainer {
+
+ private final Worker worker;
+ private final Thread thread;
+
+ /**
+ * Holds a thread for the worker.
+ *
+ * @param worker Worker that should be processed in a separate thread.
+ */
+ public WorkerContainer(Worker worker) {
+ this.worker = worker;
+ thread = new Thread(worker);
+ }
+
+ public void start() {
+ thread.start();
+ }
+
+ public void finish() {
+ worker.stopWorker();
+ while (thread.isAlive()) {
+ try {
+ thread.join();
+ } catch (InterruptedException ex) {
+ Logger.getLogger(getClass().getName())
+ .warning("Interrupted while waiting for stream finish");
+ }
+ }
+ }
+
+ public Worker getWorker() {
+ return worker;
+ }
+
+ public boolean isValid() {
+ return thread.isAlive();
+ }
+ }
+
+ private class Worker implements Runnable {
+
+ private final String keywords;
+ private final HttpURLConnection connection;
+ private AtomicBoolean running;
+
+ Worker(String keywords) throws IOException {
+ this.keywords = keywords;
+ this.connection = connect(keywords);
+ this.running = new AtomicBoolean(true);
+ }
+
+ /**
+ * @return The search keywords associated with this worker.
+ */
+ public String getKeywords() {
+ return keywords;
+ }
+
+ private HttpURLConnection connect(String keywords) throws IOException {
+ String postData = "follow=" + URLEncoder.encode(keywords, "UTF-8");
+ HttpURLConnection conn;
+ conn = (HttpURLConnection) new URL(STREAM_URL).openConnection();
+ conn.setRequestMethod("POST");
+ // set request headers
+ conn.addRequestProperty("Content-Type",
+ "application/x-www-form-urlencoded; charset=UTF-8");
+ // connect and send request
+ conn.setDoOutput(true);
+ conn.getOutputStream().write(postData.getBytes(Charsets.UTF_8));
+ int respCode = conn.getResponseCode();
+ if (respCode != 200) {
+ getLogger().severe("Response code " + respCode);
+ conn.disconnect();
+ throw new IOException("Unexpected stream response " + respCode);
+ }
+ return conn;
+ }
+
+ @Override
+ public void run() {
+ InputStream is = null;
+ try {
+ is = connection.getInputStream();
+ parseMainLoop(is);
+ } catch (IOException ex) {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException ioex) {
+ /* probably already closed? */
+ }
+ }
+ connection.disconnect();
+ exceptionListener.exceptionGenerated(ex);
+ }
+ }
+
+ public void stopWorker() {
+ /* inform the worker to stop as soon as possible */
+ running.set(false);
+ }
+
+ private void parseMainLoop(InputStream is) throws IOException {
+ while (running.get()) {
+ /* TODO: read responses and parse JSON */
+ }
+ }
+ }
+
+ private Logger getLogger() {
+ return Logger.getLogger(getClass().getName());
+ }
+}
diff --git a/src/mining/Stream.java b/src/mining/Stream.java
index 456a405..36fba1b 100644
--- a/src/mining/Stream.java
+++ b/src/mining/Stream.java
@@ -33,6 +33,12 @@ public interface Stream {
public void commit() throws IOException;
/**
+ * Stops the stream. After this method has been called, no more messages
+ * will be processed.
+ */
+ public void close();
+
+ /**
* Test whether the stream is ready for streaming
*
* @return true if connection can be made, false otherwise.
diff --git a/src/mining/StreamListener.java b/src/mining/StreamListener.java
deleted file mode 100644
index 3b163eb..0000000
--- a/src/mining/StreamListener.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package mining;
-
-import org.json.JSONObject;
-
-/**
- * A callback provider to listen from streams.
- *
- * @author Maurice Laveaux
- */
-public interface StreamListener {
-
- /**
- * This method is called by the stream when new objects are streamed in.
- *
- * @param data The JSON data returned.
- */
- public void notify(JSONObject data);
-}