summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-06-11 13:23:47 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-06-11 13:23:47 +0200
commit072c59d7dc3389e84f715ac46033ab43a4971db6 (patch)
treec88f03378c77e244c7736e65984693822836f6f4 /src
parent8a7d2e7fbd2b9b5a2a365431f000c5c1a3d6efe5 (diff)
downloadGoldfarmer-072c59d7dc3389e84f715ac46033ab43a4971db6.tar.gz
getBrands: add periodic status reporting, save memory
Prevent out of memory error due to buffering the full resultset. This is done by disabling auto commit (starting a transaction) and setting a smaller fetch size. Watcher is based on my work in Datafiller.
Diffstat (limited to 'src')
-rw-r--r--src/database/BrandAnalyzerInserter.java48
-rw-r--r--src/database/BrandAnalyzerQueue.java1
-rw-r--r--src/main/Analyzor.java81
-rw-r--r--src/main/Watcher.java59
4 files changed, 158 insertions, 31 deletions
diff --git a/src/database/BrandAnalyzerInserter.java b/src/database/BrandAnalyzerInserter.java
new file mode 100644
index 0000000..f7ed62c
--- /dev/null
+++ b/src/database/BrandAnalyzerInserter.java
@@ -0,0 +1,48 @@
+package database;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.Callable;
+import main.Watcher;
+
+/**
+ * Inserts queued brand analyses into the database.
+ *
+ * @author Peter Wu
+ */
+public class BrandAnalyzerInserter implements Callable<Integer>,
+ Watcher.Progressable {
+
+ private final BrandAnalyzerQueue analyzer;
+ private final NamedPreparedStatement insertBrand;
+ private volatile int count;
+
+ public BrandAnalyzerInserter(Connection connection,
+ BrandAnalyzerQueue analyzer) throws SQLException {
+ this.analyzer = analyzer;
+ this.insertBrand = new NamedPreparedStatement(connection,
+ "INSERT INTO mentionsbrand"
+ + " SELECT :tweetid AS tweetid, :brand AS brand"
+ + " WHERE NOT EXISTS (SELECT 1 FROM mentionsbrand WHERE"
+ + " tweetid=:tweetid AND brand=:brand)");
+ }
+
+ @Override
+ public Integer call() throws SQLException {
+ BrandAnalyzerQueue.Result result;
+ while ((result = analyzer.next()) != null) {
+ for (String brand : result.brands) {
+ QueryUtils.setInsertBrandParams(insertBrand, result.tweetid, brand);
+ insertBrand.executeUpdate();
+ }
+
+ count++;
+ }
+ return count;
+ }
+
+ @Override
+ public int getCount() {
+ return count;
+ }
+}
diff --git a/src/database/BrandAnalyzerQueue.java b/src/database/BrandAnalyzerQueue.java
index d4e4029..21d0290 100644
--- a/src/database/BrandAnalyzerQueue.java
+++ b/src/database/BrandAnalyzerQueue.java
@@ -10,6 +10,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
+ * Retrieves tweets from a statement and queues the brand analysis.
*
* @author Peter Wu
*/
diff --git a/src/main/Analyzor.java b/src/main/Analyzor.java
index 2f6e4b5..9e26c97 100644
--- a/src/main/Analyzor.java
+++ b/src/main/Analyzor.java
@@ -1,6 +1,6 @@
package main;
-import analysis.BrandChecker;
+import database.BrandAnalyzerInserter;
import database.BrandAnalyzerQueue;
import database.NamedPreparedStatement;
import database.QueryUtils;
@@ -16,11 +16,17 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
-import java.util.List;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Scanner;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* The sentiment analysis class that rates tweets based on a unigram and bigram
@@ -257,6 +263,7 @@ public class Analyzor {
*/
public void getBrands(String queryText, boolean reset) throws SQLException {
PreparedStatement statement;
+ ResultSet tweetResults;
// make a connection to the database and execute the query
if (reset) {
System.out.println("Cleaning old entries of mentionsbrand.");
@@ -266,38 +273,50 @@ public class Analyzor {
System.out.println("Obtaining all selected entries in tweet.");
if (queryText.isEmpty()) {
- query("SELECT tweetid, text FROM tweet");
- } else {
- query(queryText);
+ queryText = "SELECT tweetid, text FROM tweet";
}
- System.out.println("Query finished.");
-
- NamedPreparedStatement insertBrand = new NamedPreparedStatement(connection, QueryUtils.insertBrand);
-
- int brandCount = 0;
- int count = 0;
- long timestamp = System.currentTimeMillis();
- BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(data);
- BrandAnalyzerQueue.Result result;
- new Thread(analyzer).start();
- while ((result = analyzer.next()) != null) {
- for (String brand : result.brands) {
- QueryUtils.setInsertBrandParams(insertBrand, result.tweetid, brand);
- insertBrand.executeUpdate();
- }
-
- brandCount += result.brands.size();
- count++;
- if (count % 10000 == 0) {
- System.err.println("Processed " + count + " tweets, inserted "
- + brandCount + " in " + ((System.currentTimeMillis() - timestamp) / 1000) + " sec");
+ // allow the use of cursors for less memory usage
+ connection.setAutoCommit(false);
+ statement = connection.prepareStatement(queryText);
+ statement.setFetchSize(1000); // fetch groups of rows to save memory
+ tweetResults = statement.executeQuery();
+
+ BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(tweetResults);
+ BrandAnalyzerInserter inserter = new BrandAnalyzerInserter(connection, analyzer);
+
+ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
+ // producer
+ scheduler.submit(analyzer);
+ // consumer which writes to the database
+ Future<Integer> consumerTask = scheduler.submit(inserter);
+ // status reporter for consumer
+ int statusInterval = 5; // TODO: do not hardcode
+ scheduler.scheduleAtFixedRate(new Watcher(inserter, statusInterval),
+ statusInterval, statusInterval, TimeUnit.SECONDS);
+ // now wait for completion of the main thread
+ long startTime = System.currentTimeMillis();
+ try {
+ while (true) {
+ try {
+ consumerTask.get();
+ break;
+ } catch (InterruptedException ex) {
+ Logger.getLogger(Analyzor.class.getName())
+ .info("Interrupted while waiting for consumer");
+ }
}
+ } catch (ExecutionException ex) {
+ Logger.getLogger(Analyzor.class.getName())
+ .log(Level.SEVERE, "Consumer failed", ex);
}
-
- System.err.println("Processed " + count + " tweets in "
- + ((System.currentTimeMillis() - timestamp) / 1000) + " sec");
- System.err.println("Finished getBrands, processed " + count
- + " number of tweets, added " + brandCount + " brands or no.");
+ // cancel status thread
+ scheduler.shutdown();
+ // end, cleanup
+ connection.setAutoCommit(true);
+ tweetResults.close();
+
+ System.err.println("Processed " + inserter.getCount() + " tweets in "
+ + ((System.currentTimeMillis() - startTime) / 1000) + " sec");
}
//gets the amount of users that tweet about a brand in a timezone
diff --git a/src/main/Watcher.java b/src/main/Watcher.java
new file mode 100644
index 0000000..3f1db47
--- /dev/null
+++ b/src/main/Watcher.java
@@ -0,0 +1,59 @@
+package main;
+
+import org.joda.time.LocalDateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * Prints some progress.
+ *
+ * @author Peter Wu
+ */
+public class Watcher implements Runnable {
+
+ private final Progressable main;
+ /**
+ * Interval in seconds in which the status information is updated.
+ */
+ private final int interval;
+ private final DateTimeFormatter df;
+ private int elapsed_seconds = 0;
+ private int previousCount = 0;
+
+ public Watcher(Progressable main, int interval) {
+ this.main = main;
+ this.interval = interval;
+ df = DateTimeFormat.forPattern("YYYY-MM-dd HH:mm:ss");
+ }
+
+ @Override
+ public void run() {
+ int newCount = main.getCount();
+ if (newCount < 1) {
+ // database connection is being established? Whatever is the case,
+ // no status information should be printed.
+ return;
+ }
+ int objects_per_min = (newCount - previousCount) * 60 / interval;
+ int objects_per_sec = objects_per_min / 60;
+ elapsed_seconds += interval;
+ // objects per min based on total number of objects and elapsed time
+ int total_objects_per_min = newCount * 60 / elapsed_seconds;
+ String datetime = df.print(new LocalDateTime());
+ // store new objects count for next sample
+ previousCount = newCount;
+ System.err.println(datetime + " Processed objects: " + newCount
+ + " (total: " + total_objects_per_min + " objects/min)"
+ + " (last interval: " + objects_per_sec + " objects/sec, "
+ + objects_per_min + " objects/min)");
+ }
+
+ public interface Progressable {
+
+ /**
+ * @return The progress of the program or 0 if it was not started yet.
+ * It must be a monotonically increasing value.
+ */
+ public int getCount();
+ }
+}