1818 */
1919package org .apache .parquet .column .values .rle ;
2020
21- import java .io .DataInputStream ;
21+ import java .io .EOFException ;
2222import java .io .IOException ;
2323import java .io .InputStream ;
2424import org .apache .parquet .Preconditions ;
@@ -48,6 +48,8 @@ private static enum MODE {
4848 private int currentCount ;
4949 private int currentValue ;
5050 private int [] currentBuffer ;
51+ private int currentBufferLength ;
52+ private byte [] packedBytes ;
5153
5254 public RunLengthBitPackingHybridDecoder (int bitWidth , InputStream in ) {
5355 LOG .debug ("decoding bitWidth {}" , bitWidth );
@@ -69,7 +71,7 @@ public int readInt() throws IOException {
6971 result = currentValue ;
7072 break ;
7173 case PACKED :
72- result = currentBuffer [currentBuffer . length - 1 - currentCount ];
74+ result = currentBuffer [currentBufferLength - 1 - currentCount ];
7375 break ;
7476 default :
7577 throw new ParquetDecodingException ("not a valid mode " + mode );
@@ -90,21 +92,39 @@ private void readNext() throws IOException {
9092 case PACKED :
9193 int numGroups = header >>> 1 ;
9294 currentCount = numGroups * 8 ;
95+ currentBufferLength = currentCount ;
9396 LOG .debug ("reading {} values BIT PACKED" , currentCount );
94- currentBuffer = new int [currentCount ]; // TODO: reuse a buffer
95- byte [] bytes = new byte [numGroups * bitWidth ];
97+ if (currentBuffer == null || currentBuffer .length < currentCount ) {
98+ currentBuffer = new int [currentCount ];
99+ }
100+ int bytesNeeded = numGroups * bitWidth ;
101+ if (packedBytes == null || packedBytes .length < bytesNeeded ) {
102+ packedBytes = new byte [bytesNeeded ];
103+ }
96104 // At the end of the file RLE data though, there might not be that many bytes left.
97105 int bytesToRead = (int ) Math .ceil (currentCount * bitWidth / 8.0 );
98106 bytesToRead = Math .min (bytesToRead , in .available ());
99- new DataInputStream ( in ). readFully (bytes , 0 , bytesToRead );
107+ readFully (in , packedBytes , bytesToRead );
100108 for (int valueIndex = 0 , byteIndex = 0 ;
101109 valueIndex < currentCount ;
102110 valueIndex += 8 , byteIndex += bitWidth ) {
103- packer .unpack8Values (bytes , byteIndex , currentBuffer , valueIndex );
111+ packer .unpack8Values (packedBytes , byteIndex , currentBuffer , valueIndex );
104112 }
105113 break ;
106114 default :
107115 throw new ParquetDecodingException ("not a valid mode " + mode );
108116 }
109117 }
118+
119+ private static void readFully (InputStream in , byte [] buf , int len ) throws IOException {
120+ int offset = 0 ;
121+ while (offset < len ) {
122+ int read = in .read (buf , offset , len - offset );
123+ if (read < 0 ) {
124+ throw new EOFException (
125+ "Unexpected end of stream: still needed " + (len - offset ) + " bytes" );
126+ }
127+ offset += read ;
128+ }
129+ }
110130}
0 commit comments