package database; import java.sql.Connection; import java.sql.SQLException; import java.util.concurrent.Callable; import main.Watcher; /** * Inserts queued brand analyses into the database. * * @author Peter Wu */ public class BrandAnalyzerInserter implements Callable, Watcher.Progressable { private final Connection connection; private final BrandAnalyzerQueue analyzer; private final NamedPreparedStatement insertBrand; private volatile int count; public BrandAnalyzerInserter(Connection connection, BrandAnalyzerQueue analyzer) throws SQLException { this.connection = connection; this.analyzer = analyzer; this.insertBrand = new NamedPreparedStatement(connection, "INSERT INTO mentionsbrand" + " SELECT :tweetid AS tweetid, :brand AS brand" + " WHERE NOT EXISTS (SELECT 1 FROM mentionsbrand WHERE" + " tweetid=:tweetid AND brand=:brand)"); connection.setAutoCommit(false); } @Override public Integer call() throws SQLException { try { runInserter(); } finally { connection.commit(); connection.close(); } return count; } private void runInserter() throws SQLException { BrandAnalyzerQueue.Result result; while ((result = analyzer.next()) != null) { for (String brand : result.brands) { QueryUtils.setInsertBrandParams(insertBrand, result.tweetid, brand); insertBrand.executeUpdate(); } count++; // save results every 1000 analyses if (count % 1000 == 0) { connection.commit(); } } } @Override public int getCount() { return count; } }