From 761c4164faac35fc34ade62ca7654573f2c07835 Mon Sep 17 00:00:00 2001 From: Peter Wu Date: Wed, 11 Jun 2014 13:46:17 +0200 Subject: Commit results early to avoid data loss on abort --- src/database/BrandAnalyzerInserter.java | 17 ++++++++++++++++- src/main/Analyzor.java | 13 +++++++++---- src/main/FarmShell.java | 3 +-- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/database/BrandAnalyzerInserter.java b/src/database/BrandAnalyzerInserter.java index f7ed62c..d42dac2 100644 --- a/src/database/BrandAnalyzerInserter.java +++ b/src/database/BrandAnalyzerInserter.java @@ -13,22 +13,34 @@ import main.Watcher; public class BrandAnalyzerInserter implements Callable, Watcher.Progressable { + private final Connection connection; private final BrandAnalyzerQueue analyzer; private final NamedPreparedStatement insertBrand; private volatile int count; public BrandAnalyzerInserter(Connection connection, BrandAnalyzerQueue analyzer) throws SQLException { + this.connection = connection; this.analyzer = analyzer; this.insertBrand = new NamedPreparedStatement(connection, "INSERT INTO mentionsbrand" + " SELECT :tweetid AS tweetid, :brand AS brand" + " WHERE NOT EXISTS (SELECT 1 FROM mentionsbrand WHERE" + " tweetid=:tweetid AND brand=:brand)"); + connection.setAutoCommit(false); } @Override public Integer call() throws SQLException { + try { + runInserter(); + } finally { + connection.close(); + } + return count; + } + + private void runInserter() throws SQLException { BrandAnalyzerQueue.Result result; while ((result = analyzer.next()) != null) { for (String brand : result.brands) { @@ -37,8 +49,11 @@ public class BrandAnalyzerInserter implements Callable, } count++; + // save results every 1000 analyses + if (count % 1000 == 0) { + connection.commit(); + } } - return count; } @Override diff --git a/src/main/Analyzor.java b/src/main/Analyzor.java index 2589e66..b2aad4d 100644 --- a/src/main/Analyzor.java +++ b/src/main/Analyzor.java @@ -2,6 +2,7 @@ package main; import database.BrandAnalyzerInserter; import database.BrandAnalyzerQueue; +import database.ConnectionBuilder; import database.NamedPreparedStatement; import database.QueryUtils; import java.io.File; @@ -53,12 +54,16 @@ public class Analyzor { * The persistent connection to the database. */ private final Connection connection; + private final ConnectionBuilder connBuilder; /** - * @param connection An open connection to the database. + * @param connBuilder A builder for new database connections. + * @throws java.sql.SQLException On failure to set up a new connection. */ - public Analyzor(Connection connection) { - this.connection = connection; + public Analyzor(ConnectionBuilder connBuilder) throws SQLException { + // instantiate a default connection + this.connection = connBuilder.create(); + this.connBuilder = connBuilder; } /** @@ -301,7 +306,7 @@ public class Analyzor { tweetResults = statement.executeQuery(); BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(tweetResults); - BrandAnalyzerInserter inserter = new BrandAnalyzerInserter(connection, analyzer); + BrandAnalyzerInserter inserter = new BrandAnalyzerInserter(connBuilder.create(), analyzer); ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3); // producer diff --git a/src/main/FarmShell.java b/src/main/FarmShell.java index 525d342..d624a71 100644 --- a/src/main/FarmShell.java +++ b/src/main/FarmShell.java @@ -32,8 +32,7 @@ public class FarmShell { private Analyzor getAnalyzor() throws SQLException { if (cached_analyzor == null) { - Connection dbCon = dbConnectionBuilder.create(); - cached_analyzor = new Analyzor(dbCon); + cached_analyzor = new Analyzor(dbConnectionBuilder); } return cached_analyzor; } -- cgit v1.2.1