Skip to content

Commit e6ccbfc

Browse files
committed
WiP, on a file read/write benchmark
Using the same structure as the iceberg tests do
1 parent 4edec9a commit e6ccbfc

2 files changed

Lines changed: 355 additions & 55 deletions

File tree

parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/VariantBenchmark.java

Lines changed: 61 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.parquet.benchmarks;
2020

2121
import java.io.ByteArrayOutputStream;
22+
import java.io.EOFException;
2223
import java.io.IOException;
2324
import java.math.BigDecimal;
2425
import java.nio.ByteBuffer;
@@ -73,6 +74,8 @@
7374
import org.openjdk.jmh.annotations.Timeout;
7475
import org.openjdk.jmh.annotations.Warmup;
7576
import org.openjdk.jmh.infra.Blackhole;
77+
import org.slf4j.Logger;
78+
import org.slf4j.LoggerFactory;
7679

7780
/**
7881
* JMH benchmarks for {@link VariantBuilder}: construction, serialization and deserialization of
@@ -99,18 +102,18 @@
99102
* ./parquet-benchmarks/run.sh all org.apache.parquet.benchmarks.VariantBenchmark \
100103
* -wi 5 -i 5 -f 1 -rff target/results.json
101104
* </pre>
102-
*
103-
* Change fork to 1 before merge
104105
*/
105106
@Fork(1)
106107
@State(Scope.Benchmark)
107108
@Warmup(iterations = 100)
108-
@Measurement(iterations = 100)
109+
@Measurement(iterations = 250)
109110
@BenchmarkMode(Mode.SingleShotTime)
110111
@OutputTimeUnit(TimeUnit.MICROSECONDS)
111112
@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
112113
public class VariantBenchmark {
113114

115+
private static final Logger LOG = LoggerFactory.getLogger(VariantBenchmark.class);
116+
114117
/** Whether to include nested sub-objects in the field values. */
115118
public enum Depth {
116119
/** Flat structure: no nesting. */
@@ -149,7 +152,8 @@ public enum Depth {
149152
private static int count() {
150153
int c = counter++;
151154
if (c >= 512) {
152-
c = 0;
155+
counter = 0;
156+
c = counter;
153157
}
154158
return c;
155159
}
@@ -393,7 +397,7 @@ public void setupTrial() {
393397
* VariantBuilder#build()}. Measures object construction including dictionary encoding.
394398
*/
395399
@Benchmark
396-
public void benchmarkBuildVariant(Blackhole bh) {
400+
public void buildVariant(Blackhole bh) {
397401
for (int i = 0; i < ITERATIONS; i++) {
398402
Variant v = buildVariant();
399403
bh.consume(v.getValueBuffer());
@@ -409,7 +413,7 @@ public void benchmarkBuildVariant(Blackhole bh) {
409413
* primarily measures the ByteBuffer access and the Blackhole overhead..
410414
*/
411415
@Benchmark
412-
public void benchmarkSerialize(Blackhole bh) {
416+
public void serializeVariant(Blackhole bh) {
413417
// duplicate() gives an independent position/limit on the same backing array –
414418
for (int i = 0; i < ITERATIONS; i++) {
415419
// equivalent to the Iceberg benchmark's outputBuffer.clear() + writeTo() pattern.
@@ -427,23 +431,9 @@ public void benchmarkSerialize(Blackhole bh) {
427431
* cost of sub-objects.
428432
*/
429433
@Benchmark
430-
public void benchmarkDeserialize(Blackhole bh) {
434+
public void deserializeVariant(Blackhole bh) {
431435
for (int j = 0; j < ITERATIONS; j++) {
432-
deserializeVariant(preBuiltVariant, bh);
433-
}
434-
}
435-
436-
/** Recursively deserialize a variant object, descending into any nested objects. */
437-
private void deserializeVariant(Variant v, Blackhole bh) {
438-
int n = v.numObjectElements();
439-
for (int i = 0; i < n; i++) {
440-
Variant.ObjectField field = v.getFieldAtIndex(i);
441-
bh.consume(field.key);
442-
if (field.value.getType() == Variant.Type.OBJECT) {
443-
deserializeVariant(field.value, bh);
444-
} else {
445-
bh.consume(field.value.getValueBuffer());
446-
}
436+
deserializeAndConsume(preBuiltVariant, bh);
447437
}
448438
}
449439

@@ -452,7 +442,7 @@ private void deserializeVariant(Variant v, Blackhole bh) {
452442
* field matching, and recursive decomposition that {@link VariantValueWriter} perform
453443
*/
454444
@Benchmark
455-
public void writeShredded(Blackhole blackhole) {
445+
public void consumeRecordsShredded(Blackhole blackhole) {
456446
for (int i = 0; i < ITERATIONS; i++) {
457447
VariantValueWriter.write(noopConsumer, shreddedSchema, preBuiltVariant);
458448
blackhole.consume(noopConsumer);
@@ -462,33 +452,33 @@ public void writeShredded(Blackhole blackhole) {
462452
/**
463453
* Write {@link #FILE_ROWS} rows of the pre-built variant to an in-memory Parquet file using the
464454
* shredded schema. Measures end-to-end Parquet encoding cost including page/row-group framing.
465-
* Compare with {@link #writeShredded} to quantify the overhead over raw schema traversal.
455+
* Compare with {@link #consumeRecordsShredded} to quantify the overhead over raw schema traversal.
466456
*/
467457
@Benchmark
468-
public void writeFileShredded(Blackhole blackhole) throws IOException {
469-
writeThenConsume(blackhole, shreddedSchema);
458+
public void writeToMemoryFile(Blackhole blackhole) throws IOException {
459+
writeToMemory(blackhole, shreddedSchema);
470460
}
471461

472462
/**
473463
* Write the pre-built variant to an unshredded schema (metadata + value only).
474464
* This is the baseline: the entire variant is written as a single binary blob.
475-
* Compare with {@link #writeShredded} to see the cost of shredding.
465+
* Compare with {@link #consumeRecordsShredded} to see the cost of shredding.
476466
*/
477467
@Benchmark
478-
public void writeUnshredded(Blackhole bh) {
468+
public void consumeRecordsUnshredded(Blackhole blackhole) {
479469
for (int i = 0; i < ITERATIONS; i++) {
480470
VariantValueWriter.write(noopConsumer, unshreddedSchema, preBuiltVariant);
481-
bh.consume(noopConsumer);
471+
blackhole.consume(noopConsumer);
482472
}
483473
}
484474

485475
/**
486476
* Write {@link #FILE_ROWS} rows of the pre-built variant to an in-memory Parquet file using the
487-
* unshredded schema (metadata + value binary blobs only). Baseline for {@link #writeFileShredded}.
477+
* unshredded schema (metadata + value binary blobs only). Baseline for {@link #writeToMemoryFile}.
488478
*/
489479
@Benchmark
490-
public void writeFileUnshredded(Blackhole bh) throws IOException {
491-
writeThenConsume(bh, unshreddedSchema);
480+
public void writeToMemoryUnshredded(Blackhole blackhole) throws IOException {
481+
writeToMemory(blackhole, unshreddedSchema);
492482
}
493483

494484
/**
@@ -497,13 +487,8 @@ public void writeFileUnshredded(Blackhole bh) throws IOException {
497487
*/
498488
@Benchmark
499489
public void readFileShredded(Blackhole blackhole) throws IOException {
500-
try (ParquetReader<Variant> reader =
501-
new VariantReaderBuilder(new ByteArrayInputFile(shreddedFileBytes)).build()) {
502-
Variant v;
503-
while ((v = reader.read()) != null) {
504-
blackhole.consume(v);
505-
}
506-
}
490+
final ByteArrayInputFile inputFile = new ByteArrayInputFile(shreddedFileBytes);
491+
consumeInputFile(blackhole, inputFile);
507492
}
508493

509494
/**
@@ -512,22 +497,15 @@ public void readFileShredded(Blackhole blackhole) throws IOException {
512497
*/
513498
@Benchmark
514499
public void readFileUnshredded(Blackhole bh) throws IOException {
515-
try (ParquetReader<Variant> reader =
516-
new VariantReaderBuilder(new ByteArrayInputFile(unshreddedFileBytes)).build()) {
517-
Variant v;
518-
while ((v = reader.read()) != null) {
519-
bh.consume(v);
520-
}
521-
}
500+
consumeInputFile(bh, new ByteArrayInputFile(unshreddedFileBytes));
522501
}
523502

524503
// ------------------------------------------------------------------
525504
// Internal helpers
526505
// ------------------------------------------------------------------
527506

528507
/**
529-
* Build a complete Variant object from the pre-decided field types. This is the core logic shared
530-
* between {@link #benchmarkBuildVariant} and setup..
508+
* Build a complete Variant object from the pre-decided field types.
531509
*/
532510
private Variant buildVariant() {
533511
VariantBuilder builder = new VariantBuilder();
@@ -608,6 +586,20 @@ private GroupType buildShreddedSchema() {
608586
.named("variant_field");
609587
}
610588

589+
/** Recursively deserialize a variant object, descending into any nested objects. */
590+
private void deserializeAndConsume(Variant v, Blackhole bh) {
591+
int n = v.numObjectElements();
592+
for (int i = 0; i < n; i++) {
593+
Variant.ObjectField field = v.getFieldAtIndex(i);
594+
bh.consume(field.key);
595+
if (field.value.getType() == Variant.Type.OBJECT) {
596+
deserializeAndConsume(field.value, bh);
597+
} else {
598+
bh.consume(field.value.getValueBuffer());
599+
}
600+
}
601+
}
602+
611603
/**
612604
* Write {@link #FILE_ROWS} copies of {@link #preBuiltVariant} to a fresh in-memory Parquet file
613605
* using the given schema. Used both in {@link #setupTrial()} to pre-build read buffers and as the
@@ -620,6 +612,7 @@ private byte[] writeVariantsToMemory(GroupType schema) throws IOException {
620612
writer.write(preBuiltVariant);
621613
}
622614
}
615+
LOG.info("Written Parquet file has size: {}", out.size());
623616
return out.toByteArray();
624617
}
625618

@@ -630,14 +623,23 @@ private byte[] writeVariantsToMemory(GroupType schema) throws IOException {
630623
* @param schema schema
631624
* @throws IOException write failure
632625
*/
633-
private void writeThenConsume(final Blackhole blackhole, final GroupType schema) throws IOException {
634-
ByteArrayOutputFile out = new ByteArrayOutputFile();
635-
try (ParquetWriter<Variant> writer = new VariantWriterBuilder(out, schema).build()) {
636-
for (int i = 0; i < FILE_ROWS; i++) {
637-
writer.write(preBuiltVariant);
626+
private void writeToMemory(final Blackhole blackhole, final GroupType schema) throws IOException {
627+
blackhole.consume(writeVariantsToMemory(schema));
628+
}
629+
630+
/**
631+
* Consume an Input file.
632+
* @param blackhole black hole
633+
* @param inputFile input file
634+
*/
635+
private static void consumeInputFile(final Blackhole blackhole, final ByteArrayInputFile inputFile)
636+
throws IOException {
637+
try (ParquetReader<Variant> reader = new VariantReaderBuilder(inputFile).build()) {
638+
Variant v;
639+
while ((v = reader.read()) != null) {
640+
blackhole.consume(v);
638641
}
639642
}
640-
blackhole.consume(out.toByteArray());
641643
}
642644

643645
// ------------------------------------------------------------------
@@ -694,6 +696,10 @@ public boolean supportsBlockSize() {
694696
public long defaultBlockSize() {
695697
return 0;
696698
}
699+
700+
int size() {
701+
return baos.size();
702+
}
697703
}
698704

699705
/** An {@link InputFile} backed by a {@code byte[]}. */
@@ -747,7 +753,7 @@ public void readFully(byte[] bytes) throws IOException {
747753
@Override
748754
public void readFully(byte[] bytes, int start, int len) throws IOException {
749755
if (pos + len > data.length) {
750-
throw new IOException("Unexpected end of data");
756+
throw new EOFException("Unexpected end of data");
751757
}
752758
System.arraycopy(data, pos, bytes, start, len);
753759
pos += len;

0 commit comments

Comments
 (0)