From e14c671f3cbb56a08d765bd992e4cf774a0d1353 Mon Sep 17 00:00:00 2001 From: Peter Wu Date: Wed, 4 Jun 2014 17:13:19 +0200 Subject: Producer / consumer for getBrands --- src/database/BrandAnalyzerQueue.java | 88 ++++++++++++++++++++++++++++++++++++ src/main/Analyzor.java | 31 ++++++------- 2 files changed, 102 insertions(+), 17 deletions(-) create mode 100644 src/database/BrandAnalyzerQueue.java diff --git a/src/database/BrandAnalyzerQueue.java b/src/database/BrandAnalyzerQueue.java new file mode 100644 index 0000000..d4e4029 --- /dev/null +++ b/src/database/BrandAnalyzerQueue.java @@ -0,0 +1,88 @@ +package database; + +import analysis.BrandChecker; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * + * @author Peter Wu + */ +public class BrandAnalyzerQueue implements Runnable { + + private final BrandChecker checker; + private final ResultSet data; + private final BlockingQueue queue; + private volatile boolean last = false; + + public BrandAnalyzerQueue(ResultSet data) { + this.checker = new BrandChecker("brandonlyrules.txt"); + this.data = data; + this.queue = new ArrayBlockingQueue<>(1000); + } + + private Logger getLogger() { + return Logger.getLogger(BrandAnalyzerQueue.class.getName()); + } + + @Override + public void run() { + try { + fillQueue(); + } catch (SQLException ex) { + getLogger().log(Level.SEVERE, "Horrible! Database error", ex); + } catch (InterruptedException ex) { + getLogger().log(Level.SEVERE, "Interrupted!", ex); + } + try { + last = true; + queue.put(new Result(-1, null)); + } catch (InterruptedException ex) { + getLogger().log(Level.SEVERE, "Failed to insert suicide pill!"); + } + } + + private void fillQueue() throws SQLException, InterruptedException { + while (data.next()) { + List brands = checker.getBrands(data.getString("text")); + // if there is no brand, add a dummy so we know it got checked + if (brands.isEmpty()) { + brands.add("no"); + } + long tweetid = data.getLong("tweetid"); + Result result = new Result(tweetid, brands); + queue.put(result); + } + } + + public Result next() { + Result result = null; + try { + if (!last) { + result = queue.take(); + if (result.brands == null) { + result = null; + } + } + } catch (InterruptedException ex) { + getLogger().log(Level.SEVERE, "Interrupted!", ex); + } + return result; + } + + public static class Result { + + public final long tweetid; + public final List brands; + + public Result(long tweetid, List brands) { + this.tweetid = tweetid; + this.brands = brands; + } + } +} diff --git a/src/main/Analyzor.java b/src/main/Analyzor.java index 5a201be..810fc4d 100644 --- a/src/main/Analyzor.java +++ b/src/main/Analyzor.java @@ -1,6 +1,7 @@ package main; import analysis.BrandChecker; +import database.BrandAnalyzerQueue; import database.NamedPreparedStatement; import database.QueryUtils; import java.io.File; @@ -255,8 +256,6 @@ public class Analyzor { * @throws SQLException If the query is unsuccesfull. */ public void getBrands(String queryText, boolean reset) throws SQLException { - BrandChecker checker = new BrandChecker("brandonlyrules.txt"); - PreparedStatement statement; // make a connection to the database and execute the query if (reset) { @@ -278,29 +277,27 @@ public class Analyzor { int brandCount = 0; int count = 0; long timestamp = System.currentTimeMillis(); - - while (data.next()) { - List brands = checker.getBrands(data.getString("text")); - if (brands.isEmpty()) { - brandCount++; - QueryUtils.setInsertBrandParams(insertBrand, data.getLong("tweetid"), "no"); + BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(data); + BrandAnalyzerQueue.Result result; + new Thread(analyzer).start(); + while ((result = analyzer.next()) != null) { + for (String brand : result.brands) { + QueryUtils.setInsertBrandParams(insertBrand, result.tweetid, brand); insertBrand.executeUpdate(); - } else { - brandCount += brands.size(); - for (String brand : brands) { - QueryUtils.setInsertBrandParams(insertBrand, data.getLong("tweetid"), brand); - insertBrand.executeUpdate(); - } } + brandCount += result.brands.size(); count++; if (count % 10000 == 0) { - System.out.println("Processed " + count + " tweets in " + (System.currentTimeMillis() - timestamp) + " ms"); + System.err.println("Processed " + count + " tweets, inserted " + + brandCount + " in " + ((System.currentTimeMillis() - timestamp) / 1000) + " sec"); } } - System.out.println("Processed " + count + " tweets in " + (System.currentTimeMillis() - timestamp) + " ms"); - System.out.println("Finished getBrands, processed " + count + " number of tweets, added " + brandCount + " brands or no."); + System.err.println("Processed " + count + " tweets in " + + ((System.currentTimeMillis() - timestamp) / 1000) + " sec"); + System.err.println("Finished getBrands, processed " + count + + " number of tweets, added " + brandCount + " brands or no."); } //gets the amount of users that tweet about a brand in a timezone -- cgit v1.2.1