From 77f99040f09f1ceec24feb97b16280675c514eaa Mon Sep 17 00:00:00 2001 From: Peter Wu Date: Wed, 11 Jun 2014 14:25:27 +0200 Subject: Use more workers for consumers This improves the performance from 60k to about 95k tweets per second. --- src/main/Analyzor.java | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/src/main/Analyzor.java b/src/main/Analyzor.java index 6e6ca67..39eb899 100644 --- a/src/main/Analyzor.java +++ b/src/main/Analyzor.java @@ -17,8 +17,10 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map.Entry; import java.util.Scanner; import java.util.concurrent.ExecutionException; @@ -309,23 +311,46 @@ public class Analyzor { tweetResults = statement.executeQuery(); BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(tweetResults); - BrandAnalyzerInserter inserter = new BrandAnalyzerInserter(connBuilder.create(), analyzer); - - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3); + final BrandAnalyzerInserter[] inserters = new BrandAnalyzerInserter[2]; + for (int i = 0; i < inserters.length; i++) { + // create new connection for each worker + inserters[i] + = new BrandAnalyzerInserter(connBuilder.create(), analyzer); + }; + + ScheduledExecutorService scheduler; + // executor with threads for producer, status and consumers + scheduler = Executors.newScheduledThreadPool(2 + inserters.length); // producer scheduler.submit(analyzer); // consumer which writes to the database - Future consumerTask = scheduler.submit(inserter); - // status reporter for consumer + List> consumerTasks = new ArrayList<>(); + for (BrandAnalyzerInserter inserter : inserters) { + Future consumerTask = scheduler.submit(inserter); + consumerTasks.add(consumerTask); + } + // status reporter for consumers + Watcher.Progressable progress = new Watcher.Progressable() { + @Override + public int getCount() { + int count = 0; + for (Watcher.Progressable p : inserters) { + count += p.getCount(); + } + return count; + } + }; int statusInterval = 5; // TODO: do not hardcode - scheduler.scheduleAtFixedRate(new Watcher(inserter, statusInterval), + scheduler.scheduleAtFixedRate(new Watcher(progress, statusInterval), statusInterval, statusInterval, TimeUnit.SECONDS); // now wait for completion of the main thread long startTime = System.currentTimeMillis(); try { while (true) { try { - consumerTask.get(); + for (Future consumerTask : consumerTasks) { + consumerTask.get(); + } break; } catch (InterruptedException ex) { Logger.getLogger(Analyzor.class.getName()) @@ -342,7 +367,7 @@ public class Analyzor { connection.setAutoCommit(true); tweetResults.close(); - System.err.println("Processed " + inserter.getCount() + " tweets in " + System.err.println("Processed " + progress.getCount() + " tweets in " + ((System.currentTimeMillis() - startTime) / 1000) + " sec"); } -- cgit v1.2.1