package main; import io.CompressableDataWriter; import io.DataWriter; import io.OAuthRequester; import io.StreamImpl; import java.io.Closeable; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Map; import java.util.NoSuchElementException; import java.util.Scanner; import java.util.Set; import java.util.TreeMap; import java.util.logging.Level; import java.util.logging.Logger; import mining.Stream; import mining.TwitterApi; import org.apache.commons.io.IOUtils; import org.json.JSONException; import org.json.JSONObject; import provider.CompositeResultListener; import provider.ExceptionListener; import provider.ResultListener; import support.ClassEnabledTracker; import utils.Configuration; /** * Provides an interactive shell where requests can be made and displayed. */ public class TweetShell implements TwitterApi.PinSupplier { private final Scanner scanner = new Scanner(System.in); private TwitterApi api_cached; private Stream stream_cached; private final CompositeResultListener resultListeners; /** * Whether to convert uncompressed tweet files (e.g. "tweets.txt") to the * compressed files ("tweets.txt.gz"). */ public final static String CFG_CONVERT_UNCOMPRESSED = "convert-uncompressed"; public TweetShell() { resultListeners = new CompositeResultListener(); // by default, store something that counts responses resultListeners.register(new TweetCounter()); // and something that prints tweets to console. resultListeners.register(new StreamHandler()); // see getPossibleTargets() for more listener types. // register shutdown listener to prevent corruption on SIGINT Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { safeClose(); } })); } private TwitterApi getApi() throws IOException { if (api_cached == null) { api_cached = TwitterApi.getOAuth(this); } return api_cached; } private Stream getStream() throws IOException { if (stream_cached == null) { OAuthRequester requester = (OAuthRequester) getApi().getRequester(); stream_cached = new StreamImpl(requester); StreamImpl streamObserver = (StreamImpl) stream_cached; streamObserver.setExceptionListener(new StreamExceptionHandler()); streamObserver.setResultListener(resultListeners); } return stream_cached; } private class StreamExceptionHandler implements ExceptionListener { @Override public void exceptionGenerated(Exception ex) { System.err.println("Stream closed due to " + ex); } } private class StreamHandler implements ResultListener { @Override public void tweetGenerated(JSONObject obj) { try { System.out.println("Got tweet: " + obj.getString("text")); } catch (JSONException ex) { getLogger().log(Level.SEVERE, "Failed to parse tweet", ex); } } private Logger getLogger() { return Logger.getLogger(getClass().getName()); } } @Override public String requestPin(String url) throws IOException { System.err.println(url); System.err.println("Please open the above URL and enter PIN:"); return scanner.nextLine(); } private void printPrompt() { if (stream_cached == null) { // "dollars are worthless" System.out.print("$ "); } else if (stream_cached.isValid()) { // "we make money now by receiving tweets" System.out.print("€ "); } else { // "we previously made money, but not anymore" System.out.print("ƒ "); } } /** * Processes commands from stdin until the exit command is received or EOF. */ public void process_forever() { System.err.println("Entering interactive shell, type 'help' for help " + "or 'exit' to leave. '.' repeats the previous interactive " + "command."); // print prompt for reading first command printPrompt(); String lastLine = ""; while (scanner.hasNextLine()) { String line = scanner.nextLine().trim(); // repeat last command if (line.equals(".")) { line = lastLine; } if (!execute(line)) { // requested to terminate break; } if (!line.isEmpty()) { lastLine = line; } // print prompt for reading next line printPrompt(); } // prevent corrupted compressed files when exiting without a command safeClose(); } public boolean execute(String cmd) { String[] args = cmd.trim().split("\\s+", 2); if (!args[0].isEmpty()) { // non-empty command, let's see whether it makes sense? return execute(args); } return true; } /** * Executes a command with optional parameters. * * @param args An array with the first argument containing the command with * optional parameters in following arguments. * @return true if more commands are allowed to be executed, false * otherwise. */ public boolean execute(String[] args) { try { Command command = Command.fromString(args[0]); String[] params = Arrays.copyOfRange(args, 1, args.length); execute(command, params); } catch (IllegalArgumentException ex) { System.err.println(ex.getMessage()); } catch (IOException ex) { System.err.println("Command " + args[0] + " failed with " + ex); ex.printStackTrace(); } catch (NoSuchElementException ex) { // thrown by the "exit" command to signal exit return false; } // another satisfied customer, next! return true; } enum Command { add("Adds a keyword to search", 1), del("Deletes a keyword from search", 1), keywords("Display currently active keywords"), commit("Activate the stream or apply the stream keyword changes"), status("Show some status and statistics"), close("Close the stream"), exit("Returns to shell"), help("Get help"), target("Show available and enabled targets (if no args are given). " + "If one or more args are given, change the output targets. " + "A '-' or '+' prefix only enables/disables that target. " + "With no prefix, all other targets get disabled."); private final String description; private final int paramCount; Command(String description) { this.description = description; this.paramCount = 0; } Command(String description, int paramCount) { this.description = description; this.paramCount = paramCount; } public String getDescription() { return description; } public int getParamCount() { return paramCount; } public static Command fromString(String command) { for (Command cmd : values()) { if (cmd.name().equals(command)) { return cmd; } } throw new IllegalArgumentException("Unrecognized command. Hint: help"); } }; private final String[] HELP = new String[]{ "Interactive TweetShell", "", "Available commands:" }; private void execute(Command command, String[] params) throws IOException { if (params.length < command.getParamCount()) { throw new IllegalArgumentException("Expected " + command.getParamCount() + " parameters, got only " + params.length); } switch (command) { case add: getStream().watchKeyword(params[0]); break; case del: getStream().unwatchKeyword(params[0]); break; case keywords: Set active = getStream().getKeywords(true); Set queued = getStream().getKeywords(false); if (active.isEmpty() && queued.isEmpty()) { System.out.println("No keywords found. Add some with 'add'"); } else { System.out.println("Keywords:"); } for (String keyword : active) { System.out.print(" \"" + keyword + "\""); if (!queued.contains(keyword)) { System.out.println(" (will be removed)"); } else { System.out.println(); } } queued.removeAll(active); for (String keyword : queued) { System.out.println(" \"" + keyword + "\" (will be added)"); } if (!queued.equals(active)) { System.out.println("To apply changes, run 'commit'"); } break; case commit: if (getStream().getKeywords(false).isEmpty()) { if (getStream().getKeywords(true).isEmpty()) { System.err.println("Add at least one keyword with 'add'"); } else { System.err.println("No keywords are available, the " + "stream will stay open. Stop streaming with " + "the 'close' command or add new keywords " + "with 'add'."); } } else { getStream().commit(); } break; case status: TweetCounter tc; tc = (TweetCounter) resultListeners.findListener(TweetCounter.class); if (stream_cached != null && getStream().isValid()) { System.out.println("Streaming is active."); } else { System.out.println("Streaming is inactive."); } DateFormat df = SimpleDateFormat.getDateTimeInstance(); String start_date = df.format(tc.getStartDate()); System.out.println("Started at " + start_date); System.out.println("Elapsed: " + tc.getActiveTime()); System.out.println("Received tweets in session: " + tc.getTweetCount()); System.out.println("Unique users: " + tc.getUsers().size()); break; case close: safeClose(); break; case help: for (String line : HELP) { System.out.println(line); } for (Command cmd : Command.values()) { System.out.printf(" %-10s", cmd.name()); if (!cmd.getDescription().isEmpty()) { System.out.print(" " + cmd.getDescription()); } if (cmd.getParamCount() == 1) { System.out.print(" (1 arg)"); } else if (cmd.getParamCount() > 1) { System.out.printf(" (%d args)", cmd.getParamCount()); } System.out.println(); } break; case exit: safeClose(); throw new NoSuchElementException(); case target: if (params.length > 0) { // due to limitations of shell (does not handle escapes), // do a manualy split as we cannot have spaces in our args. if (params.length == 1) { params = params[0].split("\\s+"); } configureTargets(params); } else { ClassEnabledTracker targets; targets = getPossibleTargets(); Set enabledTargets = targets.getEnabled(); System.out.println("Possible targets (* = enabled):"); for (String name : targets.getNames()) { System.out.print(" " + name); if (enabledTargets.contains(name)) { System.out.print("*"); } } System.out.println(); } break; default: throw new AssertionError(command.name()); } } private void safeClose() { if (stream_cached != null) { try { getStream().close(); } catch (IOException ex) { // should not happen because getStream exists System.err.println("getStream().close() -- " + ex.getMessage()); } } resultListeners.close(); } /** * @return All targets that can be disabled or enabled. */ private ClassEnabledTracker getPossibleTargets() { Map> targets = new TreeMap<>(); targets.put("file", DataWriter.class); targets.put("cfile", CompressableDataWriter.class); targets.put("shell", StreamHandler.class); ClassEnabledTracker targetFoo = new ClassEnabledTracker<>(targets); targetFoo.disableAll(); targetFoo.enableClasses(resultListeners.getRegistered()); return targetFoo; } /** * Process enable and disable target parameters. */ private void configureTargets(String[] params) { ClassEnabledTracker targets = getPossibleTargets(); for (String type : params) { String name; // whether to remove, add targets or restrict to one if (type.startsWith("-") || type.startsWith("+")) { name = type.substring(1); } else { name = type; } if (!targets.has(name)) { System.err.println("Unrecognized target: " + name); continue; } // queue for enable or disable if (type.startsWith("-")) { targets.disable(name); } else if (type.startsWith("+")) { targets.enable(name); } else { targets.disableAll(); targets.enable(name); } } for (String name : targets.getDisabled()) { if (disableTarget(targets.getClassByName(name))) { System.err.println("Disabled " + name); } } for (String name : targets.getEnabled()) { if (enableTarget(targets.getClassByName(name))) { System.err.println("Enabled " + name); } } } private boolean enableTarget(Class rlCls) { ResultListener oldListener = resultListeners.findListener(rlCls); // don't add it again if already activated if (oldListener != null) { return false; } if (DataWriter.class.isAssignableFrom(rlCls)) { Configuration config = Configuration.getConfig(); if (resultListeners.findListener(CompressableDataWriter.class) != null || resultListeners.findListener(DataWriter.class) != null) { System.err.println("Cannot enable both file and cfile."); return false; } String tweetsFilename = config.getProperty(DataWriter.CFG_TWEETS_FILENAME); try { DataWriter dw; if (CompressableDataWriter.class.isAssignableFrom(rlCls)) { // compressed stream, convert by default (removing orig) boolean convertUncompressed = Boolean.parseBoolean( config.getProperty(CFG_CONVERT_UNCOMPRESSED, "true")); dw = new CompressableDataWriter( tweetsFilename, convertUncompressed); } else { dw = new DataWriter(tweetsFilename); } dw.open(); resultListeners.register(dw); } catch (IOException ex) { System.err.println("Could not open file for storing tweets:"); System.err.println(ex.getMessage()); return false; } } else if (rlCls == StreamHandler.class) { resultListeners.register(new StreamHandler()); } return true; } private boolean disableTarget(Class rlCls) { ResultListener oldListener = resultListeners.findListener(rlCls); // no need for action if not activated if (oldListener == null) { return false; } resultListeners.unregister(oldListener); // do we need to cleanup something? if (oldListener instanceof Closeable) { IOUtils.closeQuietly((Closeable) oldListener); } return true; } }