From bfbfac24b457407278f37ec9b5a0d842bb801fd8 Mon Sep 17 00:00:00 2001 From: Peter Wu Date: Thu, 22 May 2014 17:36:36 +0200 Subject: Correctness fixes * Throw interrupted exception when interrupted instead of swallowing it, otherwise shutdown will take a longer time. * Decrease number of tweets that must be requested after confirming that the request was successful. * Style: convert to foreach; white-space. * Properly terminate if stop is requested. * Make executeRequest take care of re-queing requests if needed. --- src/io/RateLimitException.java | 15 ++++++++-- src/io/Response.java | 8 ++++++ src/io/SearchImpl.java | 64 +++++++++++++++++------------------------- 3 files changed, 46 insertions(+), 41 deletions(-) diff --git a/src/io/RateLimitException.java b/src/io/RateLimitException.java index 2bb2624..e196702 100644 --- a/src/io/RateLimitException.java +++ b/src/io/RateLimitException.java @@ -2,17 +2,26 @@ package io; /** * The exception that will be thrown when a ratelimit has been hit. - * + * * @author Maurice Laveaux */ public class RateLimitException extends Exception { + private final long rateLimitReset; - + public RateLimitException(long resetTime) { rateLimitReset = resetTime; } - + public long getResetTime() { return rateLimitReset; } + + /** + * @return Time in milliseconds that should be waited for before a new + * similar request can be executed. + */ + public long getRateLimitRemainingTime() { + return rateLimitReset * 1000 - System.currentTimeMillis(); + } } diff --git a/src/io/Response.java b/src/io/Response.java index 92b0f0c..e5ede8a 100644 --- a/src/io/Response.java +++ b/src/io/Response.java @@ -39,4 +39,12 @@ public class Response { public int getRateLimitReset() { return this.rateLimitReset; } + + /** + * @return Time in milliseconds that should be waited for before a new + * similar request can be executed. + */ + public long getRateLimitRemainingTime() { + return rateLimitReset * 1000 - System.currentTimeMillis(); + } } diff --git a/src/io/SearchImpl.java b/src/io/SearchImpl.java index 5bceadc..520a37c 100644 --- a/src/io/SearchImpl.java +++ b/src/io/SearchImpl.java @@ -119,7 +119,7 @@ public class SearchImpl implements Search { @Override public void close() { if (thread_cached != null) { - thread_cached.stop(); + thread_cached.stop(); save("search_cache.txt"); } } @@ -133,16 +133,16 @@ public class SearchImpl implements Search { sinceId = 0; maxId = Long.MAX_VALUE; } - + public Request(String parse) { String[] args = parse.split(":", 4); - + this.screenName = args[0]; this.sinceId = Integer.parseInt(args[1]); this.maxId = Integer.parseInt(args[2]); this.numberOfTweets = Integer.parseInt(args[3]); } - + @Override public String toString() { return screenName + ":" + Long.toString(sinceId) + ":" + Long.toString(maxId) + ":" + Integer.toString(numberOfTweets); @@ -187,7 +187,6 @@ public class SearchImpl implements Search { try { workerThread.interrupt(); workerThread.join(); - ioThread.interrupt(); ioThread.join(); } catch (InterruptedException ex) { Logger.getLogger(getClass().getName()) @@ -207,7 +206,7 @@ public class SearchImpl implements Search { */ private final BlockingQueue receivedObjects; - private boolean running = true; + private volatile boolean running = true; public Worker() { receivedObjects = new LinkedBlockingQueue(); @@ -234,23 +233,20 @@ public class SearchImpl implements Search { @Override public void run() { - while (true) { - if (!requestQueue.isEmpty()) { - Request request = null; - try { - request = requestQueue.take(); - executeRequest(request); - } catch (InterruptedException ex) { - if (request != null) { - requestQueue.offer(request); - } - getLogger().log(Level.SEVERE, null, ex); + while (running) { + try { + Request request = requestQueue.take(); + executeRequest(request); + } catch (InterruptedException ex) { + if (running) { + getLogger().log(Level.SEVERE, + "Interrupted while active", ex); } } } } - private void executeRequest(Request request) { + private void executeRequest(Request request) throws InterruptedException { try { Builder url = oauth.build(USER_TIMELINE_RESOURCE) .param("screen_name", request.screenName); @@ -259,23 +255,22 @@ public class SearchImpl implements Search { if (count >= request.numberOfTweets) { count = request.numberOfTweets; } - request.numberOfTweets -= count; - url = url.param("count", Integer.toString(count)); + url.param("count", Integer.toString(count)); if (request.maxId > 0) { - url = url.param("max_id", Long.toString(request.maxId - 1)); + url.param("max_id", Long.toString(request.maxId - 1)); } try { Response result = url.request(); // Obtain the lowest id. - JsonArray array = result.getResp().getAsJsonArray(); - for (int i = 0; i < array.size(); i++) { - JsonObject object = array.get(i).getAsJsonObject(); + JsonArray array = result.getJson().getAsJsonArray(); + for (JsonElement element : array) { + JsonObject object = element.getAsJsonObject(); JsonElement id = object.get("id"); - if (id == null) { + if (id == null || !id.isJsonPrimitive()) { getLogger().log(Level.WARNING, "Search {0}, id was not valid.", object); continue; } @@ -286,29 +281,22 @@ public class SearchImpl implements Search { receivedObjects.offer(object); } + request.numberOfTweets -= count; + // request completed, do we need more requests? if (request.numberOfTweets > 0) { requestQueue.offer(request); } - + if (result.getRateLimitRemaining() == 0) { - waitTil(result.getRateLimitReset()); + sleep(result.getRateLimitRemainingTime()); } } catch (RateLimitException ex) { requestQueue.offer(request); - waitTil(ex.getResetTime()); + sleep(ex.getRateLimitRemainingTime()); } } catch (IOException ex) { - getLogger().log(Level.SEVERE, null, ex); - } - } - - private void waitTil(long seconds) { - try { - long interval = seconds * 1000 - System.currentTimeMillis() + 2; - System.out.println("Rate limited, waiting " + interval + "seconds"); - sleep(interval); - } catch (InterruptedException ex) { + requestQueue.offer(request); getLogger().log(Level.SEVERE, null, ex); } } -- cgit v1.2.1