Skip to content

Commit c087c86

Browse files
docs: redis sink docs (#40)
* docs: redis sink docs * docs: redis sink docs * docs: redis sink docs * docs: redis sink docs * docs: add keyvalue mode in redis * docs: redis sink docs * docs: redis sink docs * docs: redis sink docs
1 parent 16c3cd2 commit c087c86

7 files changed

Lines changed: 111 additions & 7 deletions

File tree

docs/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ GRPC)
1010
* Instrumentation support with statsd
1111
* Log Sink
1212
* Bigquery Sink
13+
* Redis Sink
1314

1415
Depot is a sink connector, which acts as a bridge between data processing systems and real sink. The APIs in this
1516
library can be used to push data to various sinks. Common sinks implementations will be added in this repo.

docs/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
## Sink
77
* [Bigquery](sinks/bigquery.md)
8+
* [Redis](sinks/redis.md)
89

910
## Reference
1011

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Redis
2+
3+
A Redis sink in Depot requires the following environment variables to be set along with Generic ones
4+
5+
### `SINK_REDIS_URLS`
6+
7+
REDIS server instance hostname/IP address followed by its port.
8+
9+
- Example value: `localhost:6379,localhost:6380`
10+
- Type: `required`
11+
12+
### `SINK_REDIS_DATA_TYPE`
13+
14+
To select whether you want to push your data as a `KEYVALUE`, `HASHSET` or as a `LIST` data type.
15+
16+
- Example value: `Hashset`
17+
- Type: `required`
18+
- Default value: `Hashset`
19+
20+
### `SINK_REDIS_KEY_TEMPLATE`
21+
22+
The string that will act as the key for each Redis entry. This key can be configured as per the requirement, a constant or can extract value from each message and use that as the Redis key.
23+
24+
- Example value: `Service\_%%s,1`
25+
26+
This will take the value with index 1 from proto and create the Redis keys as per the template.
27+
28+
- Type: `required`
29+
30+
### `SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING`
31+
32+
This is the field that decides what all data will be stored in the HashSet for each message.
33+
- Example value: `{"order_number":"ORDER_NUMBER","event_timestamp":"TIMESTAMP"}`
34+
- Type: `required (For Hashset)`
35+
36+
### `SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME`
37+
38+
This field decides what data will be stored in the value part of key-value pair
39+
40+
- Example value: `customer_id`
41+
42+
This will get the value of the field with name `customer_id` in your proto and push that to the Redis as value with the corresponding keyTemplate
43+
44+
- Type: `required (For KeyValue)`
45+
46+
### `SINK_REDIS_LIST_DATA_FIELD_NAME`
47+
48+
This field decides what all data will be stored in the List for each message.
49+
50+
- Example value: `customer_id`
51+
52+
This will get the value of the field with name `customer_id` in your proto and push that to the Redis list with the corresponding keyTemplate
53+
54+
- Type: `required (For List)`
55+
56+
### `SINK_REDIS_TTL_TYPE`
57+
58+
- Example value: `DURATION`
59+
- Type: `optional`
60+
- Default value: `DISABLE`
61+
- Choice of Redis TTL type.It can be:
62+
- `DURATION`: After which the Key will be expired and removed from Redis \(UNIT- seconds\)
63+
- `EXACT_TIME`: Precise UNIX timestamp after which the Key will be expired
64+
65+
### `SINK_REDIS_TTL_VALUE`
66+
67+
Redis TTL value in Unix Timestamp for `EXACT_TIME` TTL type, In Seconds for `DURATION` TTL type.
68+
69+
- Example value: `100000`
70+
- Type: `optional`
71+
- Default value: `0`
72+
73+
### `SINK_REDIS_DEPLOYMENT_TYPE`
74+
75+
The Redis deployment you are using. At present, we support `Standalone` and `Cluster` types.
76+
77+
- Example value: `Standalone`
78+
- Type: `required`
79+
- Default value: `Standalone`

docs/roadmap.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ If you have feedback about the roadmap section itself,
1212
such as how the issues are presented,
1313
let us know through [discussions](https://github.com/odpf/depot/discussions).
1414

15-
## depot 0.1.4
15+
## Depot 0.1.4
1616

1717
### Feature enhancements
1818

docs/sinks/redis.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Redis Sink
2+
3+
### Data Types
4+
Redis sink can be created in 3 different modes based on the value of [`SINK_REDIS_DATA_TYPE`](../reference/configuration/redis.md#sink_redis_data_type): HashSet, KeyValue or List
5+
- `Hashset`: For each message, an entry of the format `key : field : value` is generated and pushed to Redis. Field and value are generated on the basis of the config [`SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING`](../reference/configuration/redis.md#sink_redis_hashset_field_to_column_mapping)
6+
- `List`: For each message entry of the format `key : value` is generated and pushed to Redis. The value is fetched for the Proto field name provided in the config [`SINK_REDIS_LIST_DATA_FIELD_NAME`](../reference/configuration/redis.md#sink_redis_list_data_field_name)
7+
- `KeyValue`: For each message entry of the format `key : value` is generated and pushed to Redis. The value is fetched for the proto field name provided in the config [`SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME`](../reference/configuration/redis.md#sink_redis_key_value_data_field_name)
8+
9+
The `key` is picked up from a field in the message itself.
10+
11+
Limitation: Depot Redis sink only supports Key-Value, HashSet and List entries as of now.
12+
13+
### Deployment Types
14+
Redis sink, as of now, supports two different Deployment Types `Standalone` and `Cluster`. This can be configured in the Depot environment variable `SINK_REDIS_DEPLOYMENT_TYPE`.

src/main/java/io/odpf/depot/config/RedisSinkConfig.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,6 @@ public interface RedisSinkConfig extends OdpfSinkConfig {
3939
@ConverterClass(RedisSinkDeploymentTypeConverter.class)
4040
RedisSinkDeploymentType getSinkRedisDeploymentType();
4141

42-
@Key("SINK_REDIS_LIST_DATA_PROTO_INDEX")
43-
String getSinkRedisListDataProtoIndex();
44-
4542
@Key("SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME")
4643
String getSinkRedisKeyValueDataFieldName();
4744

src/main/java/io/odpf/depot/redis/RedisSinkFactory.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,26 @@ public RedisSinkFactory(RedisSinkConfig sinkConfig) {
3636

3737
public void init() throws IOException {
3838
Instrumentation instrumentation = new Instrumentation(statsDReporter, RedisSinkFactory.class);
39-
String redisConfig = String.format("\n\tredis.urls = %s\n\tredis.key.template = %s\n\tredis.sink.type = %s"
40-
+ "\n\tredis.list.data.proto.index = %s\n\tredis.ttl.type = %s\n\tredis.ttl.value = %d",
39+
String redisConfig = String.format("\n\tredis.urls = %s\n\tredis.key.template = %s\n\tredis.sink.data.type = %s"
40+
+ "\n\tredis.deployment.type = %s\n\tredis.ttl.type = %s\n\tredis.ttl.value = %d\n\t",
4141
sinkConfig.getSinkRedisUrls(),
4242
sinkConfig.getSinkRedisKeyTemplate(),
4343
sinkConfig.getSinkRedisDataType().toString(),
44-
sinkConfig.getSinkRedisListDataProtoIndex(),
44+
sinkConfig.getSinkRedisDeploymentType().toString(),
4545
sinkConfig.getSinkRedisTtlType().toString(),
4646
sinkConfig.getSinkRedisTtlValue());
47+
switch (sinkConfig.getSinkRedisDataType()) {
48+
case LIST:
49+
redisConfig += "redis.list.data.field.name=" + sinkConfig.getSinkRedisListDataFieldName();
50+
break;
51+
case KEYVALUE:
52+
redisConfig += "redis.keyvalue.data.field.name=" + sinkConfig.getSinkRedisKeyValueDataFieldName();
53+
break;
54+
case HASHSET:
55+
redisConfig += "redis.hashset.field.to.column.mapping=" + sinkConfig.getSinkRedisHashsetFieldToColumnMapping().toString();
56+
break;
57+
default:
58+
}
4759
instrumentation.logInfo(redisConfig);
4860
instrumentation.logInfo("Redis server type = {}", sinkConfig.getSinkRedisDeploymentType());
4961
OdpfMessageParser messageParser = OdpfMessageParserFactory.getParser(sinkConfig, statsDReporter);

0 commit comments

Comments
 (0)