summaryrefslogtreecommitdiff
path: root/src/Chapter2/streamingapi/StreamingApiExample.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/Chapter2/streamingapi/StreamingApiExample.java')
-rw-r--r--src/Chapter2/streamingapi/StreamingApiExample.java372
1 files changed, 372 insertions, 0 deletions
diff --git a/src/Chapter2/streamingapi/StreamingApiExample.java b/src/Chapter2/streamingapi/StreamingApiExample.java
new file mode 100644
index 0000000..8abfff4
--- /dev/null
+++ b/src/Chapter2/streamingapi/StreamingApiExample.java
@@ -0,0 +1,372 @@
+/* TweetTracker. Copyright (c) Arizona Board of Regents on behalf of Arizona State University
+ * @author shamanth
+ */
+package Chapter2.streamingapi;
+
+import Chapter2.support.OAuthTokenSecret;
+import Chapter2.openauthentication.OAuthExample;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import oauth.signpost.OAuthConsumer;
+import oauth.signpost.commonshttp.CommonsHttpOAuthConsumer;
+import oauth.signpost.exception.OAuthCommunicationException;
+import oauth.signpost.exception.OAuthExpectationFailedException;
+import oauth.signpost.exception.OAuthMessageSignerException;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.HttpEntity;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.params.CoreConnectionPNames;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import utils.OAuthUtils;
+
+public class StreamingApiExample
+{
+ OAuthTokenSecret OAuthToken;
+ final int RECORDS_TO_PROCESS = 1000;
+ final int MAX_GEOBOXES = 25;
+ final int MAX_KEYWORDS = 400;
+ final int MAX_USERS = 5000;
+ HashSet<String> Keywords;
+ HashSet<String> Geoboxes;
+ HashSet<String> Userids;
+ final String CONFIG_FILE_PATH = "streaming/streaming.config";
+ final String DEF_OUTPATH = "streaming/";
+
+ /**
+ * Loads the Twitter access token and secret for a user
+ */
+ public void LoadTwitterToken()
+ {
+// OAuthExample oae = new OAuthExample();
+// OAuthToken = oae.GetUserAccessKeySecret();
+ OAuthToken = OAuthExample.DEBUGUserAccessSecret();
+ }
+
+ /**
+ * Creates a connection to the Streaming Filter API
+ * @param baseUrl the URL for Twitter Filter API
+ * @param outFilePath Location to place the exported file
+ */
+ public void CreateStreamingConnection(String baseUrl, String outFilePath)
+ {
+ HttpClient httpClient = new DefaultHttpClient();
+ httpClient.getParams().setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, new Integer(90000));
+ //Step 1: Initialize OAuth Consumer
+ OAuthConsumer consumer = new CommonsHttpOAuthConsumer(OAuthUtils.CONSUMER_KEY,OAuthUtils.CONSUMER_SECRET);
+ consumer.setTokenWithSecret(OAuthToken.getAccessToken(),OAuthToken.getAccessSecret());
+ //Step 2: Create a new HTTP POST request and set parameters
+ HttpPost httppost = new HttpPost(baseUrl);
+ try {
+ httppost.setEntity(new UrlEncodedFormEntity(CreateRequestBody(), "UTF-8"));
+ } catch (UnsupportedEncodingException ex) {
+ ex.printStackTrace();
+ }
+ try {
+ //Step 3: Sign the request
+ consumer.sign(httppost);
+ } catch (OAuthMessageSignerException ex) {
+ ex.printStackTrace();
+ } catch (OAuthExpectationFailedException ex) {
+ ex.printStackTrace();
+ } catch (OAuthCommunicationException ex) {
+ ex.printStackTrace();
+ }
+ HttpResponse response;
+ InputStream is = null;
+ try {
+ //Step 4: Connect to the API
+ response = httpClient.execute(httppost);
+ if (response.getStatusLine().getStatusCode()!= HttpStatus.SC_OK)
+ {
+ throw new IOException("Got status " +response.getStatusLine().getStatusCode());
+ }
+ else
+ {
+ System.out.println(OAuthToken.getAccessToken()+ ": Processing from " + baseUrl);
+ HttpEntity entity = response.getEntity();
+ try {
+ is = entity.getContent();
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ } catch (IllegalStateException ex) {
+ ex.printStackTrace();
+ }
+ //Step 5: Process the incoming Tweet Stream
+ this.ProcessTwitterStream(is, outFilePath);
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }finally {
+ // Abort the method, otherwise releaseConnection() will
+ // attempt to finish reading the never-ending response.
+ // These methods do not throw exceptions.
+ if(is!=null)
+ {
+ try {
+ is.close();
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+ }
+ }
+ }
+
+ /**
+ * Processes a stream of tweets and writes them to a file one tweet per line. Each tweet here is represented by a JSON document.
+ * @param is input stream already connected to the streaming API
+ * @param outFilePath file to put the collected tweets in
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public void ProcessTwitterStream(InputStream is, String outFilePath)
+ {
+ BufferedWriter bwrite = null;
+ try {
+ JSONTokener jsonTokener = new JSONTokener(new InputStreamReader(is, "UTF-8"));
+ ArrayList<JSONObject> rawtweets = new ArrayList<JSONObject>();
+ int nooftweetsuploaded = 0;
+ while (true) {
+ try {
+ JSONObject temp = new JSONObject(jsonTokener);
+ rawtweets.add(temp);
+// System.out.println(temp);
+ if (rawtweets.size() >= RECORDS_TO_PROCESS)
+ {
+ Calendar cal = Calendar.getInstance();
+ String filename = outFilePath + "tweets_" + cal.getTimeInMillis() + ".json";
+ bwrite = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(filename), "UTF-8"));
+ nooftweetsuploaded += RECORDS_TO_PROCESS;
+ //Write the collected tweets to a file
+ for (JSONObject jobj : rawtweets) {
+ bwrite.write(jobj.toString());
+ bwrite.newLine();
+ }
+ System.out.println("Written "+nooftweetsuploaded+" records so far");
+ bwrite.close();
+ rawtweets.clear();
+ }
+ } catch (JSONException ex) {
+ ex.printStackTrace();
+ }
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ StreamingApiExample sae = new StreamingApiExample();
+ sae.LoadTwitterToken();
+ //load parameters from a TSV file
+ String filename = sae.CONFIG_FILE_PATH;
+ String outfilepath = sae.DEF_OUTPATH;
+ if(args!=null)
+ {
+ if(args.length>0)
+ {
+ filename = args[0];
+ }
+ if(args.length>1)
+ {
+ File fl = new File(args[1]);
+ if(fl.exists()&&fl.isDirectory())
+ {
+ outfilepath = args[1];
+ }
+ }
+ }
+ sae.ReadParameters(filename);
+ sae.CreateStreamingConnection("https://stream.twitter.com/1.1/statuses/filter.json", outfilepath);
+ }
+
+ /**
+ * Reads the file and loads the parameters to be crawled. Expects that the parameters are tab separated values and the
+ * @param filename
+ */
+ public void ReadParameters(String filename)
+ {
+ BufferedReader br = null;
+ try {
+ br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), "UTF-8"));
+ String temp = "";
+ int count = 1;
+ if(Userids==null)
+ {
+ Userids = new HashSet<String>();
+ }
+ if(Geoboxes==null)
+ {
+ Geoboxes = new HashSet<String>();
+ }
+ if(Keywords==null)
+ {
+ Keywords = new HashSet<String>();
+ }
+ while((temp = br.readLine())!=null)
+ {
+ if(!temp.isEmpty())
+ {
+ if(count==1)
+ {
+ String[] keywords = temp.split("\t");
+ HashSet<String> temptags = new HashSet<String>();
+ for(String word:keywords)
+ {
+ if(!temptags.contains(word))
+ {
+ temptags.add(word);
+ }
+ }
+ FilterKeywords(temptags);
+ }
+ else
+ if(count==2)
+ {
+ String[] geoboxes = temp.split("\t");
+ HashSet<String> tempboxes = new HashSet<String>();
+ for(String box:geoboxes)
+ {
+ if(!tempboxes.contains(box))
+ {
+ tempboxes.add(box);
+ }
+ }
+ FilterGeoboxes(tempboxes);
+ }
+ else
+ if(count==3)
+ {
+ String[] userids = temp.split("\t");
+ HashSet<String> tempids = new HashSet<String>();
+ for(String id:userids)
+ {
+ if(!tempids.contains(id))
+ {
+ tempids.add(id);
+ }
+ }
+ FilterUserids(tempids);
+ }
+ count++;
+ }
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+ finally{
+ try {
+ br.close();
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+ }
+ }
+
+ private void FilterUserids(HashSet<String> userids)
+ {
+ if(userids!=null)
+ {
+ int maxsize = MAX_USERS;
+ if(userids.size()<maxsize)
+ {
+ maxsize = userids.size();
+ }
+ for(String id:userids)
+ {
+ Userids.add(id);
+ }
+ }
+ }
+
+ private void FilterGeoboxes(HashSet<String> geoboxes)
+ {
+ if(geoboxes!=null)
+ {
+ int maxsize = MAX_GEOBOXES;
+ if(geoboxes.size()<maxsize)
+ {
+ maxsize = geoboxes.size();
+ }
+ for(String box:geoboxes)
+ {
+ Geoboxes.add(box);
+ }
+ }
+ }
+ /**
+ * Keep only the maximum permitted number of parameters for a connection. Ignoring the rest.
+ * This can be extended to create multiple sets to be crawled by different threads.
+ */
+ private void FilterKeywords(HashSet<String> hashtags)
+ {
+ if(hashtags!=null)
+ {
+ int maxsize = MAX_KEYWORDS;
+ if(hashtags.size()<maxsize)
+ {
+ maxsize = hashtags.size();
+ }
+ for(String tag:hashtags)
+ {
+ Keywords.add(tag);
+ }
+ }
+
+ }
+
+ private List<NameValuePair> CreateRequestBody()
+ {
+ List<NameValuePair> params = new ArrayList<NameValuePair>();
+ if(Userids != null&&Userids.size()>0)
+ {
+ params.add(CreateNameValuePair("follow", Userids));
+ System.out.println("userids = "+Userids);
+ }
+ if (Geoboxes != null&&Geoboxes.size()>0) {
+ params.add(CreateNameValuePair("locations", Geoboxes));
+ System.out.println("locations = "+Geoboxes);
+
+ }
+ if (Keywords != null&&Keywords.size()>0) {
+ params.add(CreateNameValuePair("track", Keywords));
+ System.out.println("keywords = "+Keywords);
+ }
+ return params;
+ }
+
+ private NameValuePair CreateNameValuePair(String name, Collection<String> items)
+ {
+ StringBuilder sb = new StringBuilder();
+ boolean needComma = false;
+ for (String item : items) {
+ if (needComma) {
+ sb.append(',');
+ }
+ needComma = true;
+ sb.append(item);
+ }
+ return new BasicNameValuePair(name, sb.toString());
+ }
+}