package io; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import java.util.zip.GZIPInputStream; import mining.Stream; import org.apache.commons.io.Charsets; import org.apache.commons.lang.StringUtils; import org.json.JSONException; import org.json.JSONObject; import org.json.JSONTokener; import provider.ExceptionListener; import provider.ResultListener; /** * Provides access to Twitter stream data. Data is processed in a separate * thread and consumers are notified via callbacks. * * @author Peter Wu */ public class StreamImpl implements Stream { private final static String STREAM_URL = "https://stream.twitter.com/1.1/statuses/filter.json"; private final Set keywords = new HashSet<>(); /** * The target that is interested in received tweets. */ private final ResultListener resultListener; /** * The target that is interesting in receiving changes. */ private final ExceptionListener exceptionListener; /** * Holds an instance that represents an active worker that contains a thread * to watch for new tweets. */ private WorkerContainer workerContainer; public StreamImpl(ResultListener resultListener, ExceptionListener el) { this.resultListener = resultListener; this.exceptionListener = el; } @Override public void watchKeyword(String keyword) { keywords.add(keyword); } @Override public void unwatchKeyword(String keyword) { keywords.remove(keyword); } @Override public void commit() throws IOException { String keywordsStr = StringUtils.join(keywords, ","); /* do not reconnect if a connection already exists for the keywords */ if (workerContainer != null && workerContainer.getWorker().getKeywords().equals(keywordsStr)) { return; } /* connect (or reconnect) after setting new keywords */ disconnect(); Worker worker = new Worker(keywordsStr); workerContainer = new WorkerContainer(worker); workerContainer.start(); } @Override public boolean isValid() { return workerContainer != null && workerContainer.isValid(); } @Override public void close() { disconnect(); } /** * Stops the worker if any. */ private void disconnect() { // wait for the worker thread to stop while (workerContainer != null) { workerContainer.finish(); workerContainer = null; } } private class WorkerContainer { private final Worker worker; private final Thread thread; /** * Holds a thread for the worker. * * @param worker Worker that should be processed in a separate thread. */ public WorkerContainer(Worker worker) { this.worker = worker; thread = new Thread(worker); } public void start() { thread.start(); } public void finish() { worker.stopWorker(); while (thread.isAlive()) { try { thread.join(); } catch (InterruptedException ex) { Logger.getLogger(getClass().getName()) .warning("Interrupted while waiting for stream finish"); } } } public Worker getWorker() { return worker; } public boolean isValid() { return thread.isAlive(); } } private class Worker implements Runnable { private final String keywords; private final HttpURLConnection connection; private AtomicBoolean running; Worker(String keywords) throws IOException { this.keywords = keywords; this.connection = connect(keywords); this.running = new AtomicBoolean(true); } /** * @return The search keywords associated with this worker. */ public String getKeywords() { return keywords; } private HttpURLConnection connect(String keywords) throws IOException { String postData = "follow=" + URLEncoder.encode(keywords, "UTF-8"); HttpURLConnection conn; conn = (HttpURLConnection) new URL(STREAM_URL).openConnection(); conn.setRequestMethod("POST"); // set request headers conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); conn.addRequestProperty("Accept-Encoding", "gzip"); // connect and send request conn.setDoOutput(true); conn.getOutputStream().write(postData.getBytes(Charsets.UTF_8)); int respCode = conn.getResponseCode(); if (respCode != 200) { getLogger().severe("Response code " + respCode); conn.disconnect(); throw new IOException("Unexpected stream response " + respCode); } return conn; } /** * Wraps an inputstream as gzip if possible. * * @param is The raw input stream. * @return An inputstream that outputs decoded data. * @throws IOException */ private InputStream wrapGzip(InputStream is) throws IOException { if ("gzip".equals(connection.getContentEncoding())) { return new GZIPInputStream(is); } return is; } @Override public void run() { InputStream is = null; try { is = connection.getInputStream(); parseMainLoop(wrapGzip(is)); } catch (IOException ex) { if (is != null) { try { is.close(); } catch (IOException ioex) { /* probably already closed? */ } } connection.disconnect(); exceptionListener.exceptionGenerated(ex); } } public void stopWorker() { /* inform the worker to stop as soon as possible */ running.set(false); } private void parseMainLoop(InputStream is) throws IOException { InputStreamReader isr = new InputStreamReader(is, Charsets.UTF_8); BufferedReader reader = new BufferedReader(isr); JSONTokener jsonTokener = new JSONTokener(reader); while (running.get()) { try { Object obj = jsonTokener.nextValue(); if (obj instanceof JSONObject) { processReceivedObject((JSONObject) obj); } else { getLogger().severe("Got unexpected object: " + obj); throw new IOException("Got unexpected type from stream"); } } catch (JSONException ex) { throw new IOException(ex); } } } private void processReceivedObject(JSONObject obj) { } private Logger getLogger() { return Logger.getLogger(getClass().getName()); } } }