This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 32115946 Implement RLE dictionary decoder using generics (#477)
32115946 is described below
commit 321159468711a8fac9a6c52a956424a390274da2
Author: daniel-adam-tfs <[email protected]>
AuthorDate: Wed Aug 27 20:38:25 2025 +0200
Implement RLE dictionary decoder using generics (#477)
### Rationale for this change
Performance improvement of reading data from RLE-dictionary encoded
parquet columns.
### What changes are included in this PR?
The DictionaryConverter interface was changed to DictionaryConverter[T]
interface, so the methods could take []T as a parameter instead of
interface{}. This required changed to several types implementing the
interface.
The template code generation for RleDecoder was removed and the methods
for the parquet types are now handled by the generic TypedRleDecoder
type.
Additionally, some utils.Min/utils.Max calls were replace with calls to
min/max intrinsics
### Are these changes tested?
Yes, tests verifying the RLE dictionary already existed and I've added
BenchmarkReadInt32Column to measure the performance improvement.
### Are there any user-facing changes?
Some exported types functions were changed:
DictionaryConverter -> DictionaryConverter[T]
RleDecoder -> split into RleDecoder and TypeRleDecoder[T]
func (b *BitReader) GetAligned(nbytes int, v interface{}) bool -> split
into getAlignedUint8, getAlignedUint16, getAlignedUint32,
getAlignedUint64
---
parquet/file/column_reader_test.go | 82 ++
parquet/internal/encoding/byte_array_decoder.go | 2 +-
parquet/internal/encoding/decoder.go | 42 +-
parquet/internal/encoding/typed_encoder.go | 58 +-
parquet/internal/utils/bit_reader.go | 104 +-
parquet/internal/utils/dictionary.go | 71 +-
parquet/internal/utils/physical_types.tmpldata | 52 -
parquet/internal/utils/rle.go | 198 +--
parquet/internal/utils/typed_rle_dict.gen.go | 1377 ---------------------
parquet/internal/utils/typed_rle_dict.gen.go.tmpl | 220 ----
parquet/internal/utils/typed_rle_dict.go | 230 ++++
11 files changed, 476 insertions(+), 1960 deletions(-)
diff --git a/parquet/file/column_reader_test.go
b/parquet/file/column_reader_test.go
index 575bda4f..25c26bc8 100644
--- a/parquet/file/column_reader_test.go
+++ b/parquet/file/column_reader_test.go
@@ -21,6 +21,8 @@ import (
"fmt"
"math"
"math/rand"
+ "os"
+ "path/filepath"
"reflect"
"runtime"
"sync"
@@ -810,3 +812,83 @@ func TestFullSeekRow(t *testing.T) {
})
}
}
+
+func BenchmarkReadInt32Column(b *testing.B) {
+ // generate parquet with RLE-dictionary encoded int32 column
+ tempdir := b.TempDir()
+ filepath := filepath.Join(tempdir, "rle-dict-int32.parquet")
+
+ props := parquet.NewWriterProperties(
+ parquet.WithDictionaryDefault(true),
+ parquet.WithDataPageSize(128*1024*1024), // 128MB
+ parquet.WithBatchSize(128*1024*1024),
+ parquet.WithMaxRowGroupLength(100_000),
+ parquet.WithDataPageVersion(parquet.DataPageV2),
+ parquet.WithVersion(parquet.V2_LATEST),
+ )
+ outFile, err := os.Create(filepath)
+ require.NoError(b, err)
+
+ sc, err := schema.NewGroupNode("schema", parquet.Repetitions.Required,
schema.FieldList{
+ schema.NewInt32Node("col", parquet.Repetitions.Required, -1),
+ }, -1)
+ require.NoError(b, err)
+
+ writer := file.NewParquetWriter(outFile, sc,
file.WithWriterProps(props))
+
+ // 10 row groups of 100 000 rows = 1 000 000 rows in total
+ value := int32(1)
+ for range 10 {
+ rgWriter := writer.AppendBufferedRowGroup()
+ cwr, _ := rgWriter.Column(0)
+ cw := cwr.(*file.Int32ColumnChunkWriter)
+ valuesIn := make([]int32, 0, 100_000)
+ repeats := 1
+ for len(valuesIn) < 100_000 {
+ repeatedValue := make([]int32, repeats)
+ for i := range repeatedValue {
+ repeatedValue[i] = value
+ }
+ if len(valuesIn)+len(repeatedValue) > 100_000 {
+ repeatedValue =
repeatedValue[:100_000-len(valuesIn)]
+ }
+ valuesIn = append(valuesIn, repeatedValue[:]...)
+ // repeat values from 1 to 50 times
+ repeats = (repeats % 50) + 1
+ value++
+ }
+ cw.WriteBatch(valuesIn, nil, nil)
+ rgWriter.Close()
+ }
+ err = writer.Close()
+ require.NoError(b, err)
+
+ reader, err := file.OpenParquetFile(filepath, false)
+ require.NoError(b, err)
+ defer reader.Close()
+
+ numValues := reader.NumRows()
+ values := make([]int32, numValues)
+ b.StopTimer()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ startIndex := 0
+ for rg := 0; rg < reader.NumRowGroups(); rg++ {
+ rgReader := reader.RowGroup(rg)
+ colReader, err := rgReader.Column(0)
+ require.NoError(b, err)
+
+ cr, ok := colReader.(*file.Int32ColumnChunkReader)
+ require.True(b, ok)
+
+ b.StartTimer()
+ _, valuesRead, err := cr.ReadBatch(rgReader.NumRows(),
values, nil, nil)
+ b.StopTimer()
+ require.NoError(b, err)
+
+ startIndex += valuesRead
+ require.Equal(b, rgReader.NumRows(), int64(valuesRead))
+ }
+ require.Equal(b, numValues, int64(startIndex))
+ }
+}
diff --git a/parquet/internal/encoding/byte_array_decoder.go
b/parquet/internal/encoding/byte_array_decoder.go
index 1253ccea..f6b1027b 100644
--- a/parquet/internal/encoding/byte_array_decoder.go
+++ b/parquet/internal/encoding/byte_array_decoder.go
@@ -118,7 +118,7 @@ func (pbad *PlainByteArrayDecoder) DecodeSpaced(out
[]parquet.ByteArray, nullCou
}
func (d *DictByteArrayDecoder) InsertDictionary(bldr array.Builder) error {
- conv := d.dictValueDecoder.(*ByteArrayDictConverter)
+ conv := d.dictValueDecoder.(*dictConverter[parquet.ByteArray])
dictLength := cap(conv.dict)
conv.ensure(pqutils.IndexType(dictLength))
diff --git a/parquet/internal/encoding/decoder.go
b/parquet/internal/encoding/decoder.go
index b2a012e4..60afcc55 100644
--- a/parquet/internal/encoding/decoder.go
+++ b/parquet/internal/encoding/decoder.go
@@ -18,7 +18,6 @@ package encoding
import (
"bytes"
- "reflect"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/bitutil"
@@ -103,31 +102,31 @@ func (d *decoder) ValuesLeft() int { return d.nvals }
// Encoding returns the encoding type used by this decoder to decode the bytes.
func (d *decoder) Encoding() parquet.Encoding { return
parquet.Encoding(d.encoding) }
-type dictDecoder struct {
+type dictDecoder[T parquet.ColumnTypes] struct {
decoder
mem memory.Allocator
- dictValueDecoder utils.DictionaryConverter
- idxDecoder *utils.RleDecoder
+ dictValueDecoder utils.DictionaryConverter[T]
+ idxDecoder *utils.TypedRleDecoder[T]
idxScratchSpace []uint64
}
// SetDict sets a decoder that can be used to decode the dictionary that is
// used for this column in order to return the proper values.
-func (d *dictDecoder) SetDict(dict TypedDecoder) {
+func (d *dictDecoder[T]) SetDict(dict TypedDecoder) {
if dict.Type() != d.descr.PhysicalType() {
panic("parquet: mismatch dictionary and column data type")
}
- d.dictValueDecoder = NewDictConverter(dict)
+ d.dictValueDecoder = NewDictConverter[T](dict)
}
// SetData sets the index value data into the decoder.
-func (d *dictDecoder) SetData(nvals int, data []byte) error {
+func (d *dictDecoder[T]) SetData(nvals int, data []byte) error {
d.nvals = nvals
if len(data) == 0 {
// no data, bitwidth can safely be 0
- d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data), 0 /*
bitwidth */)
+ d.idxDecoder =
utils.NewTypedRleDecoder[T](bytes.NewReader(data), 0 /* bitwidth */)
return nil
}
@@ -138,29 +137,29 @@ func (d *dictDecoder) SetData(nvals int, data []byte)
error {
}
// pass the rest of the data, minus that first byte, to the decoder
- d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data[1:]),
int(width))
+ d.idxDecoder = utils.NewTypedRleDecoder[T](bytes.NewReader(data[1:]),
int(width))
return nil
}
-func (d *dictDecoder) discard(n int) (int, error) {
+func (d *dictDecoder[T]) discard(n int) (int, error) {
n = d.idxDecoder.Discard(n)
d.nvals -= n
return n, nil
}
-func (d *dictDecoder) decode(out interface{}) (int, error) {
+func (d *dictDecoder[T]) decode(out []T) (int, error) {
n, err := d.idxDecoder.GetBatchWithDict(d.dictValueDecoder, out)
d.nvals -= n
return n, err
}
-func (d *dictDecoder) decodeSpaced(out interface{}, nullCount int, validBits
[]byte, validBitsOffset int64) (int, error) {
+func (d *dictDecoder[T]) decodeSpaced(out []T, nullCount int, validBits
[]byte, validBitsOffset int64) (int, error) {
n, err := d.idxDecoder.GetBatchWithDictSpaced(d.dictValueDecoder, out,
nullCount, validBits, validBitsOffset)
d.nvals -= n
return n, err
}
-func (d *dictDecoder) DecodeIndices(numValues int, bldr array.Builder) (int,
error) {
+func (d *dictDecoder[T]) DecodeIndices(numValues int, bldr array.Builder)
(int, error) {
n := shared_utils.Min(numValues, d.nvals)
if cap(d.idxScratchSpace) < n {
d.idxScratchSpace = make([]uint64, n, bitutil.NextPowerOf2(n))
@@ -179,7 +178,7 @@ func (d *dictDecoder) DecodeIndices(numValues int, bldr
array.Builder) (int, err
return n, nil
}
-func (d *dictDecoder) DecodeIndicesSpaced(numValues, nullCount int, validBits
[]byte, offset int64, bldr array.Builder) (int, error) {
+func (d *dictDecoder[T]) DecodeIndicesSpaced(numValues, nullCount int,
validBits []byte, offset int64, bldr array.Builder) (int, error) {
if cap(d.idxScratchSpace) < numValues {
d.idxScratchSpace = make([]uint64, numValues,
bitutil.NextPowerOf2(numValues))
} else {
@@ -207,15 +206,8 @@ func (d *dictDecoder) DecodeIndicesSpaced(numValues,
nullCount int, validBits []
// spacedExpand is used to take a slice of data and utilize the bitmap
provided to fill in nulls into the
// correct slots according to the bitmap in order to produce a fully expanded
result slice with nulls
// in the correct slots.
-func spacedExpand(buffer interface{}, nullCount int, validBits []byte,
validBitsOffset int64) int {
- bufferRef := reflect.ValueOf(buffer)
- if bufferRef.Kind() != reflect.Slice {
- panic("invalid spacedexpand type, not slice")
- }
-
- var (
- numValues = bufferRef.Len()
- )
+func spacedExpand[T parquet.ColumnTypes](buffer []T, nullCount int, validBits
[]byte, validBitsOffset int64) int {
+ numValues := len(buffer)
idxDecode := int64(numValues - nullCount)
if idxDecode == 0 { // if there's nothing to decode there's nothing to
do.
@@ -236,8 +228,8 @@ func spacedExpand(buffer interface{}, nullCount int,
validBits []byte, validBits
// overwrite any existing data with the correctly spaced data.
Any data that happens to be left in the null
// slots is fine since it shouldn't matter and saves us work.
idxDecode -= run.Length
- n := reflect.Copy(bufferRef.Slice(int(run.Pos),
bufferRef.Len()), bufferRef.Slice(int(idxDecode),
int(int64(idxDecode)+run.Length)))
- debug.Assert(n == int(run.Length), "reflect.Copy copied
incorrect number of elements in spacedExpand")
+ n := copy(buffer[run.Pos:],
buffer[idxDecode:int64(idxDecode)+run.Length])
+ debug.Assert(n == int(run.Length), "copy copied incorrect
number of elements in spacedExpand")
}
return numValues
diff --git a/parquet/internal/encoding/typed_encoder.go
b/parquet/internal/encoding/typed_encoder.go
index b6227c22..cec8c55c 100644
--- a/parquet/internal/encoding/typed_encoder.go
+++ b/parquet/internal/encoding/typed_encoder.go
@@ -184,7 +184,7 @@ func (intDecoderTraits[T]) BytesRequired(n int) int {
func (intDecoderTraits[T]) Decoder(e parquet.Encoding, descr *schema.Column,
useDict bool, mem memory.Allocator) TypedDecoder {
if useDict {
- return &typedDictDecoder[T]{dictDecoder{decoder:
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
+ return &typedDictDecoder[T]{dictDecoder[T]{decoder:
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
}
switch e {
@@ -213,7 +213,7 @@ func (floatDecoderTraits[T]) BytesRequired(n int) int {
func (floatDecoderTraits[T]) Decoder(e parquet.Encoding, descr *schema.Column,
useDict bool, mem memory.Allocator) TypedDecoder {
if useDict {
- return &typedDictDecoder[T]{dictDecoder{decoder:
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
+ return &typedDictDecoder[T]{dictDecoder:
dictDecoder[T]{decoder: newDecoderBase(format.Encoding_RLE_DICTIONARY, descr),
mem: mem}}
}
switch e {
@@ -227,7 +227,7 @@ func (floatDecoderTraits[T]) Decoder(e parquet.Encoding,
descr *schema.Column, u
}
type typedDictDecoder[T int32 | int64 | float32 | float64 | parquet.Int96 |
parquet.ByteArray | parquet.FixedLenByteArray] struct {
- dictDecoder
+ dictDecoder[T]
}
func (d *typedDictDecoder[T]) Type() parquet.Type {
@@ -307,12 +307,15 @@ func (dc *dictConverter[T]) IsValid(idxes
...utils.IndexType) bool {
return min >= 0 && int(min) < len(dc.dict) && int(max) >= 0 && int(max)
< len(dc.dict)
}
-func (dc *dictConverter[T]) Fill(out any, val utils.IndexType) error {
- o := out.([]T)
+func (dc *dictConverter[T]) IsValidSingle(idx utils.IndexType) bool {
+ dc.ensure(idx)
+ return int(idx) >= 0 && int(idx) < len(dc.dict)
+}
+
+func (dc *dictConverter[T]) Fill(o []T, val utils.IndexType) error {
if err := dc.ensure(val); err != nil {
return err
}
-
o[0] = dc.dict[val]
for i := 1; i < len(o); i *= 2 {
copy(o[i:], o[:i])
@@ -320,30 +323,20 @@ func (dc *dictConverter[T]) Fill(out any, val
utils.IndexType) error {
return nil
}
-func (dc *dictConverter[T]) FillZero(out any) {
- o := out.([]T)
+func (dc *dictConverter[T]) FillZero(o []T) {
o[0] = dc.zeroVal
for i := 1; i < len(o); i *= 2 {
copy(o[i:], o[:i])
}
}
-func (dc *dictConverter[T]) Copy(out any, vals []utils.IndexType) error {
- o := out.([]T)
+func (dc *dictConverter[T]) Copy(o []T, vals []utils.IndexType) error {
for idx, val := range vals {
o[idx] = dc.dict[val]
}
return nil
}
-type Int32DictConverter = dictConverter[int32]
-type Int64DictConverter = dictConverter[int64]
-type Float32DictConverter = dictConverter[float32]
-type Float64DictConverter = dictConverter[float64]
-type Int96DictConverter = dictConverter[parquet.Int96]
-type ByteArrayDictConverter = dictConverter[parquet.ByteArray]
-type FixedLenByteArrayDictConverter = dictConverter[parquet.FixedLenByteArray]
-
// the int96EncoderTraits struct is used to make it easy to create encoders
and decoders based on type
type int96EncoderTraits struct{}
@@ -374,7 +367,7 @@ func (int96DecoderTraits) BytesRequired(n int) int {
// Decoder returns a decoder for int96 typed data of the requested encoding
type if available
func (int96DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column,
useDict bool, mem memory.Allocator) TypedDecoder {
if useDict {
- return &DictInt96Decoder{dictDecoder{decoder:
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
+ return &DictInt96Decoder{dictDecoder[parquet.Int96]{decoder:
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
}
switch e {
@@ -518,7 +511,7 @@ func (byteArrayDecoderTraits) BytesRequired(n int) int {
// Decoder returns a decoder for byteArray typed data of the requested
encoding type if available
func (byteArrayDecoderTraits) Decoder(e parquet.Encoding, descr
*schema.Column, useDict bool, mem memory.Allocator) TypedDecoder {
if useDict {
- return &DictByteArrayDecoder{dictDecoder{decoder:
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
+ return
&DictByteArrayDecoder{dictDecoder[parquet.ByteArray]{decoder:
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
}
switch e {
@@ -558,7 +551,7 @@ func (enc *DictByteArrayEncoder) Type() parquet.Type {
// DictByteArrayDecoder is a decoder for decoding dictionary encoded data for
parquet.ByteArray columns
type DictByteArrayDecoder struct {
- dictDecoder
+ dictDecoder[parquet.ByteArray]
}
// Type returns the underlying physical type that can be decoded with this
decoder
@@ -639,7 +632,7 @@ func (fixedLenByteArrayDecoderTraits) BytesRequired(n int)
int {
// Decoder returns a decoder for fixedLenByteArray typed data of the requested
encoding type if available
func (fixedLenByteArrayDecoderTraits) Decoder(e parquet.Encoding, descr
*schema.Column, useDict bool, mem memory.Allocator) TypedDecoder {
if useDict {
- return &DictFixedLenByteArrayDecoder{dictDecoder{decoder:
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
+ return
&DictFixedLenByteArrayDecoder{dictDecoder[parquet.FixedLenByteArray]{decoder:
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
}
switch e {
@@ -664,25 +657,8 @@ func (enc *DictFixedLenByteArrayEncoder) Type()
parquet.Type {
// NewDictConverter creates a dict converter of the appropriate type, using
the passed in
// decoder as the decoder to decode the dictionary index.
-func NewDictConverter(dict TypedDecoder) utils.DictionaryConverter {
- switch dict.Type() {
- case parquet.Types.Int32:
- return &Int32DictConverter{valueDecoder: dict.(Int32Decoder),
dict: make([]int32, 0, dict.ValuesLeft())}
- case parquet.Types.Int64:
- return &Int64DictConverter{valueDecoder: dict.(Int64Decoder),
dict: make([]int64, 0, dict.ValuesLeft())}
- case parquet.Types.Int96:
- return &Int96DictConverter{valueDecoder: dict.(Int96Decoder),
dict: make([]parquet.Int96, 0, dict.ValuesLeft())}
- case parquet.Types.Float:
- return &Float32DictConverter{valueDecoder:
dict.(Float32Decoder), dict: make([]float32, 0, dict.ValuesLeft())}
- case parquet.Types.Double:
- return &Float64DictConverter{valueDecoder:
dict.(Float64Decoder), dict: make([]float64, 0, dict.ValuesLeft())}
- case parquet.Types.ByteArray:
- return &ByteArrayDictConverter{valueDecoder:
dict.(ByteArrayDecoder), dict: make([]parquet.ByteArray, 0, dict.ValuesLeft())}
- case parquet.Types.FixedLenByteArray:
- return &FixedLenByteArrayDictConverter{valueDecoder:
dict.(FixedLenByteArrayDecoder), dict: make([]parquet.FixedLenByteArray, 0,
dict.ValuesLeft())}
- default:
- return nil
- }
+func NewDictConverter[T parquet.ColumnTypes](dict TypedDecoder)
utils.DictionaryConverter[T] {
+ return &dictConverter[T]{valueDecoder: dict.(Decoder[T]), dict:
make([]T, 0, dict.ValuesLeft())}
}
var (
diff --git a/parquet/internal/utils/bit_reader.go
b/parquet/internal/utils/bit_reader.go
index 10805292..ced0f265 100644
--- a/parquet/internal/utils/bit_reader.go
+++ b/parquet/internal/utils/bit_reader.go
@@ -21,7 +21,6 @@ import (
"errors"
"io"
"math"
- "reflect"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
@@ -122,28 +121,46 @@ func (b *BitReader) GetZigZagVlqInt() (int64, bool) {
// error if there aren't enough bytes left.
func (b *BitReader) ReadByte() (byte, error) {
var tmp byte
- if ok := b.GetAligned(1, &tmp); !ok {
+ if ok := b.getAlignedUint8(1, &tmp); !ok {
return 0, errors.New("failed to read byte")
}
return tmp, nil
}
-// GetAligned reads nbytes from the underlying stream into the passed
interface value.
+// getAlignedUint8 reads nbytes from the underlying stream into the passed
uint8 value.
// Returning false if there aren't enough bytes remaining in the stream or if
an invalid
// type is passed. The bytes are read aligned to byte boundaries.
-//
-// v must be a pointer to a byte or sized uint type (*byte, *uint16, *uint32,
*uint64).
-// encoded values are assumed to be little endian.
-func (b *BitReader) GetAligned(nbytes int, v interface{}) bool {
- // figure out the number of bytes to represent v
- typBytes := int(reflect.TypeOf(v).Elem().Size())
- if nbytes > typBytes {
+func (b *BitReader) getAlignedUint8(nbytes int, v *uint8) bool {
+ if nbytes > 1 {
return false
}
bread := bitutil.BytesForBits(int64(b.bitoffset))
+ b.byteoffset += bread
+ n, err := b.reader.ReadAt(b.raw[:nbytes], b.byteoffset)
+ if err != nil && err != io.EOF {
+ return false
+ }
+ if n != nbytes {
+ return false
+ }
+
+ *v = b.raw[0]
+
+ b.byteoffset += int64(nbytes)
+ b.bitoffset = 0
+ b.fillbuffer()
+ return true
+}
+// getAlignedUint16 reads nbytes from the underlying stream into the passed
uint16 value.
+func (b *BitReader) getAlignedUint16(nbytes int, v *uint16) bool {
+ if nbytes > 2 {
+ return false
+ }
+
+ bread := bitutil.BytesForBits(int64(b.bitoffset))
b.byteoffset += bread
n, err := b.reader.ReadAt(b.raw[:nbytes], b.byteoffset)
if err != nil && err != io.EOF {
@@ -152,24 +169,67 @@ func (b *BitReader) GetAligned(nbytes int, v interface{})
bool {
if n != nbytes {
return false
}
+
// zero pad the bytes
- memory.Set(b.raw[n:typBytes], 0)
-
- switch v := v.(type) {
- case *byte:
- *v = b.raw[0]
- case *uint64:
- *v = binary.LittleEndian.Uint64(b.raw[:typBytes])
- case *uint32:
- *v = binary.LittleEndian.Uint32(b.raw[:typBytes])
- case *uint16:
- *v = binary.LittleEndian.Uint16(b.raw[:typBytes])
- default:
+ memory.Set(b.raw[n:2], 0)
+
+ *v = binary.LittleEndian.Uint16(b.raw[:2])
+
+ b.byteoffset += int64(nbytes)
+ b.bitoffset = 0
+ b.fillbuffer()
+ return true
+}
+
+// getAlignedUint32 reads nbytes from the underlying stream into the passed
uint32 value.
+func (b *BitReader) getAlignedUint32(nbytes int, v *uint32) bool {
+ if nbytes > 4 {
return false
}
+ bread := bitutil.BytesForBits(int64(b.bitoffset))
+ b.byteoffset += bread
+ n, err := b.reader.ReadAt(b.raw[:nbytes], b.byteoffset)
+ if err != nil && err != io.EOF {
+ return false
+ }
+ if n != nbytes {
+ return false
+ }
+
+ // zero pad the bytes
+ memory.Set(b.raw[n:4], 0)
+
+ *v = binary.LittleEndian.Uint32(b.raw[:4])
+
b.byteoffset += int64(nbytes)
+ b.bitoffset = 0
+ b.fillbuffer()
+ return true
+}
+// getAlignedUint64 reads nbytes from the underlying stream into the passed
uint64 value.
+func (b *BitReader) getAlignedUint64(nbytes int, v *uint64) bool {
+ if nbytes > 8 {
+ return false
+ }
+
+ bread := bitutil.BytesForBits(int64(b.bitoffset))
+ b.byteoffset += bread
+ n, err := b.reader.ReadAt(b.raw[:nbytes], b.byteoffset)
+ if err != nil && err != io.EOF {
+ return false
+ }
+ if n != nbytes {
+ return false
+ }
+
+ // zero pad the bytes
+ memory.Set(b.raw[n:8], 0)
+
+ *v = binary.LittleEndian.Uint64(b.raw[:8])
+
+ b.byteoffset += int64(nbytes)
b.bitoffset = 0
b.fillbuffer()
return true
diff --git a/parquet/internal/utils/dictionary.go
b/parquet/internal/utils/dictionary.go
index 4d5ef13f..8c315da1 100644
--- a/parquet/internal/utils/dictionary.go
+++ b/parquet/internal/utils/dictionary.go
@@ -18,7 +18,8 @@ package utils
import (
"math"
- "reflect"
+
+ "github.com/apache/arrow-go/v18/parquet"
)
// IndexType is the type we're going to use for Dictionary indexes, currently
@@ -31,57 +32,53 @@ const (
MinIndexType = math.MinInt32
)
-// DictionaryConverter is an interface used for dealing with RLE decoding and
encoding
-// when working with dictionaries to get values from indexes.
-type DictionaryConverter interface {
- // Copy takes an interface{} which must be a slice of the appropriate
type, and will be populated
- // by the dictionary values at the indexes from the IndexType slice
- Copy(interface{}, []IndexType) error
- // Fill fills interface{} which must be a slice of the appropriate
type, with the value
- // specified by the dictionary index passed in.
- Fill(interface{}, IndexType) error
- // FillZero fills interface{}, which must be a slice of the appropriate
type, with the zero value
- // for the given type.
- FillZero(interface{})
+type DictionaryConverter[T parquet.ColumnTypes | uint64] interface {
+ // Copy populates the input slice by the dictionary values at the
indexes from the IndexType slice
+ Copy([]T, []IndexType) error
+ // Fill fills the input slice with the value specified by the
dictionary index passed in.
+ Fill([]T, IndexType) error
+ // FillZero fills the input slice with the zero value for the given
type.
+ FillZero([]T)
// IsValid validates that all of the indexes passed in are valid
indexes for the dictionary
IsValid(...IndexType) bool
+ // IsValidSingle validates that the index passed in is a valid index
for the dictionary
+ // This is an optimisation, to avoid allocating a slice for a single
value
+ IsValidSingle(IndexType) bool
}
// converter for getspaced that handles runs that get returned directly
// as output, rather than using a dictionary
-type plainConverter struct{}
+type plainConverter[T ~uint64] struct{}
-func (plainConverter) IsValid(...IndexType) bool { return true }
-func (plainConverter) Fill(values interface{}, val IndexType) error {
- v := reflect.ValueOf(values)
- switch v.Type().Elem().Kind() {
- case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32,
reflect.Int64:
- v.Index(0).SetInt(int64(val))
- case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32,
reflect.Uint64:
- v.Index(0).SetUint(uint64(val))
- }
+func (plainConverter[T]) IsValid(...IndexType) bool { return true }
+
+func (plainConverter[T]) IsValidSingle(IndexType) bool { return true }
- for i := 1; i < v.Len(); i *= 2 {
- reflect.Copy(v.Slice(i, v.Len()), v.Slice(0, i))
+func (plainConverter[T]) Fill(values []T, val IndexType) error {
+ if len(values) == 0 {
+ return nil
+ }
+ values[0] = T(val)
+ for i := 1; i < len(values); i *= 2 {
+ copy(values[i:], values[0:i])
}
return nil
}
-func (plainConverter) FillZero(values interface{}) {
- v := reflect.ValueOf(values)
- zeroVal := reflect.New(v.Type().Elem()).Elem()
-
- v.Index(0).Set(zeroVal)
- for i := 1; i < v.Len(); i *= 2 {
- reflect.Copy(v.Slice(i, v.Len()), v.Slice(0, i))
+func (plainConverter[T]) FillZero(values []T) {
+ if len(values) == 0 {
+ return
+ }
+ var zero T
+ values[0] = zero
+ for i := 1; i < len(values); i *= 2 {
+ copy(values[i:], values[0:i])
}
}
-func (plainConverter) Copy(out interface{}, values []IndexType) error {
- vout := reflect.ValueOf(out)
- vin := reflect.ValueOf(values)
- for i := 0; i < vin.Len(); i++ {
- vout.Index(i).Set(vin.Index(i).Convert(vout.Type().Elem()))
+func (plainConverter[T]) Copy(out []T, values []IndexType) error {
+ for i := range values {
+ out[i] = T(values[i])
}
return nil
}
diff --git a/parquet/internal/utils/physical_types.tmpldata
b/parquet/internal/utils/physical_types.tmpldata
deleted file mode 100644
index 0adeb995..00000000
--- a/parquet/internal/utils/physical_types.tmpldata
+++ /dev/null
@@ -1,52 +0,0 @@
-[
- {
- "Name": "Int32",
- "name": "int32",
- "lower": "int32",
- "prefix": "arrow"
- },
- {
- "Name": "Int64",
- "name": "int64",
- "lower": "int64",
- "prefix": "arrow"
- },
- {
- "Name": "Int96",
- "name": "parquet.Int96",
- "lower": "int96",
- "prefix": "parquet"
- },
- {
- "Name": "Float32",
- "name": "float32",
- "lower": "float32",
- "prefix": "arrow",
- "physical": "Float"
- },
- {
- "Name": "Float64",
- "name": "float64",
- "lower": "float64",
- "prefix": "arrow",
- "physical": "Double"
- },
- {
- "Name": "Boolean",
- "name": "bool",
- "lower": "bool",
- "prefix": "arrow"
- },
- {
- "Name": "ByteArray",
- "name": "parquet.ByteArray",
- "lower": "byteArray",
- "prefix": "parquet"
- },
- {
- "Name": "FixedLenByteArray",
- "name": "parquet.FixedLenByteArray",
- "lower": "fixedLenByteArray",
- "prefix": "parquet"
- }
-]
diff --git a/parquet/internal/utils/rle.go b/parquet/internal/utils/rle.go
index 12eca7b2..94a40a56 100644
--- a/parquet/internal/utils/rle.go
+++ b/parquet/internal/utils/rle.go
@@ -26,13 +26,9 @@ import (
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/internal/bitutils"
- "github.com/apache/arrow-go/v18/internal/utils"
- "github.com/apache/arrow-go/v18/parquet"
"golang.org/x/xerrors"
)
-//go:generate go run ../../../arrow/_tools/tmpl/main.go -i
-data=physical_types.tmpldata typed_rle_dict.gen.go.tmpl
-
const (
MaxValuesPerLiteralRun = (1 << 6) * 8
)
@@ -40,7 +36,7 @@ const (
func MinRLEBufferSize(bitWidth int) int {
maxLiteralRunSize := 1 +
bitutil.BytesForBits(int64(MaxValuesPerLiteralRun*bitWidth))
maxRepeatedRunSize := binary.MaxVarintLen32 +
bitutil.BytesForBits(int64(bitWidth))
- return int(utils.Max(maxLiteralRunSize, maxRepeatedRunSize))
+ return int(max(maxLiteralRunSize, maxRepeatedRunSize))
}
func MaxRLEBufferSize(width, numValues int) int {
@@ -51,7 +47,7 @@ func MaxRLEBufferSize(width, numValues int) int {
minRepeatedRunSize := 1 + int(bitutil.BytesForBits(int64(width)))
repeatedMaxSize := int(bitutil.BytesForBits(int64(numValues))) *
minRepeatedRunSize
- return utils.Max(literalMaxSize, repeatedMaxSize)
+ return max(literalMaxSize, repeatedMaxSize)
}
// Utility classes to do run length encoding (RLE) for fixed bit width values.
If runs
@@ -146,24 +142,24 @@ func (r *RleDecoder) Next() bool {
nbytes := int(bitutil.BytesForBits(int64(r.bitWidth)))
switch {
case nbytes > 4:
- if !r.r.GetAligned(nbytes, &r.curVal) {
+ if !r.r.getAlignedUint64(nbytes, &r.curVal) {
return false
}
case nbytes > 2:
var val uint32
- if !r.r.GetAligned(nbytes, &val) {
+ if !r.r.getAlignedUint32(nbytes, &val) {
return false
}
r.curVal = uint64(val)
case nbytes > 1:
var val uint16
- if !r.r.GetAligned(nbytes, &val) {
+ if !r.r.getAlignedUint16(nbytes, &val) {
return false
}
r.curVal = uint64(val)
default:
var val uint8
- if !r.r.GetAligned(nbytes, &val) {
+ if !r.r.getAlignedUint8(nbytes, &val) {
return false
}
r.curVal = uint64(val)
@@ -184,11 +180,11 @@ func (r *RleDecoder) Discard(n int) int {
remain := n - read
if r.repCount > 0 {
- repbatch := int(math.Min(float64(remain),
float64(r.repCount)))
+ repbatch := min(remain, int(r.repCount))
r.repCount -= int32(repbatch)
read += repbatch
} else if r.litCount > 0 {
- litbatch := int(math.Min(float64(remain),
float64(r.litCount)))
+ litbatch := min(remain, int(r.litCount))
n, _ := r.r.Discard(uint(r.bitWidth), litbatch)
if n != litbatch {
return read
@@ -214,7 +210,7 @@ func (r *RleDecoder) GetBatch(values []uint64) int {
remain := size - read
if r.repCount > 0 {
- repbatch := int(math.Min(float64(remain),
float64(r.repCount)))
+ repbatch := min(remain, int(r.repCount))
for i := 0; i < repbatch; i++ {
out[i] = r.curVal
}
@@ -223,7 +219,7 @@ func (r *RleDecoder) GetBatch(values []uint64) int {
read += repbatch
out = out[repbatch:]
} else if r.litCount > 0 {
- litbatch := int(math.Min(float64(remain),
float64(r.litCount)))
+ litbatch := min(remain, int(r.litCount))
n, _ := r.r.GetBatch(uint(r.bitWidth), out[:litbatch])
if n != litbatch {
return read
@@ -246,7 +242,7 @@ func (r *RleDecoder) GetBatchSpaced(vals []uint64,
nullcount int, validBits []by
return r.GetBatch(vals), nil
}
- converter := plainConverter{}
+ converter := plainConverter[uint64]{}
blockCounter := bitutils.NewBitBlockCounter(validBits, validBitsOffset,
int64(len(vals)))
var (
@@ -268,7 +264,7 @@ func (r *RleDecoder) GetBatchSpaced(vals []uint64,
nullcount int, validBits []by
converter.FillZero(vals[:block.Len])
processed = int(block.Len)
} else {
- processed, err = r.getspaced(converter, vals,
int(block.Len), int(block.Len-block.Popcnt), validBits, validBitsOffset)
+ processed, err = getspaced(r, converter, vals,
int(block.Len), int(block.Len-block.Popcnt), validBits, validBitsOffset)
if err != nil {
return totalProcessed, err
}
@@ -285,92 +281,6 @@ func (r *RleDecoder) GetBatchSpaced(vals []uint64,
nullcount int, validBits []by
return totalProcessed, nil
}
-func (r *RleDecoder) getspaced(dc DictionaryConverter, vals interface{},
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error)
{
- switch vals := vals.(type) {
- case []int32:
- return r.getspacedInt32(dc, vals, batchSize, nullCount,
validBits, validBitsOffset)
- case []int64:
- return r.getspacedInt64(dc, vals, batchSize, nullCount,
validBits, validBitsOffset)
- case []float32:
- return r.getspacedFloat32(dc, vals, batchSize, nullCount,
validBits, validBitsOffset)
- case []float64:
- return r.getspacedFloat64(dc, vals, batchSize, nullCount,
validBits, validBitsOffset)
- case []parquet.ByteArray:
- return r.getspacedByteArray(dc, vals, batchSize, nullCount,
validBits, validBitsOffset)
- case []parquet.FixedLenByteArray:
- return r.getspacedFixedLenByteArray(dc, vals, batchSize,
nullCount, validBits, validBitsOffset)
- case []parquet.Int96:
- return r.getspacedInt96(dc, vals, batchSize, nullCount,
validBits, validBitsOffset)
- case []uint64:
- return r.getspacedUint64(dc, vals, batchSize, nullCount,
validBits, validBitsOffset)
- default:
- return 0, xerrors.New("parquet/rle: getspaced invalid type")
- }
-}
-
-func (r *RleDecoder) getspacedUint64(dc DictionaryConverter, vals []uint64,
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error)
{
- if nullCount == batchSize {
- dc.FillZero(vals[:batchSize])
- return batchSize, nil
- }
-
- read := 0
- remain := batchSize - nullCount
-
- const bufferSize = 1024
- var indexbuffer [bufferSize]IndexType
-
- // assume no bits to start
- bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset,
int64(batchSize))
- validRun := bitReader.NextRun()
- for read < batchSize {
- if validRun.Len == 0 {
- validRun = bitReader.NextRun()
- }
-
- if !validRun.Set {
- dc.FillZero(vals[:int(validRun.Len)])
- vals = vals[int(validRun.Len):]
- read += int(validRun.Len)
- validRun.Len = 0
- continue
- }
-
- if r.repCount == 0 && r.litCount == 0 {
- if !r.Next() {
- return read, nil
- }
- }
-
- var batch int
- switch {
- case r.repCount > 0:
- batch, remain, validRun = r.consumeRepeatCounts(read,
batchSize, remain, validRun, bitReader)
- current := IndexType(r.curVal)
- if !dc.IsValid(current) {
- return read, nil
- }
- dc.Fill(vals[:batch], current)
- case r.litCount > 0:
- var (
- litread int
- skipped int
- err error
- )
- litread, skipped, validRun, err =
r.consumeLiteralsUint64(dc, vals, remain, indexbuffer[:], validRun, bitReader)
- if err != nil {
- return read, err
- }
- batch = litread + skipped
- remain -= litread
- }
-
- vals = vals[batch:]
- read += batch
- }
- return read, nil
-}
-
func (r *RleDecoder) consumeRepeatCounts(read, batchSize, remain int, run
bitutils.BitRun, bitRdr bitutils.BitRunReader) (int, int, bitutils.BitRun) {
// Consume the entire repeat counts incrementing repeat_batch to
// be the total of nulls + values consumed, we only need to
@@ -379,7 +289,7 @@ func (r *RleDecoder) consumeRepeatCounts(read, batchSize,
remain int, run bituti
repeatBatch := 0
for r.repCount > 0 && (read+repeatBatch) < batchSize {
if run.Set {
- updateSize := int(utils.Min(run.Len, int64(r.repCount)))
+ updateSize := int(min(run.Len, int64(r.repCount)))
r.repCount -= int32(updateSize)
repeatBatch += updateSize
run.Len -= int64(updateSize)
@@ -396,88 +306,6 @@ func (r *RleDecoder) consumeRepeatCounts(read, batchSize,
remain int, run bituti
return repeatBatch, remain, run
}
-func (r *RleDecoder) consumeLiteralsUint64(dc DictionaryConverter, vals
[]uint64, remain int, buf []IndexType, run bitutils.BitRun, bitRdr
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
- batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
- buf = buf[:batch]
-
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != batch {
- return 0, 0, run, xerrors.New("was not able to retrieve correct
number of indexes")
- }
-
- if !dc.IsValid(buf...) {
- return 0, 0, run, xerrors.New("invalid index values found for
dictionary converter")
- }
-
- var (
- read int
- skipped int
- )
- for read < batch {
- if run.Set {
- updateSize := utils.Min(batch-read, int(run.Len))
- if err := dc.Copy(vals, buf[read:read+updateSize]); err
!= nil {
- return 0, 0, run, err
- }
- read += updateSize
- vals = vals[updateSize:]
- run.Len -= int64(updateSize)
- } else {
- dc.FillZero(vals[:int(run.Len)])
- vals = vals[int(run.Len):]
- skipped += int(run.Len)
- run.Len = 0
- }
- if run.Len == 0 {
- run = bitRdr.NextRun()
- }
- }
- r.litCount -= int32(batch)
- return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDict(dc DictionaryConverter, vals
interface{}) (int, error) {
- switch vals := vals.(type) {
- case []int32:
- return r.GetBatchWithDictInt32(dc, vals)
- case []int64:
- return r.GetBatchWithDictInt64(dc, vals)
- case []float32:
- return r.GetBatchWithDictFloat32(dc, vals)
- case []float64:
- return r.GetBatchWithDictFloat64(dc, vals)
- case []parquet.ByteArray:
- return r.GetBatchWithDictByteArray(dc, vals)
- case []parquet.FixedLenByteArray:
- return r.GetBatchWithDictFixedLenByteArray(dc, vals)
- case []parquet.Int96:
- return r.GetBatchWithDictInt96(dc, vals)
- default:
- return 0, xerrors.New("parquet/rle: GetBatchWithDict invalid
type")
- }
-}
-
-func (r *RleDecoder) GetBatchWithDictSpaced(dc DictionaryConverter, vals
interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int,
error) {
- switch vals := vals.(type) {
- case []int32:
- return r.GetBatchWithDictSpacedInt32(dc, vals, nullCount,
validBits, validBitsOffset)
- case []int64:
- return r.GetBatchWithDictSpacedInt64(dc, vals, nullCount,
validBits, validBitsOffset)
- case []float32:
- return r.GetBatchWithDictSpacedFloat32(dc, vals, nullCount,
validBits, validBitsOffset)
- case []float64:
- return r.GetBatchWithDictSpacedFloat64(dc, vals, nullCount,
validBits, validBitsOffset)
- case []parquet.ByteArray:
- return r.GetBatchWithDictSpacedByteArray(dc, vals, nullCount,
validBits, validBitsOffset)
- case []parquet.FixedLenByteArray:
- return r.GetBatchWithDictSpacedFixedLenByteArray(dc, vals,
nullCount, validBits, validBitsOffset)
- case []parquet.Int96:
- return r.GetBatchWithDictSpacedInt96(dc, vals, nullCount,
validBits, validBitsOffset)
- default:
- return 0, xerrors.New("parquet/rle: GetBatchWithDictSpaced
invalid type")
- }
-}
-
type RleEncoder struct {
w *BitWriter
diff --git a/parquet/internal/utils/typed_rle_dict.gen.go
b/parquet/internal/utils/typed_rle_dict.gen.go
deleted file mode 100644
index 1c1b487d..00000000
--- a/parquet/internal/utils/typed_rle_dict.gen.go
+++ /dev/null
@@ -1,1377 +0,0 @@
-// Code generated by typed_rle_dict.gen.go.tmpl. DO NOT EDIT.
-
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package utils
-
-import (
- "github.com/apache/arrow-go/v18/internal/bitutils"
- "github.com/apache/arrow-go/v18/internal/utils"
- "github.com/apache/arrow-go/v18/parquet"
- "golang.org/x/xerrors"
-)
-
-func (r *RleDecoder) GetBatchWithDictSpacedInt32(dc DictionaryConverter, vals
[]int32, nullCount int, validBits []byte, validBitsOffset int64)
(totalProcessed int, err error) {
- if nullCount == 0 {
- return r.GetBatchWithDictInt32(dc, vals)
- }
-
- var (
- blockCounter = bitutils.NewBitBlockCounter(validBits,
validBitsOffset, int64(len(vals)))
- processed = 0
- block bitutils.BitBlockCount
- )
-
- for {
- block = blockCounter.NextFourWords()
- if block.Len == 0 {
- break
- }
-
- switch {
- case block.AllSet():
- processed, err = r.GetBatchWithDictInt32(dc,
vals[:block.Len])
- case block.NoneSet():
- dc.FillZero(vals[:block.Len])
- processed = int(block.Len)
- default:
- processed, err = r.getspacedInt32(dc, vals,
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
- }
-
- if err != nil {
- break
- }
-
- totalProcessed += processed
- vals = vals[int(block.Len):]
- validBitsOffset += int64(block.Len)
- if processed != int(block.Len) {
- break
- }
- }
- return
-}
-
-func (r *RleDecoder) getspacedInt32(dc DictionaryConverter, vals []int32,
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error)
{
- if nullCount == batchSize {
- dc.FillZero(vals[:batchSize])
- return batchSize, nil
- }
-
- read := 0
- remain := batchSize - nullCount
-
- const bufferSize = 1024
- var indexbuffer [bufferSize]IndexType
-
- // assume no bits to start
- bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset,
int64(batchSize))
- validRun := bitReader.NextRun()
- for read < batchSize {
- if validRun.Len == 0 {
- validRun = bitReader.NextRun()
- }
-
- if !validRun.Set {
- dc.FillZero(vals[:int(validRun.Len)])
- vals = vals[int(validRun.Len):]
- read += int(validRun.Len)
- validRun.Len = 0
- continue
- }
-
- if r.repCount == 0 && r.litCount == 0 {
- if !r.Next() {
- return read, nil
- }
- }
-
- var batch int
- switch {
- case r.repCount > 0:
- batch, remain, validRun = r.consumeRepeatCounts(read,
batchSize, remain, validRun, bitReader)
- current := IndexType(r.curVal)
- if !dc.IsValid(current) {
- return read, nil
- }
- dc.Fill(vals[:batch], current)
- case r.litCount > 0:
- var (
- litread int
- skipped int
- err error
- )
- litread, skipped, validRun, err =
r.consumeLiteralsInt32(dc, vals, remain, indexbuffer[:], validRun, bitReader)
- if err != nil {
- return read, err
- }
- batch = litread + skipped
- remain -= litread
- }
-
- vals = vals[batch:]
- read += batch
- }
- return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsInt32(dc DictionaryConverter, vals
[]int32, remain int, buf []IndexType, run bitutils.BitRun, bitRdr
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
- batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
- buf = buf[:batch]
-
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != batch {
- return 0, 0, run, xerrors.New("was not able to retrieve correct
number of indexes")
- }
-
- if !dc.IsValid(buf...) {
- return 0, 0, run, xerrors.New("invalid index values found for
dictionary converter")
- }
-
- var (
- read int
- skipped int
- )
- for read < batch {
- if run.Set {
- updateSize := utils.Min(batch-read, int(run.Len))
- if err := dc.Copy(vals, buf[read:read+updateSize]); err
!= nil {
- return 0, 0, run, err
- }
- read += updateSize
- vals = vals[updateSize:]
- run.Len -= int64(updateSize)
- } else {
- dc.FillZero(vals[:int(run.Len)])
- vals = vals[int(run.Len):]
- skipped += int(run.Len)
- run.Len = 0
- }
- if run.Len == 0 {
- run = bitRdr.NextRun()
- }
- }
- r.litCount -= int32(batch)
- return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictInt32(dc DictionaryConverter, vals
[]int32) (int, error) {
- var (
- read = 0
- size = len(vals)
- indexbuffer [1024]IndexType
- )
-
- for read < size {
- remain := size - read
-
- switch {
- case r.repCount > 0:
- idx := IndexType(r.curVal)
- if !dc.IsValid(idx) {
- return read, nil
- }
- batch := utils.Min(remain, int(r.repCount))
- if err := dc.Fill(vals[:batch], idx); err != nil {
- return read, err
- }
- r.repCount -= int32(batch)
- read += batch
- vals = vals[batch:]
- case r.litCount > 0:
- litbatch := utils.Min(utils.Min(remain,
int(r.litCount)), 1024)
- buf := indexbuffer[:litbatch]
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != litbatch {
- return read, nil
- }
- if !dc.IsValid(buf...) {
- return read, nil
- }
- if err := dc.Copy(vals, buf); err != nil {
- return read, nil
- }
- r.litCount -= int32(litbatch)
- read += litbatch
- vals = vals[litbatch:]
- default:
- if !r.Next() {
- return read, nil
- }
- }
- }
-
- return read, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictSpacedInt64(dc DictionaryConverter, vals
[]int64, nullCount int, validBits []byte, validBitsOffset int64)
(totalProcessed int, err error) {
- if nullCount == 0 {
- return r.GetBatchWithDictInt64(dc, vals)
- }
-
- var (
- blockCounter = bitutils.NewBitBlockCounter(validBits,
validBitsOffset, int64(len(vals)))
- processed = 0
- block bitutils.BitBlockCount
- )
-
- for {
- block = blockCounter.NextFourWords()
- if block.Len == 0 {
- break
- }
-
- switch {
- case block.AllSet():
- processed, err = r.GetBatchWithDictInt64(dc,
vals[:block.Len])
- case block.NoneSet():
- dc.FillZero(vals[:block.Len])
- processed = int(block.Len)
- default:
- processed, err = r.getspacedInt64(dc, vals,
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
- }
-
- if err != nil {
- break
- }
-
- totalProcessed += processed
- vals = vals[int(block.Len):]
- validBitsOffset += int64(block.Len)
- if processed != int(block.Len) {
- break
- }
- }
- return
-}
-
-func (r *RleDecoder) getspacedInt64(dc DictionaryConverter, vals []int64,
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error)
{
- if nullCount == batchSize {
- dc.FillZero(vals[:batchSize])
- return batchSize, nil
- }
-
- read := 0
- remain := batchSize - nullCount
-
- const bufferSize = 1024
- var indexbuffer [bufferSize]IndexType
-
- // assume no bits to start
- bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset,
int64(batchSize))
- validRun := bitReader.NextRun()
- for read < batchSize {
- if validRun.Len == 0 {
- validRun = bitReader.NextRun()
- }
-
- if !validRun.Set {
- dc.FillZero(vals[:int(validRun.Len)])
- vals = vals[int(validRun.Len):]
- read += int(validRun.Len)
- validRun.Len = 0
- continue
- }
-
- if r.repCount == 0 && r.litCount == 0 {
- if !r.Next() {
- return read, nil
- }
- }
-
- var batch int
- switch {
- case r.repCount > 0:
- batch, remain, validRun = r.consumeRepeatCounts(read,
batchSize, remain, validRun, bitReader)
- current := IndexType(r.curVal)
- if !dc.IsValid(current) {
- return read, nil
- }
- dc.Fill(vals[:batch], current)
- case r.litCount > 0:
- var (
- litread int
- skipped int
- err error
- )
- litread, skipped, validRun, err =
r.consumeLiteralsInt64(dc, vals, remain, indexbuffer[:], validRun, bitReader)
- if err != nil {
- return read, err
- }
- batch = litread + skipped
- remain -= litread
- }
-
- vals = vals[batch:]
- read += batch
- }
- return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsInt64(dc DictionaryConverter, vals
[]int64, remain int, buf []IndexType, run bitutils.BitRun, bitRdr
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
- batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
- buf = buf[:batch]
-
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != batch {
- return 0, 0, run, xerrors.New("was not able to retrieve correct
number of indexes")
- }
-
- if !dc.IsValid(buf...) {
- return 0, 0, run, xerrors.New("invalid index values found for
dictionary converter")
- }
-
- var (
- read int
- skipped int
- )
- for read < batch {
- if run.Set {
- updateSize := utils.Min(batch-read, int(run.Len))
- if err := dc.Copy(vals, buf[read:read+updateSize]); err
!= nil {
- return 0, 0, run, err
- }
- read += updateSize
- vals = vals[updateSize:]
- run.Len -= int64(updateSize)
- } else {
- dc.FillZero(vals[:int(run.Len)])
- vals = vals[int(run.Len):]
- skipped += int(run.Len)
- run.Len = 0
- }
- if run.Len == 0 {
- run = bitRdr.NextRun()
- }
- }
- r.litCount -= int32(batch)
- return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictInt64(dc DictionaryConverter, vals
[]int64) (int, error) {
- var (
- read = 0
- size = len(vals)
- indexbuffer [1024]IndexType
- )
-
- for read < size {
- remain := size - read
-
- switch {
- case r.repCount > 0:
- idx := IndexType(r.curVal)
- if !dc.IsValid(idx) {
- return read, nil
- }
- batch := utils.Min(remain, int(r.repCount))
- if err := dc.Fill(vals[:batch], idx); err != nil {
- return read, err
- }
- r.repCount -= int32(batch)
- read += batch
- vals = vals[batch:]
- case r.litCount > 0:
- litbatch := utils.Min(utils.Min(remain,
int(r.litCount)), 1024)
- buf := indexbuffer[:litbatch]
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != litbatch {
- return read, nil
- }
- if !dc.IsValid(buf...) {
- return read, nil
- }
- if err := dc.Copy(vals, buf); err != nil {
- return read, nil
- }
- r.litCount -= int32(litbatch)
- read += litbatch
- vals = vals[litbatch:]
- default:
- if !r.Next() {
- return read, nil
- }
- }
- }
-
- return read, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictSpacedInt96(dc DictionaryConverter, vals
[]parquet.Int96, nullCount int, validBits []byte, validBitsOffset int64)
(totalProcessed int, err error) {
- if nullCount == 0 {
- return r.GetBatchWithDictInt96(dc, vals)
- }
-
- var (
- blockCounter = bitutils.NewBitBlockCounter(validBits,
validBitsOffset, int64(len(vals)))
- processed = 0
- block bitutils.BitBlockCount
- )
-
- for {
- block = blockCounter.NextFourWords()
- if block.Len == 0 {
- break
- }
-
- switch {
- case block.AllSet():
- processed, err = r.GetBatchWithDictInt96(dc,
vals[:block.Len])
- case block.NoneSet():
- dc.FillZero(vals[:block.Len])
- processed = int(block.Len)
- default:
- processed, err = r.getspacedInt96(dc, vals,
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
- }
-
- if err != nil {
- break
- }
-
- totalProcessed += processed
- vals = vals[int(block.Len):]
- validBitsOffset += int64(block.Len)
- if processed != int(block.Len) {
- break
- }
- }
- return
-}
-
-func (r *RleDecoder) getspacedInt96(dc DictionaryConverter, vals
[]parquet.Int96, batchSize, nullCount int, validBits []byte, validBitsOffset
int64) (int, error) {
- if nullCount == batchSize {
- dc.FillZero(vals[:batchSize])
- return batchSize, nil
- }
-
- read := 0
- remain := batchSize - nullCount
-
- const bufferSize = 1024
- var indexbuffer [bufferSize]IndexType
-
- // assume no bits to start
- bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset,
int64(batchSize))
- validRun := bitReader.NextRun()
- for read < batchSize {
- if validRun.Len == 0 {
- validRun = bitReader.NextRun()
- }
-
- if !validRun.Set {
- dc.FillZero(vals[:int(validRun.Len)])
- vals = vals[int(validRun.Len):]
- read += int(validRun.Len)
- validRun.Len = 0
- continue
- }
-
- if r.repCount == 0 && r.litCount == 0 {
- if !r.Next() {
- return read, nil
- }
- }
-
- var batch int
- switch {
- case r.repCount > 0:
- batch, remain, validRun = r.consumeRepeatCounts(read,
batchSize, remain, validRun, bitReader)
- current := IndexType(r.curVal)
- if !dc.IsValid(current) {
- return read, nil
- }
- dc.Fill(vals[:batch], current)
- case r.litCount > 0:
- var (
- litread int
- skipped int
- err error
- )
- litread, skipped, validRun, err =
r.consumeLiteralsInt96(dc, vals, remain, indexbuffer[:], validRun, bitReader)
- if err != nil {
- return read, err
- }
- batch = litread + skipped
- remain -= litread
- }
-
- vals = vals[batch:]
- read += batch
- }
- return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsInt96(dc DictionaryConverter, vals
[]parquet.Int96, remain int, buf []IndexType, run bitutils.BitRun, bitRdr
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
- batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
- buf = buf[:batch]
-
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != batch {
- return 0, 0, run, xerrors.New("was not able to retrieve correct
number of indexes")
- }
-
- if !dc.IsValid(buf...) {
- return 0, 0, run, xerrors.New("invalid index values found for
dictionary converter")
- }
-
- var (
- read int
- skipped int
- )
- for read < batch {
- if run.Set {
- updateSize := utils.Min(batch-read, int(run.Len))
- if err := dc.Copy(vals, buf[read:read+updateSize]); err
!= nil {
- return 0, 0, run, err
- }
- read += updateSize
- vals = vals[updateSize:]
- run.Len -= int64(updateSize)
- } else {
- dc.FillZero(vals[:int(run.Len)])
- vals = vals[int(run.Len):]
- skipped += int(run.Len)
- run.Len = 0
- }
- if run.Len == 0 {
- run = bitRdr.NextRun()
- }
- }
- r.litCount -= int32(batch)
- return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictInt96(dc DictionaryConverter, vals
[]parquet.Int96) (int, error) {
- var (
- read = 0
- size = len(vals)
- indexbuffer [1024]IndexType
- )
-
- for read < size {
- remain := size - read
-
- switch {
- case r.repCount > 0:
- idx := IndexType(r.curVal)
- if !dc.IsValid(idx) {
- return read, nil
- }
- batch := utils.Min(remain, int(r.repCount))
- if err := dc.Fill(vals[:batch], idx); err != nil {
- return read, err
- }
- r.repCount -= int32(batch)
- read += batch
- vals = vals[batch:]
- case r.litCount > 0:
- litbatch := utils.Min(utils.Min(remain,
int(r.litCount)), 1024)
- buf := indexbuffer[:litbatch]
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != litbatch {
- return read, nil
- }
- if !dc.IsValid(buf...) {
- return read, nil
- }
- if err := dc.Copy(vals, buf); err != nil {
- return read, nil
- }
- r.litCount -= int32(litbatch)
- read += litbatch
- vals = vals[litbatch:]
- default:
- if !r.Next() {
- return read, nil
- }
- }
- }
-
- return read, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictSpacedFloat32(dc DictionaryConverter,
vals []float32, nullCount int, validBits []byte, validBitsOffset int64)
(totalProcessed int, err error) {
- if nullCount == 0 {
- return r.GetBatchWithDictFloat32(dc, vals)
- }
-
- var (
- blockCounter = bitutils.NewBitBlockCounter(validBits,
validBitsOffset, int64(len(vals)))
- processed = 0
- block bitutils.BitBlockCount
- )
-
- for {
- block = blockCounter.NextFourWords()
- if block.Len == 0 {
- break
- }
-
- switch {
- case block.AllSet():
- processed, err = r.GetBatchWithDictFloat32(dc,
vals[:block.Len])
- case block.NoneSet():
- dc.FillZero(vals[:block.Len])
- processed = int(block.Len)
- default:
- processed, err = r.getspacedFloat32(dc, vals,
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
- }
-
- if err != nil {
- break
- }
-
- totalProcessed += processed
- vals = vals[int(block.Len):]
- validBitsOffset += int64(block.Len)
- if processed != int(block.Len) {
- break
- }
- }
- return
-}
-
-func (r *RleDecoder) getspacedFloat32(dc DictionaryConverter, vals []float32,
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error)
{
- if nullCount == batchSize {
- dc.FillZero(vals[:batchSize])
- return batchSize, nil
- }
-
- read := 0
- remain := batchSize - nullCount
-
- const bufferSize = 1024
- var indexbuffer [bufferSize]IndexType
-
- // assume no bits to start
- bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset,
int64(batchSize))
- validRun := bitReader.NextRun()
- for read < batchSize {
- if validRun.Len == 0 {
- validRun = bitReader.NextRun()
- }
-
- if !validRun.Set {
- dc.FillZero(vals[:int(validRun.Len)])
- vals = vals[int(validRun.Len):]
- read += int(validRun.Len)
- validRun.Len = 0
- continue
- }
-
- if r.repCount == 0 && r.litCount == 0 {
- if !r.Next() {
- return read, nil
- }
- }
-
- var batch int
- switch {
- case r.repCount > 0:
- batch, remain, validRun = r.consumeRepeatCounts(read,
batchSize, remain, validRun, bitReader)
- current := IndexType(r.curVal)
- if !dc.IsValid(current) {
- return read, nil
- }
- dc.Fill(vals[:batch], current)
- case r.litCount > 0:
- var (
- litread int
- skipped int
- err error
- )
- litread, skipped, validRun, err =
r.consumeLiteralsFloat32(dc, vals, remain, indexbuffer[:], validRun, bitReader)
- if err != nil {
- return read, err
- }
- batch = litread + skipped
- remain -= litread
- }
-
- vals = vals[batch:]
- read += batch
- }
- return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsFloat32(dc DictionaryConverter, vals
[]float32, remain int, buf []IndexType, run bitutils.BitRun, bitRdr
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
- batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
- buf = buf[:batch]
-
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != batch {
- return 0, 0, run, xerrors.New("was not able to retrieve correct
number of indexes")
- }
-
- if !dc.IsValid(buf...) {
- return 0, 0, run, xerrors.New("invalid index values found for
dictionary converter")
- }
-
- var (
- read int
- skipped int
- )
- for read < batch {
- if run.Set {
- updateSize := utils.Min(batch-read, int(run.Len))
- if err := dc.Copy(vals, buf[read:read+updateSize]); err
!= nil {
- return 0, 0, run, err
- }
- read += updateSize
- vals = vals[updateSize:]
- run.Len -= int64(updateSize)
- } else {
- dc.FillZero(vals[:int(run.Len)])
- vals = vals[int(run.Len):]
- skipped += int(run.Len)
- run.Len = 0
- }
- if run.Len == 0 {
- run = bitRdr.NextRun()
- }
- }
- r.litCount -= int32(batch)
- return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictFloat32(dc DictionaryConverter, vals
[]float32) (int, error) {
- var (
- read = 0
- size = len(vals)
- indexbuffer [1024]IndexType
- )
-
- for read < size {
- remain := size - read
-
- switch {
- case r.repCount > 0:
- idx := IndexType(r.curVal)
- if !dc.IsValid(idx) {
- return read, nil
- }
- batch := utils.Min(remain, int(r.repCount))
- if err := dc.Fill(vals[:batch], idx); err != nil {
- return read, err
- }
- r.repCount -= int32(batch)
- read += batch
- vals = vals[batch:]
- case r.litCount > 0:
- litbatch := utils.Min(utils.Min(remain,
int(r.litCount)), 1024)
- buf := indexbuffer[:litbatch]
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != litbatch {
- return read, nil
- }
- if !dc.IsValid(buf...) {
- return read, nil
- }
- if err := dc.Copy(vals, buf); err != nil {
- return read, nil
- }
- r.litCount -= int32(litbatch)
- read += litbatch
- vals = vals[litbatch:]
- default:
- if !r.Next() {
- return read, nil
- }
- }
- }
-
- return read, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictSpacedFloat64(dc DictionaryConverter,
vals []float64, nullCount int, validBits []byte, validBitsOffset int64)
(totalProcessed int, err error) {
- if nullCount == 0 {
- return r.GetBatchWithDictFloat64(dc, vals)
- }
-
- var (
- blockCounter = bitutils.NewBitBlockCounter(validBits,
validBitsOffset, int64(len(vals)))
- processed = 0
- block bitutils.BitBlockCount
- )
-
- for {
- block = blockCounter.NextFourWords()
- if block.Len == 0 {
- break
- }
-
- switch {
- case block.AllSet():
- processed, err = r.GetBatchWithDictFloat64(dc,
vals[:block.Len])
- case block.NoneSet():
- dc.FillZero(vals[:block.Len])
- processed = int(block.Len)
- default:
- processed, err = r.getspacedFloat64(dc, vals,
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
- }
-
- if err != nil {
- break
- }
-
- totalProcessed += processed
- vals = vals[int(block.Len):]
- validBitsOffset += int64(block.Len)
- if processed != int(block.Len) {
- break
- }
- }
- return
-}
-
-func (r *RleDecoder) getspacedFloat64(dc DictionaryConverter, vals []float64,
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error)
{
- if nullCount == batchSize {
- dc.FillZero(vals[:batchSize])
- return batchSize, nil
- }
-
- read := 0
- remain := batchSize - nullCount
-
- const bufferSize = 1024
- var indexbuffer [bufferSize]IndexType
-
- // assume no bits to start
- bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset,
int64(batchSize))
- validRun := bitReader.NextRun()
- for read < batchSize {
- if validRun.Len == 0 {
- validRun = bitReader.NextRun()
- }
-
- if !validRun.Set {
- dc.FillZero(vals[:int(validRun.Len)])
- vals = vals[int(validRun.Len):]
- read += int(validRun.Len)
- validRun.Len = 0
- continue
- }
-
- if r.repCount == 0 && r.litCount == 0 {
- if !r.Next() {
- return read, nil
- }
- }
-
- var batch int
- switch {
- case r.repCount > 0:
- batch, remain, validRun = r.consumeRepeatCounts(read,
batchSize, remain, validRun, bitReader)
- current := IndexType(r.curVal)
- if !dc.IsValid(current) {
- return read, nil
- }
- dc.Fill(vals[:batch], current)
- case r.litCount > 0:
- var (
- litread int
- skipped int
- err error
- )
- litread, skipped, validRun, err =
r.consumeLiteralsFloat64(dc, vals, remain, indexbuffer[:], validRun, bitReader)
- if err != nil {
- return read, err
- }
- batch = litread + skipped
- remain -= litread
- }
-
- vals = vals[batch:]
- read += batch
- }
- return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsFloat64(dc DictionaryConverter, vals
[]float64, remain int, buf []IndexType, run bitutils.BitRun, bitRdr
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
- batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
- buf = buf[:batch]
-
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != batch {
- return 0, 0, run, xerrors.New("was not able to retrieve correct
number of indexes")
- }
-
- if !dc.IsValid(buf...) {
- return 0, 0, run, xerrors.New("invalid index values found for
dictionary converter")
- }
-
- var (
- read int
- skipped int
- )
- for read < batch {
- if run.Set {
- updateSize := utils.Min(batch-read, int(run.Len))
- if err := dc.Copy(vals, buf[read:read+updateSize]); err
!= nil {
- return 0, 0, run, err
- }
- read += updateSize
- vals = vals[updateSize:]
- run.Len -= int64(updateSize)
- } else {
- dc.FillZero(vals[:int(run.Len)])
- vals = vals[int(run.Len):]
- skipped += int(run.Len)
- run.Len = 0
- }
- if run.Len == 0 {
- run = bitRdr.NextRun()
- }
- }
- r.litCount -= int32(batch)
- return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictFloat64(dc DictionaryConverter, vals
[]float64) (int, error) {
- var (
- read = 0
- size = len(vals)
- indexbuffer [1024]IndexType
- )
-
- for read < size {
- remain := size - read
-
- switch {
- case r.repCount > 0:
- idx := IndexType(r.curVal)
- if !dc.IsValid(idx) {
- return read, nil
- }
- batch := utils.Min(remain, int(r.repCount))
- if err := dc.Fill(vals[:batch], idx); err != nil {
- return read, err
- }
- r.repCount -= int32(batch)
- read += batch
- vals = vals[batch:]
- case r.litCount > 0:
- litbatch := utils.Min(utils.Min(remain,
int(r.litCount)), 1024)
- buf := indexbuffer[:litbatch]
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != litbatch {
- return read, nil
- }
- if !dc.IsValid(buf...) {
- return read, nil
- }
- if err := dc.Copy(vals, buf); err != nil {
- return read, nil
- }
- r.litCount -= int32(litbatch)
- read += litbatch
- vals = vals[litbatch:]
- default:
- if !r.Next() {
- return read, nil
- }
- }
- }
-
- return read, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictSpacedByteArray(dc DictionaryConverter,
vals []parquet.ByteArray, nullCount int, validBits []byte, validBitsOffset
int64) (totalProcessed int, err error) {
- if nullCount == 0 {
- return r.GetBatchWithDictByteArray(dc, vals)
- }
-
- var (
- blockCounter = bitutils.NewBitBlockCounter(validBits,
validBitsOffset, int64(len(vals)))
- processed = 0
- block bitutils.BitBlockCount
- )
-
- for {
- block = blockCounter.NextFourWords()
- if block.Len == 0 {
- break
- }
-
- switch {
- case block.AllSet():
- processed, err = r.GetBatchWithDictByteArray(dc,
vals[:block.Len])
- case block.NoneSet():
- dc.FillZero(vals[:block.Len])
- processed = int(block.Len)
- default:
- processed, err = r.getspacedByteArray(dc, vals,
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
- }
-
- if err != nil {
- break
- }
-
- totalProcessed += processed
- vals = vals[int(block.Len):]
- validBitsOffset += int64(block.Len)
- if processed != int(block.Len) {
- break
- }
- }
- return
-}
-
-func (r *RleDecoder) getspacedByteArray(dc DictionaryConverter, vals
[]parquet.ByteArray, batchSize, nullCount int, validBits []byte,
validBitsOffset int64) (int, error) {
- if nullCount == batchSize {
- dc.FillZero(vals[:batchSize])
- return batchSize, nil
- }
-
- read := 0
- remain := batchSize - nullCount
-
- const bufferSize = 1024
- var indexbuffer [bufferSize]IndexType
-
- // assume no bits to start
- bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset,
int64(batchSize))
- validRun := bitReader.NextRun()
- for read < batchSize {
- if validRun.Len == 0 {
- validRun = bitReader.NextRun()
- }
-
- if !validRun.Set {
- dc.FillZero(vals[:int(validRun.Len)])
- vals = vals[int(validRun.Len):]
- read += int(validRun.Len)
- validRun.Len = 0
- continue
- }
-
- if r.repCount == 0 && r.litCount == 0 {
- if !r.Next() {
- return read, nil
- }
- }
-
- var batch int
- switch {
- case r.repCount > 0:
- batch, remain, validRun = r.consumeRepeatCounts(read,
batchSize, remain, validRun, bitReader)
- current := IndexType(r.curVal)
- if !dc.IsValid(current) {
- return read, nil
- }
- dc.Fill(vals[:batch], current)
- case r.litCount > 0:
- var (
- litread int
- skipped int
- err error
- )
- litread, skipped, validRun, err =
r.consumeLiteralsByteArray(dc, vals, remain, indexbuffer[:], validRun,
bitReader)
- if err != nil {
- return read, err
- }
- batch = litread + skipped
- remain -= litread
- }
-
- vals = vals[batch:]
- read += batch
- }
- return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsByteArray(dc DictionaryConverter, vals
[]parquet.ByteArray, remain int, buf []IndexType, run bitutils.BitRun, bitRdr
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
- batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
- buf = buf[:batch]
-
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != batch {
- return 0, 0, run, xerrors.New("was not able to retrieve correct
number of indexes")
- }
-
- if !dc.IsValid(buf...) {
- return 0, 0, run, xerrors.New("invalid index values found for
dictionary converter")
- }
-
- var (
- read int
- skipped int
- )
- for read < batch {
- if run.Set {
- updateSize := utils.Min(batch-read, int(run.Len))
- if err := dc.Copy(vals, buf[read:read+updateSize]); err
!= nil {
- return 0, 0, run, err
- }
- read += updateSize
- vals = vals[updateSize:]
- run.Len -= int64(updateSize)
- } else {
- dc.FillZero(vals[:int(run.Len)])
- vals = vals[int(run.Len):]
- skipped += int(run.Len)
- run.Len = 0
- }
- if run.Len == 0 {
- run = bitRdr.NextRun()
- }
- }
- r.litCount -= int32(batch)
- return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictByteArray(dc DictionaryConverter, vals
[]parquet.ByteArray) (int, error) {
- var (
- read = 0
- size = len(vals)
- indexbuffer [1024]IndexType
- )
-
- for read < size {
- remain := size - read
-
- switch {
- case r.repCount > 0:
- idx := IndexType(r.curVal)
- if !dc.IsValid(idx) {
- return read, nil
- }
- batch := utils.Min(remain, int(r.repCount))
- if err := dc.Fill(vals[:batch], idx); err != nil {
- return read, err
- }
- r.repCount -= int32(batch)
- read += batch
- vals = vals[batch:]
- case r.litCount > 0:
- litbatch := utils.Min(utils.Min(remain,
int(r.litCount)), 1024)
- buf := indexbuffer[:litbatch]
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != litbatch {
- return read, nil
- }
- if !dc.IsValid(buf...) {
- return read, nil
- }
- if err := dc.Copy(vals, buf); err != nil {
- return read, nil
- }
- r.litCount -= int32(litbatch)
- read += litbatch
- vals = vals[litbatch:]
- default:
- if !r.Next() {
- return read, nil
- }
- }
- }
-
- return read, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictSpacedFixedLenByteArray(dc
DictionaryConverter, vals []parquet.FixedLenByteArray, nullCount int, validBits
[]byte, validBitsOffset int64) (totalProcessed int, err error) {
- if nullCount == 0 {
- return r.GetBatchWithDictFixedLenByteArray(dc, vals)
- }
-
- var (
- blockCounter = bitutils.NewBitBlockCounter(validBits,
validBitsOffset, int64(len(vals)))
- processed = 0
- block bitutils.BitBlockCount
- )
-
- for {
- block = blockCounter.NextFourWords()
- if block.Len == 0 {
- break
- }
-
- switch {
- case block.AllSet():
- processed, err =
r.GetBatchWithDictFixedLenByteArray(dc, vals[:block.Len])
- case block.NoneSet():
- dc.FillZero(vals[:block.Len])
- processed = int(block.Len)
- default:
- processed, err = r.getspacedFixedLenByteArray(dc, vals,
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
- }
-
- if err != nil {
- break
- }
-
- totalProcessed += processed
- vals = vals[int(block.Len):]
- validBitsOffset += int64(block.Len)
- if processed != int(block.Len) {
- break
- }
- }
- return
-}
-
-func (r *RleDecoder) getspacedFixedLenByteArray(dc DictionaryConverter, vals
[]parquet.FixedLenByteArray, batchSize, nullCount int, validBits []byte,
validBitsOffset int64) (int, error) {
- if nullCount == batchSize {
- dc.FillZero(vals[:batchSize])
- return batchSize, nil
- }
-
- read := 0
- remain := batchSize - nullCount
-
- const bufferSize = 1024
- var indexbuffer [bufferSize]IndexType
-
- // assume no bits to start
- bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset,
int64(batchSize))
- validRun := bitReader.NextRun()
- for read < batchSize {
- if validRun.Len == 0 {
- validRun = bitReader.NextRun()
- }
-
- if !validRun.Set {
- dc.FillZero(vals[:int(validRun.Len)])
- vals = vals[int(validRun.Len):]
- read += int(validRun.Len)
- validRun.Len = 0
- continue
- }
-
- if r.repCount == 0 && r.litCount == 0 {
- if !r.Next() {
- return read, nil
- }
- }
-
- var batch int
- switch {
- case r.repCount > 0:
- batch, remain, validRun = r.consumeRepeatCounts(read,
batchSize, remain, validRun, bitReader)
- current := IndexType(r.curVal)
- if !dc.IsValid(current) {
- return read, nil
- }
- dc.Fill(vals[:batch], current)
- case r.litCount > 0:
- var (
- litread int
- skipped int
- err error
- )
- litread, skipped, validRun, err =
r.consumeLiteralsFixedLenByteArray(dc, vals, remain, indexbuffer[:], validRun,
bitReader)
- if err != nil {
- return read, err
- }
- batch = litread + skipped
- remain -= litread
- }
-
- vals = vals[batch:]
- read += batch
- }
- return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsFixedLenByteArray(dc DictionaryConverter,
vals []parquet.FixedLenByteArray, remain int, buf []IndexType, run
bitutils.BitRun, bitRdr bitutils.BitRunReader) (int, int, bitutils.BitRun,
error) {
- batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
- buf = buf[:batch]
-
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != batch {
- return 0, 0, run, xerrors.New("was not able to retrieve correct
number of indexes")
- }
-
- if !dc.IsValid(buf...) {
- return 0, 0, run, xerrors.New("invalid index values found for
dictionary converter")
- }
-
- var (
- read int
- skipped int
- )
- for read < batch {
- if run.Set {
- updateSize := utils.Min(batch-read, int(run.Len))
- if err := dc.Copy(vals, buf[read:read+updateSize]); err
!= nil {
- return 0, 0, run, err
- }
- read += updateSize
- vals = vals[updateSize:]
- run.Len -= int64(updateSize)
- } else {
- dc.FillZero(vals[:int(run.Len)])
- vals = vals[int(run.Len):]
- skipped += int(run.Len)
- run.Len = 0
- }
- if run.Len == 0 {
- run = bitRdr.NextRun()
- }
- }
- r.litCount -= int32(batch)
- return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictFixedLenByteArray(dc DictionaryConverter,
vals []parquet.FixedLenByteArray) (int, error) {
- var (
- read = 0
- size = len(vals)
- indexbuffer [1024]IndexType
- )
-
- for read < size {
- remain := size - read
-
- switch {
- case r.repCount > 0:
- idx := IndexType(r.curVal)
- if !dc.IsValid(idx) {
- return read, nil
- }
- batch := utils.Min(remain, int(r.repCount))
- if err := dc.Fill(vals[:batch], idx); err != nil {
- return read, err
- }
- r.repCount -= int32(batch)
- read += batch
- vals = vals[batch:]
- case r.litCount > 0:
- litbatch := utils.Min(utils.Min(remain,
int(r.litCount)), 1024)
- buf := indexbuffer[:litbatch]
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != litbatch {
- return read, nil
- }
- if !dc.IsValid(buf...) {
- return read, nil
- }
- if err := dc.Copy(vals, buf); err != nil {
- return read, nil
- }
- r.litCount -= int32(litbatch)
- read += litbatch
- vals = vals[litbatch:]
- default:
- if !r.Next() {
- return read, nil
- }
- }
- }
-
- return read, nil
-}
diff --git a/parquet/internal/utils/typed_rle_dict.gen.go.tmpl
b/parquet/internal/utils/typed_rle_dict.gen.go.tmpl
deleted file mode 100644
index 4bce437f..00000000
--- a/parquet/internal/utils/typed_rle_dict.gen.go.tmpl
+++ /dev/null
@@ -1,220 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package utils
-
-import (
- "github.com/apache/arrow-go/v18/parquet"
- "github.com/apache/arrow-go/v18/internal/bitutils"
- "github.com/apache/arrow-go/v18/internal/utils"
-)
-
-{{range .In}}
-{{if ne .Name "Boolean"}}
-func (r *RleDecoder) GetBatchWithDictSpaced{{.Name}}(dc DictionaryConverter,
vals []{{.name}}, nullCount int, validBits []byte, validBitsOffset int64)
(totalProcessed int, err error) {
- if nullCount == 0 {
- return r.GetBatchWithDict{{.Name}}(dc, vals)
- }
-
- var (
- blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset,
int64(len(vals)))
- processed = 0
- block bitutils.BitBlockCount
- )
-
- for {
- block = blockCounter.NextFourWords()
- if block.Len == 0 {
- break
- }
-
- switch {
- case block.AllSet():
- processed, err = r.GetBatchWithDict{{.Name}}(dc, vals[:block.Len])
- case block.NoneSet():
- dc.FillZero(vals[:block.Len])
- processed = int(block.Len)
- default:
- processed, err = r.getspaced{{.Name}}(dc, vals, int(block.Len),
int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
- }
-
- if err != nil {
- break
- }
-
- totalProcessed += processed
- vals = vals[int(block.Len):]
- validBitsOffset += int64(block.Len)
- if processed != int(block.Len) {
- break
- }
- }
- return
-}
-
-func (r *RleDecoder) getspaced{{.Name}}(dc DictionaryConverter, vals
[]{{.name}}, batchSize, nullCount int, validBits []byte, validBitsOffset int64)
(int, error) {
- if nullCount == batchSize {
- dc.FillZero(vals[:batchSize])
- return batchSize, nil
- }
-
- read := 0
- remain := batchSize - nullCount
-
- const bufferSize = 1024
- var indexbuffer [bufferSize]IndexType
-
- // assume no bits to start
- bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset,
int64(batchSize))
- validRun := bitReader.NextRun()
- for read < batchSize {
- if validRun.Len == 0 {
- validRun = bitReader.NextRun()
- }
-
- if !validRun.Set {
- dc.FillZero(vals[:int(validRun.Len)])
- vals = vals[int(validRun.Len):]
- read += int(validRun.Len)
- validRun.Len = 0
- continue
- }
-
- if r.repCount == 0 && r.litCount == 0 {
- if !r.Next() {
- return read, nil
- }
- }
-
- var batch int
- switch {
- case r.repCount > 0:
- batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain,
validRun, bitReader)
- current := IndexType(r.curVal)
- if !dc.IsValid(current) {
- return read, nil
- }
- dc.Fill(vals[:batch], current)
- case r.litCount > 0:
- var (
- litread int
- skipped int
- err error
- )
- litread, skipped, validRun, err = r.consumeLiterals{{.Name}}(dc, vals,
remain, indexbuffer[:], validRun, bitReader)
- if err != nil {
- return read, err
- }
- batch = litread + skipped
- remain -= litread
- }
-
- vals = vals[batch:]
- read += batch
- }
- return read, nil
-}
-
-func (r *RleDecoder) consumeLiterals{{.Name}}(dc DictionaryConverter, vals
[]{{.name}}, remain int, buf []IndexType, run bitutils.BitRun, bitRdr
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
- batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
- buf = buf[:batch]
-
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != batch {
- return 0, 0, run, xerrors.New("was not able to retrieve correct
number of indexes")
- }
-
- if !dc.IsValid(buf...) {
- return 0, 0, run, xerrors.New("invalid index values found for
dictionary converter")
- }
-
- var (
- read int
- skipped int
- )
- for read < batch {
- if run.Set {
- updateSize := utils.Min(batch-read, int(run.Len))
- if err := dc.Copy(vals, buf[read:read+updateSize]); err
!= nil {
- return 0, 0, run, err
- }
- read += updateSize
- vals = vals[updateSize:]
- run.Len -= int64(updateSize)
- } else {
- dc.FillZero(vals[:int(run.Len)])
- vals = vals[int(run.Len):]
- skipped += int(run.Len)
- run.Len = 0
- }
- if run.Len == 0 {
- run = bitRdr.NextRun()
- }
- }
- r.litCount -= int32(batch)
- return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDict{{.Name}}(dc DictionaryConverter, vals
[]{{.name}}) (int, error) {
- var (
- read = 0
- size = len(vals)
- indexbuffer [1024]IndexType
- )
-
- for read < size {
- remain := size - read
-
- switch {
- case r.repCount > 0:
- idx := IndexType(r.curVal)
- if !dc.IsValid(idx) {
- return read, nil
- }
- batch := utils.Min(remain, int(r.repCount))
- if err := dc.Fill(vals[:batch], idx); err != nil {
- return read, err
- }
- r.repCount -= int32(batch)
- read += batch
- vals = vals[batch:]
- case r.litCount > 0:
- litbatch := utils.Min(utils.Min(remain, int(r.litCount)), 1024)
- buf := indexbuffer[:litbatch]
- n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
- if n != litbatch {
- return read, nil
- }
- if !dc.IsValid(buf...) {
- return read, nil
- }
- if err := dc.Copy(vals, buf); err != nil {
- return read, nil
- }
- r.litCount -= int32(litbatch)
- read += litbatch
- vals = vals[litbatch:]
- default:
- if !r.Next() {
- return read, nil
- }
- }
- }
-
- return read, nil
-}
-{{end}}
-{{end}}
diff --git a/parquet/internal/utils/typed_rle_dict.go
b/parquet/internal/utils/typed_rle_dict.go
new file mode 100644
index 00000000..d7cb6d69
--- /dev/null
+++ b/parquet/internal/utils/typed_rle_dict.go
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package utils
+
+import (
+ "bytes"
+
+ "github.com/apache/arrow-go/v18/internal/bitutils"
+ "github.com/apache/arrow-go/v18/parquet"
+ "golang.org/x/xerrors"
+)
+
+func getspaced[T parquet.ColumnTypes | uint64](r *RleDecoder, dc
DictionaryConverter[T], vals []T, batchSize, nullCount int, validBits []byte,
validBitsOffset int64) (int, error) {
+ if nullCount == batchSize {
+ dc.FillZero(vals[:batchSize])
+ return batchSize, nil
+ }
+
+ read := 0
+ remain := batchSize - nullCount
+
+ const bufferSize = 1024
+ var indexbuffer [bufferSize]IndexType
+
+ // assume no bits to start
+ bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset,
int64(batchSize))
+ validRun := bitReader.NextRun()
+ for read < batchSize {
+ if validRun.Len == 0 {
+ validRun = bitReader.NextRun()
+ }
+
+ if !validRun.Set {
+ dc.FillZero(vals[:int(validRun.Len)])
+ vals = vals[int(validRun.Len):]
+ read += int(validRun.Len)
+ validRun.Len = 0
+ continue
+ }
+
+ if r.repCount == 0 && r.litCount == 0 {
+ if !r.Next() {
+ return read, nil
+ }
+ }
+
+ var batch int
+ switch {
+ case r.repCount > 0:
+ batch, remain, validRun = r.consumeRepeatCounts(read,
batchSize, remain, validRun, bitReader)
+ current := IndexType(r.curVal)
+ if !dc.IsValidSingle(current) {
+ return read, nil
+ }
+ dc.Fill(vals[:batch], current)
+ case r.litCount > 0:
+ var (
+ litread int
+ skipped int
+ err error
+ )
+ litread, skipped, validRun, err = consumeLiterals(r,
dc, vals, remain, indexbuffer[:], validRun, bitReader)
+ if err != nil {
+ return read, err
+ }
+ batch = litread + skipped
+ remain -= litread
+ }
+
+ vals = vals[batch:]
+ read += batch
+ }
+ return read, nil
+}
+
+func consumeLiterals[T parquet.ColumnTypes | uint64](r *RleDecoder, dc
DictionaryConverter[T], vals []T, remain int, buf []IndexType, run
bitutils.BitRun, bitRdr bitutils.BitRunReader) (int, int, bitutils.BitRun,
error) {
+ batch := min(remain, int(r.litCount), len(buf))
+ buf = buf[:batch]
+
+ n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
+ if n != batch {
+ return 0, 0, run, xerrors.New("was not able to retrieve correct
number of indexes")
+ }
+
+ if !dc.IsValid(buf...) {
+ return 0, 0, run, xerrors.New("invalid index values found for
dictionary converter")
+ }
+
+ var (
+ read int
+ skipped int
+ )
+ for read < batch {
+ if run.Set {
+ updateSize := min(batch-read, int(run.Len))
+ if err := dc.Copy(vals, buf[read:read+updateSize]); err
!= nil {
+ return 0, 0, run, err
+ }
+ read += updateSize
+ vals = vals[updateSize:]
+ run.Len -= int64(updateSize)
+ } else {
+ dc.FillZero(vals[:int(run.Len)])
+ vals = vals[int(run.Len):]
+ skipped += int(run.Len)
+ run.Len = 0
+ }
+ if run.Len == 0 {
+ run = bitRdr.NextRun()
+ }
+ }
+ r.litCount -= int32(batch)
+ return read, skipped, run, nil
+}
+
+type TypedRleDecoder[T parquet.ColumnTypes | uint64] struct {
+ RleDecoder
+}
+
+func NewTypedRleDecoder[T parquet.ColumnTypes | uint64](data *bytes.Reader,
width int) *TypedRleDecoder[T] {
+ return &TypedRleDecoder[T]{
+ RleDecoder: RleDecoder{r: NewBitReader(data), bitWidth: width},
+ }
+}
+
+func (r *TypedRleDecoder[T]) GetBatchWithDict(dc DictionaryConverter[T], vals
[]T) (int, error) {
+ var (
+ read = 0
+ size = len(vals)
+ indexbuffer [1024]IndexType
+ )
+
+ for read < size {
+ remain := size - read
+
+ switch {
+ case r.repCount > 0:
+ idx := IndexType(r.curVal)
+ if !dc.IsValidSingle(idx) {
+ return read, nil
+ }
+ batch := min(remain, int(r.repCount))
+ if err := dc.Fill(vals[:batch], idx); err != nil {
+ return read, err
+ }
+ r.repCount -= int32(batch)
+ read += batch
+ vals = vals[batch:]
+ case r.litCount > 0:
+ litbatch := min(remain, int(r.litCount), 1024)
+ buf := indexbuffer[:litbatch]
+ n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
+ if n != litbatch {
+ return read, nil
+ }
+ if !dc.IsValid(buf...) {
+ return read, nil
+ }
+ if err := dc.Copy(vals, buf); err != nil {
+ return read, nil
+ }
+ r.litCount -= int32(litbatch)
+ read += litbatch
+ vals = vals[litbatch:]
+ default:
+ if !r.Next() {
+ return read, nil
+ }
+ }
+ }
+
+ return read, nil
+}
+
+func (r *TypedRleDecoder[T]) GetBatchWithDictSpaced(dc DictionaryConverter[T],
vals []T, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
+ if nullCount == 0 {
+ return r.GetBatchWithDict(dc, vals)
+ }
+
+ var (
+ blockCounter = bitutils.NewBitBlockCounter(validBits,
validBitsOffset, int64(len(vals)))
+ processed = 0
+ totalProcessed = 0
+ block bitutils.BitBlockCount
+ err error
+ )
+
+ for {
+ block = blockCounter.NextFourWords()
+ if block.Len == 0 {
+ break
+ }
+
+ switch {
+ case block.AllSet():
+ processed, err = r.GetBatchWithDict(dc,
vals[:block.Len])
+ case block.NoneSet():
+ dc.FillZero(vals[:block.Len])
+ processed = int(block.Len)
+ default:
+ processed, err = getspaced(&r.RleDecoder, dc, vals,
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
+ }
+
+ if err != nil {
+ break
+ }
+
+ totalProcessed += processed
+ vals = vals[int(block.Len):]
+ validBitsOffset += int64(block.Len)
+ if processed != int(block.Len) {
+ break
+ }
+ }
+ return totalProcessed, err
+}