m8928 opened a new issue, #1116:
URL: https://github.com/apache/arrow-java/issues/1116

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   ### Describe the bug
   
     We observed Arrow IPC streams written by Arrow Java where a 
ZSTD-compressed buffer has an invalid 8-byte uncompressed-length prefix.
   
     The compressed payload itself appears to be a valid ZSTD frame, but the 
Arrow IPC prefix is 0, while the ZSTD frame reports a non-zero decompressed 
content size.
   
     This makes downstream readers fail. In our case ClickHouse 26.1 fails with:
   
     Error while reading batch of Arrow data:
     IOError: ZSTD decompression failed: Destination buffer is too small
   
     The same file also fails when loaded with Arrow Java ArrowStreamReader.
   
     This has been observed with both:
   
     - Arrow Java 18.3.0 + zstd-jni 1.5.7-6
     - Arrow Java 19.0.0 + zstd-jni 1.5.7-7
   
     ### Environment
   
     Observed with both dependency sets:
   
     1. Original environment
         - Apache Arrow Java: 18.3.0
         - org.apache.arrow:arrow-vector:18.3.0
         - org.apache.arrow:arrow-memory-netty:18.3.0
         - org.apache.arrow:arrow-compression:18.3.0
         - com.github.luben:zstd-jni:1.5.7-6
     2. Also reproduced with
         - Apache Arrow Java: 19.0.0
         - org.apache.arrow:arrow-vector:19.0.0
         - org.apache.arrow:arrow-memory-netty:19.0.0
         - org.apache.arrow:arrow-compression:19.0.0
         - com.github.luben:zstd-jni:1.5.7-7
   
     Other context:
   
     - Java: 17
     - Writer API:
         - ArrowStreamWriter
         - CommonsCompressionFactory.INSTANCE
         - CompressionUtil.CodecType.ZSTD
   
     Writer creation:
   
     new ArrowStreamWriter(
         root,
         null,
         channel,
         IpcOption.DEFAULT,
         CommonsCompressionFactory.INSTANCE,
         CompressionUtil.CodecType.ZSTD
     );
   
     ### What happened
   
     One generated Arrow IPC stream contains 44 record batches and a valid end 
marker. The total row count from record batch metadata matches the sidecar 
count file.
   
     However, batch 21 fails to load.
   
     Inspection results:
   
     file size: 26090728 bytes
     schema fields: 504
     record batches: 44
     batch metadata row sum: 92936
     sidecar count: 92936
   
     The failure occurs on batch 21.
   
     batch=21
     rows=3469
     field[446]=update_date type=Timestamp(MILLISECOND, null)
     node length=3469
     node nullCount=3469
   
     For this Timestamp(MILLISECOND) vector, the data buffer should decompress 
to:
   
     3469 rows * 8 bytes = 27752 bytes
   
     But the compressed buffer contains:
   
     buffer[1264]
     compressedBytes=27
     declaredUncompressed=0
     zstdContentSize=27752
   
     So the Arrow IPC compressed buffer prefix says the uncompressed length is 
0, but the ZSTD frame itself reports 27752.
   
     Neighboring batches for the same buffer look correct:
   
     batch=20 buffer[1264]
     declaredUncompressed=22552
     zstdContentSize=22552
   
     batch=22 buffer[1264]
     declaredUncompressed=20672
     zstdContentSize=20672
   
     ### Expected behavior
   
     The 8-byte Arrow IPC compressed buffer prefix should match the 
uncompressed length of the compressed payload.
   
     For the failing buffer, it should have been:
   
     declaredUncompressed=27752
   
     ### Actual behavior
   
     The prefix is written as:
   
     declaredUncompressed=0
   
     while the ZSTD frame content size is:
   
     zstdContentSize=27752
   
     This causes readers to allocate a zero-sized or too-small destination 
buffer, then fail during ZSTD decompression.
   
     ### Arrow Java reader failure
   
     Loading the file with Arrow Java fails at batch 21:
   
     failedAfterBatches=20
     rowsBeforeFailure=39866
     bytesReadBeforeFailure=12083504
   
     java.lang.IndexOutOfBoundsException:
     index: 0, length: 512 (expected: range(0, 504))
         at org.apache.arrow.memory.ArrowBuf.checkIndex(ArrowBuf.java:690)
         at org.apache.arrow.memory.ArrowBuf.setBytes(ArrowBuf.java:942)
         at 
org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:443)
         at 
org.apache.arrow.vector.BaseFixedWidthVector.setValueCount(BaseFixedWidthVector.java:764)
         at 
org.apache.arrow.vector.VectorSchemaRoot.setRowCount(VectorSchemaRoot.java:247)
         at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:90)
         at 
org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:213)
         at 
org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:161)
   
     ### ClickHouse reader failure
   
     ClickHouse fails on the same Arrow stream:
   
     Code: 33. DB::Exception:
     Error while reading batch of Arrow data:
     IOError: ZSTD decompression failed: Destination buffer is too small:
     While executing WaitForAsyncInsert. (CANNOT_READ_ALL_DATA)
   
     ### Diagnostic code used
   
     The following kind of check was used to inspect the failing compressed 
buffer:
   
     long declared = ByteBuffer.wrap(bytes, 0, 8)
         .order(ByteOrder.LITTLE_ENDIAN)
         .getLong();
   
     byte[] frame = Arrays.copyOfRange(bytes, 8, len);
     long zstdContentSize = Zstd.decompressedSize(frame);
   
     System.out.println("declaredUncompressed=" + declared);
     System.out.println("zstdContentSize=" + zstdContentSize);
   
     For the failing buffer:
   
     declaredUncompressed=0
     zstdContentSize=27752
   
     ### Notes
   
     We have not yet produced a small deterministic reproducer. The issue was 
found in production-generated Arrow IPC streams with a wide schema and repeated 
batch writes using the same VectorSchemaRoot.
   
     The writer pattern is:
   
     for each batch:
         root.clear();
         for (FieldVector vector : root.getFieldVectors()) {
             vector.allocateNew();
         }
         root.setRowCount(batchSize);
   
         // populate vectors
   
         for (FieldVector vector : vectors) {
             vector.setValueCount(batchSize);
         }
   
         writer.writeBatch();
   
     The failing column is a nullable Timestamp(MILLISECOND) vector where the 
entire failing batch has nulls for that column.
   
     A minimal test with only one nullable timestamp column and all-null values 
did not reproduce the issue, so this may require a wider schema, vector reuse, 
or a particular buffer lifecycle pattern.
   
     ### Additional reproduction note
   
     We also tested with Apache Arrow Java 19.0.0 and zstd-jni 1.5.7-7, and the 
same class of corruption still occurs: the ZSTD frame has a non-zero 
decompressed content size, but the Arrow IPC compressed buffer
     prefix is written as 0.
   
     This suggests the issue is not limited to Arrow Java 18.3.0 or zstd-jni 
1.5.7-6.
   
     ### Workaround
   
     We worked around this locally by replacing the default ZSTD codec with a 
custom codec that captures input.writerIndex() before compression, writes that 
value into the first 8 bytes of the compressed buffer,
     and verifies the prefix immediately after writing.
   
     This avoids producing a buffer where the ZSTD frame content size and Arrow 
IPC declared uncompressed length diverge.
   
     ### Questions
   
     - Is this a known issue in Arrow Java IPC ZSTD compression around 
AbstractCompressionCodec.compress() / ZstdCompressionCodec.doCompress()?
     - Could uncompressedBuffer.writerIndex() become stale or zero between 
doCompress() and writeUncompressedLength() in the parent codec?
     - Are there known issues with reusing VectorSchemaRoot across many 
compressed IPC batches in Arrow Java 18.3.0 or 19.0.0?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to