summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2014-05-22 17:36:36 +0200
committerPeter Wu <peter@lekensteyn.nl>2014-05-22 17:36:36 +0200
commitbfbfac24b457407278f37ec9b5a0d842bb801fd8 (patch)
tree0838cdba5a1253897392eca9eb62da6ad4c557c7
parent88996448351b561c772b9be296eb0db4ea01e840 (diff)
downloadTwitterDataAnalytics-bfbfac24b457407278f37ec9b5a0d842bb801fd8.tar.gz
Correctness fixes
* Throw interrupted exception when interrupted instead of swallowing it, otherwise shutdown will take a longer time. * Decrease number of tweets that must be requested after confirming that the request was successful. * Style: convert to foreach; white-space. * Properly terminate if stop is requested. * Make executeRequest take care of re-queing requests if needed.
-rw-r--r--src/io/RateLimitException.java15
-rw-r--r--src/io/Response.java8
-rw-r--r--src/io/SearchImpl.java64
3 files changed, 46 insertions, 41 deletions
diff --git a/src/io/RateLimitException.java b/src/io/RateLimitException.java
index 2bb2624..e196702 100644
--- a/src/io/RateLimitException.java
+++ b/src/io/RateLimitException.java
@@ -2,17 +2,26 @@ package io;
/**
* The exception that will be thrown when a ratelimit has been hit.
- *
+ *
* @author Maurice Laveaux
*/
public class RateLimitException extends Exception {
+
private final long rateLimitReset;
-
+
public RateLimitException(long resetTime) {
rateLimitReset = resetTime;
}
-
+
public long getResetTime() {
return rateLimitReset;
}
+
+ /**
+ * @return Time in milliseconds that should be waited for before a new
+ * similar request can be executed.
+ */
+ public long getRateLimitRemainingTime() {
+ return rateLimitReset * 1000 - System.currentTimeMillis();
+ }
}
diff --git a/src/io/Response.java b/src/io/Response.java
index 92b0f0c..e5ede8a 100644
--- a/src/io/Response.java
+++ b/src/io/Response.java
@@ -39,4 +39,12 @@ public class Response {
public int getRateLimitReset() {
return this.rateLimitReset;
}
+
+ /**
+ * @return Time in milliseconds that should be waited for before a new
+ * similar request can be executed.
+ */
+ public long getRateLimitRemainingTime() {
+ return rateLimitReset * 1000 - System.currentTimeMillis();
+ }
}
diff --git a/src/io/SearchImpl.java b/src/io/SearchImpl.java
index 5bceadc..520a37c 100644
--- a/src/io/SearchImpl.java
+++ b/src/io/SearchImpl.java
@@ -119,7 +119,7 @@ public class SearchImpl implements Search {
@Override
public void close() {
if (thread_cached != null) {
- thread_cached.stop();
+ thread_cached.stop();
save("search_cache.txt");
}
}
@@ -133,16 +133,16 @@ public class SearchImpl implements Search {
sinceId = 0;
maxId = Long.MAX_VALUE;
}
-
+
public Request(String parse) {
String[] args = parse.split(":", 4);
-
+
this.screenName = args[0];
this.sinceId = Integer.parseInt(args[1]);
this.maxId = Integer.parseInt(args[2]);
this.numberOfTweets = Integer.parseInt(args[3]);
}
-
+
@Override
public String toString() {
return screenName + ":" + Long.toString(sinceId) + ":" + Long.toString(maxId) + ":" + Integer.toString(numberOfTweets);
@@ -187,7 +187,6 @@ public class SearchImpl implements Search {
try {
workerThread.interrupt();
workerThread.join();
- ioThread.interrupt();
ioThread.join();
} catch (InterruptedException ex) {
Logger.getLogger(getClass().getName())
@@ -207,7 +206,7 @@ public class SearchImpl implements Search {
*/
private final BlockingQueue<JsonObject> receivedObjects;
- private boolean running = true;
+ private volatile boolean running = true;
public Worker() {
receivedObjects = new LinkedBlockingQueue();
@@ -234,23 +233,20 @@ public class SearchImpl implements Search {
@Override
public void run() {
- while (true) {
- if (!requestQueue.isEmpty()) {
- Request request = null;
- try {
- request = requestQueue.take();
- executeRequest(request);
- } catch (InterruptedException ex) {
- if (request != null) {
- requestQueue.offer(request);
- }
- getLogger().log(Level.SEVERE, null, ex);
+ while (running) {
+ try {
+ Request request = requestQueue.take();
+ executeRequest(request);
+ } catch (InterruptedException ex) {
+ if (running) {
+ getLogger().log(Level.SEVERE,
+ "Interrupted while active", ex);
}
}
}
}
- private void executeRequest(Request request) {
+ private void executeRequest(Request request) throws InterruptedException {
try {
Builder url = oauth.build(USER_TIMELINE_RESOURCE)
.param("screen_name", request.screenName);
@@ -259,23 +255,22 @@ public class SearchImpl implements Search {
if (count >= request.numberOfTweets) {
count = request.numberOfTweets;
}
- request.numberOfTweets -= count;
- url = url.param("count", Integer.toString(count));
+ url.param("count", Integer.toString(count));
if (request.maxId > 0) {
- url = url.param("max_id", Long.toString(request.maxId - 1));
+ url.param("max_id", Long.toString(request.maxId - 1));
}
try {
Response result = url.request();
// Obtain the lowest id.
- JsonArray array = result.getResp().getAsJsonArray();
- for (int i = 0; i < array.size(); i++) {
- JsonObject object = array.get(i).getAsJsonObject();
+ JsonArray array = result.getJson().getAsJsonArray();
+ for (JsonElement element : array) {
+ JsonObject object = element.getAsJsonObject();
JsonElement id = object.get("id");
- if (id == null) {
+ if (id == null || !id.isJsonPrimitive()) {
getLogger().log(Level.WARNING, "Search {0}, id was not valid.", object);
continue;
}
@@ -286,29 +281,22 @@ public class SearchImpl implements Search {
receivedObjects.offer(object);
}
+ request.numberOfTweets -= count;
+ // request completed, do we need more requests?
if (request.numberOfTweets > 0) {
requestQueue.offer(request);
}
-
+
if (result.getRateLimitRemaining() == 0) {
- waitTil(result.getRateLimitReset());
+ sleep(result.getRateLimitRemainingTime());
}
} catch (RateLimitException ex) {
requestQueue.offer(request);
- waitTil(ex.getResetTime());
+ sleep(ex.getRateLimitRemainingTime());
}
} catch (IOException ex) {
- getLogger().log(Level.SEVERE, null, ex);
- }
- }
-
- private void waitTil(long seconds) {
- try {
- long interval = seconds * 1000 - System.currentTimeMillis() + 2;
- System.out.println("Rate limited, waiting " + interval + "seconds");
- sleep(interval);
- } catch (InterruptedException ex) {
+ requestQueue.offer(request);
getLogger().log(Level.SEVERE, null, ex);
}
}