summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-05-01 01:42:21 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-05-01 01:42:21 +0200
commit1f5952ccc4f13875608b1270809ea51904b988d5 (patch)
treeafe5db5aadab012670a1450af76877578d2af6a6
parent7c2d2b66a21b9a0889e78cf5ff0982245c47bb69 (diff)
downloadTwitterDataAnalytics-1f5952ccc4f13875608b1270809ea51904b988d5.tar.gz
[WIP] Initial hacky TweetShell
In TESTING! DefaultOAuthConsumer does not play nice with POST requests... need to fix that somehow.
-rw-r--r--src/io/StreamImpl.java28
-rw-r--r--src/main/Main.java9
-rw-r--r--src/main/TweetShell.java217
-rw-r--r--src/mining/Stream.java5
4 files changed, 257 insertions, 2 deletions
diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java
index 6d6ba64..9a9b249 100644
--- a/src/io/StreamImpl.java
+++ b/src/io/StreamImpl.java
@@ -12,6 +12,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import mining.Stream;
+import oauth.signpost.exception.OAuthException;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang.StringUtils;
import org.json.JSONException;
@@ -41,6 +42,10 @@ public class StreamImpl implements Stream {
* The target that is interesting in receiving changes.
*/
private final ExceptionListener exceptionListener;
+ /**
+ * Used for signing messages.
+ */
+ private final OAuthRequester oauth;
/**
* Holds an instance that represents an active worker that contains a thread
@@ -48,9 +53,11 @@ public class StreamImpl implements Stream {
*/
private WorkerContainer workerContainer;
- public StreamImpl(ResultListener resultListener, ExceptionListener el) {
+ public StreamImpl(ResultListener resultListener, ExceptionListener el,
+ OAuthRequester oauth) {
this.resultListener = resultListener;
this.exceptionListener = el;
+ this.oauth = oauth;
}
@Override
@@ -84,6 +91,15 @@ public class StreamImpl implements Stream {
}
@Override
+ public void poll() {
+ // HACK: change this, strengten contract
+ try {
+ workerContainer.thread.join();
+ } catch (InterruptedException ex) {
+ }
+ }
+
+ @Override
public void close() {
disconnect();
}
@@ -143,7 +159,7 @@ public class StreamImpl implements Stream {
private final String keywords;
private final HttpURLConnection connection;
- private AtomicBoolean running;
+ private final AtomicBoolean running;
Worker(String keywords) throws IOException {
this.keywords = keywords;
@@ -170,6 +186,11 @@ public class StreamImpl implements Stream {
// connect and send request
conn.setDoOutput(true);
conn.getOutputStream().write(postData.getBytes(Charsets.UTF_8));
+ try {
+ oauth.sign(conn);
+ } catch (OAuthException ex) {
+ throw new IOException("Unable to sign request", ex);
+ }
int respCode = conn.getResponseCode();
if (respCode != 200) {
getLogger().severe("Response code " + respCode);
@@ -218,6 +239,8 @@ public class StreamImpl implements Stream {
}
private void parseMainLoop(InputStream is) throws IOException {
+ // Note: one message per CRLF-terminated line
+ // See https://dev.twitter.com/docs/streaming-apis/messages
InputStreamReader isr = new InputStreamReader(is, Charsets.UTF_8);
BufferedReader reader = new BufferedReader(isr);
JSONTokener jsonTokener = new JSONTokener(reader);
@@ -237,6 +260,7 @@ public class StreamImpl implements Stream {
}
private void processReceivedObject(JSONObject obj) {
+ System.out.println(obj);
}
private Logger getLogger() {
diff --git a/src/main/Main.java b/src/main/Main.java
index 05429fb..f7a9203 100644
--- a/src/main/Main.java
+++ b/src/main/Main.java
@@ -87,6 +87,7 @@ public class Main {
user,
tweet,
searchtweets,
+ shell,
hack,
help;
@@ -164,6 +165,14 @@ public class Main {
System.out.println(" " + cmd.name());
}
break;
+ case shell:
+ TweetShell shell = new TweetShell();
+ // pass any remaining parameters to the shell
+ if (params.length > 0) {
+ shell.execute(params);
+ }
+ shell.process_forever();
+ break;
default:
throw new AssertionError(command.name());
}
diff --git a/src/main/TweetShell.java b/src/main/TweetShell.java
new file mode 100644
index 0000000..533939b
--- /dev/null
+++ b/src/main/TweetShell.java
@@ -0,0 +1,217 @@
+package main;
+
+import io.OAuthRequester;
+import io.StreamImpl;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+import java.util.Scanner;
+import mining.Stream;
+import mining.TwitterApi;
+import org.json.JSONObject;
+import provider.ExceptionListener;
+import provider.ResultListener;
+
+/**
+ * Provides an interactive shell where requests can be made and displayed.
+ */
+public class TweetShell implements TwitterApi.PinSupplier {
+
+ private final Scanner scanner = new Scanner(System.in);
+
+ public TweetShell() {
+
+ }
+ private TwitterApi api_cached;
+ private Stream stream_cached;
+
+ private TwitterApi getApi() throws IOException {
+ if (api_cached == null) {
+ api_cached = TwitterApi.getOAuth(this);
+ }
+ return api_cached;
+ }
+
+ private Stream getStream() throws IOException {
+ if (stream_cached == null) {
+ OAuthRequester requester = (OAuthRequester) getApi().getRequester();
+ StreamHandler handler = new StreamHandler();
+ stream_cached = new StreamImpl(handler, handler, requester);
+ }
+ return stream_cached;
+ }
+
+ private class StreamHandler implements ResultListener, ExceptionListener {
+
+ @Override
+ public void exceptionGenerated(Exception ex) {
+ System.err.println("Stream closed due to " + ex);
+ }
+
+ @Override
+ public void tweetGenerated(JSONObject obj) {
+ throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ }
+
+ @Override
+ public void profileGenerated(JSONObject obj) {
+ throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ }
+ }
+
+ @Override
+ public String requestPin(String url) throws IOException {
+ System.err.println(url);
+ System.err.println("Please open the above URL and enter PIN:");
+ return scanner.nextLine();
+ }
+
+ private void printPrompt() {
+ if (stream_cached == null) {
+ System.out.print("$ ");
+ } else {
+ System.out.print("€ ");
+ }
+ }
+ /**
+ * Processes commands from stdin until the exit command is received or EOF.
+ */
+ public void process_forever() {
+ System.err.println("Entering interactive shell, type 'help' for help "
+ + "or 'exit' to leave.");
+ // print prompt for reading first command
+ printPrompt();
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine().trim();
+ String[] args = line.split("\\s+", 2);
+ if (!args[0].isEmpty()) {
+ // non-empty command, let's see whether it makes sense?
+ if (!execute(args)) {
+ // requested to terminate
+ break;
+ }
+ }
+ // print prompt for reading next line
+ printPrompt();
+ }
+ }
+
+ /**
+ * Executes a command with optional parameters.
+ *
+ * @param args An array with the first argument containing the command with
+ * optional parameters in following arguments.
+ * @return true if more commands are allowed to be executed, false
+ * otherwise.
+ */
+ public boolean execute(String[] args) {
+ try {
+ Command command = Command.fromString(args[0]);
+ String[] params = Arrays.copyOfRange(args, 1, args.length);
+ execute(command, params);
+ } catch (IllegalArgumentException ex) {
+ System.err.println(ex.getMessage());
+ } catch (IOException ex) {
+ System.err.println("Command " + args[0] + " failed with " + ex);
+ ex.printStackTrace();
+ } catch (NoSuchElementException ex) {
+ // thrown by the "exit" command to signal exit
+ return false;
+ }
+ // another satisfied customer, next!
+ return true;
+ }
+
+ enum Command {
+
+ add("Adds a keyword to search", 1),
+ del("Deletes a keyword from search", 1),
+ commit("Activate the stream or apply the stream keyword changes"),
+ close("Close the stream"),
+ test(""),
+ exit("Returns to shell"),
+ help("Get help");
+
+ private final String description;
+ private final int paramCount;
+
+ Command(String description) {
+ this.description = description;
+ this.paramCount = 0;
+ }
+
+ Command(String description, int paramCount) {
+ this.description = description;
+ this.paramCount = paramCount;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public int getParamCount() {
+ return paramCount;
+ }
+
+ public static Command fromString(String command) {
+ for (Command cmd : values()) {
+ if (cmd.name().equals(command)) {
+ return cmd;
+ }
+ }
+ throw new IllegalArgumentException("Unrecognized command. Hint: help");
+ }
+ };
+
+ private final String[] HELP = new String[]{
+ "Interactive TweetShell",
+ "",
+ "Available commands:"
+ };
+
+ private void execute(Command command, String[] params) throws IOException {
+ if (params.length < command.getParamCount()) {
+ throw new IllegalArgumentException("Expected "
+ + command.getParamCount() + " parameters, got only "
+ + params.length);
+ }
+ switch (command) {
+ case add:
+ getStream().watchKeyword(params[0]);
+ break;
+ case del:
+ getStream().unwatchKeyword(params[0]);
+ break;
+ case commit:
+ getStream().commit();
+ break;
+ case close:
+ getStream().close();
+ break;
+ case test:
+ getStream().poll();
+ break;
+ case help:
+ for (String line : HELP) {
+ System.out.println(line);
+ }
+ for (Command cmd : Command.values()) {
+ System.out.printf(" %-10s", cmd.name());
+ if (!cmd.getDescription().isEmpty()) {
+ System.out.print(" " + cmd.getDescription());
+ }
+ if (cmd.getParamCount() == 1) {
+ System.out.print(" (1 arg)");
+ } else if (cmd.getParamCount() > 1) {
+ System.out.printf(" (%d args)", cmd.getParamCount());
+ }
+ System.out.println();
+ }
+ break;
+ case exit:
+ throw new NoSuchElementException();
+ default:
+ throw new AssertionError(command.name());
+ }
+ }
+}
diff --git a/src/mining/Stream.java b/src/mining/Stream.java
index 36fba1b..083a6c8 100644
--- a/src/mining/Stream.java
+++ b/src/mining/Stream.java
@@ -39,6 +39,11 @@ public interface Stream {
public void close();
/**
+ * Blocking call until a new event is available.
+ */
+ public void poll();
+
+ /**
* Test whether the stream is ready for streaming
*
* @return true if connection can be made, false otherwise.