diff options
Diffstat (limited to 'src/main/Analyzor.java')
-rw-r--r-- | src/main/Analyzor.java | 81 |
1 files changed, 50 insertions, 31 deletions
diff --git a/src/main/Analyzor.java b/src/main/Analyzor.java index 2f6e4b5..9e26c97 100644 --- a/src/main/Analyzor.java +++ b/src/main/Analyzor.java @@ -1,6 +1,6 @@ package main; -import analysis.BrandChecker; +import database.BrandAnalyzerInserter; import database.BrandAnalyzerQueue; import database.NamedPreparedStatement; import database.QueryUtils; @@ -16,11 +16,17 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; -import java.util.List; import java.util.HashMap; import java.util.HashSet; import java.util.Map.Entry; import java.util.Scanner; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; /** * The sentiment analysis class that rates tweets based on a unigram and bigram @@ -257,6 +263,7 @@ public class Analyzor { */ public void getBrands(String queryText, boolean reset) throws SQLException { PreparedStatement statement; + ResultSet tweetResults; // make a connection to the database and execute the query if (reset) { System.out.println("Cleaning old entries of mentionsbrand."); @@ -266,38 +273,50 @@ public class Analyzor { System.out.println("Obtaining all selected entries in tweet."); if (queryText.isEmpty()) { - query("SELECT tweetid, text FROM tweet"); - } else { - query(queryText); + queryText = "SELECT tweetid, text FROM tweet"; } - System.out.println("Query finished."); - - NamedPreparedStatement insertBrand = new NamedPreparedStatement(connection, QueryUtils.insertBrand); - - int brandCount = 0; - int count = 0; - long timestamp = System.currentTimeMillis(); - BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(data); - BrandAnalyzerQueue.Result result; - new Thread(analyzer).start(); - while ((result = analyzer.next()) != null) { - for (String brand : result.brands) { - QueryUtils.setInsertBrandParams(insertBrand, result.tweetid, brand); - insertBrand.executeUpdate(); - } - - brandCount += result.brands.size(); - count++; - if (count % 10000 == 0) { - System.err.println("Processed " + count + " tweets, inserted " - + brandCount + " in " + ((System.currentTimeMillis() - timestamp) / 1000) + " sec"); + // allow the use of cursors for less memory usage + connection.setAutoCommit(false); + statement = connection.prepareStatement(queryText); + statement.setFetchSize(1000); // fetch groups of rows to save memory + tweetResults = statement.executeQuery(); + + BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(tweetResults); + BrandAnalyzerInserter inserter = new BrandAnalyzerInserter(connection, analyzer); + + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3); + // producer + scheduler.submit(analyzer); + // consumer which writes to the database + Future<Integer> consumerTask = scheduler.submit(inserter); + // status reporter for consumer + int statusInterval = 5; // TODO: do not hardcode + scheduler.scheduleAtFixedRate(new Watcher(inserter, statusInterval), + statusInterval, statusInterval, TimeUnit.SECONDS); + // now wait for completion of the main thread + long startTime = System.currentTimeMillis(); + try { + while (true) { + try { + consumerTask.get(); + break; + } catch (InterruptedException ex) { + Logger.getLogger(Analyzor.class.getName()) + .info("Interrupted while waiting for consumer"); + } } + } catch (ExecutionException ex) { + Logger.getLogger(Analyzor.class.getName()) + .log(Level.SEVERE, "Consumer failed", ex); } - - System.err.println("Processed " + count + " tweets in " - + ((System.currentTimeMillis() - timestamp) / 1000) + " sec"); - System.err.println("Finished getBrands, processed " + count - + " number of tweets, added " + brandCount + " brands or no."); + // cancel status thread + scheduler.shutdown(); + // end, cleanup + connection.setAutoCommit(true); + tweetResults.close(); + + System.err.println("Processed " + inserter.getCount() + " tweets in " + + ((System.currentTimeMillis() - startTime) / 1000) + " sec"); } //gets the amount of users that tweet about a brand in a timezone |