Skip to content

Commit 1aaf8a0

Browse files
committed
revert bucket partition validation
1 parent dbf9731 commit 1aaf8a0

3 files changed

Lines changed: 157 additions & 72 deletions

File tree

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java

Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.nio.charset.StandardCharsets;
3232
import java.util.ArrayList;
3333
import java.util.Collections;
34-
import java.util.HashMap;
3534
import java.util.Iterator;
3635
import java.util.LinkedList;
3736
import java.util.List;
@@ -97,10 +96,8 @@
9796
import org.apache.iceberg.Table;
9897
import org.apache.iceberg.avro.Avro;
9998
import org.apache.iceberg.catalog.TableIdentifier;
100-
import org.apache.iceberg.data.Record;
10199
import org.apache.iceberg.exceptions.AlreadyExistsException;
102100
import org.apache.iceberg.exceptions.NoSuchTableException;
103-
import org.apache.iceberg.io.CloseableIterable;
104101
import org.apache.iceberg.io.InputFile;
105102
import org.apache.iceberg.io.OutputFile;
106103
import org.apache.iceberg.mapping.MappingUtil;
@@ -111,8 +108,6 @@
111108
import org.apache.iceberg.transforms.Transform;
112109
import org.apache.iceberg.types.Conversions;
113110
import org.apache.iceberg.types.Type;
114-
import org.apache.iceberg.types.TypeUtil;
115-
import org.apache.iceberg.types.Types;
116111
import org.apache.parquet.hadoop.ParquetFileReader;
117112
import org.apache.parquet.hadoop.metadata.FileMetaData;
118113
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -573,11 +568,6 @@ private String getPartitionFromFilePath(String filePath) {
573568
* determine the partition. We also cannot fall back to a "null" partition, because that will
574569
* also get skipped by most queries.
575570
*
576-
* <p>The Bucket partition transform is an exceptional case because it is not monotonic, meaning
577-
* it's not enough to just compare the min and max values. There may be a middle value somewhere
578-
* that gets hashed to a different value. For this transform, we'll need to read all the values
579-
* in the column ensure they all get transformed to the same partition value.
580-
*
581571
* <p>In these cases, we output the DataFile to the DLQ, because assigning an incorrect
582572
* partition may lead to it being incorrectly ignored by downstream queries.
583573
*/
@@ -614,22 +604,9 @@ static String getPartitionFromMetrics(Metrics metrics, InputFile inputFile, Tabl
614604

615605
PartitionKey pk = new PartitionKey(table.spec(), table.schema());
616606

617-
HashMap<Integer, PartitionField> bucketPartitions = new HashMap<>();
607+
// read metadata from footer and set partition based on min/max transformed values
618608
for (int i = 0; i < fields.size(); i++) {
619609
PartitionField field = fields.get(i);
620-
Transform<?, ?> transform = field.transform();
621-
if (transform.toString().contains("bucket[")) {
622-
bucketPartitions.put(i, field);
623-
}
624-
}
625-
626-
// first, read only metadata for the non-bucket partition types
627-
for (int i = 0; i < fields.size(); i++) {
628-
PartitionField field = fields.get(i);
629-
// skip bucket partitions (we will process them below)
630-
if (bucketPartitions.containsKey(i)) {
631-
continue;
632-
}
633610
Type type = table.schema().findType(field.sourceId());
634611
Transform<?, ?> transform = field.transform();
635612

@@ -658,54 +635,6 @@ static String getPartitionFromMetrics(Metrics metrics, InputFile inputFile, Tabl
658635
pk.set(i, lowerTransformedValue);
659636
}
660637

661-
// bucket transform needs extra processing (see java doc above)
662-
if (!bucketPartitions.isEmpty()) {
663-
// Optimize by only reading bucket-transformed columns into memory
664-
org.apache.iceberg.Schema bucketCols =
665-
TypeUtil.select(
666-
table.schema(),
667-
bucketPartitions.values().stream()
668-
.map(PartitionField::sourceId)
669-
.collect(Collectors.toSet()));
670-
671-
// Keep one instance of transformed value per column. Use this to compare against each
672-
// record's transformed value.
673-
// Values in the same columns must yield the same transformed value, otherwise we cannot
674-
// determine a partition
675-
// from this file.
676-
Map<Integer, Object> transformedValues = new HashMap<>();
677-
678-
// Do a one-time read of the file and compare all bucket-transformed columns
679-
try (CloseableIterable<Record> reader = ReadUtils.createReader(inputFile, bucketCols)) {
680-
for (Record record : reader) {
681-
for (Map.Entry<Integer, PartitionField> entry : bucketPartitions.entrySet()) {
682-
int partitionIndex = entry.getKey();
683-
PartitionField partitionField = entry.getValue();
684-
Transform<?, ?> transform = partitionField.transform();
685-
Types.NestedField field = table.schema().findField(partitionField.sourceId());
686-
Object value = record.getField(field.name());
687-
688-
// set initial transformed value for this column
689-
@Nullable Object transformedValue = transformedValues.get(partitionIndex);
690-
Object currentTransformedValue = transformValue(transform, field.type(), value);
691-
if (transformedValue == null) {
692-
transformedValues.put(partitionIndex, checkStateNotNull(currentTransformedValue));
693-
continue;
694-
}
695-
696-
if (!Objects.deepEquals(currentTransformedValue, transformedValue)) {
697-
throw new UnknownPartitionException(
698-
"Found records with conflicting transformed values, for column: "
699-
+ field.name());
700-
}
701-
}
702-
}
703-
}
704-
705-
for (Map.Entry<Integer, Object> partitionCol : transformedValues.entrySet()) {
706-
pk.set(partitionCol.getKey(), partitionCol.getValue());
707-
}
708-
}
709638
return pk.toPath();
710639
}
711640
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.iceberg;
19+
20+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
21+
22+
import java.lang.reflect.Array;
23+
import java.math.BigDecimal;
24+
import java.nio.ByteBuffer;
25+
import java.time.Instant;
26+
import java.time.LocalDate;
27+
import java.time.LocalDateTime;
28+
import java.time.LocalTime;
29+
import java.util.concurrent.TimeUnit;
30+
import org.apache.beam.sdk.schemas.Schema;
31+
import org.apache.beam.sdk.schemas.Schema.FieldType;
32+
import org.apache.beam.sdk.schemas.logicaltypes.Date;
33+
import org.apache.beam.sdk.schemas.logicaltypes.DateTime;
34+
import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
35+
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
36+
import org.apache.beam.sdk.schemas.logicaltypes.Time;
37+
import org.apache.beam.sdk.values.Row;
38+
import org.apache.iceberg.StructLike;
39+
import org.apache.iceberg.types.Type;
40+
import org.apache.iceberg.types.Types;
41+
import org.apache.iceberg.util.DateTimeUtil;
42+
import org.apache.iceberg.util.UUIDUtil;
43+
import org.checkerframework.checker.nullness.qual.Nullable;
44+
45+
public class BeamRowWrapper implements StructLike {
46+
47+
private final FieldType[] types;
48+
private final @Nullable PositionalGetter<?>[] getters;
49+
private @Nullable Row row = null;
50+
51+
public BeamRowWrapper(Schema schema, Types.StructType struct) {
52+
int size = schema.getFieldCount();
53+
54+
types = (FieldType[]) Array.newInstance(FieldType.class, size);
55+
getters = (PositionalGetter[]) Array.newInstance(PositionalGetter.class, size);
56+
57+
for (int i = 0; i < size; i++) {
58+
types[i] = schema.getField(i).getType();
59+
getters[i] = buildGetter(types[i], struct.fields().get(i).type());
60+
}
61+
}
62+
63+
public BeamRowWrapper wrap(@Nullable Row row) {
64+
this.row = row;
65+
return this;
66+
}
67+
68+
@Override
69+
public int size() {
70+
return types.length;
71+
}
72+
73+
@Override
74+
public <T> @Nullable T get(int pos, Class<T> javaClass) {
75+
if (row == null || row.getValue(pos) == null) {
76+
return null;
77+
} else if (getters[pos] != null) {
78+
return javaClass.cast(getters[pos].get(checkStateNotNull(row), pos));
79+
}
80+
81+
return javaClass.cast(checkStateNotNull(row).getValue(pos));
82+
}
83+
84+
@Override
85+
public <T> void set(int pos, T value) {
86+
throw new UnsupportedOperationException(
87+
"Could not set a field in the BeamRowWrapper because rowData is read-only");
88+
}
89+
90+
private interface PositionalGetter<T> {
91+
T get(Row data, int pos);
92+
}
93+
94+
private static @Nullable PositionalGetter<?> buildGetter(FieldType beamType, Type icebergType) {
95+
switch (beamType.getTypeName()) {
96+
case BYTE:
97+
return Row::getByte;
98+
case INT16:
99+
return Row::getInt16;
100+
case STRING:
101+
return Row::getString;
102+
case BYTES:
103+
return (row, pos) -> {
104+
byte[] bytes = checkStateNotNull(row.getBytes(pos));
105+
if (Type.TypeID.UUID == icebergType.typeId()) {
106+
return UUIDUtil.convert(bytes);
107+
} else {
108+
return ByteBuffer.wrap(bytes);
109+
}
110+
};
111+
case DECIMAL:
112+
return Row::getDecimal;
113+
case DATETIME:
114+
return (row, pos) ->
115+
TimeUnit.MILLISECONDS.toMicros(checkStateNotNull(row.getDateTime(pos)).getMillis());
116+
case ROW:
117+
Schema beamSchema = checkStateNotNull(beamType.getRowSchema());
118+
Types.StructType structType = (Types.StructType) icebergType;
119+
120+
BeamRowWrapper nestedWrapper = new BeamRowWrapper(beamSchema, structType);
121+
return (row, pos) -> nestedWrapper.wrap(row.getRow(pos));
122+
case LOGICAL_TYPE:
123+
if (beamType.isLogicalType(MicrosInstant.IDENTIFIER)) {
124+
return (row, pos) -> {
125+
Instant instant = checkStateNotNull(row.getLogicalTypeValue(pos, Instant.class));
126+
return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + instant.getNano() / 1000;
127+
};
128+
} else if (beamType.isLogicalType(DateTime.IDENTIFIER)) {
129+
return (row, pos) ->
130+
DateTimeUtil.microsFromTimestamp(
131+
checkStateNotNull(row.getLogicalTypeValue(pos, LocalDateTime.class)));
132+
} else if (beamType.isLogicalType(Date.IDENTIFIER)) {
133+
return (row, pos) ->
134+
DateTimeUtil.daysFromDate(
135+
checkStateNotNull(row.getLogicalTypeValue(pos, LocalDate.class)));
136+
} else if (beamType.isLogicalType(Time.IDENTIFIER)) {
137+
return (row, pos) ->
138+
DateTimeUtil.microsFromTime(
139+
checkStateNotNull(row.getLogicalTypeValue(pos, LocalTime.class)));
140+
} else if (beamType.isLogicalType(FixedPrecisionNumeric.IDENTIFIER)) {
141+
return (row, pos) -> row.getLogicalTypeValue(pos, BigDecimal.class);
142+
} else {
143+
return null;
144+
}
145+
default:
146+
return null;
147+
}
148+
}
149+
}

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.joda.time.Duration;
8181
import org.junit.Before;
8282
import org.junit.ClassRule;
83+
import org.junit.Ignore;
8384
import org.junit.Rule;
8485
import org.junit.Test;
8586
import org.junit.rules.ExpectedException;
@@ -425,6 +426,12 @@ public void testPartitionPrefixErrors() throws Exception {
425426
pipeline.run().waitUntilFinish();
426427
}
427428

429+
/**
430+
* We reverted the in-depth bucket-partition validation in
431+
* https://github.com/apache/beam/pull/38039, partly because it was too resource intensive, and
432+
* also because the Spark AddFiles equivalent performs zero validation.
433+
*/
434+
@Ignore
428435
@Test
429436
public void testRecognizesBucketPartitionMismatch() throws IOException {
430437
String file1 = root + "data1.parquet";

0 commit comments

Comments
 (0)