Skip to content

Commit 66eb61d

Browse files
authored
Fix issue with ';' delimiter when specifying multiple table keys (#199)
1 parent 87f0343 commit 66eb61d

5 files changed

Lines changed: 21 additions & 12 deletions

File tree

deploy/docker/venice/schemas/keySchemaRecord.avsc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44
"doc": "SampleTableKey",
55
"fields": [
66
{
7-
"name": "id",
7+
"name": "name",
8+
"type": "string"
9+
},
10+
{
11+
"name": "age",
812
"type": "int"
913
}
1014
]

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ static Map<String, String> addKeysAsOption(Map<String, String> options, RelDataT
117117
.filter(name -> name.startsWith(KEY_PREFIX))
118118
.collect(Collectors.joining(";"));
119119
if (!keyString.isEmpty()) {
120-
newOptions.put(KEY_OPTION, keyString);
120+
newOptions.put(KEY_OPTION, keyString.replaceAll("\\s+", ""));
121121
newOptions.put(KEY_PREFIX_OPTION, KEY_PREFIX);
122122
newOptions.put(KEY_TYPE_OPTION, "RECORD");
123123
}

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,10 @@ default String sql() {
148148
default String sql(SqlDialect dialect) {
149149
SqlWriter w = new SqlPrettyWriter(SqlWriterConfig.of().withDialect(dialect));
150150
implement(w);
151-
return w.toSqlString().getSql().replaceAll("\\n", " ").replaceAll("\\s*;\\s*", ";\n").trim();
151+
// This logic is intended to split DDL/DML statements into multiple lines.
152+
// Flink SQL options (e.g. key.fields) can contain ";" characters as a delimiter.
153+
// TODO: make this logic more robust, it will still fail for non-trimmed strings containing ";" characters
154+
return w.toSqlString().getSql().replaceAll("\\n", " ").replaceAll("\\s+;\\s*", ";\n").trim();
152155
}
153156

154157
/** Generate SQL for a given dialect */

hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceSchemaValidationTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@ public class VeniceSchemaValidationTest extends JdbcTestBase {
1616

1717
@Test
1818
public void testVeniceTableCreationAndSchema() throws Exception {
19-
sql("create or replace materialized view \"VENICE\".\"test-store$insert-partial\" (\"KEY_id\", \"intField\") "
20-
+ "as select \"KEY\", \"intField\" from \"VENICE\".\"test-store-primitive\"");
19+
sql("create or replace materialized view \"VENICE\".\"test-store$insert-partial\" (\"KEY_name\", \"KEY_age\", \"intField\") "
20+
+ "as select \"stringField\" AS \"KEY_name\", \"KEY\" AS \"KEY_age\", \"intField\" from \"VENICE\".\"test-store-primitive\"");
2121

2222
// Validate the table was created with expected schema
2323
Map<String, String> expectedColumns = Map.of(
24-
"KEY_id", "INTEGER",
24+
"KEY_name", "VARCHAR",
25+
"KEY_age", "INTEGER",
2526
"intField", "INTEGER"
2627
);
2728
validateTableSchema(List.of("VENICE", "test-store$insert-partial"), expectedColumns);

hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
!set outputformat mysql
22
!use k8s
33

4-
create or replace materialized view "VENICE"."test-store$insert-partial" ("KEY_id", "intField") as select "KEY", "intField" from "VENICE"."test-store-primitive";
4+
create or replace materialized view "VENICE"."test-store$insert-partial" ("KEY_name", "KEY_age", "intField") as select "stringField" AS "KEY_name", "KEY" AS "KEY_age", "intField" from "VENICE"."test-store-primitive";
55
(0 rows modified)
66

77
!update
88

99
+------------+----------+------------+------------+
1010
| columnName | typeName | columnSize | isNullable |
1111
+------------+----------+------------+------------+
12-
| KEY_id | INTEGER | 10 | NO |
12+
| KEY_name | VARCHAR | null | YES |
13+
| KEY_age | INTEGER | 10 | NO |
1314
| intField | INTEGER | 10 | YES |
1415
+------------+----------+------------+------------+
15-
(2 rows)
16+
(3 rows)
1617

1718
!describe "VENICE"."test-store$insert-partial"
1819

@@ -21,7 +22,7 @@ drop materialized view "VENICE"."test-store$insert-partial";
2122

2223
!update
2324

24-
insert into "VENICE"."test-store" ("KEY_id", "intField") select "KEY", "intField" from "VENICE"."test-store-primitive";
25+
insert into "VENICE"."test-store" ("KEY_name", "KEY_age", "intField") select "stringField" AS "KEY_name", "KEY" AS "KEY_age", "intField" from "VENICE"."test-store-primitive";
2526
apiVersion: flink.apache.org/v1beta1
2627
kind: FlinkSessionJob
2728
metadata:
@@ -34,8 +35,8 @@ spec:
3435
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
3536
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-primitive` (`KEY` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY', 'key.fields-prefix'='', 'key.type'='PRIMITIVE', 'partial-update-mode'='true', 'storeName'='test-store-primitive', 'value.fields-include'='EXCEPT_KEY')
3637
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
37-
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
38-
- INSERT INTO `VENICE`.`test-store` (`KEY_id`, `intField`) SELECT `KEY` AS `KEY_id`, `intField` FROM `VENICE`.`test-store-primitive`
38+
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_name` VARCHAR, `KEY_age` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_name;KEY_age', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
39+
- INSERT INTO `VENICE`.`test-store` (`KEY_name`, `KEY_age`, `intField`) SELECT `stringField` AS `KEY_name`, `KEY` AS `KEY_age`, `intField` FROM `VENICE`.`test-store-primitive`
3940
jarURI: file:///opt/hoptimator-flink-runner.jar
4041
parallelism: 1
4142
upgradeMode: stateless

0 commit comments

Comments
 (0)