summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-06-11 14:25:27 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-06-11 14:25:27 +0200
commit77f99040f09f1ceec24feb97b16280675c514eaa (patch)
tree0f142af9f94f9cf2cf888386abe102b693562a69
parent7b36e82bb8c521353bfa19ce4b3534bcd41fcb38 (diff)
downloadGoldfarmer-77f99040f09f1ceec24feb97b16280675c514eaa.tar.gz
Use more workers for consumers
This improves the performance from 60k to about 95k tweets per second.
-rw-r--r--src/main/Analyzor.java41
1 files changed, 33 insertions, 8 deletions
diff --git a/src/main/Analyzor.java b/src/main/Analyzor.java
index 6e6ca67..39eb899 100644
--- a/src/main/Analyzor.java
+++ b/src/main/Analyzor.java
@@ -17,8 +17,10 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map.Entry;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;
@@ -309,23 +311,46 @@ public class Analyzor {
tweetResults = statement.executeQuery();
BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(tweetResults);
- BrandAnalyzerInserter inserter = new BrandAnalyzerInserter(connBuilder.create(), analyzer);
-
- ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
+ final BrandAnalyzerInserter[] inserters = new BrandAnalyzerInserter[2];
+ for (int i = 0; i < inserters.length; i++) {
+ // create new connection for each worker
+ inserters[i]
+ = new BrandAnalyzerInserter(connBuilder.create(), analyzer);
+ };
+
+ ScheduledExecutorService scheduler;
+ // executor with threads for producer, status and consumers
+ scheduler = Executors.newScheduledThreadPool(2 + inserters.length);
// producer
scheduler.submit(analyzer);
// consumer which writes to the database
- Future<Integer> consumerTask = scheduler.submit(inserter);
- // status reporter for consumer
+ List<Future<Integer>> consumerTasks = new ArrayList<>();
+ for (BrandAnalyzerInserter inserter : inserters) {
+ Future<Integer> consumerTask = scheduler.submit(inserter);
+ consumerTasks.add(consumerTask);
+ }
+ // status reporter for consumers
+ Watcher.Progressable progress = new Watcher.Progressable() {
+ @Override
+ public int getCount() {
+ int count = 0;
+ for (Watcher.Progressable p : inserters) {
+ count += p.getCount();
+ }
+ return count;
+ }
+ };
int statusInterval = 5; // TODO: do not hardcode
- scheduler.scheduleAtFixedRate(new Watcher(inserter, statusInterval),
+ scheduler.scheduleAtFixedRate(new Watcher(progress, statusInterval),
statusInterval, statusInterval, TimeUnit.SECONDS);
// now wait for completion of the main thread
long startTime = System.currentTimeMillis();
try {
while (true) {
try {
- consumerTask.get();
+ for (Future<Integer> consumerTask : consumerTasks) {
+ consumerTask.get();
+ }
break;
} catch (InterruptedException ex) {
Logger.getLogger(Analyzor.class.getName())
@@ -342,7 +367,7 @@ public class Analyzor {
connection.setAutoCommit(true);
tweetResults.close();
- System.err.println("Processed " + inserter.getCount() + " tweets in "
+ System.err.println("Processed " + progress.getCount() + " tweets in "
+ ((System.currentTimeMillis() - startTime) / 1000) + " sec");
}