Skip to content

Commit 478a046

Browse files
committed
INSERT INTO / SELECT fixes
1 parent dcf9cc7 commit 478a046

2 files changed

Lines changed: 144 additions & 11 deletions

File tree

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtils.java

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ public RexNode newColumnDefaultValue(RelOptTable table, int iColumn,
622622
* {@code CREATE TABLE}, {@code CREATE MATERIALIZED VIEW}, or {@code INSERT INTO}.
623623
*
624624
* <p>This is the shared implementation behind {@code !specify} in Quidem tests, the
625-
* interactive CLI, and any gRPC plan endpoint. It is a strict dry-run: no schema
625+
* interactive CLI, and more. It is a strict dry-run: no schema
626626
* mutations, no deployer create/update calls.
627627
* Returns both the YAML artifact specs and the sink row type for the given SQL statement in
628628
* a single pass.
@@ -653,29 +653,65 @@ public static SpecifyResult specifyFromSql(String sql, HoptimatorConnection conn
653653
}
654654

655655
// Plain SELECT / INSERT INTO path.
656-
String viewName = "sink";
656+
String viewName = "SINK";
657657
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
658658
RelDataType sinkRowType = root.rel.getRowType();
659659
Properties connectionProperties = conn.connectionProperties();
660660
RelOptTable table = root.rel.getTable();
661+
List<String> viewPath;
661662
if (table != null) {
662663
List<String> qualifiedName = table.getQualifiedName();
663664
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION,
664665
String.join(".", qualifiedName));
665666
viewName = qualifiedName.get(qualifiedName.size() - 1);
667+
viewPath = new ArrayList<>(qualifiedName);
668+
} else {
669+
// No INSERT INTO target — name the virtual sink "SINK" and record it as the pipeline.
670+
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, viewName);
671+
viewPath = new ArrayList<>(List.of("DEFAULT", viewName));
666672
}
667-
List<String> viewPath = new ArrayList<>(List.of("DEFAULT", viewName));
668673

669-
PipelineRel.Implementor plan = DeploymentService.plan(root, Collections.emptyList(), connectionProperties);
674+
PipelineRel.Implementor plan = DeploymentService.plan(root, conn.materializations(), connectionProperties);
670675

671-
Pipeline pipeline = plan.pipeline(viewName, conn);
672-
List<String> specs = new ArrayList<>();
673-
for (Source source : pipeline.sources()) {
674-
specs.addAll(DeploymentService.specify(source, conn));
676+
// For plain SELECT queries (no INSERT INTO target), the planner has no sink.
677+
// Set up a virtual sink so that pipeline.job().sink() is non-null
678+
CalcitePrepare.Context ctx = conn.createPrepareContext();
679+
Pair<SchemaPlus, Table> sinkSnapshot = null;
680+
if (table == null) {
681+
ViewTable sinkViewTable = viewTable(ctx, sql, new HoptimatorDriver.Prepare(conn),
682+
List.of("DEFAULT"), viewPath);
683+
sinkRowType = sinkViewTable.getRowType(ctx.getTypeFactory());
684+
plan.setSink("DEFAULT", viewPath, sinkRowType, Collections.emptyMap());
685+
// Register the virtual sink in the schema so deployers can resolve it; snapshot for rollback.
686+
Pair<CalciteSchema, String> sinkSchemaPair = schema(ctx, false,
687+
new SqlIdentifier(viewName, SqlParserPos.ZERO));
688+
if (sinkSchemaPair.left != null) {
689+
SchemaPlus sinkSchemaPlus = sinkSchemaPair.left.plus();
690+
Table existing = sinkSchemaPlus.tables().get(sinkSchemaPair.right);
691+
sinkSnapshot = Pair.of(sinkSchemaPlus, existing);
692+
sinkSchemaPlus.add(sinkSchemaPair.right, new MaterializedViewTable(sinkViewTable));
693+
}
694+
}
695+
696+
try {
697+
Pipeline pipeline = plan.pipeline(viewName, conn);
698+
List<String> specs = new ArrayList<>();
699+
for (Source source : pipeline.sources()) {
700+
specs.addAll(DeploymentService.specify(source, conn));
701+
}
702+
specs.addAll(DeploymentService.specify(pipeline.sink(), conn));
703+
specs.addAll(DeploymentService.specify(pipeline.job(), conn));
704+
return new SpecifyResult(specs, sinkRowType, viewPath);
705+
} finally {
706+
// Restore the schema — the virtual sink must not persist after this call.
707+
if (sinkSnapshot != null) {
708+
if (sinkSnapshot.right != null) {
709+
sinkSnapshot.left.add(viewName, sinkSnapshot.right);
710+
} else {
711+
sinkSnapshot.left.removeTable(viewName);
712+
}
713+
}
675714
}
676-
specs.addAll(DeploymentService.specify(pipeline.sink(), conn));
677-
specs.addAll(DeploymentService.specify(pipeline.job(), conn));
678-
return new SpecifyResult(specs, sinkRowType, viewPath);
679715
}
680716

681717
public static Map<String, String> options(SqlNodeList optionList) {

hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtilsTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package com.linkedin.hoptimator.jdbc;
22

33
import com.linkedin.hoptimator.Deployer;
4+
import com.linkedin.hoptimator.Job;
5+
import com.linkedin.hoptimator.Pipeline;
6+
import com.linkedin.hoptimator.Sink;
47
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView;
8+
import com.linkedin.hoptimator.util.DeploymentService;
59
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable;
610
import com.linkedin.hoptimator.util.planner.PipelineRel;
711
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -26,6 +30,8 @@
2630
import org.apache.calcite.util.Pair;
2731
import org.junit.jupiter.api.Test;
2832
import org.junit.jupiter.api.extension.ExtendWith;
33+
import org.mockito.Mock;
34+
import org.mockito.MockedStatic;
2935
import org.mockito.junit.jupiter.MockitoExtension;
3036

3137
import java.sql.SQLException;
@@ -41,6 +47,7 @@
4147
import static org.junit.jupiter.api.Assertions.assertNull;
4248
import static org.junit.jupiter.api.Assertions.assertThrows;
4349
import static org.junit.jupiter.api.Assertions.assertTrue;
50+
import static org.mockito.ArgumentMatchers.any;
4451
import static org.mockito.Mockito.mock;
4552
import static org.mockito.Mockito.when;
4653

@@ -50,6 +57,9 @@
5057
justification = "Mock objects created in stubbing setup don't need resource management")
5158
class HoptimatorDdlUtilsTest {
5259

60+
@Mock
61+
MockedStatic<DeploymentService> mockDeploymentService;
62+
5363
@Test
5464
void testRenameColumnsReturnsQueryWhenColumnListNull() {
5565
SqlNode query = SqlLiteral.createCharString("test", SqlParserPos.ZERO);
@@ -528,6 +538,93 @@ void specifyFromSqlWithCreateMaterializedViewDryRunDoesNotThrowOnSchemaFailure()
528538
}
529539
}
530540

541+
// ── specifyFromSql() plain SELECT path ───────────────────────────────────────
542+
543+
@Test
544+
void specifyFromSqlForPlainSelectReturnsSinkPathAndRowType() throws Exception {
545+
// Regression test: before the fix, pipeline.job().sink() was null for plain SELECT
546+
// (no INSERT INTO target), causing NullPointerException. Now setSink() anchors a virtual
547+
// "DEFAULT"."sink" so the pipeline can be fully constructed.
548+
Pipeline mockPipeline = mock(Pipeline.class);
549+
Sink mockSink = mock(Sink.class);
550+
Job mockJob = mock(Job.class);
551+
when(mockPipeline.sources()).thenReturn(Collections.emptyList());
552+
when(mockPipeline.sink()).thenReturn(mockSink);
553+
when(mockPipeline.job()).thenReturn(mockJob);
554+
555+
PipelineRel.Implementor mockPlan = mock(PipelineRel.Implementor.class);
556+
when(mockPlan.pipeline(any(), any())).thenReturn(mockPipeline);
557+
558+
mockDeploymentService.when(() -> DeploymentService.plan(any(), any(), any())).thenReturn(mockPlan);
559+
mockDeploymentService.when(() -> DeploymentService.specify(any(), any()))
560+
.thenReturn(Collections.emptyList());
561+
562+
HoptimatorDriver driver = new HoptimatorDriver();
563+
try (HoptimatorConnection conn =
564+
(HoptimatorConnection) driver.connect("jdbc:hoptimator://catalogs=util", new Properties())) {
565+
HoptimatorDdlUtils.SpecifyResult result = HoptimatorDdlUtils.specifyFromSql(
566+
"SELECT 1 AS \"KEY\", 'value' AS \"VAL\"", conn);
567+
568+
// Specs: empty (no deployers return anything)
569+
assertNotNull(result.specs);
570+
assertTrue(result.specs.isEmpty());
571+
// viewPath last element is "SINK" (uppercase) for a bare SELECT, matching original plan() behavior
572+
assertNotNull(result.viewPath);
573+
assertEquals("SINK", result.viewPath.get(result.viewPath.size() - 1));
574+
// sinkRowType matches the SELECT output columns
575+
assertNotNull(result.sinkRowType);
576+
assertEquals(2, result.sinkRowType.getFieldCount());
577+
assertEquals("KEY", result.sinkRowType.getFieldList().get(0).getName());
578+
assertEquals("VAL", result.sinkRowType.getFieldList().get(1).getName());
579+
}
580+
}
581+
582+
@Test
583+
void specifyFromSqlCreateTableViewPathMatchesSchemaAndName() throws Exception {
584+
// For CREATE TABLE, the SpecifyResult.viewPath should include the schema path + table name.
585+
HoptimatorDriver driver = new HoptimatorDriver();
586+
try (HoptimatorConnection conn =
587+
(HoptimatorConnection) driver.connect("jdbc:hoptimator://catalogs=util", new Properties())) {
588+
conn.calciteConnection().getRootSchema().add("VP_DB", new TestDatabaseSchema("vp-db"));
589+
HoptimatorDdlUtils.SpecifyResult result = HoptimatorDdlUtils.specifyFromSql(
590+
"CREATE TABLE \"VP_DB\".\"events\" (\"id\" INTEGER, \"msg\" VARCHAR)", conn);
591+
assertNotNull(result.viewPath);
592+
assertFalse(result.viewPath.isEmpty());
593+
assertEquals("events", result.viewPath.get(result.viewPath.size() - 1));
594+
assertTrue(result.viewPath.contains("VP_DB"),
595+
"viewPath should contain the schema: " + result.viewPath);
596+
}
597+
}
598+
599+
@Test
600+
void specifyFromSqlForPlainSelectRestoresSchemaAfterCall() throws Exception {
601+
// The virtual "SINK" table registered during the SELECT path must not persist after
602+
// specifyFromSql returns (even when it throws at the deployer level).
603+
HoptimatorDriver driver = new HoptimatorDriver();
604+
try (HoptimatorConnection conn =
605+
(HoptimatorConnection) driver.connect("jdbc:hoptimator://catalogs=util", new Properties())) {
606+
CalcitePrepare.Context ctx = conn.createPrepareContext();
607+
608+
// Capture schema state before the call.
609+
Pair<CalciteSchema, String> pair = HoptimatorDdlUtils.schema(ctx, false,
610+
new SqlIdentifier("sink", SqlParserPos.ZERO));
611+
boolean existedBefore = pair.left != null
612+
&& pair.left.plus().tables().get(pair.right) != null;
613+
614+
try {
615+
HoptimatorDdlUtils.specifyFromSql("SELECT 1 AS \"COL1\"", conn);
616+
} catch (Exception ignored) {
617+
// Expected: planning/deployer failure in the test environment.
618+
}
619+
620+
// Schema must be in the same state as before.
621+
boolean existsAfter = pair.left != null
622+
&& pair.left.plus().tables().get(pair.right) != null;
623+
assertEquals(existedBefore, existsAfter,
624+
"Virtual SINK table must be removed after specifyFromSql returns");
625+
}
626+
}
627+
531628
@Test
532629
void optionsParsesSingleKeyValue() {
533630
SqlNodeList list = new SqlNodeList(SqlParserPos.ZERO);

0 commit comments

Comments
 (0)