package io; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.HashSet; import java.util.Scanner; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.commons.io.Charsets; import org.json.JSONException; import org.json.JSONObject; import provider.ResultListener; /** * This class writes the JSON objects for tweets and users to (separate) files. * * @author Maurice Laveaux */ public class DataWriter implements ResultListener { /** * The writer for the tweet stream. */ private final OutputStream m_tweetWriter; /** * the writer for the profile stream. */ private final OutputStream m_profileWriter; /** * the buffer of tweet ids that already exist. */ private final Set m_tweetIdSet; /** * the buffer of profile ids that already exist. */ private final Set m_profileIdSet; public final static String CFG_PROFILE_FILENAME = "profiles-filename"; public final static String CFG_TWEETS_FILENAME = "tweets-filename"; /** * Opens a stream to every single file that data will be streamed to. * * @param profilesName The file to write the profiles to. * @param tweetsName The file to write the tweets to. * @throws java.io.IOException if the files cannot be read or written. */ public DataWriter(final String profilesName, final String tweetsName) throws IOException { m_profileIdSet = readIds(profilesName); m_profileWriter = getFileWriter(profilesName); m_tweetIdSet = readIds(tweetsName); m_tweetWriter = getFileWriter(tweetsName); } /** * Given a filename, return a suitable input stream. * * @param tweetsName File name. * @return A stream from which JSON objects can be read (one per line). * @throws IOException */ protected InputStream getFileReader(String tweetsName) throws IOException { return new FileInputStream(tweetsName); } /** * Given a filename, return a suitable output stream. * * @param tweetsName File name. * @return A stream to which JSON objects can be written (one per line). * @throws IOException */ protected OutputStream getFileWriter(String tweetsName) throws IOException { return new FileOutputStream(tweetsName, true); } public void close() { try { m_tweetWriter.close(); m_profileWriter.close(); } catch (IOException ex) { getLogger().log(Level.SEVERE, null, ex); } } @Override public void profileGenerated(JSONObject obj) { writeObject(obj, m_profileWriter, m_profileIdSet); } @Override public void tweetGenerated(JSONObject obj) { writeObject(obj, m_tweetWriter, m_tweetIdSet); } /** * Read the current existing tweetName and profileName filenames and fill * the existing id set. * * @param filename The file to parse. * @return The set of ids, may be empty if the fill does not exist. */ private Set readIds(String filename) throws IOException { Set idSet = new HashSet<>(); try { InputStream is = getFileReader(filename); Scanner reader = new Scanner(is); // parse each line into a JSONObject, read the id and add it to // the set of ids. while (reader.hasNext()) { JSONObject obj = new JSONObject(reader.nextLine()); long id = obj.getLong("id"); idSet.add(id); } } catch (FileNotFoundException ex) { // ignore, file will be created if necessary. } catch (JSONException ex) { getLogger().log(Level.WARNING, filename + ": File is only partially processed", ex); } return idSet; } /** * Writes the JSONObject to a writer and update the idSet. * * @param obj The object to write. * @param output The stream to write objects to. * @param idSet The id set to add the obj id to. */ private void writeObject(JSONObject obj, OutputStream output, Set idSet) { try { long id = obj.getLong("id"); if (!idSet.contains(id)) { // Write a single profile into the profile file. try { output.write((obj.toString() + "\n").getBytes(Charsets.UTF_8)); idSet.add(id); } catch (IOException ex) { getLogger().log(Level.WARNING, "Cannot write to file", ex); } } } catch (JSONException ex) { getLogger().log(Level.WARNING, "ID not found?!", ex); } } private Logger getLogger() { return Logger.getLogger(getClass().getName()); } }