Skip to content

Commit dd2c20f

Browse files
committed
separated the http client
1 parent 6921826 commit dd2c20f

2 files changed

Lines changed: 71 additions & 66 deletions

File tree

src/main/java/com/simplesteph/kafka/GitHubSourceTask.java

Lines changed: 69 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@ public class GitHubSourceTask extends SourceTask {
3030
static final Logger log = LoggerFactory.getLogger(GitHubSourceTask.class);
3131
public GitHubSourceConnectorConfig config;
3232

33-
3433
protected Instant nextQuerySince;
3534
protected Integer lastIssueNumber;
3635
protected Integer nextPageToVisit = 1;
3736
protected Instant lastUpdatedAt;
3837

38+
GitHubHttpAPIClient gitHubHttpAPIClient = new GitHubHttpAPIClient();
39+
3940
// for efficient http requests
4041
private Integer XRateLimit = 9999;
4142
private Integer XRateRemaining = 9999;
@@ -93,7 +94,7 @@ public List<SourceRecord> poll() throws InterruptedException {
9394

9495
// fetch data
9596
final ArrayList<SourceRecord> records = new ArrayList<>();
96-
JSONArray issues = getNextIssues();
97+
JSONArray issues = gitHubHttpAPIClient.getNextIssues();
9798
// we'll count how many results we get with i
9899
int i = 0;
99100
for (Object obj : issues) {
@@ -129,68 +130,6 @@ private SourceRecord generateSourceRecord(Issue issue) {
129130
issue.getUpdatedAt().toEpochMilli());
130131
}
131132

132-
protected JSONArray getNextIssues() throws InterruptedException {
133-
134-
HttpResponse<JsonNode> jsonResponse;
135-
try {
136-
jsonResponse = getNextIssuesAPI();
137-
138-
// deal with headers in any case
139-
Headers headers = jsonResponse.getHeaders();
140-
XRateLimit = Integer.valueOf(headers.getFirst("X-RateLimit-Limit"));
141-
XRateRemaining = Integer.valueOf(headers.getFirst("X-RateLimit-Remaining"));
142-
XRateReset = Integer.valueOf(headers.getFirst("X-RateLimit-Reset"));
143-
switch (jsonResponse.getStatus()){
144-
case 200:
145-
return jsonResponse.getBody().getArray();
146-
case 401:
147-
throw new ConnectException("Bad GitHub credentials provided, please edit your config");
148-
case 403:
149-
// we have issues too many requests.
150-
log.info(jsonResponse.getBody().getObject().getString("message"));
151-
log.info(String.format("Your rate limit is %s", XRateLimit));
152-
log.info(String.format("Your remaining calls is %s", XRateRemaining));
153-
log.info(String.format("The limit will reset at %s",
154-
LocalDateTime.ofInstant(Instant.ofEpochSecond(XRateReset), ZoneOffset.systemDefault())));
155-
long sleepTime = XRateReset - Instant.now().getEpochSecond();
156-
log.info(String.format("Sleeping for %s seconds", sleepTime ));
157-
Thread.sleep(1000 * sleepTime);
158-
return getNextIssues();
159-
default:
160-
log.error(String.valueOf(jsonResponse.getStatus()));
161-
log.error(jsonResponse.getBody().toString());
162-
log.error(jsonResponse.getHeaders().toString());
163-
log.error("Unknown error: Sleeping 5 seconds " +
164-
"before re-trying");
165-
Thread.sleep(5000L);
166-
return getNextIssues();
167-
}
168-
} catch (UnirestException e) {
169-
e.printStackTrace();
170-
Thread.sleep(5000L);
171-
return new JSONArray();
172-
}
173-
}
174-
175-
protected HttpResponse<JsonNode> getNextIssuesAPI() throws UnirestException {
176-
GetRequest unirest = Unirest.get(constructUrl());
177-
if (!config.getAuthUsername().isEmpty() && !config.getAuthPassword().isEmpty() ){
178-
unirest = unirest.basicAuth(config.getAuthUsername(), config.getAuthPassword());
179-
}
180-
log.debug(String.format("GET %s", unirest.getUrl()));
181-
return unirest.asJson();
182-
}
183-
184-
protected String constructUrl(){
185-
return String.format(
186-
"https://api.github.com/repos/%s/%s/issues?page=%s&per_page=%s&since=%s&state=all&direction=asc&sort=updated",
187-
config.getOwnerConfig(),
188-
config.getRepoConfig(),
189-
nextPageToVisit,
190-
config.getBatchSize(),
191-
nextQuerySince.toString());
192-
}
193-
194133
@Override
195134
public void stop() {
196135
// Do whatever is required to stop your task.
@@ -251,4 +190,70 @@ private Struct buildRecordValue(Issue issue){
251190
return valueStruct;
252191
}
253192

193+
// GitHubHttpAPIClient used to launch HTTP Get requests
194+
public class GitHubHttpAPIClient {
195+
196+
protected JSONArray getNextIssues() throws InterruptedException {
197+
198+
HttpResponse<JsonNode> jsonResponse;
199+
try {
200+
jsonResponse = getNextIssuesAPI();
201+
202+
// deal with headers in any case
203+
Headers headers = jsonResponse.getHeaders();
204+
XRateLimit = Integer.valueOf(headers.getFirst("X-RateLimit-Limit"));
205+
XRateRemaining = Integer.valueOf(headers.getFirst("X-RateLimit-Remaining"));
206+
XRateReset = Integer.valueOf(headers.getFirst("X-RateLimit-Reset"));
207+
switch (jsonResponse.getStatus()){
208+
case 200:
209+
return jsonResponse.getBody().getArray();
210+
case 401:
211+
throw new ConnectException("Bad GitHub credentials provided, please edit your config");
212+
case 403:
213+
// we have issues too many requests.
214+
log.info(jsonResponse.getBody().getObject().getString("message"));
215+
log.info(String.format("Your rate limit is %s", XRateLimit));
216+
log.info(String.format("Your remaining calls is %s", XRateRemaining));
217+
log.info(String.format("The limit will reset at %s",
218+
LocalDateTime.ofInstant(Instant.ofEpochSecond(XRateReset), ZoneOffset.systemDefault())));
219+
long sleepTime = XRateReset - Instant.now().getEpochSecond();
220+
log.info(String.format("Sleeping for %s seconds", sleepTime ));
221+
Thread.sleep(1000 * sleepTime);
222+
return getNextIssues();
223+
default:
224+
log.error(String.valueOf(jsonResponse.getStatus()));
225+
log.error(jsonResponse.getBody().toString());
226+
log.error(jsonResponse.getHeaders().toString());
227+
log.error("Unknown error: Sleeping 5 seconds " +
228+
"before re-trying");
229+
Thread.sleep(5000L);
230+
return getNextIssues();
231+
}
232+
} catch (UnirestException e) {
233+
e.printStackTrace();
234+
Thread.sleep(5000L);
235+
return new JSONArray();
236+
}
237+
}
238+
239+
protected HttpResponse<JsonNode> getNextIssuesAPI() throws UnirestException {
240+
GetRequest unirest = Unirest.get(constructUrl());
241+
if (!config.getAuthUsername().isEmpty() && !config.getAuthPassword().isEmpty() ){
242+
unirest = unirest.basicAuth(config.getAuthUsername(), config.getAuthPassword());
243+
}
244+
log.debug(String.format("GET %s", unirest.getUrl()));
245+
return unirest.asJson();
246+
}
247+
248+
protected String constructUrl(){
249+
return String.format(
250+
"https://api.github.com/repos/%s/%s/issues?page=%s&per_page=%s&since=%s&state=all&direction=asc&sort=updated",
251+
config.getOwnerConfig(),
252+
config.getRepoConfig(),
253+
nextPageToVisit,
254+
config.getBatchSize(),
255+
nextQuerySince.toString());
256+
}
257+
}
258+
254259
}

src/test/java/com/simplesteph/kafka/GitHubSourceTaskTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ public void test() throws UnirestException {
3535
gitHubSourceTask.config = new GitHubSourceConnectorConfig(initialConfig());
3636
gitHubSourceTask.nextPageToVisit = 1;
3737
gitHubSourceTask.nextQuerySince = Instant.parse("2017-01-01T00:00:00Z");
38-
String url = gitHubSourceTask.constructUrl();
38+
String url = gitHubSourceTask.gitHubHttpAPIClient.constructUrl();
3939
System.out.println(url);
40-
HttpResponse<JsonNode> httpResponse = gitHubSourceTask.getNextIssuesAPI();
40+
HttpResponse<JsonNode> httpResponse = gitHubSourceTask.gitHubHttpAPIClient.getNextIssuesAPI();
4141
if (httpResponse.getStatus() != 403) {
4242
assert (httpResponse.getStatus() == 200);
4343
Set<String> headers = httpResponse.getHeaders().keySet();

0 commit comments

Comments
 (0)