Skip to content

Commit 076e787

Browse files
author
thexia
committed
ORC-2619: Fix estimateRgEndOffset slop calculation for incompressible data
The stretchFactor calculation in estimateRgEndOffset did not account for the 2-byte RLEv2 DIRECT run header. This caused insufficient buffer allocation when data is incompressible, leading to 'Buffer size too small' errors. Fix: Include RLE_V2_HEADER_SIZE in the worst-case payload calculation. Add test demonstrating the issue with the old formula.
1 parent 3563ee5 commit 076e787

4 files changed

Lines changed: 156 additions & 4 deletions

File tree

java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,9 @@ public static long estimateRgEndOffset(boolean isCompressed,
217217
// Stretch the slop by a factor to safely accommodate following compression blocks.
218218
// We need to calculate the maximum number of blocks(stretchFactor) by bufferSize accordingly.
219219
if (isCompressed) {
220-
int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / bufferSize;
220+
// RLEv2 DIRECT runs can need a 2-byte header in addition to their value payload.
221+
int maxRleDirectRunSize = MAX_VALUES_LENGTH * MAX_BYTE_WIDTH + 2;
222+
int stretchFactor = 2 + (maxRleDirectRunSize - 1) / bufferSize;
221223
slop = stretchFactor * (OutStream.HEADER_SIZE + bufferSize);
222224
}
223225
return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
@@ -300,6 +302,8 @@ public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
300302
// the maximum byte width for each value
301303
static final int MAX_BYTE_WIDTH =
302304
SerializationUtils.decodeBitWidth(SerializationUtils.FixedBitSizes.SIXTYFOUR.ordinal()) / 8;
305+
// RLEv2 DIRECT run header size in bytes
306+
public static final int RLE_V2_HEADER_SIZE = 2;
303307

304308
/**
305309
* Is this stream part of a dictionary?

java/core/src/test/org/apache/orc/impl/TestInStream.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,16 @@
3535
import java.util.ArrayList;
3636
import java.util.Arrays;
3737
import java.util.List;
38+
import java.util.Random;
3839

40+
import static org.apache.orc.impl.RecordReaderUtils.MAX_BYTE_WIDTH;
41+
import static org.apache.orc.impl.RecordReaderUtils.MAX_VALUES_LENGTH;
3942
import static org.junit.jupiter.api.Assertions.assertEquals;
4043
import static org.junit.jupiter.api.Assertions.assertNotEquals;
4144
import static org.junit.jupiter.api.Assertions.assertNotSame;
4245
import static org.junit.jupiter.api.Assertions.assertSame;
46+
import static org.junit.jupiter.api.Assertions.assertThrows;
47+
import static org.junit.jupiter.api.Assertions.assertTrue;
4348
import static org.junit.jupiter.api.Assertions.fail;
4449

4550
public class TestInStream {
@@ -1000,4 +1005,67 @@ public void testStreamResetWithoutIncreasedLength() throws IOException {
10001005
byte[] inBuffer = new byte[5];
10011006
assertEquals(5, inStream.read(inBuffer));
10021007
}
1008+
1009+
/**
1010+
* Demonstrates that the old estimateRgEndOffset slop calculation is insufficient.
1011+
* When a compressed stream is truncated at the old estimated end offset,
1012+
* reading a full RLE v2 DIRECT run fails because the estimated slop doesn't
1013+
* account for enough compressed blocks.
1014+
*/
1015+
@Test
1016+
public void testTruncatedRleV2DirectRunAtEstimatedEndFails() throws Exception {
1017+
final int bufferSize = 1024;
1018+
final int chunkSize = OutStream.HEADER_SIZE + bufferSize;
1019+
final int nextGroupOffset = bufferSize;
1020+
final int oldStretchFactor =
1021+
2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / bufferSize;
1022+
final int oldEstimatedEnd = nextGroupOffset + oldStretchFactor * chunkSize;
1023+
1024+
TestInStream.OutputCollector receiver = new TestInStream.OutputCollector();
1025+
CompressionCodec codec = new ZlibCodec();
1026+
StreamOptions streamOptions = new StreamOptions(bufferSize)
1027+
.withCodec(codec, codec.getDefaultOptions());
1028+
byte[] data = new byte[bufferSize * 6];
1029+
new Random(42).nextBytes(data);
1030+
try (OutStream out = new OutStream("test", streamOptions, receiver)) {
1031+
out.write(data);
1032+
out.flush();
1033+
}
1034+
1035+
byte[] encoded = receiver.buffer.get();
1036+
assertEquals(nextGroupOffset + 5 * chunkSize, oldEstimatedEnd);
1037+
assertTrue(encoded.length > oldEstimatedEnd);
1038+
1039+
InStream stream = InStream.create("test",
1040+
new BufferChunk(ByteBuffer.wrap(encoded, 0, oldEstimatedEnd), 0),
1041+
0, oldEstimatedEnd,
1042+
InStream.options().withCodec(codec).withBufferSize(bufferSize));
1043+
byte[] rleDirectRun = new byte[MAX_VALUES_LENGTH * MAX_BYTE_WIDTH
1044+
+ RecordReaderUtils.RLE_V2_HEADER_SIZE];
1045+
1046+
stream.seek(new SimplePositionProvider(nextGroupOffset, 0));
1047+
IllegalArgumentException error = assertThrows(
1048+
IllegalArgumentException.class, () -> {
1049+
int offset = 0;
1050+
while (offset < rleDirectRun.length) {
1051+
offset += stream.read(
1052+
rleDirectRun, offset, rleDirectRun.length - offset);
1053+
}
1054+
});
1055+
assertTrue(error.getMessage().contains("Buffer size too small"));
1056+
}
1057+
1058+
private static class SimplePositionProvider implements PositionProvider {
1059+
private final long[] positions;
1060+
private int index = 0;
1061+
1062+
SimplePositionProvider(long... positions) {
1063+
this.positions = positions;
1064+
}
1065+
1066+
@Override
1067+
public long getNext() {
1068+
return positions[index++];
1069+
}
1070+
}
10031071
}

java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
import org.apache.hadoop.fs.FileSystem;
2323
import org.apache.hadoop.fs.Path;
2424
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
25+
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
2526
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
27+
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
28+
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
29+
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
2630
import org.apache.orc.CompressionKind;
2731
import org.apache.orc.OrcConf;
2832
import org.apache.orc.OrcFile;
@@ -159,6 +163,83 @@ public void testConfigMaxChunkLimit() throws IOException {
159163
assertEquals(1000, ((RecordReaderImpl) recordReader).getMaxDiskRangeChunkLimit());
160164
}
161165

166+
@Test
167+
public void testRleV2DirectSeekAtBufferBoundaryWithSkippedEndRowGroup()
168+
throws IOException {
169+
final int bufferSize = 1024;
170+
final int rowIndexStride = 512 * 512 + 1;
171+
final int rowGroupCount = 3;
172+
final int selectedRowGroup = 1;
173+
TypeDescription schema = TypeDescription.createStruct()
174+
.addField("rg", TypeDescription.createLong())
175+
.addField("value", TypeDescription.createLong());
176+
177+
writeRleV2BoundaryFile(schema, bufferSize, rowIndexStride, rowGroupCount);
178+
179+
Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
180+
assertEquals(CompressionKind.ZLIB, reader.getCompressionKind());
181+
assertEquals(bufferSize, reader.getCompressionSize());
182+
assertEquals(1, reader.getStripes().size());
183+
184+
SearchArgument sarg = SearchArgumentFactory.newBuilder()
185+
.equals("rg", PredicateLeaf.Type.LONG, (long) selectedRowGroup)
186+
.build();
187+
Reader.Options options = reader.options()
188+
.searchArgument(sarg, new String[] {"rg"})
189+
.useSelected(true)
190+
.allowSARGToFilter(true);
191+
192+
long rowsRead = 0;
193+
VectorizedRowBatch batch = schema.createRowBatch();
194+
try (RecordReader rows = reader.rows(options)) {
195+
while (rows.nextBatch(batch)) {
196+
rowsRead += validateSelectedRowGroup(batch, selectedRowGroup);
197+
}
198+
}
199+
assertEquals(rowIndexStride, rowsRead);
200+
}
201+
202+
private void writeRleV2BoundaryFile(TypeDescription schema, int bufferSize, int rowIndexStride, int rowGroupCount)
203+
throws IOException {
204+
try (Writer writer = OrcFile.createWriter(testFilePath,
205+
OrcFile.writerOptions(conf)
206+
.fileSystem(fs)
207+
.setSchema(schema)
208+
.compress(CompressionKind.ZLIB)
209+
.enforceBufferSize()
210+
.bufferSize(bufferSize)
211+
.stripeSize(100_000_000)
212+
.rowIndexStride(rowIndexStride))) {
213+
VectorizedRowBatch batch = schema.createRowBatch();
214+
LongColumnVector rg = (LongColumnVector) batch.cols[0];
215+
LongColumnVector value = (LongColumnVector) batch.cols[1];
216+
Random random = new Random(42);
217+
for (int row = 0; row < rowIndexStride * rowGroupCount; ++row) {
218+
rg.vector[batch.size] = row / rowIndexStride;
219+
value.vector[batch.size] = random.nextLong();
220+
batch.size += 1;
221+
if (batch.size == batch.getMaxSize()) {
222+
writer.addRowBatch(batch);
223+
batch.reset();
224+
}
225+
}
226+
if (batch.size != 0) {
227+
writer.addRowBatch(batch);
228+
}
229+
}
230+
}
231+
232+
private long validateSelectedRowGroup(VectorizedRowBatch batch, int selectedRowGroup) {
233+
LongColumnVector rg = (LongColumnVector) batch.cols[0];
234+
long selectedRows = 0;
235+
for (int i = 0; i < batch.size; ++i) {
236+
int row = batch.selectedInUse ? batch.selected[i] : i;
237+
assertEquals(selectedRowGroup, rg.vector[row]);
238+
selectedRows += 1;
239+
}
240+
return selectedRows;
241+
}
242+
162243
@Test
163244
public void testStringDirectGreaterThan2GB() throws IOException {
164245
final Runtime rt = Runtime.getRuntime();

java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,8 @@
8585
import java.util.List;
8686
import java.util.TimeZone;
8787

88-
import static org.apache.orc.impl.RecordReaderUtils.MAX_BYTE_WIDTH;
89-
import static org.apache.orc.impl.RecordReaderUtils.MAX_VALUES_LENGTH;
9088
import static org.apache.orc.OrcFile.CURRENT_WRITER;
89+
import static org.apache.orc.impl.RecordReaderUtils.*;
9190
import static org.junit.jupiter.api.Assertions.assertEquals;
9291
import static org.junit.jupiter.api.Assertions.assertFalse;
9392
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -1544,7 +1543,7 @@ public void testPartialPlanCompressed() throws Exception {
15441543
new InStream.StreamOptions()
15451544
.withCodec(OrcCodecPool.getCodec(CompressionKind.ZLIB))
15461545
.withBufferSize(1024);
1547-
int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / options.getBufferSize();
1546+
int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH + RLE_V2_HEADER_SIZE - 1) / options.getBufferSize();
15481547
final int SLOP = stretchFactor * (OutStream.HEADER_SIZE + options.getBufferSize());
15491548
MockDataReader dataReader = new MockDataReader(schema, options)
15501549
.addStream(1, OrcProto.Stream.Kind.ROW_INDEX,

0 commit comments

Comments
 (0)