From 16d71f8894963b7495ca0e57d50b91142ed3b506 Mon Sep 17 00:00:00 2001 From: Peter Wu Date: Sun, 11 May 2014 11:58:48 +0200 Subject: Report status while importing (add --status option) * Add status reporting to know how many tweets are already imported. * Remove spam from DataFiller when no brand is detected --- src/main/DataFiller.java | 4 --- src/main/Main.java | 76 ++++++++++++++++++++++++++++++++++++++++++------ src/main/Watcher.java | 45 ++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 13 deletions(-) create mode 100644 src/main/Watcher.java diff --git a/src/main/DataFiller.java b/src/main/DataFiller.java index f62bbac..55bc273 100644 --- a/src/main/DataFiller.java +++ b/src/main/DataFiller.java @@ -118,10 +118,6 @@ public class DataFiller { } // TODO: WTF IS THIS PILE OF SHIT?! - if (result.isEmpty()) { - result.add("geen"); - System.out.println(text); - } return result; } } diff --git a/src/main/Main.java b/src/main/Main.java index 809d6b0..e076d32 100644 --- a/src/main/Main.java +++ b/src/main/Main.java @@ -10,13 +10,19 @@ import io.TweetReader; import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; /** * The main class. */ -public class Main { +public class Main implements Callable { /** * The main method of the application. @@ -32,7 +38,37 @@ public class Main { System.exit(1); return; } - main.run(); + + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); + // the main IO thread + Future mainTask = scheduler.submit(main); + + // the status thread + if (main.statusInterval > 0) { + scheduler.scheduleAtFixedRate(new Watcher(main, main.statusInterval), + main.statusInterval, main.statusInterval, TimeUnit.SECONDS); + } + // now wait for completion of the main thread + try { + while (true) { + try { + if (mainTask.get()) { + System.out.println("Import successful."); + } else { + System.out.println("Import (partially) failed."); + } + break; + } catch (InterruptedException ex) { + Logger.getLogger(Main.class.getName()) + .info("Interrupted while waiting for main"); + } + } + } catch (ExecutionException ex) { + Logger.getLogger(Main.class.getName()) + .log(Level.SEVERE, "Main failed", ex); + } + // cancel status thread + scheduler.shutdown(); } private String m_filename; @@ -41,6 +77,7 @@ public class Main { * Whether the database should be contacted or not. */ private boolean skipDb; + private Integer statusInterval; public Main(String[] args) { // default connection properties @@ -50,22 +87,29 @@ public class Main { .setPassword("2IOC02") .setDbName("twitter"); skipDb = false; + statusInterval = 2; /* parse the global options. */ parseGlobalOptions(args); } - private void printTweets(ITweetReader reader) throws IOException { + /** + * The current tweet number that is being processed. + */ + private volatile int tweetNo; + + private boolean printTweets(ITweetReader reader) throws IOException { Tweet tweet; - long tweetNo = 1; + tweetNo = 1; while ((tweet = reader.getTweet()) != null) { System.out.println("/*" + tweetNo++ + "*/ " + tweet); } + return true; } - private void tweetsToDb(ITweetReader reader) throws IOException { + private boolean tweetsToDb(ITweetReader reader) throws IOException { Tweet tweet = null; - int tweetNo = 1; + tweetNo = 1; try (Connection connection = cb.create()) { /* create the object that fills the database */ DataFiller filler = new DataFiller(connection); @@ -73,6 +117,7 @@ public class Main { filler.processTweet(tweet); ++tweetNo; } + return true; } catch (JsonParseException ex) { if (tweet != null) { System.err.println("Faulty tweet " + tweetNo + ": " + tweet); @@ -86,10 +131,17 @@ public class Main { Logger.getLogger(Main.class.getName()).log(Level.SEVERE, "DB error", ex); } + return false; } - public void run() { + public int getProcessedTweets() { + return tweetNo; + } + + @Override + public Boolean call() { ITweetReader reader = null; + boolean success = false; try { if (m_filename == null) { reader = new TweetReader(System.in); @@ -97,9 +149,9 @@ public class Main { reader = new FileTweetReader(m_filename); } if (skipDb) { - printTweets(reader); + success = printTweets(reader); } else { - tweetsToDb(reader); + success = tweetsToDb(reader); } } catch (JsonSyntaxException ex) { System.err.println("Got an invalid tweet: " + ex); @@ -110,6 +162,8 @@ public class Main { reader.close(); } } + + return success; } private void parseGlobalOptions(String[] args) @@ -131,6 +185,8 @@ public class Main { cb.setDbName(getArg(args, ++i, "--dbname")); } else if ("--skipdb".equals(args[i])) { skipDb = true; + } else if ("--status".equals(args[i])) { + statusInterval = Integer.valueOf(getArg(args, ++i, "--status")); } else if (args[i].startsWith("-")) { throw new IllegalArgumentException("Invalid option: " + args[i]); } else { @@ -171,6 +227,8 @@ public class Main { " --dbport PORT Database port (defaults to 5432)", " --dbname NAME Database name (defaults to 'Twitter')", " --skipdb Do not contact the database at all, just print data.", + " --status SECS The interval in which import status information", + " should be printed, zero disables it (defaults to 2)", "", "If no tweets file is given, data will be read from standard input." }; diff --git a/src/main/Watcher.java b/src/main/Watcher.java new file mode 100644 index 0000000..bb790f0 --- /dev/null +++ b/src/main/Watcher.java @@ -0,0 +1,45 @@ +package main; + +import org.joda.time.LocalDateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +/** + * While the DataFiller is working, get some status information. + * + * @author Peter Wu + */ +class Watcher implements Runnable { + + private final Main main; + /** + * Interval in seconds in which the status information is updated. + */ + private final int interval; + private final DateTimeFormatter df; + private int elapsed_seconds = 0; + private int previousCount = 0; + + public Watcher(Main main, int interval) { + this.main = main; + this.interval = interval; + df = DateTimeFormat.forPattern("YYYY-MM-dd HH:mm:ss"); + } + + @Override + public void run() { + int newCount = main.getProcessedTweets(); + int tweets_per_min = (newCount - previousCount) * 60 / interval; + int tweets_per_sec = tweets_per_min / 60; + elapsed_seconds += interval; + // tweets per min based on total number of tweets and elapsed time + int total_tweets_per_min = newCount * 60 / elapsed_seconds; + String datetime = df.print(new LocalDateTime()); + // store new tweets count for next sample + previousCount = newCount; + System.err.println(datetime + " Processed tweets: " + newCount + + " (total: " + total_tweets_per_min + " tweet/min)" + + " (last interval: " + tweets_per_sec + " tweets/sec, " + + tweets_per_min + " tweets/min)"); + } +} -- cgit v1.2.1