This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new b0f6e2c2 fix(parquet/file): regression with decompressing data (#652)
b0f6e2c2 is described below
commit b0f6e2c2a92d5937be9bf9fd630d94b704baaaab
Author: Matt Topol <[email protected]>
AuthorDate: Wed Jan 28 10:39:32 2026 -0500
fix(parquet/file): regression with decompressing data (#652)
### Rationale for this change
fixes #619
### What changes are included in this PR?
Uses the correct buffer during decompression
### Are these changes tested?
A test case was added to confirm the issue
### Are there any user-facing changes?
only the bug fix
---
parquet/file/file_writer_test.go | 116 +++++++++++++++++++++++++++++++++++++++
parquet/file/page_reader.go | 25 ++++++---
2 files changed, 133 insertions(+), 8 deletions(-)
diff --git a/parquet/file/file_writer_test.go b/parquet/file/file_writer_test.go
index b098c54d..5997b107 100644
--- a/parquet/file/file_writer_test.go
+++ b/parquet/file/file_writer_test.go
@@ -1198,3 +1198,119 @@ func TestWriteBloomFilters(t *testing.T) {
assert.False(t, byteArrayFilter.Check(parquet.ByteArray("bar")))
assert.False(t, byteArrayFilter.Check(parquet.ByteArray("baz")))
}
+
+// TestBufferedStreamDictionaryCompressed tests the fix for issue #619
+// where BufferedStreamEnabled=true with dictionary encoding and compression
+// caused "dict spaced eof exception" and "snappy: corrupt input" errors.
+// This was due to a bug in the decompress() method that read from the wrong
buffer.
+func TestBufferedStreamDictionaryCompressed(t *testing.T) {
+ // Create schema with a string column that will use dictionary encoding
+ fields := schema.FieldList{
+ schema.NewByteArrayNode("dict_col",
parquet.Repetitions.Required, -1),
+ schema.NewInt32Node("value_col", parquet.Repetitions.Required,
-1),
+ }
+ sc, err := schema.NewGroupNode("schema", parquet.Repetitions.Required,
fields, -1)
+ require.NoError(t, err)
+
+ sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+ defer sink.Release()
+
+ // Write with dictionary encoding and Snappy compression
+ props := parquet.NewWriterProperties(
+ parquet.WithDictionaryDefault(true),
+ parquet.WithCompression(compress.Codecs.Snappy),
+ )
+
+ writer := file.NewParquetWriter(sink, sc, file.WithWriterProps(props))
+ rgWriter := writer.AppendBufferedRowGroup()
+
+ // Write dictionary column with repeated values to trigger dictionary
encoding
+ dictCol, err := rgWriter.Column(0)
+ require.NoError(t, err)
+ dictWriter := dictCol.(*file.ByteArrayColumnChunkWriter)
+
+ const numValues = 1000
+ dictValues := make([]parquet.ByteArray, numValues)
+ for i := 0; i < numValues; i++ {
+ // Use only 10 unique values to ensure dictionary encoding is
used
+ dictValues[i] = parquet.ByteArray(fmt.Sprintf("value_%d", i%10))
+ }
+ _, err = dictWriter.WriteBatch(dictValues, nil, nil)
+ require.NoError(t, err)
+
+ // Write value column
+ valueCol, err := rgWriter.Column(1)
+ require.NoError(t, err)
+ valueWriter := valueCol.(*file.Int32ColumnChunkWriter)
+
+ values := make([]int32, numValues)
+ for i := 0; i < numValues; i++ {
+ values[i] = int32(i)
+ }
+ _, err = valueWriter.WriteBatch(values, nil, nil)
+ require.NoError(t, err)
+
+ require.NoError(t, rgWriter.Close())
+ require.NoError(t, writer.Close())
+
+ buffer := sink.Finish()
+ defer buffer.Release()
+
+ // Verify dictionary page was written
+ reader, err := file.NewParquetReader(bytes.NewReader(buffer.Bytes()))
+ require.NoError(t, err)
+ defer reader.Close()
+
+ rgReader := reader.RowGroup(0)
+ chunk, err := rgReader.MetaData().ColumnChunk(0)
+ require.NoError(t, err)
+ assert.True(t, chunk.HasDictionaryPage(), "Expected dictionary page to
be written")
+
+ // Now read with BufferedStreamEnabled=true (the issue #619 condition)
+ readProps := parquet.NewReaderProperties(memory.DefaultAllocator)
+ readProps.BufferSize = 1024
+ readProps.BufferedStreamEnabled = true
+
+ bufferedReader, err :=
file.NewParquetReader(bytes.NewReader(buffer.Bytes()),
file.WithReadProps(readProps))
+ require.NoError(t, err)
+ defer bufferedReader.Close()
+
+ // Read the data back
+ bufferedRgReader := bufferedReader.RowGroup(0)
+ assert.EqualValues(t, numValues, bufferedRgReader.NumRows())
+
+ // Read dictionary column
+ dictColReader, err := bufferedRgReader.Column(0)
+ require.NoError(t, err)
+ dictChunkReader := dictColReader.(*file.ByteArrayColumnChunkReader)
+
+ readDictValues := make([]parquet.ByteArray, numValues)
+ defLevels := make([]int16, numValues)
+ repLevels := make([]int16, numValues)
+
+ total, valuesRead, err := dictChunkReader.ReadBatch(int64(numValues),
readDictValues, defLevels, repLevels)
+ require.NoError(t, err, "Should not get 'dict spaced eof exception' or
'snappy: corrupt input'")
+ assert.EqualValues(t, numValues, total)
+ assert.EqualValues(t, numValues, valuesRead)
+
+ // Verify the data is correct
+ for i := 0; i < numValues; i++ {
+ expected := parquet.ByteArray(fmt.Sprintf("value_%d", i%10))
+ assert.Equal(t, expected, readDictValues[i], "Value mismatch at
index %d", i)
+ }
+
+ // Read value column to ensure it also works
+ valueColReader, err := bufferedRgReader.Column(1)
+ require.NoError(t, err)
+ valueChunkReader := valueColReader.(*file.Int32ColumnChunkReader)
+
+ readValues := make([]int32, numValues)
+ total, valuesRead, err = valueChunkReader.ReadBatch(int64(numValues),
readValues, defLevels, repLevels)
+ require.NoError(t, err)
+ assert.EqualValues(t, numValues, total)
+ assert.EqualValues(t, numValues, valuesRead)
+
+ for i := 0; i < numValues; i++ {
+ assert.Equal(t, int32(i), readValues[i])
+ }
+}
diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go
index 307c6c6a..903f2ec0 100644
--- a/parquet/file/page_reader.go
+++ b/parquet/file/page_reader.go
@@ -17,7 +17,6 @@
package file
import (
- "bytes"
"errors"
"fmt"
"io"
@@ -502,14 +501,19 @@ func (p *serializedPageReader) Page() Page {
func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf
[]byte) ([]byte, error) {
p.decompressBuffer.ResizeNoShrink(lenCompressed)
- b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
- if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
+
+ // Read directly into the memory.Buffer's backing slice
+ n, err := io.ReadFull(rd, p.decompressBuffer.Bytes()[:lenCompressed])
+ if err != nil {
return nil, err
}
+ if n != lenCompressed {
+ return nil, fmt.Errorf("parquet: expected to read %d compressed
bytes, got %d", lenCompressed, n)
+ }
- data := p.decompressBuffer.Bytes()
+ data := p.decompressBuffer.Bytes()[:lenCompressed]
if p.cryptoCtx.DataDecryptor != nil {
- data =
p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
+ data = p.cryptoCtx.DataDecryptor.Decrypt(data)
}
return p.codec.Decode(buf, data), nil
@@ -518,11 +522,16 @@ func (p *serializedPageReader) decompress(rd io.Reader,
lenCompressed int, buf [
func (p *serializedPageReader) readV2Encrypted(rd io.Reader, lenCompressed
int, levelsBytelen int, compressed bool, buf []byte) error {
// if encrypted, we need to decrypt before decompressing
p.decompressBuffer.ResizeNoShrink(lenCompressed)
- b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
- if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
+
+ n, err := io.ReadFull(rd, p.decompressBuffer.Bytes()[:lenCompressed])
+ if err != nil {
return err
}
- data := p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
+ if n != lenCompressed {
+ return fmt.Errorf("parquet: expected to read %d compressed
bytes, got %d", lenCompressed, n)
+ }
+
+ data :=
p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes()[:lenCompressed])
// encrypted + uncompressed -> just copy the decrypted data to output
buffer
if !compressed {
copy(buf, data)