From 1f5952ccc4f13875608b1270809ea51904b988d5 Mon Sep 17 00:00:00 2001 From: Peter Wu Date: Thu, 1 May 2014 01:42:21 +0200 Subject: [WIP] Initial hacky TweetShell In TESTING! DefaultOAuthConsumer does not play nice with POST requests... need to fix that somehow. --- src/io/StreamImpl.java | 28 +++++- src/main/Main.java | 9 ++ src/main/TweetShell.java | 217 +++++++++++++++++++++++++++++++++++++++++++++++ src/mining/Stream.java | 5 ++ 4 files changed, 257 insertions(+), 2 deletions(-) create mode 100644 src/main/TweetShell.java diff --git a/src/io/StreamImpl.java b/src/io/StreamImpl.java index 6d6ba64..9a9b249 100644 --- a/src/io/StreamImpl.java +++ b/src/io/StreamImpl.java @@ -12,6 +12,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import mining.Stream; +import oauth.signpost.exception.OAuthException; import org.apache.commons.io.Charsets; import org.apache.commons.lang.StringUtils; import org.json.JSONException; @@ -41,6 +42,10 @@ public class StreamImpl implements Stream { * The target that is interesting in receiving changes. */ private final ExceptionListener exceptionListener; + /** + * Used for signing messages. + */ + private final OAuthRequester oauth; /** * Holds an instance that represents an active worker that contains a thread @@ -48,9 +53,11 @@ public class StreamImpl implements Stream { */ private WorkerContainer workerContainer; - public StreamImpl(ResultListener resultListener, ExceptionListener el) { + public StreamImpl(ResultListener resultListener, ExceptionListener el, + OAuthRequester oauth) { this.resultListener = resultListener; this.exceptionListener = el; + this.oauth = oauth; } @Override @@ -83,6 +90,15 @@ public class StreamImpl implements Stream { return workerContainer != null && workerContainer.isValid(); } + @Override + public void poll() { + // HACK: change this, strengten contract + try { + workerContainer.thread.join(); + } catch (InterruptedException ex) { + } + } + @Override public void close() { disconnect(); @@ -143,7 +159,7 @@ public class StreamImpl implements Stream { private final String keywords; private final HttpURLConnection connection; - private AtomicBoolean running; + private final AtomicBoolean running; Worker(String keywords) throws IOException { this.keywords = keywords; @@ -170,6 +186,11 @@ public class StreamImpl implements Stream { // connect and send request conn.setDoOutput(true); conn.getOutputStream().write(postData.getBytes(Charsets.UTF_8)); + try { + oauth.sign(conn); + } catch (OAuthException ex) { + throw new IOException("Unable to sign request", ex); + } int respCode = conn.getResponseCode(); if (respCode != 200) { getLogger().severe("Response code " + respCode); @@ -218,6 +239,8 @@ public class StreamImpl implements Stream { } private void parseMainLoop(InputStream is) throws IOException { + // Note: one message per CRLF-terminated line + // See https://dev.twitter.com/docs/streaming-apis/messages InputStreamReader isr = new InputStreamReader(is, Charsets.UTF_8); BufferedReader reader = new BufferedReader(isr); JSONTokener jsonTokener = new JSONTokener(reader); @@ -237,6 +260,7 @@ public class StreamImpl implements Stream { } private void processReceivedObject(JSONObject obj) { + System.out.println(obj); } private Logger getLogger() { diff --git a/src/main/Main.java b/src/main/Main.java index 05429fb..f7a9203 100644 --- a/src/main/Main.java +++ b/src/main/Main.java @@ -87,6 +87,7 @@ public class Main { user, tweet, searchtweets, + shell, hack, help; @@ -164,6 +165,14 @@ public class Main { System.out.println(" " + cmd.name()); } break; + case shell: + TweetShell shell = new TweetShell(); + // pass any remaining parameters to the shell + if (params.length > 0) { + shell.execute(params); + } + shell.process_forever(); + break; default: throw new AssertionError(command.name()); } diff --git a/src/main/TweetShell.java b/src/main/TweetShell.java new file mode 100644 index 0000000..533939b --- /dev/null +++ b/src/main/TweetShell.java @@ -0,0 +1,217 @@ +package main; + +import io.OAuthRequester; +import io.StreamImpl; +import java.io.IOException; +import java.util.Arrays; +import java.util.NoSuchElementException; +import java.util.Scanner; +import mining.Stream; +import mining.TwitterApi; +import org.json.JSONObject; +import provider.ExceptionListener; +import provider.ResultListener; + +/** + * Provides an interactive shell where requests can be made and displayed. + */ +public class TweetShell implements TwitterApi.PinSupplier { + + private final Scanner scanner = new Scanner(System.in); + + public TweetShell() { + + } + private TwitterApi api_cached; + private Stream stream_cached; + + private TwitterApi getApi() throws IOException { + if (api_cached == null) { + api_cached = TwitterApi.getOAuth(this); + } + return api_cached; + } + + private Stream getStream() throws IOException { + if (stream_cached == null) { + OAuthRequester requester = (OAuthRequester) getApi().getRequester(); + StreamHandler handler = new StreamHandler(); + stream_cached = new StreamImpl(handler, handler, requester); + } + return stream_cached; + } + + private class StreamHandler implements ResultListener, ExceptionListener { + + @Override + public void exceptionGenerated(Exception ex) { + System.err.println("Stream closed due to " + ex); + } + + @Override + public void tweetGenerated(JSONObject obj) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public void profileGenerated(JSONObject obj) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + } + + @Override + public String requestPin(String url) throws IOException { + System.err.println(url); + System.err.println("Please open the above URL and enter PIN:"); + return scanner.nextLine(); + } + + private void printPrompt() { + if (stream_cached == null) { + System.out.print("$ "); + } else { + System.out.print("€ "); + } + } + /** + * Processes commands from stdin until the exit command is received or EOF. + */ + public void process_forever() { + System.err.println("Entering interactive shell, type 'help' for help " + + "or 'exit' to leave."); + // print prompt for reading first command + printPrompt(); + while (scanner.hasNextLine()) { + String line = scanner.nextLine().trim(); + String[] args = line.split("\\s+", 2); + if (!args[0].isEmpty()) { + // non-empty command, let's see whether it makes sense? + if (!execute(args)) { + // requested to terminate + break; + } + } + // print prompt for reading next line + printPrompt(); + } + } + + /** + * Executes a command with optional parameters. + * + * @param args An array with the first argument containing the command with + * optional parameters in following arguments. + * @return true if more commands are allowed to be executed, false + * otherwise. + */ + public boolean execute(String[] args) { + try { + Command command = Command.fromString(args[0]); + String[] params = Arrays.copyOfRange(args, 1, args.length); + execute(command, params); + } catch (IllegalArgumentException ex) { + System.err.println(ex.getMessage()); + } catch (IOException ex) { + System.err.println("Command " + args[0] + " failed with " + ex); + ex.printStackTrace(); + } catch (NoSuchElementException ex) { + // thrown by the "exit" command to signal exit + return false; + } + // another satisfied customer, next! + return true; + } + + enum Command { + + add("Adds a keyword to search", 1), + del("Deletes a keyword from search", 1), + commit("Activate the stream or apply the stream keyword changes"), + close("Close the stream"), + test(""), + exit("Returns to shell"), + help("Get help"); + + private final String description; + private final int paramCount; + + Command(String description) { + this.description = description; + this.paramCount = 0; + } + + Command(String description, int paramCount) { + this.description = description; + this.paramCount = paramCount; + } + + public String getDescription() { + return description; + } + + public int getParamCount() { + return paramCount; + } + + public static Command fromString(String command) { + for (Command cmd : values()) { + if (cmd.name().equals(command)) { + return cmd; + } + } + throw new IllegalArgumentException("Unrecognized command. Hint: help"); + } + }; + + private final String[] HELP = new String[]{ + "Interactive TweetShell", + "", + "Available commands:" + }; + + private void execute(Command command, String[] params) throws IOException { + if (params.length < command.getParamCount()) { + throw new IllegalArgumentException("Expected " + + command.getParamCount() + " parameters, got only " + + params.length); + } + switch (command) { + case add: + getStream().watchKeyword(params[0]); + break; + case del: + getStream().unwatchKeyword(params[0]); + break; + case commit: + getStream().commit(); + break; + case close: + getStream().close(); + break; + case test: + getStream().poll(); + break; + case help: + for (String line : HELP) { + System.out.println(line); + } + for (Command cmd : Command.values()) { + System.out.printf(" %-10s", cmd.name()); + if (!cmd.getDescription().isEmpty()) { + System.out.print(" " + cmd.getDescription()); + } + if (cmd.getParamCount() == 1) { + System.out.print(" (1 arg)"); + } else if (cmd.getParamCount() > 1) { + System.out.printf(" (%d args)", cmd.getParamCount()); + } + System.out.println(); + } + break; + case exit: + throw new NoSuchElementException(); + default: + throw new AssertionError(command.name()); + } + } +} diff --git a/src/mining/Stream.java b/src/mining/Stream.java index 36fba1b..083a6c8 100644 --- a/src/mining/Stream.java +++ b/src/mining/Stream.java @@ -38,6 +38,11 @@ public interface Stream { */ public void close(); + /** + * Blocking call until a new event is available. + */ + public void poll(); + /** * Test whether the stream is ready for streaming * -- cgit v1.2.1