Skip to content

Commit 16c3cd2

Browse files
eyeofvinayravisuhag
authored andcommitted
feat: redis sink (#38)
* Initial commit: add class diagram * move diagram to docs/sinks * Add config files * Add files for KeyValue mode support * Refactor: follow checkstyle * Add and use REDIS_VALUE_BY_NAME property * Add logic for parsing key templates * Use JsonPath for nested Proto and Json field by name support * add config SINK_DEFAULT_DATATYPE_STRING_ENABLE, add getFieldByName() in OdpfParsedMessage * Fix instrumentation, error handling * Add logic to extract nested Proto fields * Simplify getFieldByName for proto, follow consistent convention for json fields * Optimization: cache schema mapping * Ref: Move Json parsing code to statis util method * Fix getMapping caching, move common code to RedisParser from RedisKeyValueParser * Add List parser module * Add Hashset parser module * Consistent key template formatting * Add tests for ttl, dataentry, parsers packages * Add test for JsonUtils class * Minor refactor, fix tests * Add tests for redis/client package * Add tests for redis/parsers package * Fix RedisHashSetParserTest * chore: review changes * test: add test for parseKeyTemplate(), fix test after review changes * fix client tests * feat: added response parsing * Reduce parseKey code, add test for multiple variables in template, fix tests * fix: review comments * chore: rearrange packages * chore: refactor packages * fix and add tests, checkstyle * chore: fix tests, improve coverage * Add test for getErrorsFromResponse() method * chore: minor change * Add RedisParserTest, optimize getSchema in RedisParser * feat: Template abstraction * chore: move entry package inside client * Fix tests * Add RedisSinkTest * Add StandaloneClientClientTest * Improve client tests * Fix Entry tests * add TTL response * fix: template constructor should validate arguments * chore: fix checkstyle * tests: add test for entries * tests: add tests for redis clients * fix: remove schema from entry classes * fix: minor fixes after qa Co-authored-by: Vinay Verma <vinayverma610@gmail.com>
1 parent cc6fbb4 commit 16c3cd2

77 files changed

Lines changed: 3632 additions & 45 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@ dependencies {
3939
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
4040
implementation "io.grpc:grpc-all:1.38.0"
4141
implementation group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.35'
42+
implementation group: 'redis.clients', name: 'jedis', version: '3.0.1'
43+
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
4244
implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.2.1'
4345
implementation 'org.json:json:20220320'
44-
46+
implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.4.0'
4547
testImplementation group: 'junit', name: 'junit', version: '4.13'
4648
testImplementation 'org.hamcrest:hamcrest-all:1.3'
4749
testImplementation 'org.mockito:mockito-core:4.5.1'

src/main/java/io/odpf/depot/config/OdpfSinkConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public interface OdpfSinkConfig extends Config {
6464
@DefaultValue("")
6565
String getSinkConnectorSchemaProtoKeyClass();
6666

67+
@Key("SINK_CONNECTOR_SCHEMA_JSON_PARSER_STRING_MODE_ENABLED")
68+
@DefaultValue("true")
69+
boolean getSinkConnectorSchemaJsonParserStringModeEnabled();
70+
6771
@Key("SINK_CONNECTOR_SCHEMA_DATA_TYPE")
6872
@ConverterClass(SinkConnectorSchemaDataTypeConverter.class)
6973
@DefaultValue("PROTOBUF")
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.odpf.depot.config;
2+
3+
import io.odpf.depot.config.converter.JsonToPropertiesConverter;
4+
import io.odpf.depot.config.converter.RedisSinkDataTypeConverter;
5+
import io.odpf.depot.config.converter.RedisSinkDeploymentTypeConverter;
6+
import io.odpf.depot.config.converter.RedisSinkTtlTypeConverter;
7+
import io.odpf.depot.redis.enums.RedisSinkDataType;
8+
import io.odpf.depot.redis.enums.RedisSinkDeploymentType;
9+
import io.odpf.depot.redis.enums.RedisSinkTtlType;
10+
import org.aeonbits.owner.Config;
11+
12+
import java.util.Properties;
13+
14+
15+
@Config.DisableFeature(Config.DisableableFeature.PARAMETER_FORMATTING)
16+
public interface RedisSinkConfig extends OdpfSinkConfig {
17+
@Key("SINK_REDIS_URLS")
18+
String getSinkRedisUrls();
19+
20+
@Key("SINK_REDIS_KEY_TEMPLATE")
21+
String getSinkRedisKeyTemplate();
22+
23+
@Key("SINK_REDIS_DATA_TYPE")
24+
@DefaultValue("HASHSET")
25+
@ConverterClass(RedisSinkDataTypeConverter.class)
26+
RedisSinkDataType getSinkRedisDataType();
27+
28+
@Key("SINK_REDIS_TTL_TYPE")
29+
@DefaultValue("DISABLE")
30+
@ConverterClass(RedisSinkTtlTypeConverter.class)
31+
RedisSinkTtlType getSinkRedisTtlType();
32+
33+
@Key("SINK_REDIS_TTL_VALUE")
34+
@DefaultValue("0")
35+
long getSinkRedisTtlValue();
36+
37+
@Key("SINK_REDIS_DEPLOYMENT_TYPE")
38+
@DefaultValue("Standalone")
39+
@ConverterClass(RedisSinkDeploymentTypeConverter.class)
40+
RedisSinkDeploymentType getSinkRedisDeploymentType();
41+
42+
@Key("SINK_REDIS_LIST_DATA_PROTO_INDEX")
43+
String getSinkRedisListDataProtoIndex();
44+
45+
@Key("SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME")
46+
String getSinkRedisKeyValueDataFieldName();
47+
48+
@Key("SINK_REDIS_LIST_DATA_FIELD_NAME")
49+
String getSinkRedisListDataFieldName();
50+
51+
@Key("SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING")
52+
@ConverterClass(JsonToPropertiesConverter.class)
53+
@DefaultValue("")
54+
Properties getSinkRedisHashsetFieldToColumnMapping();
55+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package io.odpf.depot.config.converter;
2+
3+
import com.google.common.base.Strings;
4+
import com.google.gson.Gson;
5+
import com.google.gson.reflect.TypeToken;
6+
7+
import java.lang.reflect.Method;
8+
import java.lang.reflect.Type;
9+
import java.util.ArrayList;
10+
import java.util.HashSet;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Properties;
14+
import java.util.Set;
15+
import java.util.function.Consumer;
16+
import java.util.stream.Stream;
17+
18+
19+
public class JsonToPropertiesConverter implements org.aeonbits.owner.Converter<Properties> {
20+
private static final Gson GSON = new Gson();
21+
22+
@Override
23+
public Properties convert(Method method, String input) {
24+
if (Strings.isNullOrEmpty(input)) {
25+
return null;
26+
}
27+
Type type = new TypeToken<Map<String, Object>>() {
28+
}.getType();
29+
Map<String, Object> m = GSON.fromJson(input, type);
30+
Properties properties = getProperties(m);
31+
validate(properties);
32+
return properties;
33+
}
34+
35+
private Properties getProperties(Map<String, Object> inputMap) {
36+
Properties properties = new Properties();
37+
for (String key : inputMap.keySet()) {
38+
Object value = inputMap.get(key);
39+
if (value instanceof String) {
40+
properties.put(key, value);
41+
} else if (value instanceof Map) {
42+
properties.put(key, getProperties((Map) value));
43+
}
44+
}
45+
return properties;
46+
}
47+
48+
private void validate(Properties properties) {
49+
DuplicateFinder duplicateFinder = flattenValues(properties)
50+
.collect(DuplicateFinder::new, DuplicateFinder::accept, DuplicateFinder::combine);
51+
if (duplicateFinder.duplicates.size() > 0) {
52+
throw new IllegalArgumentException("duplicates found in SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING for : " + duplicateFinder.duplicates);
53+
}
54+
}
55+
56+
private Stream<String> flattenValues(Properties properties) {
57+
return properties
58+
.values()
59+
.stream()
60+
.flatMap(v -> {
61+
if (v instanceof String) {
62+
return Stream.of((String) v);
63+
} else if (v instanceof Properties) {
64+
return flattenValues((Properties) v);
65+
} else {
66+
return Stream.empty();
67+
}
68+
});
69+
}
70+
71+
private static class DuplicateFinder implements Consumer<String> {
72+
private final Set<String> processedValues = new HashSet<>();
73+
private final List<String> duplicates = new ArrayList<>();
74+
75+
@Override
76+
public void accept(String o) {
77+
if (processedValues.contains(o)) {
78+
duplicates.add(o);
79+
} else {
80+
processedValues.add(o);
81+
}
82+
}
83+
84+
void combine(DuplicateFinder other) {
85+
other.processedValues
86+
.forEach(v -> {
87+
if (processedValues.contains(v)) {
88+
duplicates.add(v);
89+
} else {
90+
processedValues.add(v);
91+
}
92+
});
93+
}
94+
}
95+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.odpf.depot.config.converter;
2+
3+
import io.odpf.depot.redis.enums.RedisSinkDataType;
4+
import org.aeonbits.owner.Converter;
5+
6+
import java.lang.reflect.Method;
7+
8+
public class RedisSinkDataTypeConverter implements Converter<RedisSinkDataType> {
9+
@Override
10+
public RedisSinkDataType convert(Method method, String input) {
11+
return RedisSinkDataType.valueOf(input.toUpperCase());
12+
}
13+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.odpf.depot.config.converter;
2+
3+
import io.odpf.depot.redis.enums.RedisSinkDeploymentType;
4+
import org.aeonbits.owner.Converter;
5+
6+
import java.lang.reflect.Method;
7+
8+
public class RedisSinkDeploymentTypeConverter implements Converter<RedisSinkDeploymentType> {
9+
@Override
10+
public RedisSinkDeploymentType convert(Method method, String input) {
11+
return RedisSinkDeploymentType.valueOf(input.toUpperCase());
12+
}
13+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.odpf.depot.config.converter;
2+
3+
import io.odpf.depot.redis.enums.RedisSinkTtlType;
4+
import org.aeonbits.owner.Converter;
5+
6+
import java.lang.reflect.Method;
7+
8+
public class RedisSinkTtlTypeConverter implements Converter<RedisSinkTtlType> {
9+
@Override
10+
public RedisSinkTtlType convert(Method method, String input) {
11+
return RedisSinkTtlType.valueOf(input.toUpperCase());
12+
}
13+
}

src/main/java/io/odpf/depot/error/ErrorInfo.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
@Data
99
public class ErrorInfo {
1010

11-
@EqualsAndHashCode.Exclude private Exception exception;
11+
@EqualsAndHashCode.Exclude
12+
private Exception exception;
1213
private ErrorType errorType;
1314

1415
public String toString() {
15-
return errorType.name();
16+
return String.format("Exception %s, ErrorType: %s", exception != null ? exception.getMessage() : "NULL", errorType.name());
1617
}
1718
}
Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package io.odpf.depot.message;
2-
3-
import java.io.IOException;
4-
52
public interface OdpfMessageSchema {
63

7-
Object getSchema() throws IOException;
4+
Object getSchema();
85
}

src/main/java/io/odpf/depot/message/ParsedOdpfMessage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ public interface ParsedOdpfMessage {
1111
void validate(OdpfSinkConfig config);
1212

1313
Map<String, Object> getMapping(OdpfMessageSchema schema) throws IOException;
14+
15+
Object getFieldByName(String name, OdpfMessageSchema odpfMessageSchema);
1416
}

0 commit comments

Comments
 (0)