package io; import java.io.BufferedReader; import java.io.Closeable; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.Flushable; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.util.HashSet; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.json.JSONException; import org.json.JSONObject; import provider.ResultListener; /** * This class writes the JSON objects for tweets to files. * * @author Maurice Laveaux */ public class DataWriter implements ResultListener, Closeable, Flushable { /** * The writer for the tweet stream. */ private final Store m_tweet; /** * the buffer of tweet ids that already exist. */ private final Set m_tweetIdSet; public final static String CFG_TWEETS_FILENAME = "tweets-filename"; /** * Creates an instance, specifying the file to store tweets. * * @param tweetsName The file to write the tweets to. */ public DataWriter(final String tweetsName) { m_tweet = getStore(tweetsName); m_tweetIdSet = new HashSet<>(); } /** * Opens a stream to every single file that data will be streamed to. * * @throws java.io.IOException if the files cannot be read or written. */ public void open() throws IOException { try { readIds(m_tweetIdSet, m_tweet); // open after reading input to prevent simultaneous r/w access m_tweet.open(); } catch (IOException ex) { m_tweet.close(); throw ex; } } /** * For a given filename, get an instance that provides I/O. * * @param filename File to read/write. * @return A Store instance providing read and write access. */ protected Store getStore(String filename) { return new SimpleFileStore(filename); } @Override public void close() { m_tweet.close(); } @Override public void tweetGenerated(JSONObject obj) { try { // ensure that the file is open m_tweet.open(); writeObject(obj, m_tweet.getOutputStream(), m_tweetIdSet); } catch (IOException ex) { getLogger().log(Level.SEVERE, "Cannot save tweet", ex); } } /** * Read tweet IDs from file to avoid storing duplicate tweets later. * * @param is An input stream that provides JSON objects. * @return The set of ids, may be empty if the fill does not exist. */ private void readIds(Set idSet, Store store) throws IOException { InputStream is = null; String line = null; long lineno = 1; try { is = store.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(is)); // parse each line into a JSONObject, read the id and add it to // the set of ids. while ((line = reader.readLine()) != null) { JSONObject obj = new JSONObject(line); long id = obj.getLong("id"); idSet.add(id); lineno++; } } catch (FileNotFoundException ex) { // ignore, file will be created if necessary. } catch (JSONException | IOException ex) { if (line != null) { getLogger().log(Level.INFO, "Last line: " + line); } getLogger().log(Level.WARNING, store.getFileName() + ": error occurred in file at line " + lineno, ex); throw new IOException(ex); } finally { IOUtils.closeQuietly(is); } } /** * Writes the JSONObject to a writer and update the idSet. * * @param obj The object to write. * @param output The stream to write objects to. * @param idSet The id set to add the obj id to. */ private void writeObject(JSONObject obj, OutputStream output, Set idSet) { try { long id = obj.getLong("id"); if (!idSet.contains(id)) { try { output.write((obj.toString() + "\n").getBytes(Charsets.UTF_8)); idSet.add(id); } catch (IOException ex) { getLogger().log(Level.WARNING, "Cannot write to file", ex); } } } catch (JSONException ex) { getLogger().log(Level.WARNING, "ID not found?!", ex); } } private Logger getLogger() { return Logger.getLogger(getClass().getName()); } @Override public void flush() throws IOException { m_tweet.flush(); } abstract class Store { private final String filename; protected OutputStream os; Store(String filename) { this.filename = filename; } /** * @return The file name associated with this store. */ public String getFileName() { return filename; } /** * Opens the writable stream (if not open yet), must be called before * writing a stream. * * @throws IOException */ abstract public void open() throws IOException; /** * Properly closes a file, flushing any buffers. */ public void close() { try { if (os != null) { os.close(); os = null; } } catch (IOException ex) { Logger.getLogger(getClass().getName()).log(Level.WARNING, "Could not close " + getFileName(), ex); } } /** * Returns a previously opened writable file stream. * * @return A stream to which JSON objects can be written (one per line). * If no writable stream is open, {@code null} is returned. */ public OutputStream getOutputStream() { return os; } /** * Opens a new input stream for the file. * * @return A stream from which JSON objects can be read (one per line). * @throws IOException if no input file can be retrieved. */ abstract public InputStream getInputStream() throws IOException; private void flush() throws IOException { if (os != null) { os.flush(); } } } class SimpleFileStore extends Store { SimpleFileStore(String filename) { super(filename); } @Override public void open() throws IOException { if (os == null) { os = new FileOutputStream(getFileName(), true); } } @Override public InputStream getInputStream() throws FileNotFoundException { return new FileInputStream(getFileName()); } } }