diff options
Diffstat (limited to 'src/io/StreamImpl.java')
-rw-r--r-- | src/io/StreamImpl.java | 209 |
1 files changed, 209 insertions, 0 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java new file mode 100644 index 0000000..259caa6 --- /dev/null +++ b/src/io/StreamImpl.java @@ -0,0 +1,209 @@ +package io; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; +import mining.Stream; +import org.apache.commons.io.Charsets; +import org.apache.commons.lang.StringUtils; +import provider.ExceptionListener; +import provider.ResultListener; + +/** + * Provides access to Twitter stream data. Data is processed in a separate + * thread and consumers are notified via callbacks. + * + * @author Peter Wu + */ +public class StreamImpl implements Stream { + + private final static String STREAM_URL + = "https://stream.twitter.com/1.1/statuses/filter.json"; + + private final Set<String> keywords = new HashSet<>(); + /** + * The target that is interested in received tweets. + */ + private final ResultListener resultListener; + /** + * The target that is interesting in receiving changes. + */ + private final ExceptionListener exceptionListener; + + /** + * Holds an instance that represents an active worker that contains a thread + * to watch for new tweets. + */ + private WorkerContainer workerContainer; + + public StreamImpl(ResultListener resultListener, ExceptionListener el) { + this.resultListener = resultListener; + this.exceptionListener = el; + } + + @Override + public void watchKeyword(String keyword) { + keywords.add(keyword); + } + + @Override + public void unwatchKeyword(String keyword) { + keywords.remove(keyword); + } + + @Override + public void commit() throws IOException { + String keywordsStr = StringUtils.join(keywords, ","); + /* do not reconnect if a connection already exists for the keywords */ + if (workerContainer != null + && workerContainer.getWorker().getKeywords().equals(keywordsStr)) { + return; + } + /* connect (or reconnect) after setting new keywords */ + disconnect(); + Worker worker = new Worker(keywordsStr); + workerContainer = new WorkerContainer(worker); + workerContainer.start(); + } + + @Override + public boolean isValid() { + return workerContainer != null && workerContainer.isValid(); + } + + @Override + public void close() { + disconnect(); + } + + /** + * Stops the worker if any. + */ + private void disconnect() { + // wait for the worker thread to stop + while (workerContainer != null) { + workerContainer.finish(); + workerContainer = null; + } + } + + private class WorkerContainer { + + private final Worker worker; + private final Thread thread; + + /** + * Holds a thread for the worker. + * + * @param worker Worker that should be processed in a separate thread. + */ + public WorkerContainer(Worker worker) { + this.worker = worker; + thread = new Thread(worker); + } + + public void start() { + thread.start(); + } + + public void finish() { + worker.stopWorker(); + while (thread.isAlive()) { + try { + thread.join(); + } catch (InterruptedException ex) { + Logger.getLogger(getClass().getName()) + .warning("Interrupted while waiting for stream finish"); + } + } + } + + public Worker getWorker() { + return worker; + } + + public boolean isValid() { + return thread.isAlive(); + } + } + + private class Worker implements Runnable { + + private final String keywords; + private final HttpURLConnection connection; + private AtomicBoolean running; + + Worker(String keywords) throws IOException { + this.keywords = keywords; + this.connection = connect(keywords); + this.running = new AtomicBoolean(true); + } + + /** + * @return The search keywords associated with this worker. + */ + public String getKeywords() { + return keywords; + } + + private HttpURLConnection connect(String keywords) throws IOException { + String postData = "follow=" + URLEncoder.encode(keywords, "UTF-8"); + HttpURLConnection conn; + conn = (HttpURLConnection) new URL(STREAM_URL).openConnection(); + conn.setRequestMethod("POST"); + // set request headers + conn.addRequestProperty("Content-Type", + "application/x-www-form-urlencoded; charset=UTF-8"); + // connect and send request + conn.setDoOutput(true); + conn.getOutputStream().write(postData.getBytes(Charsets.UTF_8)); + int respCode = conn.getResponseCode(); + if (respCode != 200) { + getLogger().severe("Response code " + respCode); + conn.disconnect(); + throw new IOException("Unexpected stream response " + respCode); + } + return conn; + } + + @Override + public void run() { + InputStream is = null; + try { + is = connection.getInputStream(); + parseMainLoop(is); + } catch (IOException ex) { + if (is != null) { + try { + is.close(); + } catch (IOException ioex) { + /* probably already closed? */ + } + } + connection.disconnect(); + exceptionListener.exceptionGenerated(ex); + } + } + + public void stopWorker() { + /* inform the worker to stop as soon as possible */ + running.set(false); + } + + private void parseMainLoop(InputStream is) throws IOException { + while (running.get()) { + /* TODO: read responses and parse JSON */ + } + } + } + + private Logger getLogger() { + return Logger.getLogger(getClass().getName()); + } +} |