Skip to content

Commit b490a1a

Browse files
committed
added optional Schema demo with PR field
1 parent 832e316 commit b490a1a

5 files changed

Lines changed: 72 additions & 18 deletions

File tree

src/main/java/com/simplesteph/kafka/GitHubSchemas.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,16 @@ public class GitHubSchemas {
2626
public static String USER_ID_FIELD = "id";
2727
public static String USER_LOGIN_FIELD = "login";
2828

29+
// PR fields
30+
public static String PR_FIELD = "pull_request";
31+
public static String PR_URL_FIELD = "url";
32+
public static String PR_HTML_URL_FIELD = "html_url";
33+
2934
// Schema names
3035
public static String SCHEMA_KEY = "GitHub Issue Key";
31-
public static String SCHEMA_VALUE_ISSUE = "GitHub Issue";
36+
public static String SCHEMA_VALUE_ISSUE = "Issue";
3237
public static String SCHEMA_VALUE_USER = "User";
38+
public static String SCHEMA_VALUE_PR = "PR";
3339

3440
// Key Schema
3541
public static Schema KEY_SCHEMA = SchemaBuilder.struct().name(SCHEMA_KEY)
@@ -47,6 +53,14 @@ public class GitHubSchemas {
4753
.field(USER_LOGIN_FIELD, Schema.STRING_SCHEMA)
4854
.build();
4955

56+
// optional schema
57+
public static Schema PR_SCHEMA = SchemaBuilder.struct().name(SCHEMA_VALUE_PR)
58+
.version(1)
59+
.field(PR_URL_FIELD, Schema.STRING_SCHEMA)
60+
.field(PR_HTML_URL_FIELD, Schema.STRING_SCHEMA)
61+
.optional()
62+
.build();
63+
5064
public static Schema VALUE_SCHEMA = SchemaBuilder.struct().name(SCHEMA_VALUE_ISSUE)
5165
.version(1)
5266
.field(URL_FIELD, Schema.STRING_SCHEMA)
@@ -55,6 +69,7 @@ public class GitHubSchemas {
5569
.field(UPDATED_AT_FIELD, Schema.INT64_SCHEMA)
5670
.field(NUMBER_FIELD, Schema.INT32_SCHEMA)
5771
.field(STATE_FIELD, Schema.STRING_SCHEMA)
58-
.field(USER_FIELD, USER_SCHEMA)
72+
.field(USER_FIELD, USER_SCHEMA) // mandatory
73+
.field(PR_FIELD, PR_SCHEMA) // optional
5974
.build();
6075
}

src/main/java/com/simplesteph/kafka/GitHubSourceTask.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.mashape.unirest.http.exceptions.UnirestException;
88
import com.mashape.unirest.request.GetRequest;
99
import com.simplesteph.kafka.model.Issue;
10+
import com.simplesteph.kafka.model.PullRequest;
1011
import com.simplesteph.kafka.model.User;
1112
import com.simplesteph.kafka.utils.DateUtils;
1213
import org.apache.kafka.connect.data.Struct;
@@ -219,20 +220,32 @@ private Struct buildRecordKey(Issue issue){
219220
}
220221

221222
private Struct buildRecordValue(Issue issue){
222-
User user = issue.getUser();
223-
Struct userStruct = new Struct(USER_SCHEMA)
224-
.put(USER_URL_FIELD, user.getUrl())
225-
.put(USER_ID_FIELD, user.getId())
226-
.put(USER_LOGIN_FIELD, user.getLogin());
227223

224+
// Issue top level fields
228225
Struct valueStruct = new Struct(VALUE_SCHEMA)
229226
.put(URL_FIELD, issue.getUrl())
230227
.put(TITLE_FIELD, issue.getTitle())
231228
.put(CREATED_AT_FIELD, issue.getCreatedAt().toEpochMilli())
232229
.put(UPDATED_AT_FIELD, issue.getUpdatedAt().toEpochMilli())
233230
.put(NUMBER_FIELD, issue.getNumber())
234-
.put(STATE_FIELD, issue.getState())
235-
.put(USER_FIELD, userStruct);
231+
.put(STATE_FIELD, issue.getState());
232+
233+
// User is mandatory
234+
User user = issue.getUser();
235+
Struct userStruct = new Struct(USER_SCHEMA)
236+
.put(USER_URL_FIELD, user.getUrl())
237+
.put(USER_ID_FIELD, user.getId())
238+
.put(USER_LOGIN_FIELD, user.getLogin());
239+
valueStruct.put(USER_FIELD, userStruct);
240+
241+
// Pull request is optional
242+
PullRequest pullRequest = issue.getPullRequest();
243+
if (pullRequest != null) {
244+
Struct prStruct = new Struct(PR_SCHEMA)
245+
.put(PR_URL_FIELD, pullRequest.getUrl())
246+
.put(PR_HTML_URL_FIELD, pullRequest.getHtmlUrl());
247+
valueStruct.put(PR_FIELD, prStruct);
248+
}
236249

237250
return valueStruct;
238251
}

src/main/java/com/simplesteph/kafka/model/Issue.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11

22
package com.simplesteph.kafka.model;
33

4-
import org.apache.kafka.connect.data.Struct;
54
import org.json.JSONObject;
65

76
import java.time.Instant;
@@ -396,8 +395,6 @@ public Issue withAdditionalProperty(String name, Object value) {
396395

397396
public static Issue fromJson(JSONObject jsonObject) {
398397

399-
User user = User.fromJson(jsonObject.getJSONObject("user"));
400-
401398
Issue issue = new Issue();
402399
issue.withUrl(jsonObject.getString(URL_FIELD));
403400
issue.withHtmlUrl(jsonObject.getString(HTML_URL_FIELD));
@@ -406,7 +403,18 @@ public static Issue fromJson(JSONObject jsonObject) {
406403
issue.withUpdatedAt(Instant.parse(jsonObject.getString(UPDATED_AT_FIELD)));
407404
issue.withNumber(jsonObject.getInt(NUMBER_FIELD));
408405
issue.withState(jsonObject.getString(STATE_FIELD));
406+
407+
// user is mandatory
408+
User user = User.fromJson(jsonObject.getJSONObject("user"));
409409
issue.withUser(user);
410+
411+
// pull request is an optional fields
412+
if (jsonObject.has("pull_request")){
413+
System.out.println("pull!");
414+
PullRequest pullRequest = PullRequest.fromJson(jsonObject.getJSONObject("pull_request"));
415+
issue.withPullRequest(pullRequest);
416+
}
417+
410418
return issue;
411419
}
412420
}

src/main/java/com/simplesteph/kafka/model/PullRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11

22
package com.simplesteph.kafka.model;
33

4+
import org.json.JSONObject;
5+
46
import java.util.HashMap;
57
import java.util.Map;
8+
import static com.simplesteph.kafka.GitHubSchemas.*;
69

710
public class PullRequest {
811

@@ -99,4 +102,10 @@ public PullRequest withAdditionalProperty(String name, Object value) {
99102
return this;
100103
}
101104

105+
public static PullRequest fromJson(JSONObject pull_request) {
106+
return new PullRequest()
107+
.withUrl(pull_request.getString(PR_URL_FIELD))
108+
.withHtmlUrl(pull_request.getString(PR_HTML_URL_FIELD));
109+
110+
}
102111
}

src/test/java/com/simplesteph/kafka/model/IssueTest.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,8 @@
33
import org.json.JSONObject;
44
import org.junit.Test;
55

6-
import java.time.Instant;
7-
8-
import static com.simplesteph.kafka.GitHubSchemas.*;
9-
import static com.simplesteph.kafka.GitHubSchemas.NUMBER_FIELD;
10-
import static com.simplesteph.kafka.GitHubSchemas.STATE_FIELD;
116
import static org.junit.Assert.assertEquals;
7+
import static org.junit.Assert.assertNotNull;
128

139
public class IssueTest {
1410
String issueStr = "{\n" +
@@ -64,15 +60,28 @@ public class IssueTest {
6460

6561
@Test
6662
public void canParseJson(){
63+
// issue
6764
Issue issue = Issue.fromJson(issueJson);
68-
6965
assertEquals(issue.getUrl(), "https://api.github.com/repos/apache/kafka/issues/2800");
7066
assertEquals(issue.getTitle(), "added interface to allow producers to create a ProducerRecord without…");
7167
assertEquals(issue.getCreatedAt().toString(), "2017-04-04T06:47:09Z");
7268
assertEquals(issue.getUpdatedAt().toString(), "2017-04-19T22:36:21Z");
7369
assertEquals(issue.getNumber(), (Integer) 2800);
7470
assertEquals(issue.getState(), "closed");
7571

72+
// user
73+
User user = issue.getUser();
74+
assertEquals(user.getId(), (Integer) 20851561);
75+
assertEquals(user.getUrl(), "https://api.github.com/users/simplesteph");
76+
assertEquals(user.getHtmlUrl(), "https://github.com/simplesteph");
77+
assertEquals(user.getLogin(), "simplesteph");
78+
79+
// pr
80+
PullRequest pullRequest = issue.getPullRequest();
81+
assertNotNull(pullRequest);
82+
assertEquals(pullRequest.getUrl(), "https://api.github.com/repos/apache/kafka/pulls/2800");
83+
assertEquals(pullRequest.getHtmlUrl(), "https://github.com/apache/kafka/pull/2800");
84+
7685
}
7786

7887
}

0 commit comments

Comments
 (0)