Skip to content

Commit a9ffd5d

Browse files
author
thexia
committed
ORC-2619: Fix estimateRgEndOffset slop calculation for incompressible data
The stretchFactor calculation in estimateRgEndOffset did not account for the 2-byte RLEv2 DIRECT run header. This caused insufficient buffer allocation when data is incompressible, leading to 'Buffer size too small' errors. Fix: Include RLE_V2_HEADER_SIZE in the worst-case payload calculation. Add test demonstrating the issue with the old formula.
1 parent 3563ee5 commit a9ffd5d

2 files changed

Lines changed: 73 additions & 1 deletion

File tree

java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,9 @@ public static long estimateRgEndOffset(boolean isCompressed,
217217
// Stretch the slop by a factor to safely accommodate following compression blocks.
218218
// We need to calculate the maximum number of blocks(stretchFactor) by bufferSize accordingly.
219219
if (isCompressed) {
220-
int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / bufferSize;
220+
// RLEv2 DIRECT runs can need a 2-byte header in addition to their value payload.
221+
int maxRleDirectRunSize = MAX_VALUES_LENGTH * MAX_BYTE_WIDTH + 2;
222+
int stretchFactor = 2 + (maxRleDirectRunSize - 1) / bufferSize;
221223
slop = stretchFactor * (OutStream.HEADER_SIZE + bufferSize);
222224
}
223225
return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
@@ -300,6 +302,8 @@ public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
300302
// the maximum byte width for each value
301303
static final int MAX_BYTE_WIDTH =
302304
SerializationUtils.decodeBitWidth(SerializationUtils.FixedBitSizes.SIXTYFOUR.ordinal()) / 8;
305+
// RLEv2 DIRECT run header size in bytes
306+
public static final int RLE_V2_HEADER_SIZE = 2;
303307

304308
/**
305309
* Is this stream part of a dictionary?

java/core/src/test/org/apache/orc/impl/TestInStream.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,16 @@
3535
import java.util.ArrayList;
3636
import java.util.Arrays;
3737
import java.util.List;
38+
import java.util.Random;
3839

40+
import static org.apache.orc.impl.RecordReaderUtils.MAX_BYTE_WIDTH;
41+
import static org.apache.orc.impl.RecordReaderUtils.MAX_VALUES_LENGTH;
3942
import static org.junit.jupiter.api.Assertions.assertEquals;
4043
import static org.junit.jupiter.api.Assertions.assertNotEquals;
4144
import static org.junit.jupiter.api.Assertions.assertNotSame;
4245
import static org.junit.jupiter.api.Assertions.assertSame;
46+
import static org.junit.jupiter.api.Assertions.assertThrows;
47+
import static org.junit.jupiter.api.Assertions.assertTrue;
4348
import static org.junit.jupiter.api.Assertions.fail;
4449

4550
public class TestInStream {
@@ -1000,4 +1005,67 @@ public void testStreamResetWithoutIncreasedLength() throws IOException {
10001005
byte[] inBuffer = new byte[5];
10011006
assertEquals(5, inStream.read(inBuffer));
10021007
}
1008+
1009+
/**
1010+
* Demonstrates that the old estimateRgEndOffset slop calculation is insufficient.
1011+
* When a compressed stream is truncated at the old estimated end offset,
1012+
* reading a full RLE v2 DIRECT run fails because the estimated slop doesn't
1013+
* account for enough compressed blocks.
1014+
*/
1015+
@Test
1016+
public void testTruncatedRleV2DirectRunAtEstimatedEndFails() throws Exception {
1017+
final int bufferSize = 1024;
1018+
final int chunkSize = OutStream.HEADER_SIZE + bufferSize;
1019+
final int nextGroupOffset = bufferSize;
1020+
final int oldStretchFactor =
1021+
2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / bufferSize;
1022+
final int oldEstimatedEnd = nextGroupOffset + oldStretchFactor * chunkSize;
1023+
1024+
TestInStream.OutputCollector receiver = new TestInStream.OutputCollector();
1025+
CompressionCodec codec = new ZlibCodec();
1026+
StreamOptions streamOptions = new StreamOptions(bufferSize)
1027+
.withCodec(codec, codec.getDefaultOptions());
1028+
byte[] data = new byte[bufferSize * 6];
1029+
new Random(42).nextBytes(data);
1030+
try (OutStream out = new OutStream("test", streamOptions, receiver)) {
1031+
out.write(data);
1032+
out.flush();
1033+
}
1034+
1035+
byte[] encoded = receiver.buffer.get();
1036+
assertEquals(nextGroupOffset + 5 * chunkSize, oldEstimatedEnd);
1037+
assertTrue(encoded.length > oldEstimatedEnd);
1038+
1039+
InStream stream = InStream.create("test",
1040+
new BufferChunk(ByteBuffer.wrap(encoded, 0, oldEstimatedEnd), 0),
1041+
0, oldEstimatedEnd,
1042+
InStream.options().withCodec(codec).withBufferSize(bufferSize));
1043+
byte[] rleDirectRun = new byte[MAX_VALUES_LENGTH * MAX_BYTE_WIDTH
1044+
+ RecordReaderUtils.RLE_V2_HEADER_SIZE];
1045+
1046+
stream.seek(new SimplePositionProvider(nextGroupOffset, 0));
1047+
IllegalArgumentException error = assertThrows(
1048+
IllegalArgumentException.class, () -> {
1049+
int offset = 0;
1050+
while (offset < rleDirectRun.length) {
1051+
offset += stream.read(
1052+
rleDirectRun, offset, rleDirectRun.length - offset);
1053+
}
1054+
});
1055+
assertTrue(error.getMessage().contains("Buffer size too small"));
1056+
}
1057+
1058+
private static class SimplePositionProvider implements PositionProvider {
1059+
private final long[] positions;
1060+
private int index = 0;
1061+
1062+
SimplePositionProvider(long... positions) {
1063+
this.positions = positions;
1064+
}
1065+
1066+
@Override
1067+
public long getNext() {
1068+
return positions[index++];
1069+
}
1070+
}
10031071
}

0 commit comments

Comments
 (0)