11package com .simplesteph .kafka ;
22
3- import com .mashape .unirest .http .Headers ;
4- import com .mashape .unirest .http .HttpResponse ;
5- import com .mashape .unirest .http .JsonNode ;
6- import com .mashape .unirest .http .Unirest ;
7- import com .mashape .unirest .http .exceptions .UnirestException ;
8- import com .mashape .unirest .request .GetRequest ;
93import com .simplesteph .kafka .model .Issue ;
104import com .simplesteph .kafka .model .PullRequest ;
115import com .simplesteph .kafka .model .User ;
126import com .simplesteph .kafka .utils .DateUtils ;
137import org .apache .kafka .connect .data .Struct ;
14- import org .apache .kafka .connect .errors .ConnectException ;
158import org .apache .kafka .connect .source .SourceRecord ;
169import org .apache .kafka .connect .source .SourceTask ;
1710import org .json .JSONArray ;
2013import org .slf4j .LoggerFactory ;
2114
2215import java .time .Instant ;
23- import java .time .LocalDateTime ;
24- import java .time .ZoneOffset ;
2516import java .util .*;
2617import static com .simplesteph .kafka .GitHubSchemas .*;
2718
2819
2920public class GitHubSourceTask extends SourceTask {
30- static final Logger log = LoggerFactory .getLogger (GitHubSourceTask .class );
21+ private static final Logger log = LoggerFactory .getLogger (GitHubSourceTask .class );
3122 public GitHubSourceConnectorConfig config ;
3223
3324 protected Instant nextQuerySince ;
3425 protected Integer lastIssueNumber ;
3526 protected Integer nextPageToVisit = 1 ;
3627 protected Instant lastUpdatedAt ;
3728
38- GitHubHttpAPIClient gitHubHttpAPIClient = new GitHubHttpAPIClient ();
39-
40- // for efficient http requests
41- private Integer XRateLimit = 9999 ;
42- private Integer XRateRemaining = 9999 ;
43- private long XRateReset = Instant .MAX .getEpochSecond ();
29+ GitHubAPIHttpClient gitHubHttpAPIClient ;
4430
4531 @ Override
4632 public String version () {
@@ -52,6 +38,7 @@ public void start(Map<String, String> map) {
5238 //Do things here that are required to start your task. This could be open a connection to a database, etc.
5339 config = new GitHubSourceConnectorConfig (map );
5440 initializeLastVariables ();
41+ gitHubHttpAPIClient = new GitHubAPIHttpClient (config );
5542 }
5643
5744 private void initializeLastVariables (){
@@ -77,24 +64,15 @@ private void initializeLastVariables(){
7764 }
7865 }
7966
80- public void sleep () throws InterruptedException {
81- long sleepTime = (long ) Math .ceil (
82- (double ) (XRateReset - Instant .now ().getEpochSecond ()) / XRateRemaining );
83- log .debug (String .format ("Sleeping for %s seconds" , sleepTime ));
84- Thread .sleep (1000 * sleepTime );
85- }
67+
8668
8769 @ Override
8870 public List <SourceRecord > poll () throws InterruptedException {
89- // Sleep if needed
90- if (XRateRemaining <= 10 && XRateRemaining > 0 ){
91- log .info (String .format ("Approaching limit soon, you have %s requests left" , XRateRemaining ));
92- sleep ();
93- }
71+ gitHubHttpAPIClient .sleepIfNeed ();
9472
9573 // fetch data
9674 final ArrayList <SourceRecord > records = new ArrayList <>();
97- JSONArray issues = gitHubHttpAPIClient .getNextIssues ();
75+ JSONArray issues = gitHubHttpAPIClient .getNextIssues (nextPageToVisit , nextQuerySince );
9876 // we'll count how many results we get with i
9977 int i = 0 ;
10078 for (Object obj : issues ) {
@@ -112,7 +90,7 @@ public List<SourceRecord> poll() throws InterruptedException {
11290 else {
11391 nextQuerySince = lastUpdatedAt .plusSeconds (1 );
11492 nextPageToVisit = 1 ;
115- sleep ();
93+ gitHubHttpAPIClient . sleep ();
11694 }
11795 return records ;
11896 }
@@ -190,70 +168,4 @@ private Struct buildRecordValue(Issue issue){
190168 return valueStruct ;
191169 }
192170
193- // GitHubHttpAPIClient used to launch HTTP Get requests
194- public class GitHubHttpAPIClient {
195-
196- protected JSONArray getNextIssues () throws InterruptedException {
197-
198- HttpResponse <JsonNode > jsonResponse ;
199- try {
200- jsonResponse = getNextIssuesAPI ();
201-
202- // deal with headers in any case
203- Headers headers = jsonResponse .getHeaders ();
204- XRateLimit = Integer .valueOf (headers .getFirst ("X-RateLimit-Limit" ));
205- XRateRemaining = Integer .valueOf (headers .getFirst ("X-RateLimit-Remaining" ));
206- XRateReset = Integer .valueOf (headers .getFirst ("X-RateLimit-Reset" ));
207- switch (jsonResponse .getStatus ()){
208- case 200 :
209- return jsonResponse .getBody ().getArray ();
210- case 401 :
211- throw new ConnectException ("Bad GitHub credentials provided, please edit your config" );
212- case 403 :
213- // we have issues too many requests.
214- log .info (jsonResponse .getBody ().getObject ().getString ("message" ));
215- log .info (String .format ("Your rate limit is %s" , XRateLimit ));
216- log .info (String .format ("Your remaining calls is %s" , XRateRemaining ));
217- log .info (String .format ("The limit will reset at %s" ,
218- LocalDateTime .ofInstant (Instant .ofEpochSecond (XRateReset ), ZoneOffset .systemDefault ())));
219- long sleepTime = XRateReset - Instant .now ().getEpochSecond ();
220- log .info (String .format ("Sleeping for %s seconds" , sleepTime ));
221- Thread .sleep (1000 * sleepTime );
222- return getNextIssues ();
223- default :
224- log .error (String .valueOf (jsonResponse .getStatus ()));
225- log .error (jsonResponse .getBody ().toString ());
226- log .error (jsonResponse .getHeaders ().toString ());
227- log .error ("Unknown error: Sleeping 5 seconds " +
228- "before re-trying" );
229- Thread .sleep (5000L );
230- return getNextIssues ();
231- }
232- } catch (UnirestException e ) {
233- e .printStackTrace ();
234- Thread .sleep (5000L );
235- return new JSONArray ();
236- }
237- }
238-
239- protected HttpResponse <JsonNode > getNextIssuesAPI () throws UnirestException {
240- GetRequest unirest = Unirest .get (constructUrl ());
241- if (!config .getAuthUsername ().isEmpty () && !config .getAuthPassword ().isEmpty () ){
242- unirest = unirest .basicAuth (config .getAuthUsername (), config .getAuthPassword ());
243- }
244- log .debug (String .format ("GET %s" , unirest .getUrl ()));
245- return unirest .asJson ();
246- }
247-
248- protected String constructUrl (){
249- return String .format (
250- "https://api.github.com/repos/%s/%s/issues?page=%s&per_page=%s&since=%s&state=all&direction=asc&sort=updated" ,
251- config .getOwnerConfig (),
252- config .getRepoConfig (),
253- nextPageToVisit ,
254- config .getBatchSize (),
255- nextQuerySince .toString ());
256- }
257- }
258-
259171}
0 commit comments