This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new b0f6e2c2 fix(parquet/file): regression with decompressing data (#652)
b0f6e2c2 is described below

commit b0f6e2c2a92d5937be9bf9fd630d94b704baaaab
Author: Matt Topol <[email protected]>
AuthorDate: Wed Jan 28 10:39:32 2026 -0500

    fix(parquet/file): regression with decompressing data (#652)
    
    ### Rationale for this change
    fixes #619
    
    ### What changes are included in this PR?
    Uses the correct buffer during decompression
    
    ### Are these changes tested?
    A test case was added to confirm the issue
    
    ### Are there any user-facing changes?
    only the bug fix
---
 parquet/file/file_writer_test.go | 116 +++++++++++++++++++++++++++++++++++++++
 parquet/file/page_reader.go      |  25 ++++++---
 2 files changed, 133 insertions(+), 8 deletions(-)

diff --git a/parquet/file/file_writer_test.go b/parquet/file/file_writer_test.go
index b098c54d..5997b107 100644
--- a/parquet/file/file_writer_test.go
+++ b/parquet/file/file_writer_test.go
@@ -1198,3 +1198,119 @@ func TestWriteBloomFilters(t *testing.T) {
        assert.False(t, byteArrayFilter.Check(parquet.ByteArray("bar")))
        assert.False(t, byteArrayFilter.Check(parquet.ByteArray("baz")))
 }
+
+// TestBufferedStreamDictionaryCompressed tests the fix for issue #619
+// where BufferedStreamEnabled=true with dictionary encoding and compression
+// caused "dict spaced eof exception" and "snappy: corrupt input" errors.
+// This was due to a bug in the decompress() method that read from the wrong 
buffer.
+func TestBufferedStreamDictionaryCompressed(t *testing.T) {
+       // Create schema with a string column that will use dictionary encoding
+       fields := schema.FieldList{
+               schema.NewByteArrayNode("dict_col", 
parquet.Repetitions.Required, -1),
+               schema.NewInt32Node("value_col", parquet.Repetitions.Required, 
-1),
+       }
+       sc, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, 
fields, -1)
+       require.NoError(t, err)
+
+       sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+       defer sink.Release()
+
+       // Write with dictionary encoding and Snappy compression
+       props := parquet.NewWriterProperties(
+               parquet.WithDictionaryDefault(true),
+               parquet.WithCompression(compress.Codecs.Snappy),
+       )
+
+       writer := file.NewParquetWriter(sink, sc, file.WithWriterProps(props))
+       rgWriter := writer.AppendBufferedRowGroup()
+
+       // Write dictionary column with repeated values to trigger dictionary 
encoding
+       dictCol, err := rgWriter.Column(0)
+       require.NoError(t, err)
+       dictWriter := dictCol.(*file.ByteArrayColumnChunkWriter)
+
+       const numValues = 1000
+       dictValues := make([]parquet.ByteArray, numValues)
+       for i := 0; i < numValues; i++ {
+               // Use only 10 unique values to ensure dictionary encoding is 
used
+               dictValues[i] = parquet.ByteArray(fmt.Sprintf("value_%d", i%10))
+       }
+       _, err = dictWriter.WriteBatch(dictValues, nil, nil)
+       require.NoError(t, err)
+
+       // Write value column
+       valueCol, err := rgWriter.Column(1)
+       require.NoError(t, err)
+       valueWriter := valueCol.(*file.Int32ColumnChunkWriter)
+
+       values := make([]int32, numValues)
+       for i := 0; i < numValues; i++ {
+               values[i] = int32(i)
+       }
+       _, err = valueWriter.WriteBatch(values, nil, nil)
+       require.NoError(t, err)
+
+       require.NoError(t, rgWriter.Close())
+       require.NoError(t, writer.Close())
+
+       buffer := sink.Finish()
+       defer buffer.Release()
+
+       // Verify dictionary page was written
+       reader, err := file.NewParquetReader(bytes.NewReader(buffer.Bytes()))
+       require.NoError(t, err)
+       defer reader.Close()
+
+       rgReader := reader.RowGroup(0)
+       chunk, err := rgReader.MetaData().ColumnChunk(0)
+       require.NoError(t, err)
+       assert.True(t, chunk.HasDictionaryPage(), "Expected dictionary page to 
be written")
+
+       // Now read with BufferedStreamEnabled=true (the issue #619 condition)
+       readProps := parquet.NewReaderProperties(memory.DefaultAllocator)
+       readProps.BufferSize = 1024
+       readProps.BufferedStreamEnabled = true
+
+       bufferedReader, err := 
file.NewParquetReader(bytes.NewReader(buffer.Bytes()), 
file.WithReadProps(readProps))
+       require.NoError(t, err)
+       defer bufferedReader.Close()
+
+       // Read the data back
+       bufferedRgReader := bufferedReader.RowGroup(0)
+       assert.EqualValues(t, numValues, bufferedRgReader.NumRows())
+
+       // Read dictionary column
+       dictColReader, err := bufferedRgReader.Column(0)
+       require.NoError(t, err)
+       dictChunkReader := dictColReader.(*file.ByteArrayColumnChunkReader)
+
+       readDictValues := make([]parquet.ByteArray, numValues)
+       defLevels := make([]int16, numValues)
+       repLevels := make([]int16, numValues)
+
+       total, valuesRead, err := dictChunkReader.ReadBatch(int64(numValues), 
readDictValues, defLevels, repLevels)
+       require.NoError(t, err, "Should not get 'dict spaced eof exception' or 
'snappy: corrupt input'")
+       assert.EqualValues(t, numValues, total)
+       assert.EqualValues(t, numValues, valuesRead)
+
+       // Verify the data is correct
+       for i := 0; i < numValues; i++ {
+               expected := parquet.ByteArray(fmt.Sprintf("value_%d", i%10))
+               assert.Equal(t, expected, readDictValues[i], "Value mismatch at 
index %d", i)
+       }
+
+       // Read value column to ensure it also works
+       valueColReader, err := bufferedRgReader.Column(1)
+       require.NoError(t, err)
+       valueChunkReader := valueColReader.(*file.Int32ColumnChunkReader)
+
+       readValues := make([]int32, numValues)
+       total, valuesRead, err = valueChunkReader.ReadBatch(int64(numValues), 
readValues, defLevels, repLevels)
+       require.NoError(t, err)
+       assert.EqualValues(t, numValues, total)
+       assert.EqualValues(t, numValues, valuesRead)
+
+       for i := 0; i < numValues; i++ {
+               assert.Equal(t, int32(i), readValues[i])
+       }
+}
diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go
index 307c6c6a..903f2ec0 100644
--- a/parquet/file/page_reader.go
+++ b/parquet/file/page_reader.go
@@ -17,7 +17,6 @@
 package file
 
 import (
-       "bytes"
        "errors"
        "fmt"
        "io"
@@ -502,14 +501,19 @@ func (p *serializedPageReader) Page() Page {
 
 func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf 
[]byte) ([]byte, error) {
        p.decompressBuffer.ResizeNoShrink(lenCompressed)
-       b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
-       if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
+
+       // Read directly into the memory.Buffer's backing slice
+       n, err := io.ReadFull(rd, p.decompressBuffer.Bytes()[:lenCompressed])
+       if err != nil {
                return nil, err
        }
+       if n != lenCompressed {
+               return nil, fmt.Errorf("parquet: expected to read %d compressed 
bytes, got %d", lenCompressed, n)
+       }
 
-       data := p.decompressBuffer.Bytes()
+       data := p.decompressBuffer.Bytes()[:lenCompressed]
        if p.cryptoCtx.DataDecryptor != nil {
-               data = 
p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
+               data = p.cryptoCtx.DataDecryptor.Decrypt(data)
        }
 
        return p.codec.Decode(buf, data), nil
@@ -518,11 +522,16 @@ func (p *serializedPageReader) decompress(rd io.Reader, 
lenCompressed int, buf [
 func (p *serializedPageReader) readV2Encrypted(rd io.Reader, lenCompressed 
int, levelsBytelen int, compressed bool, buf []byte) error {
        // if encrypted, we need to decrypt before decompressing
        p.decompressBuffer.ResizeNoShrink(lenCompressed)
-       b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
-       if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
+
+       n, err := io.ReadFull(rd, p.decompressBuffer.Bytes()[:lenCompressed])
+       if err != nil {
                return err
        }
-       data := p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
+       if n != lenCompressed {
+               return fmt.Errorf("parquet: expected to read %d compressed 
bytes, got %d", lenCompressed, n)
+       }
+
+       data := 
p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes()[:lenCompressed])
        // encrypted + uncompressed -> just copy the decrypted data to output 
buffer
        if !compressed {
                copy(buf, data)

Reply via email to