summaryrefslogtreecommitdiff
path: root/src/main/Analyzor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/Analyzor.java')
-rw-r--r--src/main/Analyzor.java81
1 files changed, 50 insertions, 31 deletions
diff --git a/src/main/Analyzor.java b/src/main/Analyzor.java
index 2f6e4b5..9e26c97 100644
--- a/src/main/Analyzor.java
+++ b/src/main/Analyzor.java
@@ -1,6 +1,6 @@
package main;
-import analysis.BrandChecker;
+import database.BrandAnalyzerInserter;
import database.BrandAnalyzerQueue;
import database.NamedPreparedStatement;
import database.QueryUtils;
@@ -16,11 +16,17 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
-import java.util.List;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Scanner;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* The sentiment analysis class that rates tweets based on a unigram and bigram
@@ -257,6 +263,7 @@ public class Analyzor {
*/
public void getBrands(String queryText, boolean reset) throws SQLException {
PreparedStatement statement;
+ ResultSet tweetResults;
// make a connection to the database and execute the query
if (reset) {
System.out.println("Cleaning old entries of mentionsbrand.");
@@ -266,38 +273,50 @@ public class Analyzor {
System.out.println("Obtaining all selected entries in tweet.");
if (queryText.isEmpty()) {
- query("SELECT tweetid, text FROM tweet");
- } else {
- query(queryText);
+ queryText = "SELECT tweetid, text FROM tweet";
}
- System.out.println("Query finished.");
-
- NamedPreparedStatement insertBrand = new NamedPreparedStatement(connection, QueryUtils.insertBrand);
-
- int brandCount = 0;
- int count = 0;
- long timestamp = System.currentTimeMillis();
- BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(data);
- BrandAnalyzerQueue.Result result;
- new Thread(analyzer).start();
- while ((result = analyzer.next()) != null) {
- for (String brand : result.brands) {
- QueryUtils.setInsertBrandParams(insertBrand, result.tweetid, brand);
- insertBrand.executeUpdate();
- }
-
- brandCount += result.brands.size();
- count++;
- if (count % 10000 == 0) {
- System.err.println("Processed " + count + " tweets, inserted "
- + brandCount + " in " + ((System.currentTimeMillis() - timestamp) / 1000) + " sec");
+ // allow the use of cursors for less memory usage
+ connection.setAutoCommit(false);
+ statement = connection.prepareStatement(queryText);
+ statement.setFetchSize(1000); // fetch groups of rows to save memory
+ tweetResults = statement.executeQuery();
+
+ BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(tweetResults);
+ BrandAnalyzerInserter inserter = new BrandAnalyzerInserter(connection, analyzer);
+
+ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
+ // producer
+ scheduler.submit(analyzer);
+ // consumer which writes to the database
+ Future<Integer> consumerTask = scheduler.submit(inserter);
+ // status reporter for consumer
+ int statusInterval = 5; // TODO: do not hardcode
+ scheduler.scheduleAtFixedRate(new Watcher(inserter, statusInterval),
+ statusInterval, statusInterval, TimeUnit.SECONDS);
+ // now wait for completion of the main thread
+ long startTime = System.currentTimeMillis();
+ try {
+ while (true) {
+ try {
+ consumerTask.get();
+ break;
+ } catch (InterruptedException ex) {
+ Logger.getLogger(Analyzor.class.getName())
+ .info("Interrupted while waiting for consumer");
+ }
}
+ } catch (ExecutionException ex) {
+ Logger.getLogger(Analyzor.class.getName())
+ .log(Level.SEVERE, "Consumer failed", ex);
}
-
- System.err.println("Processed " + count + " tweets in "
- + ((System.currentTimeMillis() - timestamp) / 1000) + " sec");
- System.err.println("Finished getBrands, processed " + count
- + " number of tweets, added " + brandCount + " brands or no.");
+ // cancel status thread
+ scheduler.shutdown();
+ // end, cleanup
+ connection.setAutoCommit(true);
+ tweetResults.close();
+
+ System.err.println("Processed " + inserter.getCount() + " tweets in "
+ + ((System.currentTimeMillis() - startTime) / 1000) + " sec");
}
//gets the amount of users that tweet about a brand in a timezone