Skip to content

Commit 0196987

Browse files
authored
Force ScriptImplementor to explicitly name columns in SELECT (#191)
* Force ScriptImplementor to explicitly name columns in SELECT * Fix missed test
1 parent 3f8e7d5 commit 0196987

4 files changed

Lines changed: 107 additions & 15 deletions

File tree

hoptimator-k8s/src/test/resources/k8s-ddl.id

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ spec:
197197
- CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_source` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10')
198198
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
199199
- CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole')
200-
- INSERT INTO `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN`, `MEMBER_URN`) SELECT * FROM `ADS`.`AD_CLICKS_source`
200+
- INSERT INTO `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN`, `MEMBER_URN`) SELECT `CAMPAIGN_URN`, `MEMBER_URN` FROM `ADS`.`AD_CLICKS_source`
201201
jarURI: file:///opt/hoptimator-flink-runner.jar
202202
parallelism: 1
203203
upgradeMode: stateless
@@ -218,7 +218,7 @@ spec:
218218
- CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10')
219219
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
220220
- CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole')
221-
- INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT * FROM `ADS`.`AD_CLICKS`
221+
- INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `CAMPAIGN_URN` AS `PAGE_URN`, `MEMBER_URN` FROM `ADS`.`AD_CLICKS`
222222
jarURI: file:///opt/hoptimator-flink-runner.jar
223223
parallelism: 1
224224
upgradeMode: stateless
@@ -245,3 +245,24 @@ spec:
245245
upgradeMode: stateless
246246
state: running
247247
!specify PAGE_VIEWS
248+
249+
create or replace materialized view PROFILE."MEMBERS$test" AS SELECT "PAGE_URN" AS "COMPANY_URN", "MEMBER_URN" FROM ADS.PAGE_VIEWS;
250+
apiVersion: flink.apache.org/v1beta1
251+
kind: FlinkSessionJob
252+
metadata:
253+
name: profile-database-members-test
254+
spec:
255+
deploymentName: basic-session-deployment
256+
job:
257+
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
258+
args:
259+
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
260+
- CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10')
261+
- CREATE DATABASE IF NOT EXISTS `PROFILE` WITH ()
262+
- CREATE TABLE IF NOT EXISTS `PROFILE`.`MEMBERS` (`FIRST_NAME` VARCHAR, `LAST_NAME` VARCHAR, `MEMBER_URN` VARCHAR, `COMPANY_URN` VARCHAR) WITH ('connector'='blackhole')
263+
- INSERT INTO `PROFILE`.`MEMBERS` (`COMPANY_URN`, `MEMBER_URN`) SELECT `PAGE_URN` AS `COMPANY_URN`, `MEMBER_URN` FROM `ADS`.`PAGE_VIEWS`
264+
jarURI: file:///opt/hoptimator-flink-runner.jar
265+
parallelism: 1
266+
upgradeMode: stateless
267+
state: running
268+
!specify MEMBERS

hoptimator-kafka/src/test/resources/kafka-ddl.id

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ spec:
3535
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'k1'='v1', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json')
3636
- CREATE DATABASE IF NOT EXISTS `KAFKA` WITH ()
3737
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'k2'='v2', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json')
38-
- INSERT INTO `KAFKA`.`existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2`
38+
- INSERT INTO `KAFKA`.`existing-topic-1` (`KEY`, `VALUE`) SELECT `KEY`, `VALUE` FROM `KAFKA`.`existing-topic-2`
3939
jarURI: file:///opt/hoptimator-flink-runner.jar
4040
parallelism: 2
4141
upgradeMode: stateless

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.apache.calcite.sql.type.SqlTypeName;
4343
import org.apache.calcite.sql.util.SqlShuttle;
4444
import org.apache.calcite.tools.RelBuilder;
45-
import org.apache.calcite.util.Util;
4645

4746

4847
/**
@@ -198,6 +197,20 @@ public void implement(SqlWriter w) {
198197
if (node instanceof SqlSelect && ((SqlSelect) node).getSelectList() != null) {
199198
SqlSelect select = (SqlSelect) node;
200199
select.setSelectList((SqlNodeList) Objects.requireNonNull(select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR)));
200+
SqlNodeList selectList = select.getSelectList();
201+
202+
// Check if this is a SELECT * and replace with explicit columns
203+
if (selectList.size() == 1 && selectList.get(0) instanceof SqlIdentifier) {
204+
SqlIdentifier id = (SqlIdentifier) selectList.get(0);
205+
if (id.isStar()) {
206+
// Replace SELECT * with explicit column list
207+
List<SqlNode> explicitColumns = new ArrayList<>();
208+
for (RelDataTypeField field : relNode.getRowType().getFieldList()) {
209+
explicitColumns.add(new SqlIdentifier(field.getName(), SqlParserPos.ZERO));
210+
}
211+
select.setSelectList(new SqlNodeList(explicitColumns, SqlParserPos.ZERO));
212+
}
213+
}
201214
}
202215
// Apply table name replacements if any
203216
if (!tableNameReplacements.isEmpty()) {
@@ -366,6 +379,7 @@ public void implement(SqlWriter w) {
366379
// Drops non-target columns, for use case: INSERT INTO (col1, col2) SELECT * FROM ...
367380
private static RelNode dropFields(RelNode relNode, ImmutablePairList<Integer, String> targetFields) {
368381
List<Integer> cols = new ArrayList<>();
382+
List<String> aliases = new ArrayList<>();
369383
List<String> targetFieldNames = targetFields.rightList();
370384
List<RelDataTypeField> sourceFields = relNode.getRowType().getFieldList();
371385

@@ -377,10 +391,11 @@ private static RelNode dropFields(RelNode relNode, ImmutablePairList<Integer, St
377391
if (targetFieldNames.contains(field.getName())
378392
&& !field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
379393
cols.add(i);
394+
aliases.add(field.getName());
380395
}
381396
}
382397

383-
return createForceProject(relNode, cols);
398+
return createForceProject(relNode, cols, aliases);
384399
}
385400

386401
// Otherwise (e.g., TableScan), the projection was optimized away.
@@ -391,16 +406,17 @@ private static RelNode dropFields(RelNode relNode, ImmutablePairList<Integer, St
391406
RelDataTypeField field = sourceFields.get(fieldIndex);
392407
if (!field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
393408
cols.add(fieldIndex);
409+
aliases.add(targetFields.rightList().get(i));
394410
}
395411
}
396412
}
397413

398-
return createForceProject(relNode, cols);
414+
return createForceProject(relNode, cols, aliases);
399415
}
400416
}
401417

402-
static RelNode createForceProject(final RelNode child, final List<Integer> posList) {
403-
return createForceProject(RelFactories.DEFAULT_PROJECT_FACTORY, child, posList);
418+
static RelNode createForceProject(final RelNode child, final List<Integer> posList, final List<String> aliases) {
419+
return createForceProject(RelFactories.DEFAULT_PROJECT_FACTORY, child, posList, aliases);
404420
}
405421

406422
// By default, "projectNamed" will try to provide an optimization by not creating a new project if the
@@ -417,9 +433,7 @@ static RelNode createForceProject(final RelNode child, final List<Integer> posLi
417433
// This implementation is largely a duplicate of RelOptUtil.createProject(relNode, cols); which does not allow
418434
// overriding the "force" argument of "projectNamed".
419435
static RelNode createForceProject(final RelFactories.ProjectFactory factory,
420-
final RelNode child, final List<Integer> posList) {
421-
RelDataType rowType = child.getRowType();
422-
final List<String> fieldNames = rowType.getFieldNames();
436+
final RelNode child, final List<Integer> posList, final List<String> aliases) {
423437
final RelBuilder relBuilder =
424438
RelBuilder.proto(factory).create(child.getCluster(), null);
425439
final List<RexNode> exprs = new AbstractList<>() {
@@ -434,10 +448,9 @@ public RexNode get(int index) {
434448
return relBuilder.getRexBuilder().makeInputRef(child, pos);
435449
}
436450
};
437-
final List<String> names = Util.select(fieldNames, posList);
438451
return relBuilder
439452
.push(child)
440-
.projectNamed(exprs, names, true)
453+
.projectNamed(exprs, aliases, true)
441454
.build();
442455
}
443456

hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorSuffixTest.java renamed to hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorTest.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
import java.util.Arrays;
1919
import java.util.HashMap;
2020
import java.util.Map;
21+
22+
import static org.junit.jupiter.api.Assertions.assertEquals;
2123
import static org.junit.jupiter.api.Assertions.assertTrue;
2224

2325
/**
24-
* Tests for ScriptImplementor suffix functionality to handle source/sink table name collisions.
26+
* Tests for ScriptImplementor
2527
*/
26-
public class ScriptImplementorSuffixTest {
28+
public class ScriptImplementorTest {
2729
@Test
2830
public void testConnectorWithSuffix() {
2931
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
@@ -171,4 +173,60 @@ public void testFullPipelineWithCollision() {
171173
assertTrue(sql.contains("INSERT INTO `ADS`.`AD_CLICKS_sink`"),
172174
"Should insert into sink table. Got: " + sql);
173175
}
176+
177+
@Test
178+
public void testExplicitColumnEnumeration() {
179+
// Test for Flink 1.20 regression where INSERT with SELECT * fails
180+
// when sink has more columns than source
181+
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
182+
183+
// Source table: 2 columns
184+
RelDataType sourceType = typeFactory.builder()
185+
.add("KEY_source", typeFactory.createSqlType(SqlTypeName.VARCHAR))
186+
.add("nestedValue_source", typeFactory.builder()
187+
.add("innerInt_source", typeFactory.createSqlType(SqlTypeName.INTEGER))
188+
.add("innerArray_source", typeFactory.createArrayType(
189+
typeFactory.createSqlType(SqlTypeName.INTEGER), -1))
190+
.build())
191+
.build();
192+
193+
// Create schema with source table
194+
SchemaPlus rootSchema = Frameworks.createRootSchema(true);
195+
SchemaPlus sourceSchema = rootSchema.add("source", new AbstractSchema());
196+
sourceSchema.add("table", new AbstractTable() {
197+
@Override
198+
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
199+
return sourceType;
200+
}
201+
});
202+
203+
RelBuilder builder = RelBuilder.create(
204+
Frameworks.newConfigBuilder()
205+
.defaultSchema(rootSchema)
206+
.build());
207+
208+
// Create a query: SELECT KEY_source as KEY_sink, nestedValue_source as nestedValue_sink FROM source
209+
// This simulates the materialized view query
210+
RelNode query = builder
211+
.scan("source", "table")
212+
.project(
213+
builder.field("KEY_source"),
214+
builder.field("nestedValue_source"))
215+
.build();
216+
217+
// Target fields for INSERT - only the 2 columns we're actually inserting
218+
ImmutablePairList<Integer, String> targetFields = ImmutablePairList.copyOf(Arrays.asList(
219+
new AbstractMap.SimpleEntry<>(0, "KEY_sink"),
220+
new AbstractMap.SimpleEntry<>(1, "nestedValue_sink")
221+
));
222+
223+
String sql = ScriptImplementor.empty()
224+
.insert(null, "sink", "mypipeline", null, query, targetFields)
225+
.sql();
226+
227+
assertEquals(
228+
"INSERT INTO `sink`.`mypipeline` (`KEY_sink`, `nestedValue_sink`) "
229+
+ "SELECT `KEY_source` AS `KEY_sink`, `nestedValue_source` AS `nestedValue_sink` "
230+
+ "FROM `source`.`table`;", sql);
231+
}
174232
}

0 commit comments

Comments
 (0)