package main; import database.BrandAnalyzerInserter; import database.BrandAnalyzerQueue; import database.ConnectionBuilder; import database.NamedPreparedStatement; import database.QueryUtils; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map.Entry; import java.util.Scanner; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; /** * The sentiment analysis class that rates tweets based on a unigram and bigram * set of weights. */ public class Analyzor { /** * The map that matches single words to their weights. */ private final HashMap unimap = new HashMap(); /** * The map that matches word pairs to their weights. */ private final HashMap bimap = new HashMap(); /** * The results of a query, maybe return from query(). */ private ResultSet data; /** * The persistent connection to the database. */ private final Connection connection; private final ConnectionBuilder connBuilder; /** * @param connBuilder A builder for new database connections. * @throws java.sql.SQLException On failure to set up a new connection. */ public Analyzor(ConnectionBuilder connBuilder) throws SQLException { // instantiate a default connection this.connection = connBuilder.create(); this.connBuilder = connBuilder; } /** * Read the unigram and bigram lexica. * * @throws FileNotFoundException */ public void readLexicon() throws FileNotFoundException { if (!unimap.isEmpty()) { // data is already read. return; } System.err.println("Trying to read lexicons..."); // A unigram is in the format (WS = whitespace): // word rating ??? ?? // A bigram has an two WS-separated words instead of one. try (Scanner uniScanner = new Scanner(new File("unigrams-pmilexicon.txt")); Scanner biScanner = new Scanner(new File("bigrams-pmilexicon.txt"));) { //Fill the map of unigrams int lineno = 1; while (uniScanner.hasNext()) { String words = uniScanner.next(); Double d = Double.valueOf(uniScanner.next()); unimap.put(words.toLowerCase(), d); if (uniScanner.hasNextLine()) { uniScanner.nextLine(); } lineno++; } //fill the map of bigrams while (biScanner.hasNext()) { String words = biScanner.next() + " " + biScanner.next(); bimap.put(words.toLowerCase(), Double.valueOf(biScanner.next())); if (biScanner.hasNextLine()) { biScanner.nextLine(); } } } System.err.println("Lexicons are read."); } /** * Executes a query that the analyzer can analyze. * * @param query The query string to execute. * @throws SQLException When database connection isn't available. */ public void query(String query) throws SQLException { PreparedStatement statement; //make a connection to the database and execute the query statement = connection.prepareStatement(query); data = statement.executeQuery(); } /** * Run a sentiment analysis and fill the database with the output. * * @param query The sql text for the query. * @throws SQLException * @throws IOException */ public void sentimentAnalysis(String query) throws SQLException, IOException { query(query); //read the lexicons readLexicon(); //go to the start of te dataset if (data == null) { System.err.println("data is empty, try querying first"); return; } Double value; String text; //for all tuples while (data.next()) { //get the text text = data.getString("text"); text = splitPunctToWords(text); // test is the tweet text you are going to analyze String[] words = text.split("\\s+"); // text splitted into separate words double positiverate = 0; // positive rating // Rate the text with unigrams for (String word : words) { value = unimap.get(word); if (value != null) { positiverate += unimap.get(word); } } // Rate the text with bigrams for (int i = 0; i < words.length - 1; i++) { String pair = words[i] + " " + words[i + 1]; value = bimap.get(pair); if (value != null) { positiverate += bimap.get(pair); } } //insert the rating into the database NamedPreparedStatement m_insertRating; m_insertRating = new NamedPreparedStatement(connection, QueryUtils.insertRating); QueryUtils.setInsertParams(m_insertRating, data.getLong("tweetid"), data.getString("brand"), (int) (positiverate * 10)); m_insertRating.executeUpdate(); //don't print the rate //System.out.println(text + ": " + (int) (positiverate * 10)); } } /** * Make a wordcloud of the results of some query. * * @param query The sql text for a query. * @throws SQLException * @throws FileNotFoundException * @throws UnsupportedEncodingException */ public void makeWordCloud(String query) throws SQLException, FileNotFoundException, UnsupportedEncodingException { query(query); //go to the start of the ResultSet data if (data == null) { System.err.println("data is empty, try querying first"); return; } String text; String brand; String[] words; HashMap> wordcloud = new HashMap<>(); while (data.next()) { //get brand brand = data.getString("brand"); //make hashmap for each brand if (!wordcloud.containsKey(brand)) { wordcloud.put(brand, new HashMap()); } //get the text text = data.getString("text"); //remove punctuation, convert to lowercase and split on words text = removePunct(text); text = text.toLowerCase(); words = text.split("\\s+"); //for all words for (String word : words) { //if it is empty, a space or a stripe, skip it if (word.equals("") || word.equals(" ") || word.equals("-")) { continue; } //if the word is already in the map, increment the amount if (wordcloud.get(brand).containsKey(word)) { wordcloud.get(brand).put(word, wordcloud.get(brand).get(word) + 1); } //if the word is not already in the map, make an entry with amount = 1 else { wordcloud.get(brand).put(word, 1); } } } //print the words and their frequency in a csv file ssiMapToCSV(wordcloud, "wordcloud.csv", "brand,word,count"); } //generate csv for disco from the query public void disco(String query) throws SQLException, FileNotFoundException, UnsupportedEncodingException { //do the query query(query); PrintWriter writer = new PrintWriter("output.csv", "UTF-8"); //print the first row for (int i = 1; i < data.getMetaData().getColumnCount(); i++) { writer.print(data.getMetaData().getColumnLabel(i) + ", "); } writer.println(data.getMetaData().getColumnLabel(data.getMetaData().getColumnCount())); //print the values while (data.next()) { for (int i = 1; i < data.getMetaData().getColumnCount(); i++) { if (data.getObject(i) == null) { writer.print(", "); } else { writer.print(data.getObject(i).toString().replaceAll("[,\n]", " ") + ", "); } } if (data.getObject(data.getMetaData().getColumnCount()) == null) { writer.println("0"); } else { writer.println(data.getObject(data.getMetaData().getColumnCount()).toString().replace(",", " ")); } } writer.close(); } /** * @return The number of rows that would be returned by the query. */ private int getQuerySize(String query) throws SQLException { PreparedStatement statement; String countQuery = "WITH q AS (" + query + ") SELECT COUNT(*) FROM q"; statement = connection.prepareStatement(countQuery); ResultSet result = statement.executeQuery(); result.next(); int count = result.getInt("count"); result.close(); return count; } /** * Obtain the brands of select tweet texts. * * @param queryText The rows to select. * @param reset Whether to reset mentionsbrand. * @throws SQLException If the query is unsuccesfull. */ public void getBrands(String queryText, boolean reset) throws SQLException { PreparedStatement statement; ResultSet tweetResults; // make a connection to the database and execute the query if (reset) { System.out.println("Cleaning old entries of mentionsbrand."); statement = connection.prepareStatement("delete from mentionsbrand"); statement.executeUpdate(); } System.out.println("Obtaining all selected entries in tweet."); if (queryText.isEmpty()) { // select all tweets which are not yet classified queryText = "SELECT tweetid, text FROM tweet t WHERE NOT EXISTS" + " (SELECT 1 FROM mentionsbrand b WHERE b.tweetid=t.tweetid)"; } // print expected count, this will increase the initial startup time // (5s for 10M results), but will give an accurate ETA estimator System.out.println("Expected query size: " + getQuerySize(queryText)); // allow the use of cursors for less memory usage connection.setAutoCommit(false); statement = connection.prepareStatement(queryText); statement.setFetchSize(1000); // fetch groups of rows to save memory tweetResults = statement.executeQuery(); BrandAnalyzerQueue analyzer = new BrandAnalyzerQueue(tweetResults); final BrandAnalyzerInserter[] inserters = new BrandAnalyzerInserter[4]; for (int i = 0; i < inserters.length; i++) { // create new connection for each worker inserters[i] = new BrandAnalyzerInserter(connBuilder.create(), analyzer); }; ScheduledExecutorService scheduler; // executor with threads for producer, status and consumers scheduler = Executors.newScheduledThreadPool(2 + inserters.length); // producer scheduler.submit(analyzer); // consumer which writes to the database List> consumerTasks = new ArrayList<>(); for (BrandAnalyzerInserter inserter : inserters) { Future consumerTask = scheduler.submit(inserter); consumerTasks.add(consumerTask); } // status reporter for consumers Watcher.Progressable progress = new Watcher.Progressable() { @Override public int getCount() { int count = 0; for (Watcher.Progressable p : inserters) { count += p.getCount(); } return count; } }; int statusInterval = 5; // TODO: do not hardcode scheduler.scheduleAtFixedRate(new Watcher(progress, statusInterval), statusInterval, statusInterval, TimeUnit.SECONDS); // now wait for completion of the main thread long startTime = System.currentTimeMillis(); try { while (true) { try { for (Future consumerTask : consumerTasks) { consumerTask.get(); } break; } catch (InterruptedException ex) { Logger.getLogger(Analyzor.class.getName()) .info("Interrupted while waiting for consumer"); } } } catch (ExecutionException ex) { Logger.getLogger(Analyzor.class.getName()) .log(Level.SEVERE, "Consumer failed", ex); } // cancel status thread scheduler.shutdown(); // end, cleanup connection.setAutoCommit(true); tweetResults.close(); System.err.println("Processed " + progress.getCount() + " tweets in " + ((System.currentTimeMillis() - startTime) / 1000) + " sec"); } //gets the amount of users that tweet about a brand in a timezone //makes a csv file timezone, brand, amount public void timezone(String query) throws SQLException, FileNotFoundException, UnsupportedEncodingException { query(query); InputStream inFile = new FileInputStream("timezone.txt"); Scanner readFile = new Scanner(inFile); HashMap toTimezone = new HashMap<>(); while (readFile.hasNextLine()) { String line = readFile.nextLine(); if (line.split(",").length > 1) { toTimezone.put(line.split(",")[0], line.split(",")[1]); } } //hashmap timezone, brand, amount HashMap> timeMap = new HashMap<>(); String timezone; String brand; while (data.next()) { timezone = data.getString("timezone"); if (toTimezone.containsKey(timezone)) { timezone = toTimezone.get(timezone); } else { timezone = "other"; } brand = data.getString("brand"); //if the timezone is already in the map if (timeMap.containsKey(timezone)) { //if the brand for that timezone is already in the map if (timeMap.get(timezone).containsKey(brand)) { //increment the amount timeMap.get(timezone).put(brand, timeMap.get(timezone).get(brand) + 1); } //if the brand for that timezone is not yet in the map else { //make a new entry for that brand with amount = 1 timeMap.get(timezone).put(brand, 1); } } //if the timezone is not yet in the map else { //make a new hashmap for this map and fill it with the brand and the amount timeMap.put(timezone, new HashMap()); timeMap.get(timezone).put(brand, 1); } } //make the CSV out of the map ssiMapToCSV(timeMap, "timezone.csv", "timezone,brand,count"); } //gets the positivity of the tweets about a brand //makes a csv file for posnegVisualizer void posNeg(String query) throws SQLException, FileNotFoundException, UnsupportedEncodingException { query(query); String brand; int rating; int ratingInterval; int intervalSize = 10; //brand, ratingInterval, amount HashMap> posnegMap = new HashMap<>(); /* the rating interval is given by an integer, which is the result of the tweets sentiment value divided by interval size rounded down. This puts all data in boxes for the histogram. */ while (data.next()) { brand = data.getString("brand"); rating = data.getInt("rating"); //ratingInterval is an integer divisible by intervalSize //if a rating is between a ratingInterval+-0.5*intervalSize, it belongs in that interval ratingInterval = (rating + (int) (0.5 * intervalSize)) / intervalSize * intervalSize; //if the brand is already in the map if (posnegMap.containsKey(brand)) { //if the brand for that brand is already in the map if (posnegMap.get(brand).containsKey(ratingInterval)) { //increment the amount posnegMap.get(brand).put(ratingInterval, posnegMap.get(brand).get(ratingInterval) + 1); } //if the brand for that brand is not yet in the map else { //make a new entry for that brand with amount = 1 posnegMap.get(brand).put(ratingInterval, 1); } } //if the brand is not yet in the map else { //make a new hashmap for this map and fill it with the brand and the amount posnegMap.put(brand, new HashMap()); posnegMap.get(brand).put(ratingInterval, 1); } } siiMapToCSV(posnegMap, "posneg.csv", "brand,ratingInterval,count"); } /* makes a csv for disco of a process of news spreading the query should be as follows: - it should be a union of the following query twice, once with TYPE = retweet, once with TYPE = reply - pick two tables of tweet (t1 and t2) and one of TYPEof - t1.tweetid = TYPEof.TYPEonid and t2.tweetid = TYPEof.TYPEid - t1.tweetid should be named maintweetid - t2.tweetid should be named TYPEid - t1.timestamp should be names maintime - t2.timestamp should be named othertime - t1.userid should be named mainuserid - t2.userid should be named otheruserid so the resulting tables should be: maintweetid, maintime, mainuserid, replyid, retweetid, othertime, otheruserid note that one of replyid and retweetid has to be null and the other a long for each row how to do this: http://stackoverflow.com/questions/2309943/unioning-two-tables-with-different-number-of-columns the csv will contain: tweetID of the replied/retweeted on, reply/retweet, timestamp, tweetid of the reply/retweet, userid which corresponds to: caseID , activity , timestamp, resource , rescource */ void newsSpread(String query) throws SQLException, FileNotFoundException, UnsupportedEncodingException { query(query); long maintweetID; long replyID; long retweetID; //tweetID, set of replyID's HashMap> hasReplies = new HashMap<>(); //tweetID, set of retweetID's HashMap> hasRetweets = new HashMap<>(); //tweetID, its timestamp HashMap timestamp = new HashMap<>(); //tweetID, its userID HashMap user = new HashMap<>(); while (data.next()) { maintweetID = data.getLong("thetweetid"); replyID = data.getLong("replyid"); retweetID = data.getLong("retweetid"); //put these in the corresponding maps //note that exact one of the two if statements below will hold //if the replyID is not null if (replyID != 0) { //if this tweetID has no set yet, make one if (hasReplies.get(maintweetID) == null) { hasReplies.put(maintweetID, new HashSet()); } //add the replyID to the tweetID hasReplies.get(maintweetID).add(replyID); //store the time of the tweet timestamp.put(replyID, data.getTimestamp("othertime")); //store teh user of the tweet user.put(replyID, data.getLong("otheruser")); } //if the retweetID is not null if (retweetID != 0) { //if this tweetID has no set yet, make one if (hasRetweets.get(maintweetID) == null) { hasRetweets.put(maintweetID, new HashSet()); } //add the retweetID to the tweetID hasRetweets.get(maintweetID).add(retweetID); //store the time of the tweet timestamp.put(retweetID, data.getTimestamp("othertime")); //store teh user of the tweet user.put(retweetID, data.getLong("otheruser")); } } //now use this data to make a csv for disco PrintWriter writer = new PrintWriter("newsSpread.csv", "UTF-8"); //print the first line writer.println("caseID,activity,timestamp,tweet,user"); //print all replies for (Long tweetid : hasReplies.keySet()) { for (Long replyid : hasReplies.get(tweetid)) { writer.println(tweetid + ", reply, " + timestamp.get(replyid) + ", " + replyid + ", " + user.get(replyid)); } } //print all retweets for (Long tweetid : hasRetweets.keySet()) { for (Long retweetid : hasRetweets.get(tweetid)) { writer.println(tweetid + ", retweet, " + timestamp.get(retweetid) + ", " + retweetid + ", " + user.get(retweetid)); } } writer.close(); } void categorize(String file) throws FileNotFoundException, UnsupportedEncodingException{ //get the division in categories InputStream inFile = new FileInputStream("categories.txt"); Scanner readFile = new Scanner(inFile); HashMap toCategory = new HashMap<>(); while (readFile.hasNextLine()) { String line = readFile.nextLine(); if(line.split(",").length>1){ for(String element:line.split(",")[1].split(" ")){ toCategory.put(element, line.split(",")[0]); } } } //read the csv Scanner sc = new Scanner(new File(file)); PrintWriter writer = new PrintWriter("categorised.csv", "UTF-8"); //copy the first line writer.println(sc.nextLine()); String line; String[] values; Boolean printed; HashSet used; //for every line while(sc.hasNextLine()){ //get the values (and so the word) line = sc.nextLine(); values = line.split(","); printed = false; //divide into categories //substring used = new HashSet<>(); for(String key : toCategory.keySet()){ if(values[0].contains(key) && !used.contains(toCategory.get(key))){ used.add(toCategory.get(key)); String[] newValues = values; newValues[0] = toCategory.get(key); //print it writer.println(csvLine(newValues)); printed = true; } } //exact word if(toCategory.containsKey("#" + values[0])){ values[0] = toCategory.get("#" + values[0]); } //print it if(!printed){ writer.println(csvLine(values)); } } writer.close(); } String csvLine(String[] values){ int length = values.length; int index = 0; String result = ""; for(String s : values){ result += s; if(!(index == length - 1)){ result += ","; } index++; } return result; } //replaces punctuation so it will be splitted //also removes urls private String splitPunctToWords(String text) { text = text.replaceAll("https?://\\S*", ""); text = text.replaceAll("[!?):;\"']", " $0"); text = text.replaceAll("[.,-](\\s|$)", " $0"); text = text.replaceAll("\\s[(\"']", "$0 "); return text; } //removes punctuation //also removes urls private String removePunct(String text) { text = text.replaceAll("https?://\\S*", " "); text = text.replaceAll("@\\S*", " "); text = text.replaceAll("[^a-zA-Z0-9#_-]", " "); return text; } //prints a hashmap into a csv for a html application //Hashmap> becomes key1, key2, value //only for String, String, Integer void ssiMapToCSV(HashMap> map, String fileName, String firstLine) throws FileNotFoundException, UnsupportedEncodingException { PrintWriter writer = new PrintWriter(fileName, "UTF-8"); writer.println(firstLine); //loop over brands for (Entry en : map.entrySet()) { //loop over words for (Entry e : map.get(en.getKey()).entrySet()) { writer.println(en.getKey() + "," + e.getKey() + "," + e.getValue()); } } writer.close(); System.out.println("csv file made, please put it next to html file and run this"); } void siiMapToCSV(HashMap> map, String fileName, String firstLine) throws FileNotFoundException, UnsupportedEncodingException { PrintWriter writer = new PrintWriter(fileName, "UTF-8"); writer.println(firstLine); //loop over brands for (Entry en : map.entrySet()) { //loop over words for (Entry e : map.get(en.getKey()).entrySet()) { writer.println(en.getKey() + "," + e.getKey() + "," + e.getValue()); } } writer.close(); System.out.println("csv file made, please put it next to html file and run this"); } }