Skip to content

Commit 7a74bde

Browse files
authored
CREATE TABLE support for existing drivers (#195)
* Support Quidem describe command * Support CREATE TABLE for Kafka topics - Intentionally disabled in favor of strimzi yaml deployment via K8sYamlDeployer * Refactor Venice deployer, add pre-validation and tests * Support CREATE TABLE for MySQL databases and tables
1 parent 6a30ab0 commit 7a74bde

37 files changed

Lines changed: 3505 additions & 291 deletions

File tree

deploy/config/hoptimator-configmap.yaml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,3 @@ data:
2222
flink.config: |
2323
flink.app.name=hoptimator-flink-runner
2424
flink.app.type=SQL
25-
26-
venice.config: |
27-
router.url=http://localhost:7777
28-
clusters=venice-cluster0

deploy/docker/mysql/init.sql

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ INSERT INTO daily_metrics (metric_date, metric_name, metric_value) VALUES
7878
(DATE_SUB(CURDATE(), INTERVAL 1 DAY), 'revenue', 4800.00),
7979
(DATE_SUB(CURDATE(), INTERVAL 1 DAY), 'orders', 145.00);
8080

81-
-- Grant permissions to hoptimator user on both databases
82-
GRANT ALL PRIVILEGES ON testdb.* TO 'hoptimator'@'%';
83-
GRANT ALL PRIVILEGES ON analytics.* TO 'hoptimator'@'%';
81+
-- Grant permissions to hoptimator user on all databases
82+
GRANT ALL PRIVILEGES ON *.* TO 'hoptimator'@'%';
8483
FLUSH PRIVILEGES;

gradle/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ commons-cli = "commons-cli:commons-cli:1.4"
88
commons-dbcp2 = "org.apache.commons:commons-dbcp2:2.11.0"
99
cron-utils = "com.cronutils:cron-utils:9.2.1"
1010
findbugs = "com.google.code.findbugs:jsr305:3.0.0"
11+
findbugs-annotations = "com.google.code.findbugs:annotations:3.0.0"
1112
flink-clients = "org.apache.flink:flink-clients:1.18.1"
13+
mysql-connector = "mysql:mysql-connector-java:8.0.33"
1214
flink-connector-base = "org.apache.flink:flink-connector-base:1.18.1"
1315
flink-connector-kafka = "org.apache.flink:flink-sql-connector-kafka:3.2.0-1.18"
1416
flink-connector-mysql-cdc = "com.ververica:flink-sql-connector-mysql-cdc:2.3.0"
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
package com.linkedin.hoptimator.jdbc;
2+
3+
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema;
4+
import org.apache.calcite.avatica.ConnectStringParser;
5+
import org.apache.calcite.schema.SchemaPlus;
6+
import org.apache.commons.dbcp2.BasicDataSource;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import javax.annotation.Nullable;
11+
import java.sql.Connection;
12+
import java.util.Map;
13+
import java.util.Properties;
14+
15+
/**
16+
* Utility methods for Deployer implementations.
17+
*
18+
* <p>Provides common functionality for:
19+
* <ul>
20+
* <li>Parsing configuration options from maps</li>
21+
* <li>Extracting connection properties from JDBC schemas</li>
22+
* </ul>
23+
*/
24+
public final class DeployerUtils {
25+
private static final Logger log = LoggerFactory.getLogger(DeployerUtils.class);
26+
27+
private DeployerUtils() {
28+
// Utility class - prevent instantiation
29+
}
30+
31+
/**
32+
* Parses an integer option from a map of options.
33+
*
34+
* @param options the options map to parse from
35+
* @param key the option key to look up
36+
* @param defaultValue the default value to return if the option is not set or invalid
37+
* @return the parsed integer value, or defaultValue if not set or invalid
38+
*/
39+
public static Integer parseIntOption(Map<String, String> options, String key, Integer defaultValue) {
40+
String value = options.get(key);
41+
if (value == null) {
42+
return defaultValue;
43+
}
44+
try {
45+
return Integer.parseInt(value);
46+
} catch (NumberFormatException e) {
47+
log.warn("Invalid integer value for option '{}': '{}'. Using default: {}", key, value, defaultValue);
48+
return defaultValue;
49+
}
50+
}
51+
52+
/**
53+
* Parses a long option from a map of options.
54+
*
55+
* @param options the options map to parse from
56+
* @param key the option key to look up
57+
* @param defaultValue the default value to return if the option is not set or invalid
58+
* @return the parsed long value, or defaultValue if not set or invalid
59+
*/
60+
public static Long parseLongOption(Map<String, String> options, String key, Long defaultValue) {
61+
String value = options.get(key);
62+
if (value == null) {
63+
return defaultValue;
64+
}
65+
try {
66+
return Long.parseLong(value);
67+
} catch (NumberFormatException e) {
68+
log.warn("Invalid long value for option '{}': '{}'. Using default: {}", key, value, defaultValue);
69+
return defaultValue;
70+
}
71+
}
72+
73+
/**
74+
* Parses a boolean option from a map of options.
75+
*
76+
* @param options the options map to parse from
77+
* @param key the option key to look up
78+
* @param defaultValue the default value to return if the option is not set
79+
* @return the parsed boolean value, or defaultValue if not set
80+
*/
81+
public static Boolean parseBooleanOption(Map<String, String> options, String key, Boolean defaultValue) {
82+
String value = options.get(key);
83+
if (value == null) {
84+
return defaultValue;
85+
}
86+
return Boolean.parseBoolean(value);
87+
}
88+
89+
/**
90+
* Parses a double option from a map of options.
91+
*
92+
* @param options the options map to parse from
93+
* @param key the option key to look up
94+
* @param defaultValue the default value to return if the option is not set or invalid
95+
* @return the parsed double value, or defaultValue if not set or invalid
96+
*/
97+
public static Double parseDoubleOption(Map<String, String> options, String key, Double defaultValue) {
98+
String value = options.get(key);
99+
if (value == null) {
100+
return defaultValue;
101+
}
102+
try {
103+
return Double.parseDouble(value);
104+
} catch (NumberFormatException e) {
105+
log.warn("Invalid double value for option '{}': '{}'. Using default: {}", key, value, defaultValue);
106+
return defaultValue;
107+
}
108+
}
109+
110+
/**
111+
* Extracts connection properties from a JDBC-backed schema.
112+
*
113+
* <p>This method:
114+
* <ol>
115+
* <li>Looks up the schema by name in the connection's root schema</li>
116+
* <li>Unwraps it to get the HoptimatorJdbcSchema</li>
117+
* <li>Extracts the JDBC URL from the BasicDataSource</li>
118+
* <li>Parses the URL (after removing the connection prefix) into Properties</li>
119+
* </ol>
120+
*
121+
* @param catalogName Optional catalog name to look up
122+
* @param schemaName the name of the schema to look up
123+
* @param connection the connection to search in
124+
* @param connectionPrefix the JDBC connection prefix to strip (e.g., "jdbc:kafka://")
125+
* @param logger optional logger for debug messages
126+
* @return Properties extracted from the JDBC URL, or null if schema not found or not JDBC-backed
127+
*/
128+
public static Properties extractPropertiesFromJdbcSchema(@Nullable String catalogName, String schemaName,
129+
Connection connection, String connectionPrefix, @Nullable Logger logger) {
130+
131+
if (schemaName == null) {
132+
return null;
133+
}
134+
135+
try {
136+
if (!(connection instanceof HoptimatorConnection)) {
137+
return null;
138+
}
139+
140+
HoptimatorConnection hoptimatorConnection =
141+
(HoptimatorConnection) connection;
142+
SchemaPlus rootSchema = hoptimatorConnection.calciteConnection().getRootSchema();
143+
if (catalogName != null) {
144+
rootSchema = rootSchema.subSchemas().get(catalogName);
145+
if (rootSchema == null) {
146+
return null;
147+
}
148+
}
149+
SchemaPlus subSchemaPlus = rootSchema.subSchemas().get(schemaName);
150+
151+
if (subSchemaPlus == null) {
152+
return null;
153+
}
154+
155+
HoptimatorJdbcSchema schema = subSchemaPlus.unwrap(HoptimatorJdbcSchema.class);
156+
if (schema == null) {
157+
return null;
158+
}
159+
160+
String jdbcUrl = ((BasicDataSource) schema.getDataSource()).getUrl();
161+
162+
Properties properties = new Properties();
163+
properties.putAll(ConnectStringParser.parse(jdbcUrl.substring(connectionPrefix.length())));
164+
return properties;
165+
} catch (Exception e) {
166+
if (logger != null) {
167+
logger.debug("Could not extract properties from schema '{}': {}", schemaName, e.getMessage());
168+
}
169+
}
170+
return null;
171+
}
172+
}

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.linkedin.hoptimator.jdbc.ddl.SqlPauseTrigger;
3838
import com.linkedin.hoptimator.jdbc.ddl.SqlResumeTrigger;
3939
import com.linkedin.hoptimator.util.DeploymentService;
40+
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcCatalogSchema;
4041
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema;
4142
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcTable;
4243
import com.linkedin.hoptimator.util.planner.PipelineRel;
@@ -376,9 +377,19 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
376377
throw new DdlException(create, e.getMessage(), e);
377378
}
378379

379-
final Pair<CalciteSchema, String> pair = HoptimatorDdlUtils.schema(context, true, create.name);
380+
boolean isNewSchema = false;
381+
Pair<CalciteSchema, String> pair = HoptimatorDdlUtils.schema(context, true, create.name);
380382
if (pair.left == null) {
381-
throw new DdlException(create, "Schema for " + create.name + " not found.");
383+
// If the schema is not found, it might be because it's a 3-level path (CATALOG.SCHEMA.TABLE)
384+
if (create.name.names.size() > 2) {
385+
pair = HoptimatorDdlUtils.catalog(context, true, create.name);
386+
isNewSchema = true;
387+
if (pair.left == null) {
388+
throw new DdlException(create, "Catalog for " + create.name + " not found.");
389+
}
390+
} else {
391+
throw new DdlException(create, "Schema for " + create.name + " not found.");
392+
}
382393
}
383394

384395
// TODO: Add support for populating new tables from a query as a one-time operation.
@@ -390,8 +401,17 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
390401
}
391402

392403
final SchemaPlus schemaPlus = pair.left.plus();
393-
final String tableName = pair.right;
394-
if (schemaPlus.tables().get(tableName) != null) {
404+
String database = null;
405+
String tableName;
406+
if (isNewSchema) {
407+
int idx = pair.right.indexOf(".");
408+
database = pair.right.substring(0, idx);
409+
tableName = pair.right.substring(idx + 1);
410+
} else {
411+
tableName = pair.right;
412+
}
413+
414+
if (!isNewSchema && schemaPlus.tables().get(tableName) != null) {
395415
if (!create.ifNotExists && !create.getReplace()) {
396416
// They did not specify IF NOT EXISTS, so give error.
397417
throw new DdlException(create,
@@ -402,11 +422,12 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
402422
Collection<Deployer> deployers = null;
403423
Pair<SchemaPlus, Table> schemaSnapshot = null;
404424
try {
405-
String database;
406-
if (pair.left.schema instanceof Database) {
407-
database = ((Database) pair.left.schema).databaseName();
408-
} else {
409-
database = connection.getSchema();
425+
if (database == null) {
426+
if (pair.left.schema instanceof Database) {
427+
database = ((Database) pair.left.schema).databaseName();
428+
} else {
429+
database = connection.getSchema();
430+
}
410431
}
411432

412433
final JavaTypeFactory typeFactory = context.getTypeFactory();
@@ -447,18 +468,37 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
447468
return super.newColumnDefaultValue(table, iColumn, context);
448469
}
449470
};
450-
// Table does not exist. Create it.
451-
Table currentViewTable = schemaPlus.tables().get(tableName);
452-
schemaSnapshot = Pair.of(schemaPlus, currentViewTable);
471+
// Snapshot current state for rollback
472+
if (!isNewSchema) {
473+
Table currentTable = schemaPlus.tables().get(tableName);
474+
schemaSnapshot = Pair.of(schemaPlus, currentTable);
475+
}
453476

477+
// Table does not exist. Create it.
454478
// Add a temporary table with the correct row type so deployers can resolve the schema
455479
// TODO: This may cause problems if we reuse connections, only the next connection will load this as a HoptimatorJdbcTable.
456-
Table tempTable = new TemporaryTable(rowType, ief, database);
457-
schemaPlus.add(tableName, tempTable);
458-
logger.info("Added table {} to schema {}", tableName, schemaPlus.getName());
480+
if (isNewSchema) {
481+
HoptimatorJdbcCatalogSchema catalogSchema = schemaPlus.unwrap(HoptimatorJdbcCatalogSchema.class);
482+
if (catalogSchema == null) {
483+
throw new DdlException(create, "Catalog for " + schemaPlus.getName() + " not found.");
484+
}
485+
SchemaPlus databaseSchema = schemaPlus.add(database, catalogSchema.createSchema(database));
486+
logger.info("Added schema {} to catalog {}", database, schemaPlus.getName());
487+
488+
Table tempTable = new TemporaryTable(rowType, ief, database);
489+
databaseSchema.add(tableName, tempTable);
490+
logger.info("Added table {} to schema {}", tableName, databaseSchema.getName());
491+
} else {
492+
Table tempTable = new TemporaryTable(rowType, ief, database);
493+
schemaPlus.add(tableName, tempTable);
494+
logger.info("Added table {} to schema {}", tableName, schemaPlus.getName());
495+
}
459496

460497
final List<String> schemaPath = pair.left.path(null);
461498
List<String> tablePath = new ArrayList<>(schemaPath);
499+
if (isNewSchema) {
500+
tablePath.add(database);
501+
}
462502
tablePath.add(tableName);
463503

464504
Map<String, String> tableOptions = HoptimatorDdlUtils.options(create.options);
@@ -491,6 +531,9 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
491531
schemaPlus.add(tableName, schemaSnapshot.right);
492532
logger.info("Restored schema for table {}", tableName);
493533
}
534+
} else {
535+
pair.left.removeSubSchema(database);
536+
logger.info("Removed schema {} from catalog", database);
494537
}
495538
throw new DdlException(create, e.getMessage(), e);
496539
}

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,21 @@ public static Pair<CalciteSchema, String> schema(CalcitePrepare.Context context,
7777
return Pair.of(schema, name);
7878
}
7979

80+
/** Returns the catalog in which to create an object;
81+
* the left part is null if the catalog does not exist. */
82+
public static Pair<CalciteSchema, String> catalog(CalcitePrepare.Context context, boolean mutable, SqlIdentifier id) {
83+
if (id.names.size() < 3) {
84+
throw new IllegalArgumentException("CATALOG.SCHEMA.TABLE identified expected but found: " + id);
85+
}
86+
final List<String> schemaTablePath = Util.last(id.names, 2);
87+
final List<String> catalogPath = Util.skipLast(id.names, 2);
88+
CalciteSchema schema = mutable ? context.getMutableRootSchema() : context.getRootSchema();
89+
for (String p : catalogPath) {
90+
schema = Objects.requireNonNull(schema).getSubSchema(p, true);
91+
}
92+
return Pair.of(schema, String.join(".", schemaTablePath));
93+
}
94+
8095
// N.B. copy-pasted from Apache Calcite
8196
/** Wraps a query to rename its columns. Used by CREATE VIEW and CREATE
8297
* MATERIALIZED VIEW. */

0 commit comments

Comments
 (0)