kli19 opened a new issue, #756:
URL: https://github.com/apache/arrow-go/issues/756

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   **Summary**
   
   The writeValues method in `parquet/file/column_writer_types.gen.go.tmpl` 
(starting in v18.5.2) silently drops small ByteArray values that precede a ≥1MB 
value within the same write batch. The page header retains the full value count 
from def/rep levels, but the encoded data stream is short, producing corrupt 
parquet files.
   
   **Impact**
   We write BSON documents to parquet using 
`ByteArrayColumnChunkWriter.WriteBatch`. After upgrading from v18.5.1 to 
v18.5.2, round-trip validation began failing. Readers (including ours, which is 
independent of `arrow-go`) misinterpret the truncated data stream. For example, 
a 65-byte string was read back as 2,483,541 bytes because the reader consumed 
bytes from subsequent values as a length prefix. _Reverting to v18.5.1 resolved 
the issue._
   
   **Root Cause**
   Relevant file: `parquet/file/column_writer_types.gen.go.tmpl`
   
   v18.5.1 had this one-liner:
   ```
     func (w *ByteArrayColumnChunkWriter) writeValues(values 
[]parquet.ByteArray, numNulls int64) {
         w.currentEncoder.(encoding.ByteArrayEncoder).Put(values)  // all 
values, one call
         ...
     }
   ```
   
   v18.5.2 replaced it with a loop that has two safety checks: a 1GB overflow 
check and a 1MB large-value check. The 1GB check is correct. The 1MB check has 
a bug. Here's the v18.5.2 code with annotations: 
   
   ```
     batchStart := 0
     for i := 0; i < len(values); i++ {
         valueSize := int64(len(values[i]))
    
         // 1GB CHECK — correctly flushes pending small values first
         if currentSize+valueSize >= maxSafeBufferSize {
             if i > batchStart {
                 encoder.Put(values[batchStart:i])  // correctly encodes 
pending batch
             }
             w.FlushCurrentPage()
             batchStart = i
             currentSize = 0
         }
    
         currentSize += valueSize + 4
    
         // 1MB CHECK — BUG: does NOT flush pending small values
         if valueSize >= largeValueThreshold {
             // MISSING: encoder.Put(values[batchStart:i])
             encoder.Put(values[i : i+1])  // only encodes the large value
             batchStart = i + 1            // skips everything from batchStart 
to i-1
             currentSize = w.currentEncoder.EstimatedDataEncodedSize()
         }
     }
    
     // encode whatever's left
     if batchStart < len(values) {
         encoder.Put(values[batchStart:])
     }
   ```
   
   Say we have 5 values: [10B, 20B, 30B, 2MB, 50B] and currentSize starts at 0.
   
   ```
   Iteration Breakdown:
   i = 0: valueSize = 10
       1GB check: 0 + 10 < 1GB --> skip
       Current Size: 14 (10 + 4)
       1MB check: 10 < 1MB --> skip
   
   i = 1: valueSize = 20
       1GB check: 14 + 20 < 1GB --> skip
       Current Size: 38 (14 + 24)
       1MB check: 20 < 1MB --> skip
   
   i = 2: valueSize = 30
       1GB check: 38 + 30 < 1GB --> skip
       Current Size: 72 (38 + 34)
       1MB check: 30 < 1MB --> skip
   
   i = 3: valueSize = 2MB
       1GB check: 72 + 2MB < 1GB --> skip
           --> THIS IS WHERE BUG HAPPENS
       Current Size: 2MB + 76
       1MB check: 2MB >= 1MB --> ENTER
           Action: encoder.Put(values[3:4]) --> Encodes only the 2MB value
           Batch Start: 4
           Current Size: encoder.EstimatedDataEncodedSize()
   
   i = 4: valueSize = 50
       1GB check: skip
       Current Size: currentSize += 54
       1MB check: skip
   
   End Condition:
       Final Check: batchStart = 4 < 5
           --> Action: encoder.Put(values[4:5])
       Key Values Not Encoded: values[0], [1], [2]
   ```
   
   **Suggested Fix**
   Flush the pending batch in the largeValueThreshold branch, matching what the 
maxSafeBufferSize branch already does:
   ```
     if valueSize >= largeValueThreshold {                                      
                                                                                
                                                       
         if i > batchStart {
             encoder.Put(values[batchStart:i])
         }
         encoder.Put(values[i : i+1])
         batchStart = i + 1                                                     
                                                                                
                                                       
         currentSize = w.currentEncoder.EstimatedDataEncodedSize()
     } 
   ```
   
   ### Component(s)
   
   Parquet


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to