Skip to content

Commit 0cc728d

Browse files
committed
Wrap results in spec
1 parent dbd866a commit 0cc728d

4 files changed

Lines changed: 175 additions & 54 deletions

File tree

hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
277277
String sql = split[1];
278278
HoptimatorConnection conn = (HoptimatorConnection) sqlline.getConnection();
279279
try {
280-
List<String> specs = HoptimatorDdlUtils.specifyFromSql(sql, conn);
280+
List<String> specs = HoptimatorDdlUtils.specifyFromSql(sql, conn).specs;
281281
specs.forEach(x -> sqlline.output(x + "\n\n---\n\n"));
282282
} catch (SQLException e) {
283283
sqlline.error(e);

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtils.java

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,29 @@ public final class HoptimatorDdlUtils {
8989
private HoptimatorDdlUtils() {
9090
}
9191

92+
/**
93+
* The result of a {@link #specifyFromSql} call: the YAML artifact specs, the sink row type,
94+
* and the fully-qualified path of the sink (viewPath).
95+
*
96+
* <ul>
97+
* <li>{@code specs} — YAML artifacts produced by each deployer (empty for dry-run no-ops).
98+
* <li>{@code sinkRowType} — row type of the sink; same as the (renamed) query output.
99+
* <li>{@code viewPath} — fully-qualified path including catalog, schema, and view name.
100+
* </ul>
101+
*/
102+
public static final class SpecifyResult {
103+
public final List<String> specs;
104+
public final RelDataType sinkRowType;
105+
/** Fully-qualified path of the sink (catalog + schema + table). */
106+
public final List<String> viewPath;
107+
108+
SpecifyResult(List<String> specs, RelDataType sinkRowType, List<String> viewPath) {
109+
this.specs = Collections.unmodifiableList(specs);
110+
this.sinkRowType = sinkRowType;
111+
this.viewPath = Collections.unmodifiableList(viewPath);
112+
}
113+
}
114+
92115
/**
93116
* Controls whether a DDL operation performs a real deployment (CREATE or UPDATE)
94117
* or a dry-run (SPECIFY).
@@ -267,14 +290,23 @@ public static Pair<SchemaPlus, Table> snapshotAndSetSinkSchema(CalcitePrepare.Co
267290
* @return an empty list for CREATE/UPDATE, or the YAML spec strings for SPECIFY
268291
* @throws SQLException on validation or deployment errors
269292
*/
270-
static List<String> processCreateMaterializedView(CalcitePrepare.Context ctx,
293+
static SpecifyResult processCreateMaterializedView(CalcitePrepare.Context ctx,
271294
HoptimatorDriver.Prepare prepare, HoptimatorConnection conn,
272295
SqlCreateMaterializedView create, DdlMode mode) throws SQLException {
273296
HoptimatorConnection.HoptimatorConnectionDualLogger logger = conn.getLogger(HoptimatorDdlUtils.class);
274297
// Validate the DDL statement.
275298
logger.info("Validating statement: {}", create);
276299
ValidationService.validateOrThrow(create);
277300

301+
// Extract query SQL (rename columns if a column list was provided) and plan the query.
302+
// This is done first — before schema/conflict checks — so that:
303+
// 1. sinkRowType is always available, even for IF NOT EXISTS early returns.
304+
// 2. root is computed once and reused for pipeline planning below.
305+
final SqlNode q = renameColumns(create.columnList, create.query);
306+
final String sql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql();
307+
RelRoot root = prepare.convert(ctx, sql).root;
308+
RelDataType sinkRowType = root.rel.getRowType();
309+
278310
// Navigate to the schema (mutable only when actually deploying).
279311
final Pair<CalciteSchema, String> pair = schema(ctx, mode.mutable(), create.name);
280312
if (pair.left == null) {
@@ -297,7 +329,9 @@ static List<String> processCreateMaterializedView(CalcitePrepare.Context ctx,
297329
schemaPlus.removeTable(pair.right);
298330
} else {
299331
// IF NOT EXISTS — nothing to do.
300-
return Collections.emptyList();
332+
List<String> viewPath = new ArrayList<>(pair.left.path(null));
333+
viewPath.add(pair.right);
334+
return new SpecifyResult(Collections.emptyList(), sinkRowType, viewPath);
301335
}
302336
}
303337

@@ -306,9 +340,6 @@ static List<String> processCreateMaterializedView(CalcitePrepare.Context ctx,
306340
}
307341
String database = ((Database) pair.left.schema).databaseName();
308342

309-
// Extract query SQL (rename columns if a column list was provided).
310-
final SqlNode q = renameColumns(create.columnList, create.query);
311-
final String sql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql();
312343
final List<String> schemaPath = pair.left.path(null);
313344
String viewName = pair.right;
314345
List<String> viewPath = new ArrayList<>(schemaPath);
@@ -327,8 +358,7 @@ static List<String> processCreateMaterializedView(CalcitePrepare.Context ctx,
327358
Properties connectionProperties = conn.connectionProperties();
328359
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, pipelineName);
329360

330-
// Plan the pipeline to materialize the view.
331-
RelRoot root = prepare.convert(ctx, sql).root;
361+
// Plan the pipeline
332362
PipelineRel.Implementor plan = DeploymentService.plan(root, conn.materializations(), connectionProperties);
333363

334364
// Snapshot the current schema state and set the sink schema for planning.
@@ -358,12 +388,12 @@ static List<String> processCreateMaterializedView(CalcitePrepare.Context ctx,
358388
} else {
359389
logger.info("Specifying materialized view {}", viewName);
360390
}
361-
List<String> result = mode.executeDeployers(deployers, conn);
391+
List<String> specs = mode.executeDeployers(deployers, conn);
362392
if (mode.mutable()) {
363393
logger.info("Deployed materialized view {}", viewName);
364394
}
365395
success = true;
366-
return result;
396+
return new SpecifyResult(specs, sinkRowType, viewPath);
367397
} catch (SQLException | RuntimeException e) {
368398
logger.info("Failed to deploy materialized view {}", viewName);
369399
if (deployers != null) {
@@ -404,7 +434,7 @@ static List<String> processCreateMaterializedView(CalcitePrepare.Context ctx,
404434
* @return an empty list for CREATE/UPDATE, or the YAML spec strings for SPECIFY
405435
* @throws SQLException on validation or deployment errors
406436
*/
407-
static List<String> processCreateTable(CalcitePrepare.Context ctx, HoptimatorConnection conn,
437+
static SpecifyResult processCreateTable(CalcitePrepare.Context ctx, HoptimatorConnection conn,
408438
SqlCreateTable create, DdlMode mode) throws SQLException {
409439
HoptimatorConnection.HoptimatorConnectionDualLogger logger = conn.getLogger(HoptimatorDdlUtils.class);
410440

@@ -552,12 +582,12 @@ public RexNode newColumnDefaultValue(RelOptTable table, int iColumn,
552582
} else {
553583
logger.info("Specifying table {}", source);
554584
}
555-
List<String> result = mode.executeDeployers(deployers, conn);
585+
List<String> specs = mode.executeDeployers(deployers, conn);
556586
if (mode.mutable()) {
557587
logger.info("Deployed table {}", source);
558588
}
559589
success = true;
560-
return result;
590+
return new SpecifyResult(specs, rowType, tablePath);
561591
} catch (SQLException | RuntimeException e) {
562592
logger.info("Failed to deploy table {}", tableName);
563593
if (deployers != null) {
@@ -594,8 +624,19 @@ public RexNode newColumnDefaultValue(RelOptTable table, int iColumn,
594624
* <p>This is the shared implementation behind {@code !specify} in Quidem tests, the
595625
* interactive CLI, and any gRPC plan endpoint. It is a strict dry-run: no schema
596626
* mutations, no deployer create/update calls.
627+
* Returns both the YAML artifact specs and the sink row type for the given SQL statement in
628+
* a single pass.
629+
*
630+
* <p>Supported statement types:
631+
* <ul>
632+
* <li><b>CREATE MATERIALIZED VIEW</b> — column renames applied; query planned once.
633+
* <li><b>CREATE TABLE</b> — row type from column declarations; no query planning.
634+
* <li><b>SELECT / INSERT INTO</b> — query output row type.
635+
* </ul>
636+
*
637+
* @throws SQLException for unsupported DDL (e.g. DROP, CREATE VIEW)
597638
*/
598-
public static List<String> specifyFromSql(String sql, HoptimatorConnection conn) throws SQLException {
639+
public static SpecifyResult specifyFromSql(String sql, HoptimatorConnection conn) throws SQLException {
599640
SqlNode sqlNode = HoptimatorDriver.parseQuery(conn, sql);
600641

601642
if (sqlNode instanceof SqlCreateTable) {
@@ -612,19 +653,18 @@ public static List<String> specifyFromSql(String sql, HoptimatorConnection conn)
612653
}
613654

614655
// Plain SELECT / INSERT INTO path.
615-
String querySql = sql;
616656
String viewName = "sink";
617-
618-
RelRoot root = HoptimatorDriver.convert(conn, querySql).root;
657+
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
658+
RelDataType sinkRowType = root.rel.getRowType();
619659
Properties connectionProperties = conn.connectionProperties();
620660
RelOptTable table = root.rel.getTable();
621661
if (table != null) {
622662
List<String> qualifiedName = table.getQualifiedName();
623663
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION,
624664
String.join(".", qualifiedName));
625-
// Use the sink table's simple name as the pipeline/job name (matches !specify TABLE behavior)
626665
viewName = qualifiedName.get(qualifiedName.size() - 1);
627666
}
667+
List<String> viewPath = new ArrayList<>(List.of("DEFAULT", viewName));
628668

629669
PipelineRel.Implementor plan = DeploymentService.plan(root, Collections.emptyList(), connectionProperties);
630670

@@ -635,7 +675,7 @@ public static List<String> specifyFromSql(String sql, HoptimatorConnection conn)
635675
}
636676
specs.addAll(DeploymentService.specify(pipeline.sink(), conn));
637677
specs.addAll(DeploymentService.specify(pipeline.job(), conn));
638-
return specs;
678+
return new SpecifyResult(specs, sinkRowType, viewPath);
639679
}
640680

641681
public static Map<String, String> options(SqlNodeList optionList) {

0 commit comments

Comments
 (0)