Skip to content

Commit f1b823a

Browse files
committed
Add parameterized forward index compression codecs
1 parent 0c6521a commit f1b823a

33 files changed

Lines changed: 915 additions & 88 deletions

File tree

pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import org.apache.pinot.common.tier.TierFactory;
3030
import org.apache.pinot.spi.config.table.CompletionConfig;
31+
import org.apache.pinot.spi.config.table.CompressionCodecSpec;
3132
import org.apache.pinot.spi.config.table.DedupConfig;
3233
import org.apache.pinot.spi.config.table.FieldConfig;
3334
import org.apache.pinot.spi.config.table.HashFunction;
@@ -240,7 +241,9 @@ public void testSerDe()
240241
Lists.newArrayList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null, properties),
241242
new FieldConfig("column2", null, Collections.emptyList(), null, null),
242243
new FieldConfig("column3", FieldConfig.EncodingType.RAW, Collections.emptyList(),
243-
FieldConfig.CompressionCodec.SNAPPY, null));
244+
FieldConfig.CompressionCodec.SNAPPY, null),
245+
new FieldConfig.Builder("column4").withEncodingType(FieldConfig.EncodingType.RAW)
246+
.withCompressionCodecSpec(CompressionCodecSpec.fromString("zstd(3)")).build());
244247
TableConfig tableConfig = tableConfigBuilder.setFieldConfigList(fieldConfigList).build();
245248

246249
checkFieldConfig(tableConfig);
@@ -546,7 +549,7 @@ private void checkInstanceAssignmentConfig(TableConfig tableConfig) {
546549
private void checkFieldConfig(TableConfig tableConfig) {
547550
List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
548551
assertNotNull(fieldConfigList);
549-
assertEquals(fieldConfigList.size(), 3);
552+
assertEquals(fieldConfigList.size(), 4);
550553

551554
FieldConfig firstFieldConfig = fieldConfigList.get(0);
552555
assertEquals(firstFieldConfig.getName(), "column1");
@@ -571,6 +574,14 @@ private void checkFieldConfig(TableConfig tableConfig) {
571574
assertNull(thirdFieldConfig.getIndexType());
572575
assertEquals(thirdFieldConfig.getCompressionCodec(), FieldConfig.CompressionCodec.SNAPPY);
573576
assertNull(thirdFieldConfig.getProperties());
577+
578+
FieldConfig fourthFieldConfig = fieldConfigList.get(3);
579+
assertEquals(fourthFieldConfig.getName(), "column4");
580+
assertEquals(fourthFieldConfig.getEncodingType(), FieldConfig.EncodingType.RAW);
581+
assertEquals(fourthFieldConfig.getCompressionCodec(), FieldConfig.CompressionCodec.ZSTANDARD);
582+
assertEquals(fourthFieldConfig.getCompressionCodecSpec(), CompressionCodecSpec.of(
583+
FieldConfig.CompressionCodec.ZSTANDARD, 3));
584+
assertNull(fourthFieldConfig.getProperties());
574585
}
575586

576587
private void checkTableConfigWithUpsertConfig(TableConfig tableConfig) {

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.segment.local.io.compression;
2020

21+
import javax.annotation.Nullable;
2122
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
2223
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
2324
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
@@ -39,7 +40,7 @@ private ChunkCompressorFactory() {
3940
* @return Compressor for the specified type.
4041
*/
4142
public static ChunkCompressor getCompressor(ChunkCompressionType compressionType) {
42-
return getCompressor(compressionType, false);
43+
return getCompressor(compressionType, null, false);
4344
}
4445

4546
/**
@@ -51,30 +52,49 @@ public static ChunkCompressor getCompressor(ChunkCompressionType compressionType
5152
* @return Compressor for the specified type.
5253
*/
5354
public static ChunkCompressor getCompressor(ChunkCompressionType compressionType, boolean upgradeToLengthPrefixed) {
55+
return getCompressor(compressionType, null, upgradeToLengthPrefixed);
56+
}
57+
58+
/**
59+
* Returns the chunk compressor for the specified name and optional compression level.
60+
*/
61+
public static ChunkCompressor getCompressor(ChunkCompressionType compressionType, @Nullable Integer compressionLevel,
62+
boolean upgradeToLengthPrefixed) {
5463
switch (compressionType) {
5564

5665
case PASS_THROUGH:
66+
validateUnsupportedLevel(compressionType, compressionLevel);
5767
return PassThroughCompressor.INSTANCE;
5868

5969
case SNAPPY:
70+
validateUnsupportedLevel(compressionType, compressionLevel);
6071
return SnappyCompressor.INSTANCE;
6172

6273
case ZSTANDARD:
63-
return ZstandardCompressor.INSTANCE;
74+
return compressionLevel != null ? new ZstandardCompressor(compressionLevel) : ZstandardCompressor.INSTANCE;
6475

6576
case LZ4:
77+
if (compressionLevel != null) {
78+
return upgradeToLengthPrefixed ? new LZ4WithLengthCompressor(compressionLevel)
79+
: new LZ4Compressor(compressionLevel);
80+
}
6681
return upgradeToLengthPrefixed ? LZ4WithLengthCompressor.INSTANCE : LZ4Compressor.INSTANCE;
6782

6883
case LZ4_LENGTH_PREFIXED:
84+
if (compressionLevel != null) {
85+
return new LZ4WithLengthCompressor(compressionLevel);
86+
}
6987
return LZ4WithLengthCompressor.INSTANCE;
7088

7189
case GZIP:
72-
return new GzipCompressor();
90+
return compressionLevel != null ? new GzipCompressor(compressionLevel) : new GzipCompressor();
7391

7492
case DELTA:
93+
validateUnsupportedLevel(compressionType, compressionLevel);
7594
return DeltaCompressor.INSTANCE;
7695

7796
case DELTADELTA:
97+
validateUnsupportedLevel(compressionType, compressionLevel);
7898
return DeltaDeltaCompressor.INSTANCE;
7999

80100
default:
@@ -118,4 +138,12 @@ public static ChunkDecompressor getDecompressor(ChunkCompressionType compression
118138
throw new IllegalArgumentException("Illegal decompressor name " + compressionType);
119139
}
120140
}
141+
142+
private static void validateUnsupportedLevel(ChunkCompressionType compressionType,
143+
@Nullable Integer compressionLevel) {
144+
if (compressionLevel != null) {
145+
throw new IllegalArgumentException(
146+
String.format("Compression type %s does not support an explicit compression level", compressionType));
147+
}
148+
}
121149
}

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipCompressor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ public GzipCompressor() {
3636
_compressor = new Deflater();
3737
}
3838

39+
public GzipCompressor(int compressionLevel) {
40+
_compressor = new Deflater(compressionLevel);
41+
}
42+
3943
@Override
4044
public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
4145
throws IOException {

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,20 @@ class LZ4Compressor implements ChunkCompressor {
3535

3636
static final LZ4Compressor INSTANCE = new LZ4Compressor();
3737

38+
private final net.jpountz.lz4.LZ4Compressor _compressor;
39+
3840
private LZ4Compressor() {
41+
_compressor = LZ4_FACTORY.fastCompressor();
42+
}
43+
44+
LZ4Compressor(int compressionLevel) {
45+
_compressor = LZ4_FACTORY.highCompressor(compressionLevel);
3946
}
4047

4148
@Override
4249
public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
4350
throws IOException {
44-
LZ4_FACTORY.fastCompressor().compress(inUncompressed, outCompressed);
51+
_compressor.compress(inUncompressed, outCompressed);
4552
// When the compress method returns successfully,
4653
// dstBuf's position() will be set to its current position() plus the compressed size of the data.
4754
// and srcBuf's position() will be set to its limit()
@@ -52,7 +59,7 @@ public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
5259

5360
@Override
5461
public int maxCompressedSize(int uncompressedSize) {
55-
return LZ4_FACTORY.fastCompressor().maxCompressedLength(uncompressedSize);
62+
return _compressor.maxCompressedLength(uncompressedSize);
5663
}
5764

5865
@Override

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthCompressor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ private LZ4WithLengthCompressor() {
3939
_compressor = new LZ4CompressorWithLength(LZ4Compressor.LZ4_FACTORY.fastCompressor());
4040
}
4141

42+
LZ4WithLengthCompressor(int compressionLevel) {
43+
_compressor = new LZ4CompressorWithLength(LZ4Compressor.LZ4_FACTORY.highCompressor(compressionLevel));
44+
}
45+
4246
@Override
4347
public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
4448
throws IOException {

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,20 @@ class ZstandardCompressor implements ChunkCompressor {
3333

3434
static final ZstandardCompressor INSTANCE = new ZstandardCompressor();
3535

36+
private final int _compressionLevel;
37+
3638
private ZstandardCompressor() {
39+
_compressionLevel = Zstd.defaultCompressionLevel();
40+
}
41+
42+
ZstandardCompressor(int compressionLevel) {
43+
_compressionLevel = compressionLevel;
3744
}
3845

3946
@Override
4047
public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
4148
throws IOException {
42-
int compressedSize = Zstd.compress(outCompressed, inUncompressed);
49+
int compressedSize = Zstd.compress(outCompressed, inUncompressed, _compressionLevel);
4350
// When the compress method returns successfully,
4451
// dstBuf's position() will be set to its current position() plus the compressed size of the data.
4552
// and srcBuf's position() will be set to its limit()

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.RandomAccessFile;
2626
import java.nio.ByteBuffer;
2727
import java.nio.channels.FileChannel;
28+
import javax.annotation.Nullable;
2829
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
2930
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
3031
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
@@ -88,11 +89,18 @@ public abstract class BaseChunkForwardIndexWriter implements Closeable {
8889
protected BaseChunkForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs,
8990
int numDocsPerChunk, long chunkSize, int sizeOfEntry, int version, boolean fixed)
9091
throws IOException {
92+
this(file, compressionType, null, totalDocs, numDocsPerChunk, chunkSize, sizeOfEntry, version, fixed);
93+
}
94+
95+
protected BaseChunkForwardIndexWriter(File file, ChunkCompressionType compressionType,
96+
@Nullable Integer compressionLevel, int totalDocs, int numDocsPerChunk, long chunkSize, int sizeOfEntry,
97+
int version, boolean fixed)
98+
throws IOException {
9199
Preconditions.checkArgument(version == 2 || version == 3 || (fixed && (version == 4 || version == 5)),
92100
"Illegal version: %s for %s bytes values", version, fixed ? "fixed" : "variable");
93101
Preconditions.checkArgument(chunkSize <= Integer.MAX_VALUE, "Chunk size limited to 2GB");
94102
_chunkSize = (int) chunkSize;
95-
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType);
103+
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType, compressionLevel, false);
96104
_headerEntryChunkOffsetSize = version == 2 ? Integer.BYTES : Long.BYTES;
97105
_dataOffset = writeHeader(compressionType, totalDocs, numDocsPerChunk, sizeOfEntry, version);
98106
_chunkBuffer = ByteBuffer.allocateDirect(_chunkSize);

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.File;
2222
import java.io.FileNotFoundException;
2323
import java.io.IOException;
24+
import javax.annotation.Nullable;
2425
import javax.annotation.concurrent.NotThreadSafe;
2526
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
2627

@@ -48,7 +49,13 @@ public class FixedByteChunkForwardIndexWriter extends BaseChunkForwardIndexWrite
4849
public FixedByteChunkForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs,
4950
int numDocsPerChunk, int sizeOfEntry, int writerVersion)
5051
throws IOException {
51-
super(file, compressionType, totalDocs, normalizeDocsPerChunk(writerVersion, numDocsPerChunk),
52+
this(file, compressionType, null, totalDocs, numDocsPerChunk, sizeOfEntry, writerVersion);
53+
}
54+
55+
public FixedByteChunkForwardIndexWriter(File file, ChunkCompressionType compressionType,
56+
@Nullable Integer compressionLevel, int totalDocs, int numDocsPerChunk, int sizeOfEntry, int writerVersion)
57+
throws IOException {
58+
super(file, compressionType, compressionLevel, totalDocs, normalizeDocsPerChunk(writerVersion, numDocsPerChunk),
5259
(long) sizeOfEntry * normalizeDocsPerChunk(writerVersion, numDocsPerChunk), sizeOfEntry, writerVersion, true);
5360
_chunkDataOffset = 0;
5461
}

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.FileNotFoundException;
2323
import java.io.IOException;
2424
import java.math.BigDecimal;
25+
import javax.annotation.Nullable;
2526
import javax.annotation.concurrent.NotThreadSafe;
2627
import org.apache.pinot.segment.local.utils.ArraySerDeUtils;
2728
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
@@ -66,7 +67,14 @@ public class VarByteChunkForwardIndexWriter extends BaseChunkForwardIndexWriter
6667
public VarByteChunkForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs,
6768
int numDocsPerChunk, int lengthOfLongestEntry, int writerVersion)
6869
throws IOException {
69-
super(file, compressionType, totalDocs, numDocsPerChunk,
70+
this(file, compressionType, null, totalDocs, numDocsPerChunk, lengthOfLongestEntry, writerVersion);
71+
}
72+
73+
public VarByteChunkForwardIndexWriter(File file, ChunkCompressionType compressionType,
74+
@Nullable Integer compressionLevel, int totalDocs, int numDocsPerChunk, int lengthOfLongestEntry,
75+
int writerVersion)
76+
throws IOException {
77+
super(file, compressionType, compressionLevel, totalDocs, numDocsPerChunk,
7078
numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + (long) lengthOfLongestEntry),
7179
// chunkSize
7280
lengthOfLongestEntry, writerVersion, false);

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.nio.ByteOrder;
2828
import java.nio.channels.FileChannel;
2929
import java.nio.charset.StandardCharsets;
30+
import javax.annotation.Nullable;
3031
import javax.annotation.concurrent.NotThreadSafe;
3132
import org.apache.commons.io.FileUtils;
3233
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
@@ -95,10 +96,16 @@ public class VarByteChunkForwardIndexWriterV4 implements VarByteChunkWriter {
9596

9697
public VarByteChunkForwardIndexWriterV4(File file, ChunkCompressionType compressionType, int chunkSize)
9798
throws IOException {
99+
this(file, compressionType, null, chunkSize);
100+
}
101+
102+
public VarByteChunkForwardIndexWriterV4(File file, ChunkCompressionType compressionType,
103+
@Nullable Integer compressionLevel, int chunkSize)
104+
throws IOException {
98105
_dataBuffer = new File(file.getParentFile(), file.getName() + DATA_BUFFER_SUFFIX);
99106
_output = new RandomAccessFile(file, "rw");
100107
_dataChannel = new RandomAccessFile(_dataBuffer, "rw").getChannel();
101-
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType, true);
108+
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType, compressionLevel, true);
102109
_chunkBuffer = ByteBuffer.allocateDirect(chunkSize).order(ByteOrder.LITTLE_ENDIAN);
103110
_compressionBuffer =
104111
ByteBuffer.allocateDirect(_chunkCompressor.maxCompressedSize(chunkSize)).order(ByteOrder.LITTLE_ENDIAN);

0 commit comments

Comments
 (0)