diff options
author | Peter Wu <peter@lekensteyn.nl> | 2014-04-30 19:34:40 +0200 |
---|---|---|
committer | Peter Wu <peter@lekensteyn.nl> | 2014-04-30 19:35:25 +0200 |
commit | e6e30ea585131df43ca1d47ac5cd4b1730b02200 (patch) | |
tree | 71edfa30e45d9d6219e8fb72420537b45e5d7b95 /src | |
parent | 8441cb20f57644442d6e792da59d7e9c9c50b1d8 (diff) | |
download | TwitterDataAnalytics-e6e30ea585131df43ca1d47ac5cd4b1730b02200.tar.gz |
[WIP] Stream processor
Diffstat (limited to 'src')
-rw-r--r-- | src/io/OAuthRequester.java | 20 | ||||
-rw-r--r-- | src/io/StreamImpl.java | 209 | ||||
-rw-r--r-- | src/mining/Stream.java | 6 | ||||
-rw-r--r-- | src/mining/StreamListener.java | 18 |
4 files changed, 235 insertions, 18 deletions
diff --git a/src/io/OAuthRequester.java b/src/io/OAuthRequester.java index b4ac481..77c41c0 100644 --- a/src/io/OAuthRequester.java +++ b/src/io/OAuthRequester.java @@ -1,12 +1,17 @@ package io; import java.io.IOException; +import java.net.HttpURLConnection; import java.net.URLConnection; import oauth.signpost.OAuth; import oauth.signpost.OAuthConsumer; import oauth.signpost.basic.DefaultOAuthConsumer; import oauth.signpost.basic.DefaultOAuthProvider; +import oauth.signpost.basic.HttpURLConnectionRequestAdapter; +import oauth.signpost.exception.OAuthCommunicationException; import oauth.signpost.exception.OAuthException; +import oauth.signpost.exception.OAuthExpectationFailedException; +import oauth.signpost.exception.OAuthMessageSignerException; import org.json.JSONObject; import support.ConsumerKeySecret; import support.OAuthAccessTokenSecret; @@ -92,6 +97,21 @@ public class OAuthRequester extends AbstractRequester { } } + /** + * Signs a HTTP request. + * + * @param conn An open HTTP connection with data ready to be sent. + * @throws OAuthMessageSignerException + * @throws OAuthExpectationFailedException + * @throws OAuthCommunicationException + */ + public void sign(HttpURLConnection conn) throws OAuthMessageSignerException, + OAuthExpectationFailedException, OAuthCommunicationException { + HttpURLConnectionRequestAdapter request; + request = new HttpURLConnectionRequestAdapter(conn); + consumer.sign(request); + } + @Override protected void preconnect(URLConnection conn) throws IOException { try { diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java new file mode 100644 index 0000000..259caa6 --- /dev/null +++ b/src/io/StreamImpl.java @@ -0,0 +1,209 @@ +package io; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; +import mining.Stream; +import org.apache.commons.io.Charsets; +import org.apache.commons.lang.StringUtils; +import provider.ExceptionListener; +import provider.ResultListener; + +/** + * Provides access to Twitter stream data. Data is processed in a separate + * thread and consumers are notified via callbacks. + * + * @author Peter Wu + */ +public class StreamImpl implements Stream { + + private final static String STREAM_URL + = "https://stream.twitter.com/1.1/statuses/filter.json"; + + private final Set<String> keywords = new HashSet<>(); + /** + * The target that is interested in received tweets. + */ + private final ResultListener resultListener; + /** + * The target that is interesting in receiving changes. + */ + private final ExceptionListener exceptionListener; + + /** + * Holds an instance that represents an active worker that contains a thread + * to watch for new tweets. + */ + private WorkerContainer workerContainer; + + public StreamImpl(ResultListener resultListener, ExceptionListener el) { + this.resultListener = resultListener; + this.exceptionListener = el; + } + + @Override + public void watchKeyword(String keyword) { + keywords.add(keyword); + } + + @Override + public void unwatchKeyword(String keyword) { + keywords.remove(keyword); + } + + @Override + public void commit() throws IOException { + String keywordsStr = StringUtils.join(keywords, ","); + /* do not reconnect if a connection already exists for the keywords */ + if (workerContainer != null + && workerContainer.getWorker().getKeywords().equals(keywordsStr)) { + return; + } + /* connect (or reconnect) after setting new keywords */ + disconnect(); + Worker worker = new Worker(keywordsStr); + workerContainer = new WorkerContainer(worker); + workerContainer.start(); + } + + @Override + public boolean isValid() { + return workerContainer != null && workerContainer.isValid(); + } + + @Override + public void close() { + disconnect(); + } + + /** + * Stops the worker if any. + */ + private void disconnect() { + // wait for the worker thread to stop + while (workerContainer != null) { + workerContainer.finish(); + workerContainer = null; + } + } + + private class WorkerContainer { + + private final Worker worker; + private final Thread thread; + + /** + * Holds a thread for the worker. + * + * @param worker Worker that should be processed in a separate thread. + */ + public WorkerContainer(Worker worker) { + this.worker = worker; + thread = new Thread(worker); + } + + public void start() { + thread.start(); + } + + public void finish() { + worker.stopWorker(); + while (thread.isAlive()) { + try { + thread.join(); + } catch (InterruptedException ex) { + Logger.getLogger(getClass().getName()) + .warning("Interrupted while waiting for stream finish"); + } + } + } + + public Worker getWorker() { + return worker; + } + + public boolean isValid() { + return thread.isAlive(); + } + } + + private class Worker implements Runnable { + + private final String keywords; + private final HttpURLConnection connection; + private AtomicBoolean running; + + Worker(String keywords) throws IOException { + this.keywords = keywords; + this.connection = connect(keywords); + this.running = new AtomicBoolean(true); + } + + /** + * @return The search keywords associated with this worker. + */ + public String getKeywords() { + return keywords; + } + + private HttpURLConnection connect(String keywords) throws IOException { + String postData = "follow=" + URLEncoder.encode(keywords, "UTF-8"); + HttpURLConnection conn; + conn = (HttpURLConnection) new URL(STREAM_URL).openConnection(); + conn.setRequestMethod("POST"); + // set request headers + conn.addRequestProperty("Content-Type", + "application/x-www-form-urlencoded; charset=UTF-8"); + // connect and send request + conn.setDoOutput(true); + conn.getOutputStream().write(postData.getBytes(Charsets.UTF_8)); + int respCode = conn.getResponseCode(); + if (respCode != 200) { + getLogger().severe("Response code " + respCode); + conn.disconnect(); + throw new IOException("Unexpected stream response " + respCode); + } + return conn; + } + + @Override + public void run() { + InputStream is = null; + try { + is = connection.getInputStream(); + parseMainLoop(is); + } catch (IOException ex) { + if (is != null) { + try { + is.close(); + } catch (IOException ioex) { + /* probably already closed? */ + } + } + connection.disconnect(); + exceptionListener.exceptionGenerated(ex); + } + } + + public void stopWorker() { + /* inform the worker to stop as soon as possible */ + running.set(false); + } + + private void parseMainLoop(InputStream is) throws IOException { + while (running.get()) { + /* TODO: read responses and parse JSON */ + } + } + } + + private Logger getLogger() { + return Logger.getLogger(getClass().getName()); + } +} diff --git a/src/mining/Stream.java b/src/mining/Stream.java index 456a405..36fba1b 100644 --- a/src/mining/Stream.java +++ b/src/mining/Stream.java @@ -33,6 +33,12 @@ public interface Stream { public void commit() throws IOException; /** + * Stops the stream. After this method has been called, no more messages + * will be processed. + */ + public void close(); + + /** * Test whether the stream is ready for streaming * * @return true if connection can be made, false otherwise. diff --git a/src/mining/StreamListener.java b/src/mining/StreamListener.java deleted file mode 100644 index 3b163eb..0000000 --- a/src/mining/StreamListener.java +++ /dev/null @@ -1,18 +0,0 @@ -package mining; - -import org.json.JSONObject; - -/** - * A callback provider to listen from streams. - * - * @author Maurice Laveaux - */ -public interface StreamListener { - - /** - * This method is called by the stream when new objects are streamed in. - * - * @param data The JSON data returned. - */ - public void notify(JSONObject data); -} |