Skip to content

Commit 8ee8b10

Browse files
authored
Add more ParquetIo write options (#37740)
* add more parquet options * comments * more tests and use default
1 parent d2fead7 commit 8ee8b10

2 files changed

Lines changed: 149 additions & 3 deletions

File tree

sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.apache.parquet.avro.AvroParquetReader;
7070
import org.apache.parquet.avro.AvroParquetWriter;
7171
import org.apache.parquet.avro.AvroReadSupport;
72+
import org.apache.parquet.column.ParquetProperties;
7273
import org.apache.parquet.column.page.PageReadStore;
7374
import org.apache.parquet.filter2.compat.FilterCompat;
7475
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
@@ -1005,8 +1006,11 @@ public static Sink sink(Schema schema) {
10051006
return new AutoValue_ParquetIO_Sink.Builder()
10061007
.setJsonSchema(schema.toString())
10071008
.setCompressionCodec(CompressionCodecName.SNAPPY)
1008-
// This resembles the default value for ParquetWriter.rowGroupSize.
10091009
.setRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
1010+
.setPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
1011+
.setEnableDictionary(ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED)
1012+
.setEnableBloomFilter(ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED)
1013+
.setMinRowCountForPageSizeCheck(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK)
10101014
.build();
10111015
}
10121016

@@ -1022,6 +1026,14 @@ public abstract static class Sink implements FileIO.Sink<GenericRecord> {
10221026

10231027
abstract int getRowGroupSize();
10241028

1029+
abstract int getPageSize();
1030+
1031+
abstract boolean getEnableDictionary();
1032+
1033+
abstract boolean getEnableBloomFilter();
1034+
1035+
abstract int getMinRowCountForPageSizeCheck();
1036+
10251037
abstract @Nullable Class<? extends GenericData> getAvroDataModelClass();
10261038

10271039
abstract Builder toBuilder();
@@ -1036,6 +1048,14 @@ abstract static class Builder {
10361048

10371049
abstract Builder setRowGroupSize(int rowGroupSize);
10381050

1051+
abstract Builder setPageSize(int pageSize);
1052+
1053+
abstract Builder setEnableDictionary(boolean enableDictionary);
1054+
1055+
abstract Builder setEnableBloomFilter(boolean enableBloomFilter);
1056+
1057+
abstract Builder setMinRowCountForPageSizeCheck(int minRowCountForPageSizeCheck);
1058+
10391059
abstract Builder setAvroDataModelClass(Class<? extends GenericData> modelClass);
10401060

10411061
abstract Sink build();
@@ -1064,6 +1084,34 @@ public Sink withRowGroupSize(int rowGroupSize) {
10641084
return toBuilder().setRowGroupSize(rowGroupSize).build();
10651085
}
10661086

1087+
/** Specify the page size for the Parquet writer. Defaults to {@code 1 MB}. */
1088+
public Sink withPageSize(int pageSize) {
1089+
checkArgument(pageSize > 0, "pageSize must be positive");
1090+
return toBuilder().setPageSize(pageSize).build();
1091+
}
1092+
1093+
/** Enable or disable dictionary encoding. Enabled by default. */
1094+
public Sink withDictionaryEncoding(boolean enableDictionary) {
1095+
return toBuilder().setEnableDictionary(enableDictionary).build();
1096+
}
1097+
1098+
/** Enable or disable bloom filters. Disabled by default. */
1099+
public Sink withBloomFilterEnabled(boolean enableBloomFilter) {
1100+
return toBuilder().setEnableBloomFilter(enableBloomFilter).build();
1101+
}
1102+
1103+
/**
1104+
* Specify the minimum number of rows to write before a page size check is performed. The writer
1105+
* buffers at least this many rows before checking whether the page size threshold has been
1106+
* reached. With large rows, the default ({@code 100}) can cause excessive memory use; set a
1107+
* lower value (e.g. {@code 1}) to flush pages more frequently.
1108+
*/
1109+
public Sink withMinRowCountForPageSizeCheck(int minRowCountForPageSizeCheck) {
1110+
checkArgument(
1111+
minRowCountForPageSizeCheck > 0, "minRowCountForPageSizeCheck must be positive");
1112+
return toBuilder().setMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck).build();
1113+
}
1114+
10671115
/**
10681116
* Define the Avro data model; see {@link AvroParquetWriter.Builder#withDataModel(GenericData)}.
10691117
*/
@@ -1079,6 +1127,7 @@ public void open(WritableByteChannel channel) throws IOException {
10791127

10801128
Schema schema = new Schema.Parser().parse(getJsonSchema());
10811129
Class<? extends GenericData> modelClass = getAvroDataModelClass();
1130+
Configuration conf = SerializableConfiguration.newConfiguration(getConfiguration());
10821131

10831132
BeamParquetOutputFile beamParquetOutputFile =
10841133
new BeamParquetOutputFile(Channels.newOutputStream(channel));
@@ -1088,8 +1137,13 @@ public void open(WritableByteChannel channel) throws IOException {
10881137
.withSchema(schema)
10891138
.withCompressionCodec(getCompressionCodec())
10901139
.withWriteMode(OVERWRITE)
1091-
.withConf(SerializableConfiguration.newConfiguration(getConfiguration()))
1092-
.withRowGroupSize(getRowGroupSize());
1140+
.withConf(conf)
1141+
.withRowGroupSize(getRowGroupSize())
1142+
.withPageSize(getPageSize())
1143+
.withDictionaryEncoding(getEnableDictionary())
1144+
.withBloomFilterEnabled(getEnableBloomFilter())
1145+
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck());
1146+
10931147
if (modelClass != null) {
10941148
try {
10951149
builder.withDataModel(buildModelObject(modelClass));

sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
2222
import static org.hamcrest.MatcherAssert.assertThat;
2323
import static org.junit.Assert.assertEquals;
24+
import static org.junit.Assert.assertFalse;
2425
import static org.junit.Assert.assertThrows;
2526
import static org.junit.Assert.assertTrue;
2627
import static org.mockito.Mockito.mock;
2728
import static org.mockito.Mockito.when;
2829

2930
import java.io.ByteArrayOutputStream;
31+
import java.io.File;
3032
import java.io.IOException;
3133
import java.io.Serializable;
3234
import java.io.UnsupportedEncodingException;
@@ -57,8 +59,11 @@
5759
import org.apache.hadoop.conf.Configuration;
5860
import org.apache.parquet.filter2.predicate.FilterApi;
5961
import org.apache.parquet.filter2.predicate.FilterPredicate;
62+
import org.apache.parquet.hadoop.ParquetFileReader;
6063
import org.apache.parquet.hadoop.ParquetInputFormat;
6164
import org.apache.parquet.hadoop.metadata.BlockMetaData;
65+
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
66+
import org.apache.parquet.hadoop.util.HadoopInputFile;
6267
import org.apache.parquet.io.api.Binary;
6368
import org.junit.Rule;
6469
import org.junit.Test;
@@ -518,6 +523,93 @@ public void testWriteAndReadFilesAsJsonForUnknownSchemaWithConfiguration() {
518523
readPipeline.run().waitUntilFinish();
519524
}
520525

526+
@Test
527+
public void testWriteWithDefaultWriterProperties() throws Exception {
528+
List<GenericRecord> records = generateGenericRecords(1000);
529+
530+
mainPipeline
531+
.apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
532+
.apply(
533+
FileIO.<GenericRecord>write()
534+
.via(ParquetIO.sink(SCHEMA))
535+
.to(temporaryFolder.getRoot().getAbsolutePath()));
536+
mainPipeline.run().waitUntilFinish();
537+
538+
File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) -> !name.startsWith("."));
539+
assertTrue("Expected at least one output file", outputFiles != null && outputFiles.length > 0);
540+
541+
org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(outputFiles[0].toURI());
542+
try (ParquetFileReader reader =
543+
ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new Configuration()))) {
544+
ParquetMetadata footer = reader.getFooter();
545+
546+
// Verify bloom filters are absent by default.
547+
boolean hasBloomFilter =
548+
footer.getBlocks().stream()
549+
.flatMap(block -> block.getColumns().stream())
550+
.anyMatch(col -> col.getBloomFilterOffset() >= 0);
551+
assertFalse("Expected no bloom filters by default", hasBloomFilter);
552+
553+
// Verify dictionary encoding is enabled by default.
554+
boolean hasDictionary =
555+
footer.getBlocks().stream()
556+
.flatMap(block -> block.getColumns().stream())
557+
.anyMatch(col -> col.getDictionaryPageOffset() > 0);
558+
assertTrue("Expected dictionary pages to be present by default", hasDictionary);
559+
}
560+
}
561+
562+
@Test
563+
public void testWriteWithWriterProperties() throws Exception {
564+
List<GenericRecord> records = generateGenericRecords(1000);
565+
566+
mainPipeline
567+
.apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
568+
.apply(
569+
FileIO.<GenericRecord>write()
570+
.via(
571+
ParquetIO.sink(SCHEMA)
572+
.withPageSize(256 * 1024)
573+
.withDictionaryEncoding(false)
574+
.withBloomFilterEnabled(true)
575+
.withMinRowCountForPageSizeCheck(5))
576+
.to(temporaryFolder.getRoot().getAbsolutePath()));
577+
mainPipeline.run().waitUntilFinish();
578+
579+
// Read back the file metadata and verify the settings took effect.
580+
File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) -> !name.startsWith("."));
581+
assertTrue("Expected at least one output file", outputFiles != null && outputFiles.length > 0);
582+
583+
org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(outputFiles[0].toURI());
584+
try (ParquetFileReader reader =
585+
ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new Configuration()))) {
586+
ParquetMetadata footer = reader.getFooter();
587+
588+
// Verify bloom filters were written: at least one column should have a bloom filter.
589+
boolean hasBloomFilter =
590+
footer.getBlocks().stream()
591+
.flatMap(block -> block.getColumns().stream())
592+
.anyMatch(col -> col.getBloomFilterOffset() >= 0);
593+
assertTrue("Expected bloom filters to be present", hasBloomFilter);
594+
595+
// Verify dictionary encoding was disabled: no columns should have dictionary pages.
596+
// getDictionaryPageOffset() returns 0 when no dictionary page is present.
597+
boolean hasDictionary =
598+
footer.getBlocks().stream()
599+
.flatMap(block -> block.getColumns().stream())
600+
.anyMatch(col -> col.getDictionaryPageOffset() > 0);
601+
assertFalse(
602+
"Expected no dictionary pages when dictionary encoding is disabled", hasDictionary);
603+
}
604+
605+
// Verify the data still round-trips correctly.
606+
PCollection<GenericRecord> readBack =
607+
readPipeline.apply(
608+
ParquetIO.read(SCHEMA).from(temporaryFolder.getRoot().getAbsolutePath() + "/*"));
609+
PAssert.that(readBack).containsInAnyOrder(records);
610+
readPipeline.run().waitUntilFinish();
611+
}
612+
521613
/** Returns list of JSON representation of GenericRecords. */
522614
private static List<String> convertRecordsToJson(List<GenericRecord> records) {
523615
return records.stream().map(ParseGenericRecordAsJsonFn.create()::apply).collect(toList());

0 commit comments

Comments
 (0)