summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-06-04 17:13:19 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-06-04 17:13:19 +0200
commite14c671f3cbb56a08d765bd992e4cf774a0d1353 (patch)
tree48294b4824105f2500a3529e10cc9272505bd428
parent6ed880b928ceaee3935562c2eb975ddaa49a8530 (diff)
downloadGoldfarmer-e14c671f3cbb56a08d765bd992e4cf774a0d1353.tar.gz
Producer / consumer for getBrands
-rw-r--r--src/database/BrandAnalyzerQueue.java88
-rw-r--r--src/main/Analyzor.java31
2 files changed, 102 insertions, 17 deletions
diff --git a/src/database/BrandAnalyzerQueue.java b/src/database/BrandAnalyzerQueue.java
new file mode 100644
index 0000000..d4e4029
--- /dev/null
+++ b/src/database/BrandAnalyzerQueue.java
@@ -0,0 +1,88 @@
+package database;
+
+import analysis.BrandChecker;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ *
+ * @author Peter Wu
+ */
+public class BrandAnalyzerQueue implements Runnable {
+
+ private final BrandChecker checker;
+ private final ResultSet data;
+ private final BlockingQueue<Result> queue;
+ private volatile boolean last = false;
+
+ public BrandAnalyzerQueue(ResultSet data) {
+ this.checker = new BrandChecker("brandonlyrules.txt");
+ this.data = data;
+ this.queue = new ArrayBlockingQueue<>(1000);
+ }
+
+ private Logger getLogger() {
+ return Logger.getLogger(BrandAnalyzerQueue.class.getName());
+ }
+
+ @Override
+ public void run() {
+ try {
+ fillQueue();
+ } catch (SQLException ex) {
+ getLogger().log(Level.SEVERE, "Horrible! Database error", ex);
+ } catch (InterruptedException ex) {
+ getLogger().log(Level.SEVERE, "Interrupted!", ex);
+ }
+ try {
+ last = true;
+ queue.put(new Result(-1, null));
+ } catch (InterruptedException ex) {
+ getLogger().log(Level.SEVERE, "Failed to insert suicide pill!");
+ }
+ }
+
+ private void fillQueue() throws SQLException, InterruptedException {
+ while (data.next()) {
+ List<String> brands = checker.getBrands(data.getString("text"));
+ // if there is no brand, add a dummy so we know it got checked
+ if (brands.isEmpty()) {
+ brands.add("no");
+ }
+ long tweetid = data.getLong("tweetid");
+ Result result = new Result(tweetid, brands);
+ queue.put(result);
+ }
+ }
+
+ public Result next() {
+ Result result = null;
+ try {
+ if (!last) {
+ result = queue.take();
+ if (result.brands == null) {
+ result = null;
+ }
+ }
+ } catch (InterruptedException ex) {
+ getLogger().log(Level.SEVERE, "Interrupted!", ex);
+ }
+ return result;
+ }
+
+ public static class Result {
+
+ public final long tweetid;
+ public final List<String> brands;
+
+ public Result(long tweetid, List<String> brands) {
+ this.tweetid = tweetid;
+ this.brands = brands;
+ }
+ }
+}
diff --git a/src/main/Analyzor.java b/src/main/Analyzor.java
index 5a201be..810fc4d 100644
--- a/src/main/Analyzor.java
+++ b/src/main/Analyzor.java
@@ -1,6 +1,7 @@
package main;
import analysis.BrandChecker;
+import database.BrandAnalyzerQueue;
import database.NamedPreparedStatement;
import database.QueryUtils;
import java.io.File;
@@ -255,8 +256,6 @@ public class Analyzor {
* @throws SQLException If the query is unsuccesfull.
*/
public void getBrands(String queryText, boolean reset) throws SQLException {
- BrandChecker checker = new BrandChecker("brandonlyrules.txt");
-
PreparedStatement statement;
// make a connection to the database and execute the query
if (reset) {
@@ -278,29 +277,27 @@ public class Analyzor {
int brandCount = 0;
int count = 0;
long timestamp = System.currentTimeMillis();
-
- while (data.next()) {
- List<String> brands = checker.getBrands(data.getString("text"));
- if (brands.isEmpty()) {
- brandCount++;
- QueryUtils.setInsertBrandParams(insertBrand, data.getLong("tweetid"), "no");
+ BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(data);
+ BrandAnalyzerQueue.Result result;
+ new Thread(analyzer).start();
+ while ((result = analyzer.next()) != null) {
+ for (String brand : result.brands) {
+ QueryUtils.setInsertBrandParams(insertBrand, result.tweetid, brand);
insertBrand.executeUpdate();
- } else {
- brandCount += brands.size();
- for (String brand : brands) {
- QueryUtils.setInsertBrandParams(insertBrand, data.getLong("tweetid"), brand);
- insertBrand.executeUpdate();
- }
}
+ brandCount += result.brands.size();
count++;
if (count % 10000 == 0) {
- System.out.println("Processed " + count + " tweets in " + (System.currentTimeMillis() - timestamp) + " ms");
+ System.err.println("Processed " + count + " tweets, inserted "
+ + brandCount + " in " + ((System.currentTimeMillis() - timestamp) / 1000) + " sec");
}
}
- System.out.println("Processed " + count + " tweets in " + (System.currentTimeMillis() - timestamp) + " ms");
- System.out.println("Finished getBrands, processed " + count + " number of tweets, added " + brandCount + " brands or no.");
+ System.err.println("Processed " + count + " tweets in "
+ + ((System.currentTimeMillis() - timestamp) / 1000) + " sec");
+ System.err.println("Finished getBrands, processed " + count
+ + " number of tweets, added " + brandCount + " brands or no.");
}
//gets the amount of users that tweet about a brand in a timezone