Skip to content

Commit 2e29ea6

Browse files
committed
projection works
1 parent 0889e2d commit 2e29ea6

1 file changed

Lines changed: 108 additions & 58 deletions

File tree

parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/VariantReadBenchmark.java

Lines changed: 108 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
@Fork(0)
9696
@State(Scope.Benchmark)
9797
@Warmup(iterations = 3)
98-
@Measurement(iterations = 10)
98+
@Measurement(iterations = 5)
9999
@BenchmarkMode(Mode.SingleShotTime)
100100
@OutputTimeUnit(MILLISECONDS)
101101
@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
@@ -104,24 +104,36 @@ public class VariantReadBenchmark {
104104
private static final Logger LOG = LoggerFactory.getLogger(VariantReadBenchmark.class);
105105

106106
/** Number of rows written per file. */
107-
private static final int NUM_ROWS = 10_000_000;
107+
private static final int NUM_ROWS = 1_000_000;
108108

109-
/** 20 col4 values, one per category. */
109+
private static final int CATEGORIES = 20;
110+
111+
/** The col4 values, one per category. */
110112
private static final String[] COL4_VALUES;
111113

112114
static {
113-
COL4_VALUES = new String[20];
114-
for (int i = 0; i < 20; i++) {
115+
COL4_VALUES = new String[CATEGORIES];
116+
for (int i = 0; i < CATEGORIES; i++) {
115117
COL4_VALUES[i] = "col4_category_" + i;
116118
}
117119
}
118120

119121
@Param({"true", "false"})
120122
public boolean shredded;
121-
123+
/**
124+
* The record schema with the unshredded variant.
125+
*/
122126
private final MessageType unshreddedSchema;
127+
128+
/**
129+
* The shredded schema splits all expected variant group entries
130+
* into their own columns.
131+
*/
123132
private final MessageType shreddedSchema;
124-
private final MessageType projectionSchema;
133+
/**
134+
* Select schema.
135+
*/
136+
private final MessageType selectSchema;
125137
private MessageType activeSchema;
126138
private Configuration conf;
127139
private FileSystem fs;
@@ -130,6 +142,8 @@ public class VariantReadBenchmark {
130142
private Path dataFile;
131143

132144
public VariantReadBenchmark() {
145+
146+
133147
unshreddedSchema = parseMessageType("message vschema { "
134148
+ "required int64 id;"
135149
+ "required int32 category;"
@@ -166,9 +180,9 @@ public VariantReadBenchmark() {
166180
+ " }"
167181
+ "}");
168182

169-
projectionSchema = parseMessageType("message vschema { "
183+
184+
selectSchema = parseMessageType("message vschema { "
170185
+ "required int64 id;"
171-
+ "required int32 category;"
172186
+ "optional group nested (VARIANT(1)) {"
173187
+ " required binary metadata;"
174188
+ " optional binary value;"
@@ -177,7 +191,8 @@ public VariantReadBenchmark() {
177191
+ " optional binary value;"
178192
+ " optional int32 typed_value;"
179193
+ " }"
180-
+ " }"
194+
+ " }"
195+
+ " }"
181196
+ "}");
182197

183198
}
@@ -212,7 +227,7 @@ private void writeDataset(final MessageType schema, final Path path) throws IOEx
212227
try (ParquetWriter<RowRecord> writer =
213228
new RowWriterBuilder(HadoopOutputFile.fromPath(path, conf), schema, nestedGroup).build()) {
214229
for (int i = 0; i < NUM_ROWS; i++) {
215-
int category = i % 20;
230+
int category = i % CATEGORIES;
216231
writer.write(new RowRecord(i, category, buildVariant(i, category, COL4_VALUES[category])));
217232
}
218233
}
@@ -238,17 +253,19 @@ public void readFileWithoutProjection(Blackhole blackhole) throws IOException {
238253
}
239254

240255
/**
241-
* Like {@link #readFileWithoutProjection(Blackhole)}, but uses column projection to read only the
242-
* {@code nested.typed_value.varcategory} column, skipping {@code id}, {@code category},
243-
* {@code idstr}, {@code varid}, and {@code col4}.
256+
* Like {@link #readFileWithoutProjection(Blackhole)}, but uses column projection to read only
257+
* {@code id} and {@code nested.typed_value.varcategory}, skipping {@code category}, {@code
258+
* idstr}, {@code varid}, and {@code col4}.
244259
*/
245260
@Benchmark
246261
public void readFileProjected(Blackhole blackhole) throws IOException {
247-
try (ParquetReader<Variant> reader =
262+
try (ParquetReader<RowRecord> reader =
248263
new ProjectedReaderBuilder(HadoopInputFile.fromPath(dataFile, conf)).build()) {
249-
Variant nested;
250-
while ((nested = reader.read()) != null) {
251-
Variant varcategory = nested.getFieldByKey("varcategory");
264+
RowRecord row;
265+
while ((row = reader.read()) != null) {
266+
blackhole.consume(row.id);
267+
blackhole.consume(row.category);
268+
Variant varcategory = row.variant.getFieldByKey("varcategory");
252269
if (varcategory != null) {
253270
blackhole.consume(varcategory.getInt());
254271
}
@@ -528,40 +545,43 @@ Variant getCurrentVariant() {
528545
// ------------------------------------------------------------------
529546

530547
/** {@link ParquetReader.Builder} using {@link ProjectedReadSupport}. */
531-
private static final class ProjectedReaderBuilder extends ParquetReader.Builder<Variant> {
548+
private final class ProjectedReaderBuilder extends ParquetReader.Builder<RowRecord> {
532549
ProjectedReaderBuilder(InputFile file) {
533550
super(file);
534551
}
535552

536553
@Override
537-
protected ReadSupport<Variant> getReadSupport() {
554+
protected ReadSupport<RowRecord> getReadSupport() {
538555
return new ProjectedReadSupport();
539556
}
540557
}
541558

559+
private static final MessageType VARCATEGORY_PROJECTION = new MessageType(
560+
"vschema",
561+
Types.required(PrimitiveTypeName.INT64).named("id"),
562+
Types.required(PrimitiveTypeName.INT32).named("category"),
563+
Types.optionalGroup()
564+
.as(LogicalTypeAnnotation.variantType((byte) 1))
565+
.required(PrimitiveTypeName.BINARY)
566+
.named("metadata")
567+
.optional(PrimitiveTypeName.BINARY)
568+
.named("value")
569+
.addField(Types.optionalGroup()
570+
.addField(Types.optionalGroup()
571+
.optional(PrimitiveTypeName.BINARY)
572+
.named("value")
573+
.optional(PrimitiveTypeName.INT32)
574+
.named("typed_value")
575+
.named("varcategory"))
576+
.named("typed_value"))
577+
.named("nested"));
578+
542579
/**
543-
* {@link ReadSupport} that projects the file schema down to only the {@code nested} variant's
544-
* {@code varcategory} field, skipping {@code id}, {@code category}, {@code idstr},
545-
* {@code varid}, and {@code col4} column chunks entirely.
580+
* {@link ReadSupport} that projects the file schema down to {@code id} and only the {@code
581+
* nested.typed_value.varcategory} column, skipping {@code category}, {@code idstr}, {@code
582+
* varid}, and {@code col4} column chunks entirely.
546583
*/
547-
private static final class ProjectedReadSupport extends ReadSupport<Variant> {
548-
private static final MessageType VARCATEGORY_PROJECTION = new MessageType(
549-
"vschema",
550-
Types.optionalGroup()
551-
.as(LogicalTypeAnnotation.variantType((byte) 1))
552-
.required(PrimitiveTypeName.BINARY)
553-
.named("metadata")
554-
.optional(PrimitiveTypeName.BINARY)
555-
.named("value")
556-
.addField(Types.optionalGroup()
557-
.addField(Types.optionalGroup()
558-
.optional(PrimitiveTypeName.BINARY)
559-
.named("value")
560-
.optional(PrimitiveTypeName.INT32)
561-
.named("typed_value")
562-
.named("varcategory"))
563-
.named("typed_value"))
564-
.named("nested"));
584+
private final class ProjectedReadSupport extends ReadSupport<RowRecord> {
565585

566586
@Override
567587
public ReadContext init(InitContext context) {
@@ -576,24 +596,25 @@ public ReadContext init(InitContext context) {
576596
}
577597

578598
@Override
579-
public RecordMaterializer<Variant> prepareForRead(
599+
public RecordMaterializer<RowRecord> prepareForRead(
580600
Configuration conf,
581601
Map<String, String> keyValueMetaData,
582602
MessageType fileSchema,
583603
ReadContext readContext) {
584604
// Use the requested schema from the ReadContext — either VARCATEGORY_PROJECTION
585605
// (shredded) or the full file schema (unshredded fallback).
586-
GroupType nestedGroup = readContext.getRequestedSchema().getType("nested").asGroupType();
587-
return new ProjectedRecordMaterializer(nestedGroup);
606+
MessageType requestedSchema = readContext.getRequestedSchema();
607+
GroupType nestedGroup = requestedSchema.getType("nested").asGroupType();
608+
return new ProjectedRecordMaterializer(requestedSchema, nestedGroup);
588609
}
589610
}
590611

591-
/** Materializes the {@code nested} {@link Variant} from the projected single-field schema. */
592-
private static final class ProjectedRecordMaterializer extends RecordMaterializer<Variant> {
612+
/** Materializes a {@link RowRecord} from the projected schema. */
613+
private static final class ProjectedRecordMaterializer extends RecordMaterializer<RowRecord> {
593614
private final ProjectedMessageConverter root;
594615

595-
ProjectedRecordMaterializer(GroupType nestedGroup) {
596-
this.root = new ProjectedMessageConverter(nestedGroup);
616+
ProjectedRecordMaterializer(MessageType requestedSchema, GroupType nestedGroup) {
617+
this.root = new ProjectedMessageConverter(requestedSchema, nestedGroup);
597618
}
598619

599620
@Override
@@ -602,35 +623,64 @@ public GroupConverter getRootConverter() {
602623
}
603624

604625
@Override
605-
public Variant getCurrentRecord() {
606-
return root.getCurrentVariant();
626+
public RowRecord getCurrentRecord() {
627+
return root.getCurrentRecord();
607628
}
608629
}
609630

610631
/**
611-
* Root converter for the projected schema: single field 0 ({@code nested}). No converters for
612-
* {@code id} or {@code category} — those columns are not requested and never decoded.
632+
* Root converter for the projected schema. Routes {@code id}, {@code category}, and {@code
633+
* nested} to dedicated converters; indices are resolved dynamically from the requested schema so
634+
* both the shredded projection and the unshredded full-schema fallback work correctly.
613635
*/
614636
private static final class ProjectedMessageConverter extends GroupConverter {
637+
private final int idIndex;
638+
private final int categoryIndex;
639+
private final int nestedIndex;
640+
private final PrimitiveConverter idConverter;
641+
private final PrimitiveConverter categoryConverter;
615642
private final RowVariantGroupConverter variantConverter;
643+
private long id;
644+
private int category;
616645

617-
ProjectedMessageConverter(GroupType nestedGroup) {
618-
this.variantConverter = new RowVariantGroupConverter(nestedGroup);
646+
ProjectedMessageConverter(MessageType requestedSchema, GroupType nestedGroup) {
647+
idIndex = requestedSchema.getFieldIndex("id");
648+
categoryIndex = requestedSchema.getFieldIndex("category");
649+
nestedIndex = requestedSchema.getFieldIndex("nested");
650+
idConverter = new PrimitiveConverter() {
651+
@Override
652+
public void addLong(long value) {
653+
id = value;
654+
}
655+
};
656+
categoryConverter = new PrimitiveConverter() {
657+
@Override
658+
public void addInt(int value) {
659+
category = value;
660+
}
661+
};
662+
variantConverter = new RowVariantGroupConverter(nestedGroup);
619663
}
620664

621665
@Override
622666
public Converter getConverter(int fieldIndex) {
623-
return variantConverter;
667+
if (fieldIndex == idIndex) return idConverter;
668+
if (fieldIndex == categoryIndex) return categoryConverter;
669+
if (fieldIndex == nestedIndex) return variantConverter;
670+
throw new IllegalArgumentException("Unknown field index: " + fieldIndex);
624671
}
625672

626673
@Override
627-
public void start() {}
674+
public void start() {
675+
id = -1;
676+
category = -1;
677+
}
628678

629679
@Override
630680
public void end() {}
631681

632-
Variant getCurrentVariant() {
633-
return variantConverter.getCurrentVariant();
682+
RowRecord getCurrentRecord() {
683+
return new RowRecord(id, category, variantConverter.getCurrentVariant());
634684
}
635685
}
636686
}

0 commit comments

Comments
 (0)