diff options
author | Peter Wu <peter@lekensteyn.nl> | 2014-06-11 13:23:47 +0200 |
---|---|---|
committer | Peter Wu <peter@lekensteyn.nl> | 2014-06-11 13:23:47 +0200 |
commit | 072c59d7dc3389e84f715ac46033ab43a4971db6 (patch) | |
tree | c88f03378c77e244c7736e65984693822836f6f4 /src | |
parent | 8a7d2e7fbd2b9b5a2a365431f000c5c1a3d6efe5 (diff) | |
download | Goldfarmer-072c59d7dc3389e84f715ac46033ab43a4971db6.tar.gz |
getBrands: add periodic status reporting, save memory
Prevent out of memory error due to buffering the full resultset. This
is done by disabling auto commit (starting a transaction) and setting
a smaller fetch size.
Watcher is based on my work in Datafiller.
Diffstat (limited to 'src')
-rw-r--r-- | src/database/BrandAnalyzerInserter.java | 48 | ||||
-rw-r--r-- | src/database/BrandAnalyzerQueue.java | 1 | ||||
-rw-r--r-- | src/main/Analyzor.java | 81 | ||||
-rw-r--r-- | src/main/Watcher.java | 59 |
4 files changed, 158 insertions, 31 deletions
diff --git a/src/database/BrandAnalyzerInserter.java b/src/database/BrandAnalyzerInserter.java new file mode 100644 index 0000000..f7ed62c --- /dev/null +++ b/src/database/BrandAnalyzerInserter.java @@ -0,0 +1,48 @@ +package database; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.concurrent.Callable; +import main.Watcher; + +/** + * Inserts queued brand analyses into the database. + * + * @author Peter Wu + */ +public class BrandAnalyzerInserter implements Callable<Integer>, + Watcher.Progressable { + + private final BrandAnalyzerQueue analyzer; + private final NamedPreparedStatement insertBrand; + private volatile int count; + + public BrandAnalyzerInserter(Connection connection, + BrandAnalyzerQueue analyzer) throws SQLException { + this.analyzer = analyzer; + this.insertBrand = new NamedPreparedStatement(connection, + "INSERT INTO mentionsbrand" + + " SELECT :tweetid AS tweetid, :brand AS brand" + + " WHERE NOT EXISTS (SELECT 1 FROM mentionsbrand WHERE" + + " tweetid=:tweetid AND brand=:brand)"); + } + + @Override + public Integer call() throws SQLException { + BrandAnalyzerQueue.Result result; + while ((result = analyzer.next()) != null) { + for (String brand : result.brands) { + QueryUtils.setInsertBrandParams(insertBrand, result.tweetid, brand); + insertBrand.executeUpdate(); + } + + count++; + } + return count; + } + + @Override + public int getCount() { + return count; + } +} diff --git a/src/database/BrandAnalyzerQueue.java b/src/database/BrandAnalyzerQueue.java index d4e4029..21d0290 100644 --- a/src/database/BrandAnalyzerQueue.java +++ b/src/database/BrandAnalyzerQueue.java @@ -10,6 +10,7 @@ import java.util.logging.Level; import java.util.logging.Logger; /** + * Retrieves tweets from a statement and queues the brand analysis. * * @author Peter Wu */ diff --git a/src/main/Analyzor.java b/src/main/Analyzor.java index 2f6e4b5..9e26c97 100644 --- a/src/main/Analyzor.java +++ b/src/main/Analyzor.java @@ -1,6 +1,6 @@ package main; -import analysis.BrandChecker; +import database.BrandAnalyzerInserter; import database.BrandAnalyzerQueue; import database.NamedPreparedStatement; import database.QueryUtils; @@ -16,11 +16,17 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; -import java.util.List; import java.util.HashMap; import java.util.HashSet; import java.util.Map.Entry; import java.util.Scanner; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; /** * The sentiment analysis class that rates tweets based on a unigram and bigram @@ -257,6 +263,7 @@ public class Analyzor { */ public void getBrands(String queryText, boolean reset) throws SQLException { PreparedStatement statement; + ResultSet tweetResults; // make a connection to the database and execute the query if (reset) { System.out.println("Cleaning old entries of mentionsbrand."); @@ -266,38 +273,50 @@ public class Analyzor { System.out.println("Obtaining all selected entries in tweet."); if (queryText.isEmpty()) { - query("SELECT tweetid, text FROM tweet"); - } else { - query(queryText); + queryText = "SELECT tweetid, text FROM tweet"; } - System.out.println("Query finished."); - - NamedPreparedStatement insertBrand = new NamedPreparedStatement(connection, QueryUtils.insertBrand); - - int brandCount = 0; - int count = 0; - long timestamp = System.currentTimeMillis(); - BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(data); - BrandAnalyzerQueue.Result result; - new Thread(analyzer).start(); - while ((result = analyzer.next()) != null) { - for (String brand : result.brands) { - QueryUtils.setInsertBrandParams(insertBrand, result.tweetid, brand); - insertBrand.executeUpdate(); - } - - brandCount += result.brands.size(); - count++; - if (count % 10000 == 0) { - System.err.println("Processed " + count + " tweets, inserted " - + brandCount + " in " + ((System.currentTimeMillis() - timestamp) / 1000) + " sec"); + // allow the use of cursors for less memory usage + connection.setAutoCommit(false); + statement = connection.prepareStatement(queryText); + statement.setFetchSize(1000); // fetch groups of rows to save memory + tweetResults = statement.executeQuery(); + + BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(tweetResults); + BrandAnalyzerInserter inserter = new BrandAnalyzerInserter(connection, analyzer); + + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3); + // producer + scheduler.submit(analyzer); + // consumer which writes to the database + Future<Integer> consumerTask = scheduler.submit(inserter); + // status reporter for consumer + int statusInterval = 5; // TODO: do not hardcode + scheduler.scheduleAtFixedRate(new Watcher(inserter, statusInterval), + statusInterval, statusInterval, TimeUnit.SECONDS); + // now wait for completion of the main thread + long startTime = System.currentTimeMillis(); + try { + while (true) { + try { + consumerTask.get(); + break; + } catch (InterruptedException ex) { + Logger.getLogger(Analyzor.class.getName()) + .info("Interrupted while waiting for consumer"); + } } + } catch (ExecutionException ex) { + Logger.getLogger(Analyzor.class.getName()) + .log(Level.SEVERE, "Consumer failed", ex); } - - System.err.println("Processed " + count + " tweets in " - + ((System.currentTimeMillis() - timestamp) / 1000) + " sec"); - System.err.println("Finished getBrands, processed " + count - + " number of tweets, added " + brandCount + " brands or no."); + // cancel status thread + scheduler.shutdown(); + // end, cleanup + connection.setAutoCommit(true); + tweetResults.close(); + + System.err.println("Processed " + inserter.getCount() + " tweets in " + + ((System.currentTimeMillis() - startTime) / 1000) + " sec"); } //gets the amount of users that tweet about a brand in a timezone diff --git a/src/main/Watcher.java b/src/main/Watcher.java new file mode 100644 index 0000000..3f1db47 --- /dev/null +++ b/src/main/Watcher.java @@ -0,0 +1,59 @@ +package main; + +import org.joda.time.LocalDateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +/** + * Prints some progress. + * + * @author Peter Wu + */ +public class Watcher implements Runnable { + + private final Progressable main; + /** + * Interval in seconds in which the status information is updated. + */ + private final int interval; + private final DateTimeFormatter df; + private int elapsed_seconds = 0; + private int previousCount = 0; + + public Watcher(Progressable main, int interval) { + this.main = main; + this.interval = interval; + df = DateTimeFormat.forPattern("YYYY-MM-dd HH:mm:ss"); + } + + @Override + public void run() { + int newCount = main.getCount(); + if (newCount < 1) { + // database connection is being established? Whatever is the case, + // no status information should be printed. + return; + } + int objects_per_min = (newCount - previousCount) * 60 / interval; + int objects_per_sec = objects_per_min / 60; + elapsed_seconds += interval; + // objects per min based on total number of objects and elapsed time + int total_objects_per_min = newCount * 60 / elapsed_seconds; + String datetime = df.print(new LocalDateTime()); + // store new objects count for next sample + previousCount = newCount; + System.err.println(datetime + " Processed objects: " + newCount + + " (total: " + total_objects_per_min + " objects/min)" + + " (last interval: " + objects_per_sec + " objects/sec, " + + objects_per_min + " objects/min)"); + } + + public interface Progressable { + + /** + * @return The progress of the program or 0 if it was not started yet. + * It must be a monotonically increasing value. + */ + public int getCount(); + } +} |