summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-06-11 13:46:17 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-06-11 13:46:17 +0200
commit761c4164faac35fc34ade62ca7654573f2c07835 (patch)
treec5bc92ce4a6a581da389e0f9bd2a07b9c9e5e384
parent36055c51726264c43110c2225ba9bdc2fdcce5f2 (diff)
downloadGoldfarmer-761c4164faac35fc34ade62ca7654573f2c07835.tar.gz
Commit results early to avoid data loss on abort
-rw-r--r--src/database/BrandAnalyzerInserter.java17
-rw-r--r--src/main/Analyzor.java13
-rw-r--r--src/main/FarmShell.java3
3 files changed, 26 insertions, 7 deletions
diff --git a/src/database/BrandAnalyzerInserter.java b/src/database/BrandAnalyzerInserter.java
index f7ed62c..d42dac2 100644
--- a/src/database/BrandAnalyzerInserter.java
+++ b/src/database/BrandAnalyzerInserter.java
@@ -13,22 +13,34 @@ import main.Watcher;
public class BrandAnalyzerInserter implements Callable<Integer>,
Watcher.Progressable {
+ private final Connection connection;
private final BrandAnalyzerQueue analyzer;
private final NamedPreparedStatement insertBrand;
private volatile int count;
public BrandAnalyzerInserter(Connection connection,
BrandAnalyzerQueue analyzer) throws SQLException {
+ this.connection = connection;
this.analyzer = analyzer;
this.insertBrand = new NamedPreparedStatement(connection,
"INSERT INTO mentionsbrand"
+ " SELECT :tweetid AS tweetid, :brand AS brand"
+ " WHERE NOT EXISTS (SELECT 1 FROM mentionsbrand WHERE"
+ " tweetid=:tweetid AND brand=:brand)");
+ connection.setAutoCommit(false);
}
@Override
public Integer call() throws SQLException {
+ try {
+ runInserter();
+ } finally {
+ connection.close();
+ }
+ return count;
+ }
+
+ private void runInserter() throws SQLException {
BrandAnalyzerQueue.Result result;
while ((result = analyzer.next()) != null) {
for (String brand : result.brands) {
@@ -37,8 +49,11 @@ public class BrandAnalyzerInserter implements Callable<Integer>,
}
count++;
+ // save results every 1000 analyses
+ if (count % 1000 == 0) {
+ connection.commit();
+ }
}
- return count;
}
@Override
diff --git a/src/main/Analyzor.java b/src/main/Analyzor.java
index 2589e66..b2aad4d 100644
--- a/src/main/Analyzor.java
+++ b/src/main/Analyzor.java
@@ -2,6 +2,7 @@ package main;
import database.BrandAnalyzerInserter;
import database.BrandAnalyzerQueue;
+import database.ConnectionBuilder;
import database.NamedPreparedStatement;
import database.QueryUtils;
import java.io.File;
@@ -53,12 +54,16 @@ public class Analyzor {
* The persistent connection to the database.
*/
private final Connection connection;
+ private final ConnectionBuilder connBuilder;
/**
- * @param connection An open connection to the database.
+ * @param connBuilder A builder for new database connections.
+ * @throws java.sql.SQLException On failure to set up a new connection.
*/
- public Analyzor(Connection connection) {
- this.connection = connection;
+ public Analyzor(ConnectionBuilder connBuilder) throws SQLException {
+ // instantiate a default connection
+ this.connection = connBuilder.create();
+ this.connBuilder = connBuilder;
}
/**
@@ -301,7 +306,7 @@ public class Analyzor {
tweetResults = statement.executeQuery();
BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(tweetResults);
- BrandAnalyzerInserter inserter = new BrandAnalyzerInserter(connection, analyzer);
+ BrandAnalyzerInserter inserter = new BrandAnalyzerInserter(connBuilder.create(), analyzer);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
// producer
diff --git a/src/main/FarmShell.java b/src/main/FarmShell.java
index 525d342..d624a71 100644
--- a/src/main/FarmShell.java
+++ b/src/main/FarmShell.java
@@ -32,8 +32,7 @@ public class FarmShell {
private Analyzor getAnalyzor() throws SQLException {
if (cached_analyzor == null) {
- Connection dbCon = dbConnectionBuilder.create();
- cached_analyzor = new Analyzor(dbCon);
+ cached_analyzor = new Analyzor(dbConnectionBuilder);
}
return cached_analyzor;
}