summaryrefslogtreecommitdiff
path: root/src/main/Main.java
blob: e076d32af2d13e4deb8b226187a445d8f260e9fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package main;

import com.google.gson.JsonParseException;
import com.google.gson.JsonSyntaxException;
import data.Tweet;
import database.ConnectionBuilder;
import io.FileTweetReader;
import io.ITweetReader;
import io.TweetReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * The main class.
 */
public class Main implements Callable<Boolean> {

    /**
     * The main method of the application.
     *
     * @param args the global arguments to pass to the program.
     */
    public static void main(String[] args) {
        Main main;
        try {
            main = new Main(args);
        } catch (IllegalArgumentException ex) {
            System.err.println(ex.getMessage());
            System.exit(1);
            return;
        }

        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
        // the main IO thread
        Future<Boolean> mainTask = scheduler.submit(main);

        // the status thread
        if (main.statusInterval > 0) {
            scheduler.scheduleAtFixedRate(new Watcher(main, main.statusInterval),
                    main.statusInterval, main.statusInterval, TimeUnit.SECONDS);
        }
        // now wait for completion of the main thread
        try {
            while (true) {
                try {
                    if (mainTask.get()) {
                        System.out.println("Import successful.");
                    } else {
                        System.out.println("Import (partially) failed.");
                    }
                    break;
                } catch (InterruptedException ex) {
                    Logger.getLogger(Main.class.getName())
                            .info("Interrupted while waiting for main");
                }
            }
        } catch (ExecutionException ex) {
            Logger.getLogger(Main.class.getName())
                    .log(Level.SEVERE, "Main failed", ex);
        }
        // cancel status thread
        scheduler.shutdown();
    }

    private String m_filename;
    private final ConnectionBuilder cb;
    /**
     * Whether the database should be contacted or not.
     */
    private boolean skipDb;
    private Integer statusInterval;

    public Main(String[] args) {
        // default connection properties
        cb = new ConnectionBuilder()
                .setServerName("localhost")
                .setUsername("twitter")
                .setPassword("2IOC02")
                .setDbName("twitter");
        skipDb = false;
        statusInterval = 2;

        /* parse the global options. */
        parseGlobalOptions(args);
    }

    /**
     * The current tweet number that is being processed.
     */
    private volatile int tweetNo;

    private boolean printTweets(ITweetReader reader) throws IOException {
        Tweet tweet;
        tweetNo = 1;
        while ((tweet = reader.getTweet()) != null) {
            System.out.println("/*" + tweetNo++ + "*/ " + tweet);
        }
        return true;
    }

    private boolean tweetsToDb(ITweetReader reader) throws IOException {
        Tweet tweet = null;
        tweetNo = 1;
        try (Connection connection = cb.create()) {
            /* create the object that fills the database */
            DataFiller filler = new DataFiller(connection);
            while ((tweet = reader.getTweet()) != null) {
                filler.processTweet(tweet);
                ++tweetNo;
            }
            return true;
        } catch (JsonParseException ex) {
            if (tweet != null) {
                System.err.println("Faulty tweet " + tweetNo + ": " + tweet);
            }
            Logger.getLogger(Main.class.getName()).log(Level.SEVERE,
                    "Tweet read error", ex);
        } catch (SQLException ex) {
            if (tweet != null) {
                System.err.println("Faulty tweet " + tweetNo + ": " + tweet);
            }
            Logger.getLogger(Main.class.getName()).log(Level.SEVERE,
                    "DB error", ex);
        }
        return false;
    }

    public int getProcessedTweets() {
        return tweetNo;
    }

    @Override
    public Boolean call() {
        ITweetReader reader = null;
        boolean success = false;
        try {
            if (m_filename == null) {
                reader = new TweetReader(System.in);
            } else {
                reader = new FileTweetReader(m_filename);
            }
            if (skipDb) {
                success = printTweets(reader);
            } else {
                success = tweetsToDb(reader);
            }
        } catch (JsonSyntaxException ex) {
            System.err.println("Got an invalid tweet: " + ex);
        } catch (IOException ex) {
            System.err.println("Cannot open tweets: " + ex);
        } finally {
            if (reader != null) {
                reader.close();
            }
        }

        return success;
    }

    private void parseGlobalOptions(String[] args)
            throws IllegalArgumentException {
        /* parse global options */
        for (int i = 0; i < args.length; i++) {
            if ("--help".equals(args[i])) {
                printHelp();
                System.exit(0);
            } else if ("--dbhost".equals(args[i])) {
                cb.setServerName(getArg(args, ++i, "--dbhost"));
            } else if ("--dbuser".equals(args[i])) {
                cb.setUsername(getArg(args, ++i, "--dbuser"));
            } else if ("--dbpass".equals(args[i])) {
                cb.setPassword(getArg(args, ++i, "--dbpass"));
            } else if ("--dbport".equals(args[i])) {
                cb.setPort(Integer.valueOf(getArg(args, ++i, "--dbport")));
            } else if ("--dbname".equals(args[i])) {
                cb.setDbName(getArg(args, ++i, "--dbname"));
            } else if ("--skipdb".equals(args[i])) {
                skipDb = true;
            } else if ("--status".equals(args[i])) {
                statusInterval = Integer.valueOf(getArg(args, ++i, "--status"));
            } else if (args[i].startsWith("-")) {
                throw new IllegalArgumentException("Invalid option: " + args[i]);
            } else {
                /* This should be the filename */
                m_filename = args[i];
            }
        }
    }

    /**
     * Read an extra option for a command.
     */
    private String getArg(String[] params, int index, String name) {
        if (index >= params.length) {
            System.err.println("Missing argument for parameter " + name);
            System.exit(1);
        }
        return params[index];
    }

    /**
     * Print some useful help messages.
     */
    private void printHelp() {
        for (String line : HELP) {
            System.out.println(line);
        }
    }

    private final static String[] HELP = {
        "Usage: java -jar DataFiller.jar [options] [tweets-file]",
        "",
        "Global options:",
        "   --help        Print this help text.",
        "   --dbhost HOST Database host (defaults to 'localhost')",
        "   --dbuser USER Database username (defaults to 'postgres')",
        "   --dbpass PASS Database password (defaults to '2IOC02')",
        "   --dbport PORT Database port (defaults to 5432)",
        "   --dbname NAME Database name (defaults to 'Twitter')",
        "   --skipdb      Do not contact the database at all, just print data.",
        "   --status SECS The interval in which import status information",
        "                 should be printed, zero disables it (defaults to 2)",
        "",
        "If no tweets file is given, data will be read from standard input."
    };
}