This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 32115946 Implement RLE dictionary decoder using generics (#477)
32115946 is described below

commit 321159468711a8fac9a6c52a956424a390274da2
Author: daniel-adam-tfs <[email protected]>
AuthorDate: Wed Aug 27 20:38:25 2025 +0200

    Implement RLE dictionary decoder using generics (#477)
    
    ### Rationale for this change
    
    Performance improvement of reading data from RLE-dictionary encoded
    parquet columns.
    
    ### What changes are included in this PR?
    
    The DictionaryConverter interface was changed to DictionaryConverter[T]
    interface, so the methods could take []T as a parameter instead of
    interface{}. This required changed to several types implementing the
    interface.
    
    The template code generation for RleDecoder was removed and the methods
    for the parquet types are now handled by the generic TypedRleDecoder
    type.
    
    Additionally, some utils.Min/utils.Max calls were replace with calls to
    min/max intrinsics
    
    ### Are these changes tested?
    
    Yes, tests verifying the RLE dictionary already existed and I've added
    BenchmarkReadInt32Column to measure the performance improvement.
    
    ### Are there any user-facing changes?
    
    Some exported types functions were changed:
    DictionaryConverter -> DictionaryConverter[T]
    RleDecoder -> split into RleDecoder and TypeRleDecoder[T]
    
    func (b *BitReader) GetAligned(nbytes int, v interface{}) bool -> split
    into getAlignedUint8, getAlignedUint16, getAlignedUint32,
    getAlignedUint64
---
 parquet/file/column_reader_test.go                |   82 ++
 parquet/internal/encoding/byte_array_decoder.go   |    2 +-
 parquet/internal/encoding/decoder.go              |   42 +-
 parquet/internal/encoding/typed_encoder.go        |   58 +-
 parquet/internal/utils/bit_reader.go              |  104 +-
 parquet/internal/utils/dictionary.go              |   71 +-
 parquet/internal/utils/physical_types.tmpldata    |   52 -
 parquet/internal/utils/rle.go                     |  198 +--
 parquet/internal/utils/typed_rle_dict.gen.go      | 1377 ---------------------
 parquet/internal/utils/typed_rle_dict.gen.go.tmpl |  220 ----
 parquet/internal/utils/typed_rle_dict.go          |  230 ++++
 11 files changed, 476 insertions(+), 1960 deletions(-)

diff --git a/parquet/file/column_reader_test.go 
b/parquet/file/column_reader_test.go
index 575bda4f..25c26bc8 100644
--- a/parquet/file/column_reader_test.go
+++ b/parquet/file/column_reader_test.go
@@ -21,6 +21,8 @@ import (
        "fmt"
        "math"
        "math/rand"
+       "os"
+       "path/filepath"
        "reflect"
        "runtime"
        "sync"
@@ -810,3 +812,83 @@ func TestFullSeekRow(t *testing.T) {
                })
        }
 }
+
+func BenchmarkReadInt32Column(b *testing.B) {
+       // generate parquet with RLE-dictionary encoded int32 column
+       tempdir := b.TempDir()
+       filepath := filepath.Join(tempdir, "rle-dict-int32.parquet")
+
+       props := parquet.NewWriterProperties(
+               parquet.WithDictionaryDefault(true),
+               parquet.WithDataPageSize(128*1024*1024), // 128MB
+               parquet.WithBatchSize(128*1024*1024),
+               parquet.WithMaxRowGroupLength(100_000),
+               parquet.WithDataPageVersion(parquet.DataPageV2),
+               parquet.WithVersion(parquet.V2_LATEST),
+       )
+       outFile, err := os.Create(filepath)
+       require.NoError(b, err)
+
+       sc, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, 
schema.FieldList{
+               schema.NewInt32Node("col", parquet.Repetitions.Required, -1),
+       }, -1)
+       require.NoError(b, err)
+
+       writer := file.NewParquetWriter(outFile, sc, 
file.WithWriterProps(props))
+
+       // 10 row groups of 100 000 rows = 1 000 000 rows in total
+       value := int32(1)
+       for range 10 {
+               rgWriter := writer.AppendBufferedRowGroup()
+               cwr, _ := rgWriter.Column(0)
+               cw := cwr.(*file.Int32ColumnChunkWriter)
+               valuesIn := make([]int32, 0, 100_000)
+               repeats := 1
+               for len(valuesIn) < 100_000 {
+                       repeatedValue := make([]int32, repeats)
+                       for i := range repeatedValue {
+                               repeatedValue[i] = value
+                       }
+                       if len(valuesIn)+len(repeatedValue) > 100_000 {
+                               repeatedValue = 
repeatedValue[:100_000-len(valuesIn)]
+                       }
+                       valuesIn = append(valuesIn, repeatedValue[:]...)
+                       // repeat values from 1 to 50 times
+                       repeats = (repeats % 50) + 1
+                       value++
+               }
+               cw.WriteBatch(valuesIn, nil, nil)
+               rgWriter.Close()
+       }
+       err = writer.Close()
+       require.NoError(b, err)
+
+       reader, err := file.OpenParquetFile(filepath, false)
+       require.NoError(b, err)
+       defer reader.Close()
+
+       numValues := reader.NumRows()
+       values := make([]int32, numValues)
+       b.StopTimer()
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               startIndex := 0
+               for rg := 0; rg < reader.NumRowGroups(); rg++ {
+                       rgReader := reader.RowGroup(rg)
+                       colReader, err := rgReader.Column(0)
+                       require.NoError(b, err)
+
+                       cr, ok := colReader.(*file.Int32ColumnChunkReader)
+                       require.True(b, ok)
+
+                       b.StartTimer()
+                       _, valuesRead, err := cr.ReadBatch(rgReader.NumRows(), 
values, nil, nil)
+                       b.StopTimer()
+                       require.NoError(b, err)
+
+                       startIndex += valuesRead
+                       require.Equal(b, rgReader.NumRows(), int64(valuesRead))
+               }
+               require.Equal(b, numValues, int64(startIndex))
+       }
+}
diff --git a/parquet/internal/encoding/byte_array_decoder.go 
b/parquet/internal/encoding/byte_array_decoder.go
index 1253ccea..f6b1027b 100644
--- a/parquet/internal/encoding/byte_array_decoder.go
+++ b/parquet/internal/encoding/byte_array_decoder.go
@@ -118,7 +118,7 @@ func (pbad *PlainByteArrayDecoder) DecodeSpaced(out 
[]parquet.ByteArray, nullCou
 }
 
 func (d *DictByteArrayDecoder) InsertDictionary(bldr array.Builder) error {
-       conv := d.dictValueDecoder.(*ByteArrayDictConverter)
+       conv := d.dictValueDecoder.(*dictConverter[parquet.ByteArray])
        dictLength := cap(conv.dict)
        conv.ensure(pqutils.IndexType(dictLength))
 
diff --git a/parquet/internal/encoding/decoder.go 
b/parquet/internal/encoding/decoder.go
index b2a012e4..60afcc55 100644
--- a/parquet/internal/encoding/decoder.go
+++ b/parquet/internal/encoding/decoder.go
@@ -18,7 +18,6 @@ package encoding
 
 import (
        "bytes"
-       "reflect"
 
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/bitutil"
@@ -103,31 +102,31 @@ func (d *decoder) ValuesLeft() int { return d.nvals }
 // Encoding returns the encoding type used by this decoder to decode the bytes.
 func (d *decoder) Encoding() parquet.Encoding { return 
parquet.Encoding(d.encoding) }
 
-type dictDecoder struct {
+type dictDecoder[T parquet.ColumnTypes] struct {
        decoder
        mem              memory.Allocator
-       dictValueDecoder utils.DictionaryConverter
-       idxDecoder       *utils.RleDecoder
+       dictValueDecoder utils.DictionaryConverter[T]
+       idxDecoder       *utils.TypedRleDecoder[T]
 
        idxScratchSpace []uint64
 }
 
 // SetDict sets a decoder that can be used to decode the dictionary that is
 // used for this column in order to return the proper values.
-func (d *dictDecoder) SetDict(dict TypedDecoder) {
+func (d *dictDecoder[T]) SetDict(dict TypedDecoder) {
        if dict.Type() != d.descr.PhysicalType() {
                panic("parquet: mismatch dictionary and column data type")
        }
 
-       d.dictValueDecoder = NewDictConverter(dict)
+       d.dictValueDecoder = NewDictConverter[T](dict)
 }
 
 // SetData sets the index value data into the decoder.
-func (d *dictDecoder) SetData(nvals int, data []byte) error {
+func (d *dictDecoder[T]) SetData(nvals int, data []byte) error {
        d.nvals = nvals
        if len(data) == 0 {
                // no data, bitwidth can safely be 0
-               d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data), 0 /* 
bitwidth */)
+               d.idxDecoder = 
utils.NewTypedRleDecoder[T](bytes.NewReader(data), 0 /* bitwidth */)
                return nil
        }
 
@@ -138,29 +137,29 @@ func (d *dictDecoder) SetData(nvals int, data []byte) 
error {
        }
 
        // pass the rest of the data, minus that first byte, to the decoder
-       d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data[1:]), 
int(width))
+       d.idxDecoder = utils.NewTypedRleDecoder[T](bytes.NewReader(data[1:]), 
int(width))
        return nil
 }
 
-func (d *dictDecoder) discard(n int) (int, error) {
+func (d *dictDecoder[T]) discard(n int) (int, error) {
        n = d.idxDecoder.Discard(n)
        d.nvals -= n
        return n, nil
 }
 
-func (d *dictDecoder) decode(out interface{}) (int, error) {
+func (d *dictDecoder[T]) decode(out []T) (int, error) {
        n, err := d.idxDecoder.GetBatchWithDict(d.dictValueDecoder, out)
        d.nvals -= n
        return n, err
 }
 
-func (d *dictDecoder) decodeSpaced(out interface{}, nullCount int, validBits 
[]byte, validBitsOffset int64) (int, error) {
+func (d *dictDecoder[T]) decodeSpaced(out []T, nullCount int, validBits 
[]byte, validBitsOffset int64) (int, error) {
        n, err := d.idxDecoder.GetBatchWithDictSpaced(d.dictValueDecoder, out, 
nullCount, validBits, validBitsOffset)
        d.nvals -= n
        return n, err
 }
 
-func (d *dictDecoder) DecodeIndices(numValues int, bldr array.Builder) (int, 
error) {
+func (d *dictDecoder[T]) DecodeIndices(numValues int, bldr array.Builder) 
(int, error) {
        n := shared_utils.Min(numValues, d.nvals)
        if cap(d.idxScratchSpace) < n {
                d.idxScratchSpace = make([]uint64, n, bitutil.NextPowerOf2(n))
@@ -179,7 +178,7 @@ func (d *dictDecoder) DecodeIndices(numValues int, bldr 
array.Builder) (int, err
        return n, nil
 }
 
-func (d *dictDecoder) DecodeIndicesSpaced(numValues, nullCount int, validBits 
[]byte, offset int64, bldr array.Builder) (int, error) {
+func (d *dictDecoder[T]) DecodeIndicesSpaced(numValues, nullCount int, 
validBits []byte, offset int64, bldr array.Builder) (int, error) {
        if cap(d.idxScratchSpace) < numValues {
                d.idxScratchSpace = make([]uint64, numValues, 
bitutil.NextPowerOf2(numValues))
        } else {
@@ -207,15 +206,8 @@ func (d *dictDecoder) DecodeIndicesSpaced(numValues, 
nullCount int, validBits []
 // spacedExpand is used to take a slice of data and utilize the bitmap 
provided to fill in nulls into the
 // correct slots according to the bitmap in order to produce a fully expanded 
result slice with nulls
 // in the correct slots.
-func spacedExpand(buffer interface{}, nullCount int, validBits []byte, 
validBitsOffset int64) int {
-       bufferRef := reflect.ValueOf(buffer)
-       if bufferRef.Kind() != reflect.Slice {
-               panic("invalid spacedexpand type, not slice")
-       }
-
-       var (
-               numValues = bufferRef.Len()
-       )
+func spacedExpand[T parquet.ColumnTypes](buffer []T, nullCount int, validBits 
[]byte, validBitsOffset int64) int {
+       numValues := len(buffer)
 
        idxDecode := int64(numValues - nullCount)
        if idxDecode == 0 { // if there's nothing to decode there's nothing to 
do.
@@ -236,8 +228,8 @@ func spacedExpand(buffer interface{}, nullCount int, 
validBits []byte, validBits
                // overwrite any existing data with the correctly spaced data. 
Any data that happens to be left in the null
                // slots is fine since it shouldn't matter and saves us work.
                idxDecode -= run.Length
-               n := reflect.Copy(bufferRef.Slice(int(run.Pos), 
bufferRef.Len()), bufferRef.Slice(int(idxDecode), 
int(int64(idxDecode)+run.Length)))
-               debug.Assert(n == int(run.Length), "reflect.Copy copied 
incorrect number of elements in spacedExpand")
+               n := copy(buffer[run.Pos:], 
buffer[idxDecode:int64(idxDecode)+run.Length])
+               debug.Assert(n == int(run.Length), "copy copied incorrect 
number of elements in spacedExpand")
        }
 
        return numValues
diff --git a/parquet/internal/encoding/typed_encoder.go 
b/parquet/internal/encoding/typed_encoder.go
index b6227c22..cec8c55c 100644
--- a/parquet/internal/encoding/typed_encoder.go
+++ b/parquet/internal/encoding/typed_encoder.go
@@ -184,7 +184,7 @@ func (intDecoderTraits[T]) BytesRequired(n int) int {
 
 func (intDecoderTraits[T]) Decoder(e parquet.Encoding, descr *schema.Column, 
useDict bool, mem memory.Allocator) TypedDecoder {
        if useDict {
-               return &typedDictDecoder[T]{dictDecoder{decoder: 
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
+               return &typedDictDecoder[T]{dictDecoder[T]{decoder: 
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
        }
 
        switch e {
@@ -213,7 +213,7 @@ func (floatDecoderTraits[T]) BytesRequired(n int) int {
 
 func (floatDecoderTraits[T]) Decoder(e parquet.Encoding, descr *schema.Column, 
useDict bool, mem memory.Allocator) TypedDecoder {
        if useDict {
-               return &typedDictDecoder[T]{dictDecoder{decoder: 
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
+               return &typedDictDecoder[T]{dictDecoder: 
dictDecoder[T]{decoder: newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), 
mem: mem}}
        }
 
        switch e {
@@ -227,7 +227,7 @@ func (floatDecoderTraits[T]) Decoder(e parquet.Encoding, 
descr *schema.Column, u
 }
 
 type typedDictDecoder[T int32 | int64 | float32 | float64 | parquet.Int96 | 
parquet.ByteArray | parquet.FixedLenByteArray] struct {
-       dictDecoder
+       dictDecoder[T]
 }
 
 func (d *typedDictDecoder[T]) Type() parquet.Type {
@@ -307,12 +307,15 @@ func (dc *dictConverter[T]) IsValid(idxes 
...utils.IndexType) bool {
        return min >= 0 && int(min) < len(dc.dict) && int(max) >= 0 && int(max) 
< len(dc.dict)
 }
 
-func (dc *dictConverter[T]) Fill(out any, val utils.IndexType) error {
-       o := out.([]T)
+func (dc *dictConverter[T]) IsValidSingle(idx utils.IndexType) bool {
+       dc.ensure(idx)
+       return int(idx) >= 0 && int(idx) < len(dc.dict)
+}
+
+func (dc *dictConverter[T]) Fill(o []T, val utils.IndexType) error {
        if err := dc.ensure(val); err != nil {
                return err
        }
-
        o[0] = dc.dict[val]
        for i := 1; i < len(o); i *= 2 {
                copy(o[i:], o[:i])
@@ -320,30 +323,20 @@ func (dc *dictConverter[T]) Fill(out any, val 
utils.IndexType) error {
        return nil
 }
 
-func (dc *dictConverter[T]) FillZero(out any) {
-       o := out.([]T)
+func (dc *dictConverter[T]) FillZero(o []T) {
        o[0] = dc.zeroVal
        for i := 1; i < len(o); i *= 2 {
                copy(o[i:], o[:i])
        }
 }
 
-func (dc *dictConverter[T]) Copy(out any, vals []utils.IndexType) error {
-       o := out.([]T)
+func (dc *dictConverter[T]) Copy(o []T, vals []utils.IndexType) error {
        for idx, val := range vals {
                o[idx] = dc.dict[val]
        }
        return nil
 }
 
-type Int32DictConverter = dictConverter[int32]
-type Int64DictConverter = dictConverter[int64]
-type Float32DictConverter = dictConverter[float32]
-type Float64DictConverter = dictConverter[float64]
-type Int96DictConverter = dictConverter[parquet.Int96]
-type ByteArrayDictConverter = dictConverter[parquet.ByteArray]
-type FixedLenByteArrayDictConverter = dictConverter[parquet.FixedLenByteArray]
-
 // the int96EncoderTraits struct is used to make it easy to create encoders 
and decoders based on type
 type int96EncoderTraits struct{}
 
@@ -374,7 +367,7 @@ func (int96DecoderTraits) BytesRequired(n int) int {
 // Decoder returns a decoder for int96 typed data of the requested encoding 
type if available
 func (int96DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column, 
useDict bool, mem memory.Allocator) TypedDecoder {
        if useDict {
-               return &DictInt96Decoder{dictDecoder{decoder: 
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
+               return &DictInt96Decoder{dictDecoder[parquet.Int96]{decoder: 
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
        }
 
        switch e {
@@ -518,7 +511,7 @@ func (byteArrayDecoderTraits) BytesRequired(n int) int {
 // Decoder returns a decoder for byteArray typed data of the requested 
encoding type if available
 func (byteArrayDecoderTraits) Decoder(e parquet.Encoding, descr 
*schema.Column, useDict bool, mem memory.Allocator) TypedDecoder {
        if useDict {
-               return &DictByteArrayDecoder{dictDecoder{decoder: 
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
+               return 
&DictByteArrayDecoder{dictDecoder[parquet.ByteArray]{decoder: 
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
        }
 
        switch e {
@@ -558,7 +551,7 @@ func (enc *DictByteArrayEncoder) Type() parquet.Type {
 
 // DictByteArrayDecoder is a decoder for decoding dictionary encoded data for 
parquet.ByteArray columns
 type DictByteArrayDecoder struct {
-       dictDecoder
+       dictDecoder[parquet.ByteArray]
 }
 
 // Type returns the underlying physical type that can be decoded with this 
decoder
@@ -639,7 +632,7 @@ func (fixedLenByteArrayDecoderTraits) BytesRequired(n int) 
int {
 // Decoder returns a decoder for fixedLenByteArray typed data of the requested 
encoding type if available
 func (fixedLenByteArrayDecoderTraits) Decoder(e parquet.Encoding, descr 
*schema.Column, useDict bool, mem memory.Allocator) TypedDecoder {
        if useDict {
-               return &DictFixedLenByteArrayDecoder{dictDecoder{decoder: 
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
+               return 
&DictFixedLenByteArrayDecoder{dictDecoder[parquet.FixedLenByteArray]{decoder: 
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
        }
 
        switch e {
@@ -664,25 +657,8 @@ func (enc *DictFixedLenByteArrayEncoder) Type() 
parquet.Type {
 
 // NewDictConverter creates a dict converter of the appropriate type, using 
the passed in
 // decoder as the decoder to decode the dictionary index.
-func NewDictConverter(dict TypedDecoder) utils.DictionaryConverter {
-       switch dict.Type() {
-       case parquet.Types.Int32:
-               return &Int32DictConverter{valueDecoder: dict.(Int32Decoder), 
dict: make([]int32, 0, dict.ValuesLeft())}
-       case parquet.Types.Int64:
-               return &Int64DictConverter{valueDecoder: dict.(Int64Decoder), 
dict: make([]int64, 0, dict.ValuesLeft())}
-       case parquet.Types.Int96:
-               return &Int96DictConverter{valueDecoder: dict.(Int96Decoder), 
dict: make([]parquet.Int96, 0, dict.ValuesLeft())}
-       case parquet.Types.Float:
-               return &Float32DictConverter{valueDecoder: 
dict.(Float32Decoder), dict: make([]float32, 0, dict.ValuesLeft())}
-       case parquet.Types.Double:
-               return &Float64DictConverter{valueDecoder: 
dict.(Float64Decoder), dict: make([]float64, 0, dict.ValuesLeft())}
-       case parquet.Types.ByteArray:
-               return &ByteArrayDictConverter{valueDecoder: 
dict.(ByteArrayDecoder), dict: make([]parquet.ByteArray, 0, dict.ValuesLeft())}
-       case parquet.Types.FixedLenByteArray:
-               return &FixedLenByteArrayDictConverter{valueDecoder: 
dict.(FixedLenByteArrayDecoder), dict: make([]parquet.FixedLenByteArray, 0, 
dict.ValuesLeft())}
-       default:
-               return nil
-       }
+func NewDictConverter[T parquet.ColumnTypes](dict TypedDecoder) 
utils.DictionaryConverter[T] {
+       return &dictConverter[T]{valueDecoder: dict.(Decoder[T]), dict: 
make([]T, 0, dict.ValuesLeft())}
 }
 
 var (
diff --git a/parquet/internal/utils/bit_reader.go 
b/parquet/internal/utils/bit_reader.go
index 10805292..ced0f265 100644
--- a/parquet/internal/utils/bit_reader.go
+++ b/parquet/internal/utils/bit_reader.go
@@ -21,7 +21,6 @@ import (
        "errors"
        "io"
        "math"
-       "reflect"
        "unsafe"
 
        "github.com/apache/arrow-go/v18/arrow"
@@ -122,28 +121,46 @@ func (b *BitReader) GetZigZagVlqInt() (int64, bool) {
 // error if there aren't enough bytes left.
 func (b *BitReader) ReadByte() (byte, error) {
        var tmp byte
-       if ok := b.GetAligned(1, &tmp); !ok {
+       if ok := b.getAlignedUint8(1, &tmp); !ok {
                return 0, errors.New("failed to read byte")
        }
 
        return tmp, nil
 }
 
-// GetAligned reads nbytes from the underlying stream into the passed 
interface value.
+// getAlignedUint8 reads nbytes from the underlying stream into the passed 
uint8 value.
 // Returning false if there aren't enough bytes remaining in the stream or if 
an invalid
 // type is passed. The bytes are read aligned to byte boundaries.
-//
-// v must be a pointer to a byte or sized uint type (*byte, *uint16, *uint32, 
*uint64).
-// encoded values are assumed to be little endian.
-func (b *BitReader) GetAligned(nbytes int, v interface{}) bool {
-       // figure out the number of bytes to represent v
-       typBytes := int(reflect.TypeOf(v).Elem().Size())
-       if nbytes > typBytes {
+func (b *BitReader) getAlignedUint8(nbytes int, v *uint8) bool {
+       if nbytes > 1 {
                return false
        }
 
        bread := bitutil.BytesForBits(int64(b.bitoffset))
+       b.byteoffset += bread
+       n, err := b.reader.ReadAt(b.raw[:nbytes], b.byteoffset)
+       if err != nil && err != io.EOF {
+               return false
+       }
+       if n != nbytes {
+               return false
+       }
+
+       *v = b.raw[0]
+
+       b.byteoffset += int64(nbytes)
+       b.bitoffset = 0
+       b.fillbuffer()
+       return true
+}
 
+// getAlignedUint16 reads nbytes from the underlying stream into the passed 
uint16 value.
+func (b *BitReader) getAlignedUint16(nbytes int, v *uint16) bool {
+       if nbytes > 2 {
+               return false
+       }
+
+       bread := bitutil.BytesForBits(int64(b.bitoffset))
        b.byteoffset += bread
        n, err := b.reader.ReadAt(b.raw[:nbytes], b.byteoffset)
        if err != nil && err != io.EOF {
@@ -152,24 +169,67 @@ func (b *BitReader) GetAligned(nbytes int, v interface{}) 
bool {
        if n != nbytes {
                return false
        }
+
        // zero pad the bytes
-       memory.Set(b.raw[n:typBytes], 0)
-
-       switch v := v.(type) {
-       case *byte:
-               *v = b.raw[0]
-       case *uint64:
-               *v = binary.LittleEndian.Uint64(b.raw[:typBytes])
-       case *uint32:
-               *v = binary.LittleEndian.Uint32(b.raw[:typBytes])
-       case *uint16:
-               *v = binary.LittleEndian.Uint16(b.raw[:typBytes])
-       default:
+       memory.Set(b.raw[n:2], 0)
+
+       *v = binary.LittleEndian.Uint16(b.raw[:2])
+
+       b.byteoffset += int64(nbytes)
+       b.bitoffset = 0
+       b.fillbuffer()
+       return true
+}
+
+// getAlignedUint32 reads nbytes from the underlying stream into the passed 
uint32 value.
+func (b *BitReader) getAlignedUint32(nbytes int, v *uint32) bool {
+       if nbytes > 4 {
                return false
        }
 
+       bread := bitutil.BytesForBits(int64(b.bitoffset))
+       b.byteoffset += bread
+       n, err := b.reader.ReadAt(b.raw[:nbytes], b.byteoffset)
+       if err != nil && err != io.EOF {
+               return false
+       }
+       if n != nbytes {
+               return false
+       }
+
+       // zero pad the bytes
+       memory.Set(b.raw[n:4], 0)
+
+       *v = binary.LittleEndian.Uint32(b.raw[:4])
+
        b.byteoffset += int64(nbytes)
+       b.bitoffset = 0
+       b.fillbuffer()
+       return true
+}
 
+// getAlignedUint64 reads nbytes from the underlying stream into the passed 
uint64 value.
+func (b *BitReader) getAlignedUint64(nbytes int, v *uint64) bool {
+       if nbytes > 8 {
+               return false
+       }
+
+       bread := bitutil.BytesForBits(int64(b.bitoffset))
+       b.byteoffset += bread
+       n, err := b.reader.ReadAt(b.raw[:nbytes], b.byteoffset)
+       if err != nil && err != io.EOF {
+               return false
+       }
+       if n != nbytes {
+               return false
+       }
+
+       // zero pad the bytes
+       memory.Set(b.raw[n:8], 0)
+
+       *v = binary.LittleEndian.Uint64(b.raw[:8])
+
+       b.byteoffset += int64(nbytes)
        b.bitoffset = 0
        b.fillbuffer()
        return true
diff --git a/parquet/internal/utils/dictionary.go 
b/parquet/internal/utils/dictionary.go
index 4d5ef13f..8c315da1 100644
--- a/parquet/internal/utils/dictionary.go
+++ b/parquet/internal/utils/dictionary.go
@@ -18,7 +18,8 @@ package utils
 
 import (
        "math"
-       "reflect"
+
+       "github.com/apache/arrow-go/v18/parquet"
 )
 
 // IndexType is the type we're going to use for Dictionary indexes, currently
@@ -31,57 +32,53 @@ const (
        MinIndexType = math.MinInt32
 )
 
-// DictionaryConverter is an interface used for dealing with RLE decoding and 
encoding
-// when working with dictionaries to get values from indexes.
-type DictionaryConverter interface {
-       // Copy takes an interface{} which must be a slice of the appropriate 
type, and will be populated
-       // by the dictionary values at the indexes from the IndexType slice
-       Copy(interface{}, []IndexType) error
-       // Fill fills interface{} which must be a slice of the appropriate 
type, with the value
-       // specified by the dictionary index passed in.
-       Fill(interface{}, IndexType) error
-       // FillZero fills interface{}, which must be a slice of the appropriate 
type, with the zero value
-       // for the given type.
-       FillZero(interface{})
+type DictionaryConverter[T parquet.ColumnTypes | uint64] interface {
+       // Copy populates the input slice by the dictionary values at the 
indexes from the IndexType slice
+       Copy([]T, []IndexType) error
+       // Fill fills the input slice with the value specified by the 
dictionary index passed in.
+       Fill([]T, IndexType) error
+       // FillZero fills the input slice with the zero value for the given 
type.
+       FillZero([]T)
        // IsValid validates that all of the indexes passed in are valid 
indexes for the dictionary
        IsValid(...IndexType) bool
+       // IsValidSingle validates that the index passed in is a valid index 
for the dictionary
+       // This is an optimisation, to avoid allocating a slice for a single 
value
+       IsValidSingle(IndexType) bool
 }
 
 // converter for getspaced that handles runs that get returned directly
 // as output, rather than using a dictionary
-type plainConverter struct{}
+type plainConverter[T ~uint64] struct{}
 
-func (plainConverter) IsValid(...IndexType) bool { return true }
-func (plainConverter) Fill(values interface{}, val IndexType) error {
-       v := reflect.ValueOf(values)
-       switch v.Type().Elem().Kind() {
-       case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, 
reflect.Int64:
-               v.Index(0).SetInt(int64(val))
-       case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, 
reflect.Uint64:
-               v.Index(0).SetUint(uint64(val))
-       }
+func (plainConverter[T]) IsValid(...IndexType) bool { return true }
+
+func (plainConverter[T]) IsValidSingle(IndexType) bool { return true }
 
-       for i := 1; i < v.Len(); i *= 2 {
-               reflect.Copy(v.Slice(i, v.Len()), v.Slice(0, i))
+func (plainConverter[T]) Fill(values []T, val IndexType) error {
+       if len(values) == 0 {
+               return nil
+       }
+       values[0] = T(val)
+       for i := 1; i < len(values); i *= 2 {
+               copy(values[i:], values[0:i])
        }
        return nil
 }
 
-func (plainConverter) FillZero(values interface{}) {
-       v := reflect.ValueOf(values)
-       zeroVal := reflect.New(v.Type().Elem()).Elem()
-
-       v.Index(0).Set(zeroVal)
-       for i := 1; i < v.Len(); i *= 2 {
-               reflect.Copy(v.Slice(i, v.Len()), v.Slice(0, i))
+func (plainConverter[T]) FillZero(values []T) {
+       if len(values) == 0 {
+               return
+       }
+       var zero T
+       values[0] = zero
+       for i := 1; i < len(values); i *= 2 {
+               copy(values[i:], values[0:i])
        }
 }
 
-func (plainConverter) Copy(out interface{}, values []IndexType) error {
-       vout := reflect.ValueOf(out)
-       vin := reflect.ValueOf(values)
-       for i := 0; i < vin.Len(); i++ {
-               vout.Index(i).Set(vin.Index(i).Convert(vout.Type().Elem()))
+func (plainConverter[T]) Copy(out []T, values []IndexType) error {
+       for i := range values {
+               out[i] = T(values[i])
        }
        return nil
 }
diff --git a/parquet/internal/utils/physical_types.tmpldata 
b/parquet/internal/utils/physical_types.tmpldata
deleted file mode 100644
index 0adeb995..00000000
--- a/parquet/internal/utils/physical_types.tmpldata
+++ /dev/null
@@ -1,52 +0,0 @@
-[
-  {
-    "Name": "Int32",
-    "name": "int32",
-    "lower": "int32",
-    "prefix": "arrow"
-  },
-  {
-    "Name": "Int64",
-    "name": "int64",
-    "lower": "int64",
-    "prefix": "arrow"
-  },
-  {
-    "Name": "Int96",
-    "name": "parquet.Int96",
-    "lower": "int96",
-    "prefix": "parquet"
-  },
-  {
-    "Name": "Float32",
-    "name": "float32",
-    "lower": "float32",
-    "prefix": "arrow",
-    "physical": "Float"
-  },
-  {
-    "Name": "Float64",
-    "name": "float64",
-    "lower": "float64",
-    "prefix": "arrow",
-    "physical": "Double"
-  },
-  {
-    "Name": "Boolean",
-    "name": "bool",
-    "lower": "bool",
-    "prefix": "arrow"
-  },
-  {
-    "Name": "ByteArray",
-    "name": "parquet.ByteArray",
-    "lower": "byteArray",
-    "prefix": "parquet"
-  },
-  {
-    "Name": "FixedLenByteArray",
-    "name": "parquet.FixedLenByteArray",
-    "lower": "fixedLenByteArray",
-    "prefix": "parquet"
-  }
-]
diff --git a/parquet/internal/utils/rle.go b/parquet/internal/utils/rle.go
index 12eca7b2..94a40a56 100644
--- a/parquet/internal/utils/rle.go
+++ b/parquet/internal/utils/rle.go
@@ -26,13 +26,9 @@ import (
 
        "github.com/apache/arrow-go/v18/arrow/bitutil"
        "github.com/apache/arrow-go/v18/internal/bitutils"
-       "github.com/apache/arrow-go/v18/internal/utils"
-       "github.com/apache/arrow-go/v18/parquet"
        "golang.org/x/xerrors"
 )
 
-//go:generate go run ../../../arrow/_tools/tmpl/main.go -i 
-data=physical_types.tmpldata typed_rle_dict.gen.go.tmpl
-
 const (
        MaxValuesPerLiteralRun = (1 << 6) * 8
 )
@@ -40,7 +36,7 @@ const (
 func MinRLEBufferSize(bitWidth int) int {
        maxLiteralRunSize := 1 + 
bitutil.BytesForBits(int64(MaxValuesPerLiteralRun*bitWidth))
        maxRepeatedRunSize := binary.MaxVarintLen32 + 
bitutil.BytesForBits(int64(bitWidth))
-       return int(utils.Max(maxLiteralRunSize, maxRepeatedRunSize))
+       return int(max(maxLiteralRunSize, maxRepeatedRunSize))
 }
 
 func MaxRLEBufferSize(width, numValues int) int {
@@ -51,7 +47,7 @@ func MaxRLEBufferSize(width, numValues int) int {
        minRepeatedRunSize := 1 + int(bitutil.BytesForBits(int64(width)))
        repeatedMaxSize := int(bitutil.BytesForBits(int64(numValues))) * 
minRepeatedRunSize
 
-       return utils.Max(literalMaxSize, repeatedMaxSize)
+       return max(literalMaxSize, repeatedMaxSize)
 }
 
 // Utility classes to do run length encoding (RLE) for fixed bit width values. 
 If runs
@@ -146,24 +142,24 @@ func (r *RleDecoder) Next() bool {
                nbytes := int(bitutil.BytesForBits(int64(r.bitWidth)))
                switch {
                case nbytes > 4:
-                       if !r.r.GetAligned(nbytes, &r.curVal) {
+                       if !r.r.getAlignedUint64(nbytes, &r.curVal) {
                                return false
                        }
                case nbytes > 2:
                        var val uint32
-                       if !r.r.GetAligned(nbytes, &val) {
+                       if !r.r.getAlignedUint32(nbytes, &val) {
                                return false
                        }
                        r.curVal = uint64(val)
                case nbytes > 1:
                        var val uint16
-                       if !r.r.GetAligned(nbytes, &val) {
+                       if !r.r.getAlignedUint16(nbytes, &val) {
                                return false
                        }
                        r.curVal = uint64(val)
                default:
                        var val uint8
-                       if !r.r.GetAligned(nbytes, &val) {
+                       if !r.r.getAlignedUint8(nbytes, &val) {
                                return false
                        }
                        r.curVal = uint64(val)
@@ -184,11 +180,11 @@ func (r *RleDecoder) Discard(n int) int {
                remain := n - read
 
                if r.repCount > 0 {
-                       repbatch := int(math.Min(float64(remain), 
float64(r.repCount)))
+                       repbatch := min(remain, int(r.repCount))
                        r.repCount -= int32(repbatch)
                        read += repbatch
                } else if r.litCount > 0 {
-                       litbatch := int(math.Min(float64(remain), 
float64(r.litCount)))
+                       litbatch := min(remain, int(r.litCount))
                        n, _ := r.r.Discard(uint(r.bitWidth), litbatch)
                        if n != litbatch {
                                return read
@@ -214,7 +210,7 @@ func (r *RleDecoder) GetBatch(values []uint64) int {
                remain := size - read
 
                if r.repCount > 0 {
-                       repbatch := int(math.Min(float64(remain), 
float64(r.repCount)))
+                       repbatch := min(remain, int(r.repCount))
                        for i := 0; i < repbatch; i++ {
                                out[i] = r.curVal
                        }
@@ -223,7 +219,7 @@ func (r *RleDecoder) GetBatch(values []uint64) int {
                        read += repbatch
                        out = out[repbatch:]
                } else if r.litCount > 0 {
-                       litbatch := int(math.Min(float64(remain), 
float64(r.litCount)))
+                       litbatch := min(remain, int(r.litCount))
                        n, _ := r.r.GetBatch(uint(r.bitWidth), out[:litbatch])
                        if n != litbatch {
                                return read
@@ -246,7 +242,7 @@ func (r *RleDecoder) GetBatchSpaced(vals []uint64, 
nullcount int, validBits []by
                return r.GetBatch(vals), nil
        }
 
-       converter := plainConverter{}
+       converter := plainConverter[uint64]{}
        blockCounter := bitutils.NewBitBlockCounter(validBits, validBitsOffset, 
int64(len(vals)))
 
        var (
@@ -268,7 +264,7 @@ func (r *RleDecoder) GetBatchSpaced(vals []uint64, 
nullcount int, validBits []by
                        converter.FillZero(vals[:block.Len])
                        processed = int(block.Len)
                } else {
-                       processed, err = r.getspaced(converter, vals, 
int(block.Len), int(block.Len-block.Popcnt), validBits, validBitsOffset)
+                       processed, err = getspaced(r, converter, vals, 
int(block.Len), int(block.Len-block.Popcnt), validBits, validBitsOffset)
                        if err != nil {
                                return totalProcessed, err
                        }
@@ -285,92 +281,6 @@ func (r *RleDecoder) GetBatchSpaced(vals []uint64, 
nullcount int, validBits []by
        return totalProcessed, nil
 }
 
-func (r *RleDecoder) getspaced(dc DictionaryConverter, vals interface{}, 
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) 
{
-       switch vals := vals.(type) {
-       case []int32:
-               return r.getspacedInt32(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
-       case []int64:
-               return r.getspacedInt64(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
-       case []float32:
-               return r.getspacedFloat32(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
-       case []float64:
-               return r.getspacedFloat64(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
-       case []parquet.ByteArray:
-               return r.getspacedByteArray(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
-       case []parquet.FixedLenByteArray:
-               return r.getspacedFixedLenByteArray(dc, vals, batchSize, 
nullCount, validBits, validBitsOffset)
-       case []parquet.Int96:
-               return r.getspacedInt96(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
-       case []uint64:
-               return r.getspacedUint64(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
-       default:
-               return 0, xerrors.New("parquet/rle: getspaced invalid type")
-       }
-}
-
-func (r *RleDecoder) getspacedUint64(dc DictionaryConverter, vals []uint64, 
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) 
{
-       if nullCount == batchSize {
-               dc.FillZero(vals[:batchSize])
-               return batchSize, nil
-       }
-
-       read := 0
-       remain := batchSize - nullCount
-
-       const bufferSize = 1024
-       var indexbuffer [bufferSize]IndexType
-
-       // assume no bits to start
-       bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, 
int64(batchSize))
-       validRun := bitReader.NextRun()
-       for read < batchSize {
-               if validRun.Len == 0 {
-                       validRun = bitReader.NextRun()
-               }
-
-               if !validRun.Set {
-                       dc.FillZero(vals[:int(validRun.Len)])
-                       vals = vals[int(validRun.Len):]
-                       read += int(validRun.Len)
-                       validRun.Len = 0
-                       continue
-               }
-
-               if r.repCount == 0 && r.litCount == 0 {
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-
-               var batch int
-               switch {
-               case r.repCount > 0:
-                       batch, remain, validRun = r.consumeRepeatCounts(read, 
batchSize, remain, validRun, bitReader)
-                       current := IndexType(r.curVal)
-                       if !dc.IsValid(current) {
-                               return read, nil
-                       }
-                       dc.Fill(vals[:batch], current)
-               case r.litCount > 0:
-                       var (
-                               litread int
-                               skipped int
-                               err     error
-                       )
-                       litread, skipped, validRun, err = 
r.consumeLiteralsUint64(dc, vals, remain, indexbuffer[:], validRun, bitReader)
-                       if err != nil {
-                               return read, err
-                       }
-                       batch = litread + skipped
-                       remain -= litread
-               }
-
-               vals = vals[batch:]
-               read += batch
-       }
-       return read, nil
-}
-
 func (r *RleDecoder) consumeRepeatCounts(read, batchSize, remain int, run 
bitutils.BitRun, bitRdr bitutils.BitRunReader) (int, int, bitutils.BitRun) {
        // Consume the entire repeat counts incrementing repeat_batch to
        // be the total of nulls + values consumed, we only need to
@@ -379,7 +289,7 @@ func (r *RleDecoder) consumeRepeatCounts(read, batchSize, 
remain int, run bituti
        repeatBatch := 0
        for r.repCount > 0 && (read+repeatBatch) < batchSize {
                if run.Set {
-                       updateSize := int(utils.Min(run.Len, int64(r.repCount)))
+                       updateSize := int(min(run.Len, int64(r.repCount)))
                        r.repCount -= int32(updateSize)
                        repeatBatch += updateSize
                        run.Len -= int64(updateSize)
@@ -396,88 +306,6 @@ func (r *RleDecoder) consumeRepeatCounts(read, batchSize, 
remain int, run bituti
        return repeatBatch, remain, run
 }
 
-func (r *RleDecoder) consumeLiteralsUint64(dc DictionaryConverter, vals 
[]uint64, remain int, buf []IndexType, run bitutils.BitRun, bitRdr 
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
-       batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
-       buf = buf[:batch]
-
-       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-       if n != batch {
-               return 0, 0, run, xerrors.New("was not able to retrieve correct 
number of indexes")
-       }
-
-       if !dc.IsValid(buf...) {
-               return 0, 0, run, xerrors.New("invalid index values found for 
dictionary converter")
-       }
-
-       var (
-               read    int
-               skipped int
-       )
-       for read < batch {
-               if run.Set {
-                       updateSize := utils.Min(batch-read, int(run.Len))
-                       if err := dc.Copy(vals, buf[read:read+updateSize]); err 
!= nil {
-                               return 0, 0, run, err
-                       }
-                       read += updateSize
-                       vals = vals[updateSize:]
-                       run.Len -= int64(updateSize)
-               } else {
-                       dc.FillZero(vals[:int(run.Len)])
-                       vals = vals[int(run.Len):]
-                       skipped += int(run.Len)
-                       run.Len = 0
-               }
-               if run.Len == 0 {
-                       run = bitRdr.NextRun()
-               }
-       }
-       r.litCount -= int32(batch)
-       return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDict(dc DictionaryConverter, vals 
interface{}) (int, error) {
-       switch vals := vals.(type) {
-       case []int32:
-               return r.GetBatchWithDictInt32(dc, vals)
-       case []int64:
-               return r.GetBatchWithDictInt64(dc, vals)
-       case []float32:
-               return r.GetBatchWithDictFloat32(dc, vals)
-       case []float64:
-               return r.GetBatchWithDictFloat64(dc, vals)
-       case []parquet.ByteArray:
-               return r.GetBatchWithDictByteArray(dc, vals)
-       case []parquet.FixedLenByteArray:
-               return r.GetBatchWithDictFixedLenByteArray(dc, vals)
-       case []parquet.Int96:
-               return r.GetBatchWithDictInt96(dc, vals)
-       default:
-               return 0, xerrors.New("parquet/rle: GetBatchWithDict invalid 
type")
-       }
-}
-
-func (r *RleDecoder) GetBatchWithDictSpaced(dc DictionaryConverter, vals 
interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int, 
error) {
-       switch vals := vals.(type) {
-       case []int32:
-               return r.GetBatchWithDictSpacedInt32(dc, vals, nullCount, 
validBits, validBitsOffset)
-       case []int64:
-               return r.GetBatchWithDictSpacedInt64(dc, vals, nullCount, 
validBits, validBitsOffset)
-       case []float32:
-               return r.GetBatchWithDictSpacedFloat32(dc, vals, nullCount, 
validBits, validBitsOffset)
-       case []float64:
-               return r.GetBatchWithDictSpacedFloat64(dc, vals, nullCount, 
validBits, validBitsOffset)
-       case []parquet.ByteArray:
-               return r.GetBatchWithDictSpacedByteArray(dc, vals, nullCount, 
validBits, validBitsOffset)
-       case []parquet.FixedLenByteArray:
-               return r.GetBatchWithDictSpacedFixedLenByteArray(dc, vals, 
nullCount, validBits, validBitsOffset)
-       case []parquet.Int96:
-               return r.GetBatchWithDictSpacedInt96(dc, vals, nullCount, 
validBits, validBitsOffset)
-       default:
-               return 0, xerrors.New("parquet/rle: GetBatchWithDictSpaced 
invalid type")
-       }
-}
-
 type RleEncoder struct {
        w *BitWriter
 
diff --git a/parquet/internal/utils/typed_rle_dict.gen.go 
b/parquet/internal/utils/typed_rle_dict.gen.go
deleted file mode 100644
index 1c1b487d..00000000
--- a/parquet/internal/utils/typed_rle_dict.gen.go
+++ /dev/null
@@ -1,1377 +0,0 @@
-// Code generated by typed_rle_dict.gen.go.tmpl. DO NOT EDIT.
-
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package utils
-
-import (
-       "github.com/apache/arrow-go/v18/internal/bitutils"
-       "github.com/apache/arrow-go/v18/internal/utils"
-       "github.com/apache/arrow-go/v18/parquet"
-       "golang.org/x/xerrors"
-)
-
-func (r *RleDecoder) GetBatchWithDictSpacedInt32(dc DictionaryConverter, vals 
[]int32, nullCount int, validBits []byte, validBitsOffset int64) 
(totalProcessed int, err error) {
-       if nullCount == 0 {
-               return r.GetBatchWithDictInt32(dc, vals)
-       }
-
-       var (
-               blockCounter = bitutils.NewBitBlockCounter(validBits, 
validBitsOffset, int64(len(vals)))
-               processed    = 0
-               block        bitutils.BitBlockCount
-       )
-
-       for {
-               block = blockCounter.NextFourWords()
-               if block.Len == 0 {
-                       break
-               }
-
-               switch {
-               case block.AllSet():
-                       processed, err = r.GetBatchWithDictInt32(dc, 
vals[:block.Len])
-               case block.NoneSet():
-                       dc.FillZero(vals[:block.Len])
-                       processed = int(block.Len)
-               default:
-                       processed, err = r.getspacedInt32(dc, vals, 
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
-               }
-
-               if err != nil {
-                       break
-               }
-
-               totalProcessed += processed
-               vals = vals[int(block.Len):]
-               validBitsOffset += int64(block.Len)
-               if processed != int(block.Len) {
-                       break
-               }
-       }
-       return
-}
-
-func (r *RleDecoder) getspacedInt32(dc DictionaryConverter, vals []int32, 
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) 
{
-       if nullCount == batchSize {
-               dc.FillZero(vals[:batchSize])
-               return batchSize, nil
-       }
-
-       read := 0
-       remain := batchSize - nullCount
-
-       const bufferSize = 1024
-       var indexbuffer [bufferSize]IndexType
-
-       // assume no bits to start
-       bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, 
int64(batchSize))
-       validRun := bitReader.NextRun()
-       for read < batchSize {
-               if validRun.Len == 0 {
-                       validRun = bitReader.NextRun()
-               }
-
-               if !validRun.Set {
-                       dc.FillZero(vals[:int(validRun.Len)])
-                       vals = vals[int(validRun.Len):]
-                       read += int(validRun.Len)
-                       validRun.Len = 0
-                       continue
-               }
-
-               if r.repCount == 0 && r.litCount == 0 {
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-
-               var batch int
-               switch {
-               case r.repCount > 0:
-                       batch, remain, validRun = r.consumeRepeatCounts(read, 
batchSize, remain, validRun, bitReader)
-                       current := IndexType(r.curVal)
-                       if !dc.IsValid(current) {
-                               return read, nil
-                       }
-                       dc.Fill(vals[:batch], current)
-               case r.litCount > 0:
-                       var (
-                               litread int
-                               skipped int
-                               err     error
-                       )
-                       litread, skipped, validRun, err = 
r.consumeLiteralsInt32(dc, vals, remain, indexbuffer[:], validRun, bitReader)
-                       if err != nil {
-                               return read, err
-                       }
-                       batch = litread + skipped
-                       remain -= litread
-               }
-
-               vals = vals[batch:]
-               read += batch
-       }
-       return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsInt32(dc DictionaryConverter, vals 
[]int32, remain int, buf []IndexType, run bitutils.BitRun, bitRdr 
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
-       batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
-       buf = buf[:batch]
-
-       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-       if n != batch {
-               return 0, 0, run, xerrors.New("was not able to retrieve correct 
number of indexes")
-       }
-
-       if !dc.IsValid(buf...) {
-               return 0, 0, run, xerrors.New("invalid index values found for 
dictionary converter")
-       }
-
-       var (
-               read    int
-               skipped int
-       )
-       for read < batch {
-               if run.Set {
-                       updateSize := utils.Min(batch-read, int(run.Len))
-                       if err := dc.Copy(vals, buf[read:read+updateSize]); err 
!= nil {
-                               return 0, 0, run, err
-                       }
-                       read += updateSize
-                       vals = vals[updateSize:]
-                       run.Len -= int64(updateSize)
-               } else {
-                       dc.FillZero(vals[:int(run.Len)])
-                       vals = vals[int(run.Len):]
-                       skipped += int(run.Len)
-                       run.Len = 0
-               }
-               if run.Len == 0 {
-                       run = bitRdr.NextRun()
-               }
-       }
-       r.litCount -= int32(batch)
-       return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictInt32(dc DictionaryConverter, vals 
[]int32) (int, error) {
-       var (
-               read        = 0
-               size        = len(vals)
-               indexbuffer [1024]IndexType
-       )
-
-       for read < size {
-               remain := size - read
-
-               switch {
-               case r.repCount > 0:
-                       idx := IndexType(r.curVal)
-                       if !dc.IsValid(idx) {
-                               return read, nil
-                       }
-                       batch := utils.Min(remain, int(r.repCount))
-                       if err := dc.Fill(vals[:batch], idx); err != nil {
-                               return read, err
-                       }
-                       r.repCount -= int32(batch)
-                       read += batch
-                       vals = vals[batch:]
-               case r.litCount > 0:
-                       litbatch := utils.Min(utils.Min(remain, 
int(r.litCount)), 1024)
-                       buf := indexbuffer[:litbatch]
-                       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-                       if n != litbatch {
-                               return read, nil
-                       }
-                       if !dc.IsValid(buf...) {
-                               return read, nil
-                       }
-                       if err := dc.Copy(vals, buf); err != nil {
-                               return read, nil
-                       }
-                       r.litCount -= int32(litbatch)
-                       read += litbatch
-                       vals = vals[litbatch:]
-               default:
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-       }
-
-       return read, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictSpacedInt64(dc DictionaryConverter, vals 
[]int64, nullCount int, validBits []byte, validBitsOffset int64) 
(totalProcessed int, err error) {
-       if nullCount == 0 {
-               return r.GetBatchWithDictInt64(dc, vals)
-       }
-
-       var (
-               blockCounter = bitutils.NewBitBlockCounter(validBits, 
validBitsOffset, int64(len(vals)))
-               processed    = 0
-               block        bitutils.BitBlockCount
-       )
-
-       for {
-               block = blockCounter.NextFourWords()
-               if block.Len == 0 {
-                       break
-               }
-
-               switch {
-               case block.AllSet():
-                       processed, err = r.GetBatchWithDictInt64(dc, 
vals[:block.Len])
-               case block.NoneSet():
-                       dc.FillZero(vals[:block.Len])
-                       processed = int(block.Len)
-               default:
-                       processed, err = r.getspacedInt64(dc, vals, 
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
-               }
-
-               if err != nil {
-                       break
-               }
-
-               totalProcessed += processed
-               vals = vals[int(block.Len):]
-               validBitsOffset += int64(block.Len)
-               if processed != int(block.Len) {
-                       break
-               }
-       }
-       return
-}
-
-func (r *RleDecoder) getspacedInt64(dc DictionaryConverter, vals []int64, 
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) 
{
-       if nullCount == batchSize {
-               dc.FillZero(vals[:batchSize])
-               return batchSize, nil
-       }
-
-       read := 0
-       remain := batchSize - nullCount
-
-       const bufferSize = 1024
-       var indexbuffer [bufferSize]IndexType
-
-       // assume no bits to start
-       bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, 
int64(batchSize))
-       validRun := bitReader.NextRun()
-       for read < batchSize {
-               if validRun.Len == 0 {
-                       validRun = bitReader.NextRun()
-               }
-
-               if !validRun.Set {
-                       dc.FillZero(vals[:int(validRun.Len)])
-                       vals = vals[int(validRun.Len):]
-                       read += int(validRun.Len)
-                       validRun.Len = 0
-                       continue
-               }
-
-               if r.repCount == 0 && r.litCount == 0 {
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-
-               var batch int
-               switch {
-               case r.repCount > 0:
-                       batch, remain, validRun = r.consumeRepeatCounts(read, 
batchSize, remain, validRun, bitReader)
-                       current := IndexType(r.curVal)
-                       if !dc.IsValid(current) {
-                               return read, nil
-                       }
-                       dc.Fill(vals[:batch], current)
-               case r.litCount > 0:
-                       var (
-                               litread int
-                               skipped int
-                               err     error
-                       )
-                       litread, skipped, validRun, err = 
r.consumeLiteralsInt64(dc, vals, remain, indexbuffer[:], validRun, bitReader)
-                       if err != nil {
-                               return read, err
-                       }
-                       batch = litread + skipped
-                       remain -= litread
-               }
-
-               vals = vals[batch:]
-               read += batch
-       }
-       return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsInt64(dc DictionaryConverter, vals 
[]int64, remain int, buf []IndexType, run bitutils.BitRun, bitRdr 
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
-       batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
-       buf = buf[:batch]
-
-       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-       if n != batch {
-               return 0, 0, run, xerrors.New("was not able to retrieve correct 
number of indexes")
-       }
-
-       if !dc.IsValid(buf...) {
-               return 0, 0, run, xerrors.New("invalid index values found for 
dictionary converter")
-       }
-
-       var (
-               read    int
-               skipped int
-       )
-       for read < batch {
-               if run.Set {
-                       updateSize := utils.Min(batch-read, int(run.Len))
-                       if err := dc.Copy(vals, buf[read:read+updateSize]); err 
!= nil {
-                               return 0, 0, run, err
-                       }
-                       read += updateSize
-                       vals = vals[updateSize:]
-                       run.Len -= int64(updateSize)
-               } else {
-                       dc.FillZero(vals[:int(run.Len)])
-                       vals = vals[int(run.Len):]
-                       skipped += int(run.Len)
-                       run.Len = 0
-               }
-               if run.Len == 0 {
-                       run = bitRdr.NextRun()
-               }
-       }
-       r.litCount -= int32(batch)
-       return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictInt64(dc DictionaryConverter, vals 
[]int64) (int, error) {
-       var (
-               read        = 0
-               size        = len(vals)
-               indexbuffer [1024]IndexType
-       )
-
-       for read < size {
-               remain := size - read
-
-               switch {
-               case r.repCount > 0:
-                       idx := IndexType(r.curVal)
-                       if !dc.IsValid(idx) {
-                               return read, nil
-                       }
-                       batch := utils.Min(remain, int(r.repCount))
-                       if err := dc.Fill(vals[:batch], idx); err != nil {
-                               return read, err
-                       }
-                       r.repCount -= int32(batch)
-                       read += batch
-                       vals = vals[batch:]
-               case r.litCount > 0:
-                       litbatch := utils.Min(utils.Min(remain, 
int(r.litCount)), 1024)
-                       buf := indexbuffer[:litbatch]
-                       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-                       if n != litbatch {
-                               return read, nil
-                       }
-                       if !dc.IsValid(buf...) {
-                               return read, nil
-                       }
-                       if err := dc.Copy(vals, buf); err != nil {
-                               return read, nil
-                       }
-                       r.litCount -= int32(litbatch)
-                       read += litbatch
-                       vals = vals[litbatch:]
-               default:
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-       }
-
-       return read, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictSpacedInt96(dc DictionaryConverter, vals 
[]parquet.Int96, nullCount int, validBits []byte, validBitsOffset int64) 
(totalProcessed int, err error) {
-       if nullCount == 0 {
-               return r.GetBatchWithDictInt96(dc, vals)
-       }
-
-       var (
-               blockCounter = bitutils.NewBitBlockCounter(validBits, 
validBitsOffset, int64(len(vals)))
-               processed    = 0
-               block        bitutils.BitBlockCount
-       )
-
-       for {
-               block = blockCounter.NextFourWords()
-               if block.Len == 0 {
-                       break
-               }
-
-               switch {
-               case block.AllSet():
-                       processed, err = r.GetBatchWithDictInt96(dc, 
vals[:block.Len])
-               case block.NoneSet():
-                       dc.FillZero(vals[:block.Len])
-                       processed = int(block.Len)
-               default:
-                       processed, err = r.getspacedInt96(dc, vals, 
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
-               }
-
-               if err != nil {
-                       break
-               }
-
-               totalProcessed += processed
-               vals = vals[int(block.Len):]
-               validBitsOffset += int64(block.Len)
-               if processed != int(block.Len) {
-                       break
-               }
-       }
-       return
-}
-
-func (r *RleDecoder) getspacedInt96(dc DictionaryConverter, vals 
[]parquet.Int96, batchSize, nullCount int, validBits []byte, validBitsOffset 
int64) (int, error) {
-       if nullCount == batchSize {
-               dc.FillZero(vals[:batchSize])
-               return batchSize, nil
-       }
-
-       read := 0
-       remain := batchSize - nullCount
-
-       const bufferSize = 1024
-       var indexbuffer [bufferSize]IndexType
-
-       // assume no bits to start
-       bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, 
int64(batchSize))
-       validRun := bitReader.NextRun()
-       for read < batchSize {
-               if validRun.Len == 0 {
-                       validRun = bitReader.NextRun()
-               }
-
-               if !validRun.Set {
-                       dc.FillZero(vals[:int(validRun.Len)])
-                       vals = vals[int(validRun.Len):]
-                       read += int(validRun.Len)
-                       validRun.Len = 0
-                       continue
-               }
-
-               if r.repCount == 0 && r.litCount == 0 {
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-
-               var batch int
-               switch {
-               case r.repCount > 0:
-                       batch, remain, validRun = r.consumeRepeatCounts(read, 
batchSize, remain, validRun, bitReader)
-                       current := IndexType(r.curVal)
-                       if !dc.IsValid(current) {
-                               return read, nil
-                       }
-                       dc.Fill(vals[:batch], current)
-               case r.litCount > 0:
-                       var (
-                               litread int
-                               skipped int
-                               err     error
-                       )
-                       litread, skipped, validRun, err = 
r.consumeLiteralsInt96(dc, vals, remain, indexbuffer[:], validRun, bitReader)
-                       if err != nil {
-                               return read, err
-                       }
-                       batch = litread + skipped
-                       remain -= litread
-               }
-
-               vals = vals[batch:]
-               read += batch
-       }
-       return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsInt96(dc DictionaryConverter, vals 
[]parquet.Int96, remain int, buf []IndexType, run bitutils.BitRun, bitRdr 
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
-       batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
-       buf = buf[:batch]
-
-       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-       if n != batch {
-               return 0, 0, run, xerrors.New("was not able to retrieve correct 
number of indexes")
-       }
-
-       if !dc.IsValid(buf...) {
-               return 0, 0, run, xerrors.New("invalid index values found for 
dictionary converter")
-       }
-
-       var (
-               read    int
-               skipped int
-       )
-       for read < batch {
-               if run.Set {
-                       updateSize := utils.Min(batch-read, int(run.Len))
-                       if err := dc.Copy(vals, buf[read:read+updateSize]); err 
!= nil {
-                               return 0, 0, run, err
-                       }
-                       read += updateSize
-                       vals = vals[updateSize:]
-                       run.Len -= int64(updateSize)
-               } else {
-                       dc.FillZero(vals[:int(run.Len)])
-                       vals = vals[int(run.Len):]
-                       skipped += int(run.Len)
-                       run.Len = 0
-               }
-               if run.Len == 0 {
-                       run = bitRdr.NextRun()
-               }
-       }
-       r.litCount -= int32(batch)
-       return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictInt96(dc DictionaryConverter, vals 
[]parquet.Int96) (int, error) {
-       var (
-               read        = 0
-               size        = len(vals)
-               indexbuffer [1024]IndexType
-       )
-
-       for read < size {
-               remain := size - read
-
-               switch {
-               case r.repCount > 0:
-                       idx := IndexType(r.curVal)
-                       if !dc.IsValid(idx) {
-                               return read, nil
-                       }
-                       batch := utils.Min(remain, int(r.repCount))
-                       if err := dc.Fill(vals[:batch], idx); err != nil {
-                               return read, err
-                       }
-                       r.repCount -= int32(batch)
-                       read += batch
-                       vals = vals[batch:]
-               case r.litCount > 0:
-                       litbatch := utils.Min(utils.Min(remain, 
int(r.litCount)), 1024)
-                       buf := indexbuffer[:litbatch]
-                       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-                       if n != litbatch {
-                               return read, nil
-                       }
-                       if !dc.IsValid(buf...) {
-                               return read, nil
-                       }
-                       if err := dc.Copy(vals, buf); err != nil {
-                               return read, nil
-                       }
-                       r.litCount -= int32(litbatch)
-                       read += litbatch
-                       vals = vals[litbatch:]
-               default:
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-       }
-
-       return read, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictSpacedFloat32(dc DictionaryConverter, 
vals []float32, nullCount int, validBits []byte, validBitsOffset int64) 
(totalProcessed int, err error) {
-       if nullCount == 0 {
-               return r.GetBatchWithDictFloat32(dc, vals)
-       }
-
-       var (
-               blockCounter = bitutils.NewBitBlockCounter(validBits, 
validBitsOffset, int64(len(vals)))
-               processed    = 0
-               block        bitutils.BitBlockCount
-       )
-
-       for {
-               block = blockCounter.NextFourWords()
-               if block.Len == 0 {
-                       break
-               }
-
-               switch {
-               case block.AllSet():
-                       processed, err = r.GetBatchWithDictFloat32(dc, 
vals[:block.Len])
-               case block.NoneSet():
-                       dc.FillZero(vals[:block.Len])
-                       processed = int(block.Len)
-               default:
-                       processed, err = r.getspacedFloat32(dc, vals, 
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
-               }
-
-               if err != nil {
-                       break
-               }
-
-               totalProcessed += processed
-               vals = vals[int(block.Len):]
-               validBitsOffset += int64(block.Len)
-               if processed != int(block.Len) {
-                       break
-               }
-       }
-       return
-}
-
-func (r *RleDecoder) getspacedFloat32(dc DictionaryConverter, vals []float32, 
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) 
{
-       if nullCount == batchSize {
-               dc.FillZero(vals[:batchSize])
-               return batchSize, nil
-       }
-
-       read := 0
-       remain := batchSize - nullCount
-
-       const bufferSize = 1024
-       var indexbuffer [bufferSize]IndexType
-
-       // assume no bits to start
-       bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, 
int64(batchSize))
-       validRun := bitReader.NextRun()
-       for read < batchSize {
-               if validRun.Len == 0 {
-                       validRun = bitReader.NextRun()
-               }
-
-               if !validRun.Set {
-                       dc.FillZero(vals[:int(validRun.Len)])
-                       vals = vals[int(validRun.Len):]
-                       read += int(validRun.Len)
-                       validRun.Len = 0
-                       continue
-               }
-
-               if r.repCount == 0 && r.litCount == 0 {
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-
-               var batch int
-               switch {
-               case r.repCount > 0:
-                       batch, remain, validRun = r.consumeRepeatCounts(read, 
batchSize, remain, validRun, bitReader)
-                       current := IndexType(r.curVal)
-                       if !dc.IsValid(current) {
-                               return read, nil
-                       }
-                       dc.Fill(vals[:batch], current)
-               case r.litCount > 0:
-                       var (
-                               litread int
-                               skipped int
-                               err     error
-                       )
-                       litread, skipped, validRun, err = 
r.consumeLiteralsFloat32(dc, vals, remain, indexbuffer[:], validRun, bitReader)
-                       if err != nil {
-                               return read, err
-                       }
-                       batch = litread + skipped
-                       remain -= litread
-               }
-
-               vals = vals[batch:]
-               read += batch
-       }
-       return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsFloat32(dc DictionaryConverter, vals 
[]float32, remain int, buf []IndexType, run bitutils.BitRun, bitRdr 
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
-       batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
-       buf = buf[:batch]
-
-       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-       if n != batch {
-               return 0, 0, run, xerrors.New("was not able to retrieve correct 
number of indexes")
-       }
-
-       if !dc.IsValid(buf...) {
-               return 0, 0, run, xerrors.New("invalid index values found for 
dictionary converter")
-       }
-
-       var (
-               read    int
-               skipped int
-       )
-       for read < batch {
-               if run.Set {
-                       updateSize := utils.Min(batch-read, int(run.Len))
-                       if err := dc.Copy(vals, buf[read:read+updateSize]); err 
!= nil {
-                               return 0, 0, run, err
-                       }
-                       read += updateSize
-                       vals = vals[updateSize:]
-                       run.Len -= int64(updateSize)
-               } else {
-                       dc.FillZero(vals[:int(run.Len)])
-                       vals = vals[int(run.Len):]
-                       skipped += int(run.Len)
-                       run.Len = 0
-               }
-               if run.Len == 0 {
-                       run = bitRdr.NextRun()
-               }
-       }
-       r.litCount -= int32(batch)
-       return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictFloat32(dc DictionaryConverter, vals 
[]float32) (int, error) {
-       var (
-               read        = 0
-               size        = len(vals)
-               indexbuffer [1024]IndexType
-       )
-
-       for read < size {
-               remain := size - read
-
-               switch {
-               case r.repCount > 0:
-                       idx := IndexType(r.curVal)
-                       if !dc.IsValid(idx) {
-                               return read, nil
-                       }
-                       batch := utils.Min(remain, int(r.repCount))
-                       if err := dc.Fill(vals[:batch], idx); err != nil {
-                               return read, err
-                       }
-                       r.repCount -= int32(batch)
-                       read += batch
-                       vals = vals[batch:]
-               case r.litCount > 0:
-                       litbatch := utils.Min(utils.Min(remain, 
int(r.litCount)), 1024)
-                       buf := indexbuffer[:litbatch]
-                       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-                       if n != litbatch {
-                               return read, nil
-                       }
-                       if !dc.IsValid(buf...) {
-                               return read, nil
-                       }
-                       if err := dc.Copy(vals, buf); err != nil {
-                               return read, nil
-                       }
-                       r.litCount -= int32(litbatch)
-                       read += litbatch
-                       vals = vals[litbatch:]
-               default:
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-       }
-
-       return read, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictSpacedFloat64(dc DictionaryConverter, 
vals []float64, nullCount int, validBits []byte, validBitsOffset int64) 
(totalProcessed int, err error) {
-       if nullCount == 0 {
-               return r.GetBatchWithDictFloat64(dc, vals)
-       }
-
-       var (
-               blockCounter = bitutils.NewBitBlockCounter(validBits, 
validBitsOffset, int64(len(vals)))
-               processed    = 0
-               block        bitutils.BitBlockCount
-       )
-
-       for {
-               block = blockCounter.NextFourWords()
-               if block.Len == 0 {
-                       break
-               }
-
-               switch {
-               case block.AllSet():
-                       processed, err = r.GetBatchWithDictFloat64(dc, 
vals[:block.Len])
-               case block.NoneSet():
-                       dc.FillZero(vals[:block.Len])
-                       processed = int(block.Len)
-               default:
-                       processed, err = r.getspacedFloat64(dc, vals, 
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
-               }
-
-               if err != nil {
-                       break
-               }
-
-               totalProcessed += processed
-               vals = vals[int(block.Len):]
-               validBitsOffset += int64(block.Len)
-               if processed != int(block.Len) {
-                       break
-               }
-       }
-       return
-}
-
-func (r *RleDecoder) getspacedFloat64(dc DictionaryConverter, vals []float64, 
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) 
{
-       if nullCount == batchSize {
-               dc.FillZero(vals[:batchSize])
-               return batchSize, nil
-       }
-
-       read := 0
-       remain := batchSize - nullCount
-
-       const bufferSize = 1024
-       var indexbuffer [bufferSize]IndexType
-
-       // assume no bits to start
-       bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, 
int64(batchSize))
-       validRun := bitReader.NextRun()
-       for read < batchSize {
-               if validRun.Len == 0 {
-                       validRun = bitReader.NextRun()
-               }
-
-               if !validRun.Set {
-                       dc.FillZero(vals[:int(validRun.Len)])
-                       vals = vals[int(validRun.Len):]
-                       read += int(validRun.Len)
-                       validRun.Len = 0
-                       continue
-               }
-
-               if r.repCount == 0 && r.litCount == 0 {
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-
-               var batch int
-               switch {
-               case r.repCount > 0:
-                       batch, remain, validRun = r.consumeRepeatCounts(read, 
batchSize, remain, validRun, bitReader)
-                       current := IndexType(r.curVal)
-                       if !dc.IsValid(current) {
-                               return read, nil
-                       }
-                       dc.Fill(vals[:batch], current)
-               case r.litCount > 0:
-                       var (
-                               litread int
-                               skipped int
-                               err     error
-                       )
-                       litread, skipped, validRun, err = 
r.consumeLiteralsFloat64(dc, vals, remain, indexbuffer[:], validRun, bitReader)
-                       if err != nil {
-                               return read, err
-                       }
-                       batch = litread + skipped
-                       remain -= litread
-               }
-
-               vals = vals[batch:]
-               read += batch
-       }
-       return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsFloat64(dc DictionaryConverter, vals 
[]float64, remain int, buf []IndexType, run bitutils.BitRun, bitRdr 
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
-       batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
-       buf = buf[:batch]
-
-       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-       if n != batch {
-               return 0, 0, run, xerrors.New("was not able to retrieve correct 
number of indexes")
-       }
-
-       if !dc.IsValid(buf...) {
-               return 0, 0, run, xerrors.New("invalid index values found for 
dictionary converter")
-       }
-
-       var (
-               read    int
-               skipped int
-       )
-       for read < batch {
-               if run.Set {
-                       updateSize := utils.Min(batch-read, int(run.Len))
-                       if err := dc.Copy(vals, buf[read:read+updateSize]); err 
!= nil {
-                               return 0, 0, run, err
-                       }
-                       read += updateSize
-                       vals = vals[updateSize:]
-                       run.Len -= int64(updateSize)
-               } else {
-                       dc.FillZero(vals[:int(run.Len)])
-                       vals = vals[int(run.Len):]
-                       skipped += int(run.Len)
-                       run.Len = 0
-               }
-               if run.Len == 0 {
-                       run = bitRdr.NextRun()
-               }
-       }
-       r.litCount -= int32(batch)
-       return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictFloat64(dc DictionaryConverter, vals 
[]float64) (int, error) {
-       var (
-               read        = 0
-               size        = len(vals)
-               indexbuffer [1024]IndexType
-       )
-
-       for read < size {
-               remain := size - read
-
-               switch {
-               case r.repCount > 0:
-                       idx := IndexType(r.curVal)
-                       if !dc.IsValid(idx) {
-                               return read, nil
-                       }
-                       batch := utils.Min(remain, int(r.repCount))
-                       if err := dc.Fill(vals[:batch], idx); err != nil {
-                               return read, err
-                       }
-                       r.repCount -= int32(batch)
-                       read += batch
-                       vals = vals[batch:]
-               case r.litCount > 0:
-                       litbatch := utils.Min(utils.Min(remain, 
int(r.litCount)), 1024)
-                       buf := indexbuffer[:litbatch]
-                       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-                       if n != litbatch {
-                               return read, nil
-                       }
-                       if !dc.IsValid(buf...) {
-                               return read, nil
-                       }
-                       if err := dc.Copy(vals, buf); err != nil {
-                               return read, nil
-                       }
-                       r.litCount -= int32(litbatch)
-                       read += litbatch
-                       vals = vals[litbatch:]
-               default:
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-       }
-
-       return read, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictSpacedByteArray(dc DictionaryConverter, 
vals []parquet.ByteArray, nullCount int, validBits []byte, validBitsOffset 
int64) (totalProcessed int, err error) {
-       if nullCount == 0 {
-               return r.GetBatchWithDictByteArray(dc, vals)
-       }
-
-       var (
-               blockCounter = bitutils.NewBitBlockCounter(validBits, 
validBitsOffset, int64(len(vals)))
-               processed    = 0
-               block        bitutils.BitBlockCount
-       )
-
-       for {
-               block = blockCounter.NextFourWords()
-               if block.Len == 0 {
-                       break
-               }
-
-               switch {
-               case block.AllSet():
-                       processed, err = r.GetBatchWithDictByteArray(dc, 
vals[:block.Len])
-               case block.NoneSet():
-                       dc.FillZero(vals[:block.Len])
-                       processed = int(block.Len)
-               default:
-                       processed, err = r.getspacedByteArray(dc, vals, 
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
-               }
-
-               if err != nil {
-                       break
-               }
-
-               totalProcessed += processed
-               vals = vals[int(block.Len):]
-               validBitsOffset += int64(block.Len)
-               if processed != int(block.Len) {
-                       break
-               }
-       }
-       return
-}
-
-func (r *RleDecoder) getspacedByteArray(dc DictionaryConverter, vals 
[]parquet.ByteArray, batchSize, nullCount int, validBits []byte, 
validBitsOffset int64) (int, error) {
-       if nullCount == batchSize {
-               dc.FillZero(vals[:batchSize])
-               return batchSize, nil
-       }
-
-       read := 0
-       remain := batchSize - nullCount
-
-       const bufferSize = 1024
-       var indexbuffer [bufferSize]IndexType
-
-       // assume no bits to start
-       bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, 
int64(batchSize))
-       validRun := bitReader.NextRun()
-       for read < batchSize {
-               if validRun.Len == 0 {
-                       validRun = bitReader.NextRun()
-               }
-
-               if !validRun.Set {
-                       dc.FillZero(vals[:int(validRun.Len)])
-                       vals = vals[int(validRun.Len):]
-                       read += int(validRun.Len)
-                       validRun.Len = 0
-                       continue
-               }
-
-               if r.repCount == 0 && r.litCount == 0 {
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-
-               var batch int
-               switch {
-               case r.repCount > 0:
-                       batch, remain, validRun = r.consumeRepeatCounts(read, 
batchSize, remain, validRun, bitReader)
-                       current := IndexType(r.curVal)
-                       if !dc.IsValid(current) {
-                               return read, nil
-                       }
-                       dc.Fill(vals[:batch], current)
-               case r.litCount > 0:
-                       var (
-                               litread int
-                               skipped int
-                               err     error
-                       )
-                       litread, skipped, validRun, err = 
r.consumeLiteralsByteArray(dc, vals, remain, indexbuffer[:], validRun, 
bitReader)
-                       if err != nil {
-                               return read, err
-                       }
-                       batch = litread + skipped
-                       remain -= litread
-               }
-
-               vals = vals[batch:]
-               read += batch
-       }
-       return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsByteArray(dc DictionaryConverter, vals 
[]parquet.ByteArray, remain int, buf []IndexType, run bitutils.BitRun, bitRdr 
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
-       batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
-       buf = buf[:batch]
-
-       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-       if n != batch {
-               return 0, 0, run, xerrors.New("was not able to retrieve correct 
number of indexes")
-       }
-
-       if !dc.IsValid(buf...) {
-               return 0, 0, run, xerrors.New("invalid index values found for 
dictionary converter")
-       }
-
-       var (
-               read    int
-               skipped int
-       )
-       for read < batch {
-               if run.Set {
-                       updateSize := utils.Min(batch-read, int(run.Len))
-                       if err := dc.Copy(vals, buf[read:read+updateSize]); err 
!= nil {
-                               return 0, 0, run, err
-                       }
-                       read += updateSize
-                       vals = vals[updateSize:]
-                       run.Len -= int64(updateSize)
-               } else {
-                       dc.FillZero(vals[:int(run.Len)])
-                       vals = vals[int(run.Len):]
-                       skipped += int(run.Len)
-                       run.Len = 0
-               }
-               if run.Len == 0 {
-                       run = bitRdr.NextRun()
-               }
-       }
-       r.litCount -= int32(batch)
-       return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictByteArray(dc DictionaryConverter, vals 
[]parquet.ByteArray) (int, error) {
-       var (
-               read        = 0
-               size        = len(vals)
-               indexbuffer [1024]IndexType
-       )
-
-       for read < size {
-               remain := size - read
-
-               switch {
-               case r.repCount > 0:
-                       idx := IndexType(r.curVal)
-                       if !dc.IsValid(idx) {
-                               return read, nil
-                       }
-                       batch := utils.Min(remain, int(r.repCount))
-                       if err := dc.Fill(vals[:batch], idx); err != nil {
-                               return read, err
-                       }
-                       r.repCount -= int32(batch)
-                       read += batch
-                       vals = vals[batch:]
-               case r.litCount > 0:
-                       litbatch := utils.Min(utils.Min(remain, 
int(r.litCount)), 1024)
-                       buf := indexbuffer[:litbatch]
-                       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-                       if n != litbatch {
-                               return read, nil
-                       }
-                       if !dc.IsValid(buf...) {
-                               return read, nil
-                       }
-                       if err := dc.Copy(vals, buf); err != nil {
-                               return read, nil
-                       }
-                       r.litCount -= int32(litbatch)
-                       read += litbatch
-                       vals = vals[litbatch:]
-               default:
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-       }
-
-       return read, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictSpacedFixedLenByteArray(dc 
DictionaryConverter, vals []parquet.FixedLenByteArray, nullCount int, validBits 
[]byte, validBitsOffset int64) (totalProcessed int, err error) {
-       if nullCount == 0 {
-               return r.GetBatchWithDictFixedLenByteArray(dc, vals)
-       }
-
-       var (
-               blockCounter = bitutils.NewBitBlockCounter(validBits, 
validBitsOffset, int64(len(vals)))
-               processed    = 0
-               block        bitutils.BitBlockCount
-       )
-
-       for {
-               block = blockCounter.NextFourWords()
-               if block.Len == 0 {
-                       break
-               }
-
-               switch {
-               case block.AllSet():
-                       processed, err = 
r.GetBatchWithDictFixedLenByteArray(dc, vals[:block.Len])
-               case block.NoneSet():
-                       dc.FillZero(vals[:block.Len])
-                       processed = int(block.Len)
-               default:
-                       processed, err = r.getspacedFixedLenByteArray(dc, vals, 
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
-               }
-
-               if err != nil {
-                       break
-               }
-
-               totalProcessed += processed
-               vals = vals[int(block.Len):]
-               validBitsOffset += int64(block.Len)
-               if processed != int(block.Len) {
-                       break
-               }
-       }
-       return
-}
-
-func (r *RleDecoder) getspacedFixedLenByteArray(dc DictionaryConverter, vals 
[]parquet.FixedLenByteArray, batchSize, nullCount int, validBits []byte, 
validBitsOffset int64) (int, error) {
-       if nullCount == batchSize {
-               dc.FillZero(vals[:batchSize])
-               return batchSize, nil
-       }
-
-       read := 0
-       remain := batchSize - nullCount
-
-       const bufferSize = 1024
-       var indexbuffer [bufferSize]IndexType
-
-       // assume no bits to start
-       bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, 
int64(batchSize))
-       validRun := bitReader.NextRun()
-       for read < batchSize {
-               if validRun.Len == 0 {
-                       validRun = bitReader.NextRun()
-               }
-
-               if !validRun.Set {
-                       dc.FillZero(vals[:int(validRun.Len)])
-                       vals = vals[int(validRun.Len):]
-                       read += int(validRun.Len)
-                       validRun.Len = 0
-                       continue
-               }
-
-               if r.repCount == 0 && r.litCount == 0 {
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-
-               var batch int
-               switch {
-               case r.repCount > 0:
-                       batch, remain, validRun = r.consumeRepeatCounts(read, 
batchSize, remain, validRun, bitReader)
-                       current := IndexType(r.curVal)
-                       if !dc.IsValid(current) {
-                               return read, nil
-                       }
-                       dc.Fill(vals[:batch], current)
-               case r.litCount > 0:
-                       var (
-                               litread int
-                               skipped int
-                               err     error
-                       )
-                       litread, skipped, validRun, err = 
r.consumeLiteralsFixedLenByteArray(dc, vals, remain, indexbuffer[:], validRun, 
bitReader)
-                       if err != nil {
-                               return read, err
-                       }
-                       batch = litread + skipped
-                       remain -= litread
-               }
-
-               vals = vals[batch:]
-               read += batch
-       }
-       return read, nil
-}
-
-func (r *RleDecoder) consumeLiteralsFixedLenByteArray(dc DictionaryConverter, 
vals []parquet.FixedLenByteArray, remain int, buf []IndexType, run 
bitutils.BitRun, bitRdr bitutils.BitRunReader) (int, int, bitutils.BitRun, 
error) {
-       batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
-       buf = buf[:batch]
-
-       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-       if n != batch {
-               return 0, 0, run, xerrors.New("was not able to retrieve correct 
number of indexes")
-       }
-
-       if !dc.IsValid(buf...) {
-               return 0, 0, run, xerrors.New("invalid index values found for 
dictionary converter")
-       }
-
-       var (
-               read    int
-               skipped int
-       )
-       for read < batch {
-               if run.Set {
-                       updateSize := utils.Min(batch-read, int(run.Len))
-                       if err := dc.Copy(vals, buf[read:read+updateSize]); err 
!= nil {
-                               return 0, 0, run, err
-                       }
-                       read += updateSize
-                       vals = vals[updateSize:]
-                       run.Len -= int64(updateSize)
-               } else {
-                       dc.FillZero(vals[:int(run.Len)])
-                       vals = vals[int(run.Len):]
-                       skipped += int(run.Len)
-                       run.Len = 0
-               }
-               if run.Len == 0 {
-                       run = bitRdr.NextRun()
-               }
-       }
-       r.litCount -= int32(batch)
-       return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDictFixedLenByteArray(dc DictionaryConverter, 
vals []parquet.FixedLenByteArray) (int, error) {
-       var (
-               read        = 0
-               size        = len(vals)
-               indexbuffer [1024]IndexType
-       )
-
-       for read < size {
-               remain := size - read
-
-               switch {
-               case r.repCount > 0:
-                       idx := IndexType(r.curVal)
-                       if !dc.IsValid(idx) {
-                               return read, nil
-                       }
-                       batch := utils.Min(remain, int(r.repCount))
-                       if err := dc.Fill(vals[:batch], idx); err != nil {
-                               return read, err
-                       }
-                       r.repCount -= int32(batch)
-                       read += batch
-                       vals = vals[batch:]
-               case r.litCount > 0:
-                       litbatch := utils.Min(utils.Min(remain, 
int(r.litCount)), 1024)
-                       buf := indexbuffer[:litbatch]
-                       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-                       if n != litbatch {
-                               return read, nil
-                       }
-                       if !dc.IsValid(buf...) {
-                               return read, nil
-                       }
-                       if err := dc.Copy(vals, buf); err != nil {
-                               return read, nil
-                       }
-                       r.litCount -= int32(litbatch)
-                       read += litbatch
-                       vals = vals[litbatch:]
-               default:
-                       if !r.Next() {
-                               return read, nil
-                       }
-               }
-       }
-
-       return read, nil
-}
diff --git a/parquet/internal/utils/typed_rle_dict.gen.go.tmpl 
b/parquet/internal/utils/typed_rle_dict.gen.go.tmpl
deleted file mode 100644
index 4bce437f..00000000
--- a/parquet/internal/utils/typed_rle_dict.gen.go.tmpl
+++ /dev/null
@@ -1,220 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package utils
-
-import (
-  "github.com/apache/arrow-go/v18/parquet"
-  "github.com/apache/arrow-go/v18/internal/bitutils"
-  "github.com/apache/arrow-go/v18/internal/utils"
-)
-
-{{range .In}}
-{{if ne .Name "Boolean"}}
-func (r *RleDecoder) GetBatchWithDictSpaced{{.Name}}(dc DictionaryConverter, 
vals []{{.name}}, nullCount int, validBits []byte, validBitsOffset int64) 
(totalProcessed int, err error) {
-  if nullCount == 0 {
-    return r.GetBatchWithDict{{.Name}}(dc, vals)
-  }
-
-  var (
-    blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, 
int64(len(vals)))
-    processed = 0
-    block bitutils.BitBlockCount
-  )
-
-  for {
-    block = blockCounter.NextFourWords()
-    if block.Len == 0 {
-      break
-    }
-
-    switch {
-    case block.AllSet():
-      processed, err = r.GetBatchWithDict{{.Name}}(dc, vals[:block.Len])
-    case block.NoneSet():
-      dc.FillZero(vals[:block.Len])
-      processed = int(block.Len)
-    default:
-      processed, err = r.getspaced{{.Name}}(dc, vals, int(block.Len), 
int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
-    }
-
-    if err != nil {
-      break
-    }
-
-    totalProcessed += processed
-    vals = vals[int(block.Len):]
-    validBitsOffset += int64(block.Len)
-    if processed != int(block.Len) {
-      break
-    }
-  }
-  return
-}
-
-func (r *RleDecoder) getspaced{{.Name}}(dc DictionaryConverter, vals 
[]{{.name}}, batchSize, nullCount int, validBits []byte, validBitsOffset int64) 
(int, error) {
-  if nullCount == batchSize {
-    dc.FillZero(vals[:batchSize])
-    return batchSize, nil
-  }
-
-  read := 0
-  remain := batchSize - nullCount
-
-  const bufferSize = 1024
-  var indexbuffer [bufferSize]IndexType
-
-  // assume no bits to start
-  bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, 
int64(batchSize))
-  validRun := bitReader.NextRun()
-  for read < batchSize {
-    if validRun.Len == 0 {
-      validRun = bitReader.NextRun()
-    }
-
-    if !validRun.Set {
-      dc.FillZero(vals[:int(validRun.Len)])
-      vals = vals[int(validRun.Len):]
-      read += int(validRun.Len)
-      validRun.Len = 0
-      continue
-    }
-
-    if r.repCount == 0 && r.litCount == 0 {
-      if !r.Next() {
-        return read, nil
-      }
-    }
-
-    var batch int
-    switch {
-    case r.repCount > 0:
-      batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain, 
validRun, bitReader)
-      current := IndexType(r.curVal)
-      if !dc.IsValid(current) {
-        return read, nil
-      }
-      dc.Fill(vals[:batch], current)
-    case r.litCount > 0:
-      var (
-        litread int
-        skipped int
-        err error
-      )
-      litread, skipped, validRun, err = r.consumeLiterals{{.Name}}(dc, vals, 
remain, indexbuffer[:], validRun, bitReader)
-       if err != nil {
-        return read, err
-      }
-      batch = litread + skipped
-      remain -= litread
-    }
-
-    vals = vals[batch:]
-    read += batch
-  }
-  return read, nil
-}
-
-func (r *RleDecoder) consumeLiterals{{.Name}}(dc DictionaryConverter, vals 
[]{{.name}}, remain int, buf []IndexType, run bitutils.BitRun, bitRdr 
bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
-  batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
-       buf = buf[:batch]
-
-       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-       if n != batch {
-               return 0, 0, run, xerrors.New("was not able to retrieve correct 
number of indexes")
-       }
-
-       if !dc.IsValid(buf...) {
-               return 0, 0, run, xerrors.New("invalid index values found for 
dictionary converter")
-       }
-
-       var (
-               read    int
-               skipped int
-       )
-       for read < batch {
-               if run.Set {
-                       updateSize := utils.Min(batch-read, int(run.Len))
-                       if err := dc.Copy(vals, buf[read:read+updateSize]); err 
!= nil {
-                               return 0, 0, run, err
-                       }
-                       read += updateSize
-                       vals = vals[updateSize:]
-                       run.Len -= int64(updateSize)
-               } else {
-                       dc.FillZero(vals[:int(run.Len)])
-                       vals = vals[int(run.Len):]
-                       skipped += int(run.Len)
-                       run.Len = 0
-               }
-               if run.Len == 0 {
-                       run = bitRdr.NextRun()
-               }
-       }
-       r.litCount -= int32(batch)
-       return read, skipped, run, nil
-}
-
-func (r *RleDecoder) GetBatchWithDict{{.Name}}(dc DictionaryConverter, vals 
[]{{.name}}) (int, error) {
-  var (
-    read = 0
-    size = len(vals)
-    indexbuffer [1024]IndexType
-  )
-
-  for read < size {
-    remain := size - read
-
-    switch {
-    case r.repCount > 0:
-      idx := IndexType(r.curVal)
-      if !dc.IsValid(idx) {
-        return read, nil
-      }
-      batch := utils.Min(remain, int(r.repCount))
-      if err := dc.Fill(vals[:batch], idx); err != nil {
-        return read, err
-      }
-      r.repCount -= int32(batch)
-      read += batch
-      vals = vals[batch:]
-    case r.litCount > 0:
-      litbatch := utils.Min(utils.Min(remain, int(r.litCount)), 1024)
-      buf := indexbuffer[:litbatch]
-      n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
-      if n != litbatch {
-        return read, nil
-      }
-      if !dc.IsValid(buf...) {
-        return read, nil
-      }
-      if err := dc.Copy(vals, buf); err != nil {
-        return read, nil
-      }
-      r.litCount -= int32(litbatch)
-      read += litbatch
-      vals = vals[litbatch:]
-    default:
-      if !r.Next() {
-        return read, nil
-      }
-    }
-  }
-
-  return read, nil
-}
-{{end}}
-{{end}}
diff --git a/parquet/internal/utils/typed_rle_dict.go 
b/parquet/internal/utils/typed_rle_dict.go
new file mode 100644
index 00000000..d7cb6d69
--- /dev/null
+++ b/parquet/internal/utils/typed_rle_dict.go
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package utils
+
+import (
+       "bytes"
+
+       "github.com/apache/arrow-go/v18/internal/bitutils"
+       "github.com/apache/arrow-go/v18/parquet"
+       "golang.org/x/xerrors"
+)
+
+func getspaced[T parquet.ColumnTypes | uint64](r *RleDecoder, dc 
DictionaryConverter[T], vals []T, batchSize, nullCount int, validBits []byte, 
validBitsOffset int64) (int, error) {
+       if nullCount == batchSize {
+               dc.FillZero(vals[:batchSize])
+               return batchSize, nil
+       }
+
+       read := 0
+       remain := batchSize - nullCount
+
+       const bufferSize = 1024
+       var indexbuffer [bufferSize]IndexType
+
+       // assume no bits to start
+       bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, 
int64(batchSize))
+       validRun := bitReader.NextRun()
+       for read < batchSize {
+               if validRun.Len == 0 {
+                       validRun = bitReader.NextRun()
+               }
+
+               if !validRun.Set {
+                       dc.FillZero(vals[:int(validRun.Len)])
+                       vals = vals[int(validRun.Len):]
+                       read += int(validRun.Len)
+                       validRun.Len = 0
+                       continue
+               }
+
+               if r.repCount == 0 && r.litCount == 0 {
+                       if !r.Next() {
+                               return read, nil
+                       }
+               }
+
+               var batch int
+               switch {
+               case r.repCount > 0:
+                       batch, remain, validRun = r.consumeRepeatCounts(read, 
batchSize, remain, validRun, bitReader)
+                       current := IndexType(r.curVal)
+                       if !dc.IsValidSingle(current) {
+                               return read, nil
+                       }
+                       dc.Fill(vals[:batch], current)
+               case r.litCount > 0:
+                       var (
+                               litread int
+                               skipped int
+                               err     error
+                       )
+                       litread, skipped, validRun, err = consumeLiterals(r, 
dc, vals, remain, indexbuffer[:], validRun, bitReader)
+                       if err != nil {
+                               return read, err
+                       }
+                       batch = litread + skipped
+                       remain -= litread
+               }
+
+               vals = vals[batch:]
+               read += batch
+       }
+       return read, nil
+}
+
+func consumeLiterals[T parquet.ColumnTypes | uint64](r *RleDecoder, dc 
DictionaryConverter[T], vals []T, remain int, buf []IndexType, run 
bitutils.BitRun, bitRdr bitutils.BitRunReader) (int, int, bitutils.BitRun, 
error) {
+       batch := min(remain, int(r.litCount), len(buf))
+       buf = buf[:batch]
+
+       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
+       if n != batch {
+               return 0, 0, run, xerrors.New("was not able to retrieve correct 
number of indexes")
+       }
+
+       if !dc.IsValid(buf...) {
+               return 0, 0, run, xerrors.New("invalid index values found for 
dictionary converter")
+       }
+
+       var (
+               read    int
+               skipped int
+       )
+       for read < batch {
+               if run.Set {
+                       updateSize := min(batch-read, int(run.Len))
+                       if err := dc.Copy(vals, buf[read:read+updateSize]); err 
!= nil {
+                               return 0, 0, run, err
+                       }
+                       read += updateSize
+                       vals = vals[updateSize:]
+                       run.Len -= int64(updateSize)
+               } else {
+                       dc.FillZero(vals[:int(run.Len)])
+                       vals = vals[int(run.Len):]
+                       skipped += int(run.Len)
+                       run.Len = 0
+               }
+               if run.Len == 0 {
+                       run = bitRdr.NextRun()
+               }
+       }
+       r.litCount -= int32(batch)
+       return read, skipped, run, nil
+}
+
+type TypedRleDecoder[T parquet.ColumnTypes | uint64] struct {
+       RleDecoder
+}
+
+func NewTypedRleDecoder[T parquet.ColumnTypes | uint64](data *bytes.Reader, 
width int) *TypedRleDecoder[T] {
+       return &TypedRleDecoder[T]{
+               RleDecoder: RleDecoder{r: NewBitReader(data), bitWidth: width},
+       }
+}
+
+func (r *TypedRleDecoder[T]) GetBatchWithDict(dc DictionaryConverter[T], vals 
[]T) (int, error) {
+       var (
+               read        = 0
+               size        = len(vals)
+               indexbuffer [1024]IndexType
+       )
+
+       for read < size {
+               remain := size - read
+
+               switch {
+               case r.repCount > 0:
+                       idx := IndexType(r.curVal)
+                       if !dc.IsValidSingle(idx) {
+                               return read, nil
+                       }
+                       batch := min(remain, int(r.repCount))
+                       if err := dc.Fill(vals[:batch], idx); err != nil {
+                               return read, err
+                       }
+                       r.repCount -= int32(batch)
+                       read += batch
+                       vals = vals[batch:]
+               case r.litCount > 0:
+                       litbatch := min(remain, int(r.litCount), 1024)
+                       buf := indexbuffer[:litbatch]
+                       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
+                       if n != litbatch {
+                               return read, nil
+                       }
+                       if !dc.IsValid(buf...) {
+                               return read, nil
+                       }
+                       if err := dc.Copy(vals, buf); err != nil {
+                               return read, nil
+                       }
+                       r.litCount -= int32(litbatch)
+                       read += litbatch
+                       vals = vals[litbatch:]
+               default:
+                       if !r.Next() {
+                               return read, nil
+                       }
+               }
+       }
+
+       return read, nil
+}
+
+func (r *TypedRleDecoder[T]) GetBatchWithDictSpaced(dc DictionaryConverter[T], 
vals []T, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
+       if nullCount == 0 {
+               return r.GetBatchWithDict(dc, vals)
+       }
+
+       var (
+               blockCounter   = bitutils.NewBitBlockCounter(validBits, 
validBitsOffset, int64(len(vals)))
+               processed      = 0
+               totalProcessed = 0
+               block          bitutils.BitBlockCount
+               err            error
+       )
+
+       for {
+               block = blockCounter.NextFourWords()
+               if block.Len == 0 {
+                       break
+               }
+
+               switch {
+               case block.AllSet():
+                       processed, err = r.GetBatchWithDict(dc, 
vals[:block.Len])
+               case block.NoneSet():
+                       dc.FillZero(vals[:block.Len])
+                       processed = int(block.Len)
+               default:
+                       processed, err = getspaced(&r.RleDecoder, dc, vals, 
int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
+               }
+
+               if err != nil {
+                       break
+               }
+
+               totalProcessed += processed
+               vals = vals[int(block.Len):]
+               validBitsOffset += int64(block.Len)
+               if processed != int(block.Len) {
+                       break
+               }
+       }
+       return totalProcessed, err
+}

Reply via email to