diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/database/BrandAnalyzerQueue.java | 88 | ||||
-rw-r--r-- | src/main/Analyzor.java | 31 |
2 files changed, 102 insertions, 17 deletions
diff --git a/src/database/BrandAnalyzerQueue.java b/src/database/BrandAnalyzerQueue.java new file mode 100644 index 0000000..d4e4029 --- /dev/null +++ b/src/database/BrandAnalyzerQueue.java @@ -0,0 +1,88 @@ +package database; + +import analysis.BrandChecker; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * + * @author Peter Wu + */ +public class BrandAnalyzerQueue implements Runnable { + + private final BrandChecker checker; + private final ResultSet data; + private final BlockingQueue<Result> queue; + private volatile boolean last = false; + + public BrandAnalyzerQueue(ResultSet data) { + this.checker = new BrandChecker("brandonlyrules.txt"); + this.data = data; + this.queue = new ArrayBlockingQueue<>(1000); + } + + private Logger getLogger() { + return Logger.getLogger(BrandAnalyzerQueue.class.getName()); + } + + @Override + public void run() { + try { + fillQueue(); + } catch (SQLException ex) { + getLogger().log(Level.SEVERE, "Horrible! Database error", ex); + } catch (InterruptedException ex) { + getLogger().log(Level.SEVERE, "Interrupted!", ex); + } + try { + last = true; + queue.put(new Result(-1, null)); + } catch (InterruptedException ex) { + getLogger().log(Level.SEVERE, "Failed to insert suicide pill!"); + } + } + + private void fillQueue() throws SQLException, InterruptedException { + while (data.next()) { + List<String> brands = checker.getBrands(data.getString("text")); + // if there is no brand, add a dummy so we know it got checked + if (brands.isEmpty()) { + brands.add("no"); + } + long tweetid = data.getLong("tweetid"); + Result result = new Result(tweetid, brands); + queue.put(result); + } + } + + public Result next() { + Result result = null; + try { + if (!last) { + result = queue.take(); + if (result.brands == null) { + result = null; + } + } + } catch (InterruptedException ex) { + getLogger().log(Level.SEVERE, "Interrupted!", ex); + } + return result; + } + + public static class Result { + + public final long tweetid; + public final List<String> brands; + + public Result(long tweetid, List<String> brands) { + this.tweetid = tweetid; + this.brands = brands; + } + } +} diff --git a/src/main/Analyzor.java b/src/main/Analyzor.java index 5a201be..810fc4d 100644 --- a/src/main/Analyzor.java +++ b/src/main/Analyzor.java @@ -1,6 +1,7 @@ package main; import analysis.BrandChecker; +import database.BrandAnalyzerQueue; import database.NamedPreparedStatement; import database.QueryUtils; import java.io.File; @@ -255,8 +256,6 @@ public class Analyzor { * @throws SQLException If the query is unsuccesfull. */ public void getBrands(String queryText, boolean reset) throws SQLException { - BrandChecker checker = new BrandChecker("brandonlyrules.txt"); - PreparedStatement statement; // make a connection to the database and execute the query if (reset) { @@ -278,29 +277,27 @@ public class Analyzor { int brandCount = 0; int count = 0; long timestamp = System.currentTimeMillis(); - - while (data.next()) { - List<String> brands = checker.getBrands(data.getString("text")); - if (brands.isEmpty()) { - brandCount++; - QueryUtils.setInsertBrandParams(insertBrand, data.getLong("tweetid"), "no"); + BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(data); + BrandAnalyzerQueue.Result result; + new Thread(analyzer).start(); + while ((result = analyzer.next()) != null) { + for (String brand : result.brands) { + QueryUtils.setInsertBrandParams(insertBrand, result.tweetid, brand); insertBrand.executeUpdate(); - } else { - brandCount += brands.size(); - for (String brand : brands) { - QueryUtils.setInsertBrandParams(insertBrand, data.getLong("tweetid"), brand); - insertBrand.executeUpdate(); - } } + brandCount += result.brands.size(); count++; if (count % 10000 == 0) { - System.out.println("Processed " + count + " tweets in " + (System.currentTimeMillis() - timestamp) + " ms"); + System.err.println("Processed " + count + " tweets, inserted " + + brandCount + " in " + ((System.currentTimeMillis() - timestamp) / 1000) + " sec"); } } - System.out.println("Processed " + count + " tweets in " + (System.currentTimeMillis() - timestamp) + " ms"); - System.out.println("Finished getBrands, processed " + count + " number of tweets, added " + brandCount + " brands or no."); + System.err.println("Processed " + count + " tweets in " + + ((System.currentTimeMillis() - timestamp) / 1000) + " sec"); + System.err.println("Finished getBrands, processed " + count + + " number of tweets, added " + brandCount + " brands or no."); } //gets the amount of users that tweet about a brand in a timezone |