Skip to content

Commit d27dc82

Browse files
authored
[IcebergIO] Add ITs for RESTCatalog using BLMS (#35360)
* ITs for RESTCatalog using BLMS * update rest catalog config * use top-level gcs bucket for warehouse
1 parent d10374b commit d27dc82

4 files changed

Lines changed: 95 additions & 11 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 2
3+
"modification": 4
44
}

sdks/java/io/iceberg/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ dependencies {
9696
testImplementation project(":sdks:java:io:google-cloud-platform")
9797
testImplementation library.java.google_api_services_bigquery
9898

99+
testImplementation library.java.google_auth_library_oauth2_http
99100
testRuntimeOnly library.java.slf4j_jdk14
100101
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
101102
testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java")

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.io.iceberg.catalog;
1919

20+
import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamSchemaToIcebergSchema;
21+
import static org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema;
2022
import static org.apache.beam.sdk.managed.Managed.ICEBERG;
2123
import static org.apache.beam.sdk.managed.Managed.ICEBERG_CDC;
2224
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
@@ -313,7 +315,7 @@ public Row apply(Long num) {
313315
};
314316

315317
protected static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
316-
IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
318+
beamSchemaToIcebergSchema(BEAM_SCHEMA);
317319
protected static final SimpleFunction<Row, Record> RECORD_FUNC =
318320
new SimpleFunction<Row, Record>() {
319321
@Override
@@ -346,7 +348,7 @@ private List<Row> populateTable(Table table, @Nullable String charOverride) thro
346348
}
347349
DataWriter<Record> writer =
348350
Parquet.writeData(file)
349-
.schema(ICEBERG_SCHEMA)
351+
.schema(table.schema())
350352
.createWriterFunc(GenericParquetWriter::create)
351353
.overwrite()
352354
.withSpec(table.spec())
@@ -652,7 +654,7 @@ public void testWrite() throws IOException {
652654
pipeline.run().waitUntilFinish();
653655

654656
Table table = catalog.loadTable(TableIdentifier.parse(tableId()));
655-
assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA));
657+
assertEquals(BEAM_SCHEMA, icebergSchemaToBeamSchema(table.schema()));
656658

657659
// Read back and check records are correct
658660
List<Record> returnedRecords = readRecords(table);
@@ -664,16 +666,23 @@ public void testWrite() throws IOException {
664666
public void testWriteToPartitionedTable() throws IOException {
665667
Map<String, Object> config = new HashMap<>(managedIcebergConfig(tableId()));
666668
int truncLength = "value_x".length();
667-
config.put(
668-
"partition_fields",
669-
Arrays.asList("bool_field", "hour(datetime)", "truncate(str, " + truncLength + ")"));
669+
List<String> partitionFields =
670+
Arrays.asList("bool_field", "hour(datetime)", "truncate(str, " + truncLength + ")");
671+
config.put("partition_fields", partitionFields);
670672
PCollection<Row> input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
671673
input.apply(Managed.write(ICEBERG).withConfig(config));
672674
pipeline.run().waitUntilFinish();
673675

674676
// Read back and check records are correct
675677
Table table = catalog.loadTable(TableIdentifier.parse(tableId()));
676678
List<Record> returnedRecords = readRecords(table);
679+
PartitionSpec expectedSpec =
680+
PartitionSpec.builderFor(table.schema())
681+
.identity("bool_field")
682+
.hour("datetime")
683+
.truncate("str", truncLength)
684+
.build();
685+
assertEquals(expectedSpec, table.spec());
677686
assertThat(
678687
returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
679688
}
@@ -815,10 +824,8 @@ private void writeToDynamicDestinations(
815824
Table table3 = catalog.loadTable(TableIdentifier.parse(tableId() + "_3_d"));
816825
Table table4 = catalog.loadTable(TableIdentifier.parse(tableId() + "_4_e"));
817826

818-
org.apache.iceberg.Schema tableSchema =
819-
IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema());
820827
for (Table t : Arrays.asList(table0, table1, table2, table3, table4)) {
821-
assertTrue(t.schema().sameSchema(tableSchema));
828+
assertEquals(rowFilter.outputSchema(), icebergSchemaToBeamSchema(t.schema()));
822829
}
823830

824831
// Read back and check records are correct
@@ -830,6 +837,7 @@ private void writeToDynamicDestinations(
830837
readRecords(table3),
831838
readRecords(table4));
832839

840+
org.apache.iceberg.Schema tableSchema = beamSchemaToIcebergSchema(rowFilter.outputSchema());
833841
SerializableFunction<Row, Record> recordFunc =
834842
row -> IcebergUtils.beamRowToIcebergRecord(tableSchema, row);
835843

@@ -936,7 +944,7 @@ public void testWriteToDynamicNamespaces() throws IOException {
936944
table3false,
937945
table4true,
938946
table4false)) {
939-
assertTrue(t.schema().sameSchema(ICEBERG_SCHEMA));
947+
assertEquals(BEAM_SCHEMA, icebergSchemaToBeamSchema(t.schema()));
940948
}
941949

942950
// Read back and check records are correct
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.iceberg.catalog;
19+
20+
import java.util.Map;
21+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
22+
import org.apache.iceberg.catalog.Catalog;
23+
import org.apache.iceberg.rest.RESTCatalog;
24+
import org.junit.After;
25+
import org.junit.BeforeClass;
26+
27+
/** Tests for {@link org.apache.iceberg.rest.RESTCatalog} using BigLake Metastore. */
28+
public class RESTCatalogBLMSIT extends IcebergCatalogBaseIT {
29+
private static Map<String, String> catalogProps;
30+
31+
// Using a special bucket for this test class because
32+
// BigLake does not support using subfolders as a warehouse (yet)
33+
private static final String BIGLAKE_WAREHOUSE = "gs://managed-iceberg-biglake-its";
34+
35+
@BeforeClass
36+
public static void setup() {
37+
warehouse = BIGLAKE_WAREHOUSE;
38+
catalogProps =
39+
ImmutableMap.<String, String>builder()
40+
.put("type", "rest")
41+
.put("uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog")
42+
.put("warehouse", BIGLAKE_WAREHOUSE)
43+
.put("header.x-goog-user-project", OPTIONS.getProject())
44+
.put("rest-metrics-reporting-enabled", "false")
45+
.put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO")
46+
.put("rest.auth.type", "org.apache.iceberg.gcp.auth.GoogleAuthManager")
47+
.build();
48+
}
49+
50+
@After
51+
public void after() {
52+
// making sure the cleanup path is directed at the correct warehouse
53+
warehouse = BIGLAKE_WAREHOUSE;
54+
}
55+
56+
@Override
57+
public String type() {
58+
return "biglake";
59+
}
60+
61+
@Override
62+
public Catalog createCatalog() {
63+
RESTCatalog restCatalog = new RESTCatalog();
64+
restCatalog.initialize(catalogName, catalogProps);
65+
return restCatalog;
66+
}
67+
68+
@Override
69+
public Map<String, Object> managedIcebergConfig(String tableId) {
70+
return ImmutableMap.<String, Object>builder()
71+
.put("table", tableId)
72+
.put("catalog_properties", catalogProps)
73+
.build();
74+
}
75+
}

0 commit comments

Comments
 (0)