package io; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParseException; import com.google.gson.JsonParser; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; import mining.Stream; import oauth.signpost.exception.OAuthException; import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import provider.ExceptionListener; import provider.ResultListener; import support.StreamingGZIPInputStream; /** * Provides access to Twitter stream data. Data is processed in a separate * thread and consumers are notified via callbacks. * * @author Peter Wu */ public class StreamImpl implements Stream { private final static String STREAM_URL = "https://stream.twitter.com/1.1/statuses/filter.json"; private final Set keywords = new HashSet<>(); /** * Used for thread-safe modifications. */ private final Object listenerSync = new Object(); private final Object resultListenerSync = new Object(); /** * The target that is interested in received tweets. */ private ResultListener resultListener; /** * The target that is interesting in receiving changes. */ private ExceptionListener exceptionListener; /** * Used for signing messages. */ private final OAuthRequester oauth; /** * Holds an instance that represents an active worker that contains a thread * to watch for new tweets. */ private WorkerContainer workerContainer; public StreamImpl(OAuthRequester oauth) { this.oauth = oauth; } /** * Sets the listener for new tweets. * * @param resultListener */ public void setResultListener(ResultListener resultListener) { // sync to avoid breakage when the poller thread sends a notification synchronized (resultListenerSync) { this.resultListener = resultListener; } } public ResultListener getResultListener() { return resultListener; } public void setExceptionListener(ExceptionListener exceptionListener) { synchronized (listenerSync) { this.exceptionListener = exceptionListener; } } private Set splitKeywords(String rawKeywords) { Set filteredKeywords = new HashSet<>(); List keywordsList = Arrays.asList(rawKeywords.split(",")); for (String keyword : keywordsList) { keyword = keyword.trim(); if (!keyword.isEmpty()) { filteredKeywords.add(keyword); } } return filteredKeywords; } @Override public void watchKeyword(String rawKeywords) { for (String keyword : splitKeywords(rawKeywords)) { keywords.add(keyword); } } @Override public void unwatchKeyword(String rawKeywords) { for (String keyword : splitKeywords(rawKeywords)) { keywords.remove(keyword); } } @Override public Set getKeywords(boolean active) { HashSet retKeywords = new HashSet<>(); if (active) { // return keywords from the active connection if (workerContainer != null) { String keywordsStr = workerContainer.getWorker().getKeywords(); retKeywords.addAll(Arrays.asList(keywordsStr.split(","))); } } else { retKeywords.addAll(keywords); } return retKeywords; } @Override public void commit() throws IOException { String keywordsStr = StringUtils.join(keywords, ","); /* do not reconnect if a connection already exists for the keywords */ if (isValid() && workerContainer.getWorker().getKeywords().equals(keywordsStr)) { return; } disconnect(); // a query is required. if (keywords.isEmpty()) { return; } Worker worker = new Worker(keywordsStr); workerContainer = new WorkerContainer(worker); workerContainer.start(); } @Override public boolean isValid() { return workerContainer != null && workerContainer.isValid(); } @Override public void close() { disconnect(); } /** * Stops the worker if any. */ private void disconnect() { // wait for the worker thread to stop while (workerContainer != null) { workerContainer.finish(); workerContainer = null; } } /** * Groups an activated stream connection. */ private class WorkerContainer { private final Worker worker; private final Poller poller; private final Thread ioThread; private final Thread pollerThread; /** * Holds a thread for the worker. * * @param worker Worker that should be processed in a separate thread. */ public WorkerContainer(Worker worker) { this.worker = worker; this.poller = new Poller(worker); ioThread = new Thread(worker); // the poller is closely coupled with the I/O thread. If the I/O // thread dies, no new messages will be added to the queue. pollerThread = new Thread(poller); } public void start() { ioThread.start(); pollerThread.start(); } public void finish() { worker.stopWorker(); while (ioThread.isAlive()) { try { ioThread.join(); // maybe the poller is stuck waiting for a new object, wake // it up. pollerThread.interrupt(); pollerThread.join(); } catch (InterruptedException ex) { Logger.getLogger(getClass().getName()) .warning("Interrupted while waiting for stream finish"); } } } public Worker getWorker() { return worker; } public boolean isValid() { return ioThread.isAlive() && worker.isRunning(); } } private class Poller implements Runnable { private final Worker worker; private Poller(Worker worker) { this.worker = worker; } @Override public void run() { // keep waiting for objects if the worker is alive, or fetch objects // if the worker has any old ones left. while (worker.isRunning() || worker.hasObjects()) { try { JsonObject obj = worker.getObject(); if (obj == worker.endOfStream) { // end of objects marker assert !worker.isRunning(); break; } processObject(obj); } catch (InterruptedException ex) { // interrupted, probably signalled to stop? } } } private void processObject(JsonObject obj) { synchronized (resultListenerSync) { if (resultListener != null) { resultListener.tweetGenerated(obj); } } } } private class Worker implements Runnable { private final String keywords; private final HttpURLConnection connection; private volatile boolean running = true; private final BlockingQueue receivedObjects; private InputStream inputStream; private final JsonObject endOfStream; Worker(String keywords) throws IOException { this.keywords = keywords; try { this.connection = connect(keywords); this.inputStream = this.connection.getInputStream(); } catch (IOException ex) { IOUtils.closeQuietly(this.inputStream); if (this.connection != null) { this.connection.disconnect(); } throw ex; } this.receivedObjects = new LinkedBlockingQueue<>(); this.endOfStream = new JsonObject(); } /** * @return The search keywords associated with this worker. */ public String getKeywords() { return keywords; } private HttpURLConnection connect(String keywords) throws IOException { String postData = "track=" + URLEncoder.encode(keywords, "UTF-8"); postData += "&language=en"; HttpURLConnection conn; conn = (HttpURLConnection) new URL(STREAM_URL).openConnection(); // timeout of 90 seconds is recommended at // https://dev.twitter.com/docs/streaming-apis/connecting#Stalls conn.setReadTimeout(90000); conn.setRequestMethod("POST"); // set request headers conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); conn.addRequestProperty("Accept-Encoding", "gzip"); // connect and send request conn.setDoOutput(true); try { oauth.sign(conn, postData); } catch (OAuthException ex) { throw new IOException("Unable to sign request", ex); } conn.getOutputStream().write(postData.getBytes(Charsets.UTF_8)); int respCode = conn.getResponseCode(); if (respCode != 200) { getLogger().severe("Response code " + respCode); conn.disconnect(); throw new IOException("Unexpected stream response " + respCode); } return conn; } /** * Wraps an inputstream as gzip if possible. * * @param is The raw input stream. * @return An inputstream that outputs decoded data. * @throws IOException */ private InputStream wrapGzip(InputStream is) throws IOException { if ("gzip".equals(connection.getContentEncoding())) { return new StreamingGZIPInputStream(is); } return is; } @Override public void run() { InputStream is = inputStream; IOException run_error = null; try { parseMainLoop(wrapGzip(is)); } catch (IOException ex) { run_error = ex; } finally { running = false; IOUtils.closeQuietly(is); connection.disconnect(); // mark end of stream, running must be false receivedObjects.offer(endOfStream); // now that the worker is about to die, notify listener if (run_error != null) { // synchronize just in case the exception listener gets // modified while hell breaks lose. synchronized (listenerSync) { if (exceptionListener != null) { exceptionListener.exceptionGenerated(run_error); } } } } } public void stopWorker() { /* inform the worker to stop as soon as possible */ running = false; try { inputStream.close(); } catch (IOException ex) { getLogger().warning("Error while closing stream: " + ex.getMessage()); } connection.disconnect(); } public boolean isRunning() { return running; } private void parseMainLoop(InputStream is) throws IOException { // Note: one message per CRLF-terminated line // See https://dev.twitter.com/docs/streaming-apis/messages InputStreamReader isr = new InputStreamReader(is, Charsets.UTF_8); BufferedReader reader = new BufferedReader(isr); JsonParser parser = new JsonParser(); while (running) { try { String line = reader.readLine(); if (line.isEmpty()) { continue; } JsonElement elem = parser.parse(line); if (elem.isJsonObject()) { processReceivedObject(elem.getAsJsonObject()); } else { getLogger().log(Level.SEVERE, "Got unexpected object: {0}", elem); throw new IOException("Got unexpected type from stream"); } } catch (JsonParseException ex) { // ignore IO errors for a stop request ("Socket closed") if (running) { throw new IOException(ex); } } } } /** * Detect tweets and queue them for processing. * * @param obj an object received at the stream. */ private void processReceivedObject(JsonObject obj) { // assume that tweets always have a user field if (obj.has("user")) { receivedObjects.offer(obj); } else { getLogger().log(Level.WARNING, "Received unknown object: {0}", obj); } } public boolean hasObjects() { return !receivedObjects.isEmpty(); } public JsonObject getObject() throws InterruptedException { return receivedObjects.take(); } private Logger getLogger() { return Logger.getLogger(getClass().getName()); } } }