summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-05-11 11:58:48 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-05-11 11:58:48 +0200
commit16d71f8894963b7495ca0e57d50b91142ed3b506 (patch)
tree54a96310000c7922148ebc70930356d812ffca85
parent3705d583a0cdc40f945ac9962a2414fcb699fad6 (diff)
downloadDatafiller-16d71f8894963b7495ca0e57d50b91142ed3b506.tar.gz
Report status while importing (add --status option)
* Add status reporting to know how many tweets are already imported. * Remove spam from DataFiller when no brand is detected
-rw-r--r--src/main/DataFiller.java4
-rw-r--r--src/main/Main.java76
-rw-r--r--src/main/Watcher.java45
3 files changed, 112 insertions, 13 deletions
diff --git a/src/main/DataFiller.java b/src/main/DataFiller.java
index f62bbac..55bc273 100644
--- a/src/main/DataFiller.java
+++ b/src/main/DataFiller.java
@@ -118,10 +118,6 @@ public class DataFiller {
}
// TODO: WTF IS THIS PILE OF SHIT?!
- if (result.isEmpty()) {
- result.add("geen");
- System.out.println(text);
- }
return result;
}
}
diff --git a/src/main/Main.java b/src/main/Main.java
index 809d6b0..e076d32 100644
--- a/src/main/Main.java
+++ b/src/main/Main.java
@@ -10,13 +10,19 @@ import io.TweetReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* The main class.
*/
-public class Main {
+public class Main implements Callable<Boolean> {
/**
* The main method of the application.
@@ -32,7 +38,37 @@ public class Main {
System.exit(1);
return;
}
- main.run();
+
+ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
+ // the main IO thread
+ Future<Boolean> mainTask = scheduler.submit(main);
+
+ // the status thread
+ if (main.statusInterval > 0) {
+ scheduler.scheduleAtFixedRate(new Watcher(main, main.statusInterval),
+ main.statusInterval, main.statusInterval, TimeUnit.SECONDS);
+ }
+ // now wait for completion of the main thread
+ try {
+ while (true) {
+ try {
+ if (mainTask.get()) {
+ System.out.println("Import successful.");
+ } else {
+ System.out.println("Import (partially) failed.");
+ }
+ break;
+ } catch (InterruptedException ex) {
+ Logger.getLogger(Main.class.getName())
+ .info("Interrupted while waiting for main");
+ }
+ }
+ } catch (ExecutionException ex) {
+ Logger.getLogger(Main.class.getName())
+ .log(Level.SEVERE, "Main failed", ex);
+ }
+ // cancel status thread
+ scheduler.shutdown();
}
private String m_filename;
@@ -41,6 +77,7 @@ public class Main {
* Whether the database should be contacted or not.
*/
private boolean skipDb;
+ private Integer statusInterval;
public Main(String[] args) {
// default connection properties
@@ -50,22 +87,29 @@ public class Main {
.setPassword("2IOC02")
.setDbName("twitter");
skipDb = false;
+ statusInterval = 2;
/* parse the global options. */
parseGlobalOptions(args);
}
- private void printTweets(ITweetReader reader) throws IOException {
+ /**
+ * The current tweet number that is being processed.
+ */
+ private volatile int tweetNo;
+
+ private boolean printTweets(ITweetReader reader) throws IOException {
Tweet tweet;
- long tweetNo = 1;
+ tweetNo = 1;
while ((tweet = reader.getTweet()) != null) {
System.out.println("/*" + tweetNo++ + "*/ " + tweet);
}
+ return true;
}
- private void tweetsToDb(ITweetReader reader) throws IOException {
+ private boolean tweetsToDb(ITweetReader reader) throws IOException {
Tweet tweet = null;
- int tweetNo = 1;
+ tweetNo = 1;
try (Connection connection = cb.create()) {
/* create the object that fills the database */
DataFiller filler = new DataFiller(connection);
@@ -73,6 +117,7 @@ public class Main {
filler.processTweet(tweet);
++tweetNo;
}
+ return true;
} catch (JsonParseException ex) {
if (tweet != null) {
System.err.println("Faulty tweet " + tweetNo + ": " + tweet);
@@ -86,10 +131,17 @@ public class Main {
Logger.getLogger(Main.class.getName()).log(Level.SEVERE,
"DB error", ex);
}
+ return false;
}
- public void run() {
+ public int getProcessedTweets() {
+ return tweetNo;
+ }
+
+ @Override
+ public Boolean call() {
ITweetReader reader = null;
+ boolean success = false;
try {
if (m_filename == null) {
reader = new TweetReader(System.in);
@@ -97,9 +149,9 @@ public class Main {
reader = new FileTweetReader(m_filename);
}
if (skipDb) {
- printTweets(reader);
+ success = printTweets(reader);
} else {
- tweetsToDb(reader);
+ success = tweetsToDb(reader);
}
} catch (JsonSyntaxException ex) {
System.err.println("Got an invalid tweet: " + ex);
@@ -110,6 +162,8 @@ public class Main {
reader.close();
}
}
+
+ return success;
}
private void parseGlobalOptions(String[] args)
@@ -131,6 +185,8 @@ public class Main {
cb.setDbName(getArg(args, ++i, "--dbname"));
} else if ("--skipdb".equals(args[i])) {
skipDb = true;
+ } else if ("--status".equals(args[i])) {
+ statusInterval = Integer.valueOf(getArg(args, ++i, "--status"));
} else if (args[i].startsWith("-")) {
throw new IllegalArgumentException("Invalid option: " + args[i]);
} else {
@@ -171,6 +227,8 @@ public class Main {
" --dbport PORT Database port (defaults to 5432)",
" --dbname NAME Database name (defaults to 'Twitter')",
" --skipdb Do not contact the database at all, just print data.",
+ " --status SECS The interval in which import status information",
+ " should be printed, zero disables it (defaults to 2)",
"",
"If no tweets file is given, data will be read from standard input."
};
diff --git a/src/main/Watcher.java b/src/main/Watcher.java
new file mode 100644
index 0000000..bb790f0
--- /dev/null
+++ b/src/main/Watcher.java
@@ -0,0 +1,45 @@
+package main;
+
+import org.joda.time.LocalDateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * While the DataFiller is working, get some status information.
+ *
+ * @author Peter Wu
+ */
+class Watcher implements Runnable {
+
+ private final Main main;
+ /**
+ * Interval in seconds in which the status information is updated.
+ */
+ private final int interval;
+ private final DateTimeFormatter df;
+ private int elapsed_seconds = 0;
+ private int previousCount = 0;
+
+ public Watcher(Main main, int interval) {
+ this.main = main;
+ this.interval = interval;
+ df = DateTimeFormat.forPattern("YYYY-MM-dd HH:mm:ss");
+ }
+
+ @Override
+ public void run() {
+ int newCount = main.getProcessedTweets();
+ int tweets_per_min = (newCount - previousCount) * 60 / interval;
+ int tweets_per_sec = tweets_per_min / 60;
+ elapsed_seconds += interval;
+ // tweets per min based on total number of tweets and elapsed time
+ int total_tweets_per_min = newCount * 60 / elapsed_seconds;
+ String datetime = df.print(new LocalDateTime());
+ // store new tweets count for next sample
+ previousCount = newCount;
+ System.err.println(datetime + " Processed tweets: " + newCount
+ + " (total: " + total_tweets_per_min + " tweet/min)"
+ + " (last interval: " + tweets_per_sec + " tweets/sec, "
+ + tweets_per_min + " tweets/min)");
+ }
+}