This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new caa859d5 perf(compute): optimize Take kernel for list types (#573)
caa859d5 is described below
commit caa859d5c4f853d2422e17a4a9f99e262f934c72
Author: Alex <[email protected]>
AuthorDate: Fri Nov 14 12:40:28 2025 -0700
perf(compute): optimize Take kernel for list types (#573)
### Rationale for this change
This PR implements the same allocation improvements as
https://github.com/apache/arrow-go/pull/557 but for the Take list
kernel.
### What changes are included in this PR?
- Pre-allocate upfront with a better estimate of the necessary buffer
size to eliminate repeated reallocations.
- Use exponential growth for additional allocations for O(log n) total
reallocations.
### Are these changes tested?
New benchmarks added for take list kernel
### Are there any user-facing changes?
No
### Performance Comparison
on `main`
```
goos: darwin
goarch: arm64
pkg: github.com/apache/arrow-go/v18/arrow/compute
cpu: Apple M3 Max
BenchmarkTakeList/SmallBatch_ShortLists-16 2320
555889 ns/op 1798923 rows/sec 3355390 B/op 396
allocs/op
BenchmarkTakeList/MediumBatch_ShortLists-16 31
37600434 ns/op 265955 rows/sec 324238694 B/op 3214
allocs/op
BenchmarkTakeList/LargeBatch_ShortLists-16 2
653682896 ns/op 76490 rows/sec 7877124660 B/op 15751
allocs/op
BenchmarkTakeList/XLargeBatch_ShortLists-16 1
2019873959 ns/op 49508 rows/sec 31380868976 B/op 31489
allocs/op
BenchmarkTakeList/SmallBatch_MediumLists-16 334
3435205 ns/op 291104 rows/sec 43059334 B/op 1086
allocs/op
BenchmarkTakeList/MediumBatch_MediumLists-16 5
230308533 ns/op 43420 rows/sec 4041679715 B/op 10098
allocs/op
BenchmarkTakeList/LargeBatch_MediumLists-16 1
4600489959 ns/op 10868 rows/sec 100213267960 B/op 50328
allocs/op
BenchmarkTakeList/XLargeBatch_MediumLists-16 1
16462343792 ns/op 6074 rows/sec 400427784144 B/op
101032 allocs/op
BenchmarkTakeList/LargeBatch_ShortLists_Large-16 1
1682293042 ns/op 29721 rows/sec 31379855376 B/op 31422
allocs/op
BenchmarkTakeList/XLargeBatch_MediumLists_Large-16 1
33800353917 ns/op 2959 rows/sec 800433389776 B/op
102480 allocs/op
BenchmarkTakeListPartitionPattern-16 1
3003105000 ns/op 16649 rows/sec 69581474288 B/op 46479
allocs/op
PASS
ok github.com/apache/arrow-go/v18/arrow/compute 71.015s
```
on this branch
```
goos: darwin
goarch: arm64
pkg: github.com/apache/arrow-go/v18/arrow/compute
cpu: Apple M3 Max
BenchmarkTakeList/SmallBatch_ShortLists-16 25522
46138 ns/op 21674062 rows/sec 50668 B/op 83 allocs/op
BenchmarkTakeList/MediumBatch_ShortLists-16 3792
316046 ns/op 31641022 rows/sec 457558 B/op 83 allocs/op
BenchmarkTakeList/LargeBatch_ShortLists-16 804
1521240 ns/op 32867968 rows/sec 2232578 B/op 84 allocs/op
BenchmarkTakeList/XLargeBatch_ShortLists-16 416
2832247 ns/op 35307705 rows/sec 4435314 B/op 84 allocs/op
BenchmarkTakeList/SmallBatch_MediumLists-16 9444
125321 ns/op 7979591 rows/sec 173603 B/op 83 allocs/op
BenchmarkTakeList/MediumBatch_MediumLists-16 1176
999217 ns/op 10007857 rows/sec 1653926 B/op 83 allocs/op
BenchmarkTakeList/LargeBatch_MediumLists-16 232
4913249 ns/op 10176590 rows/sec 8229752 B/op 85 allocs/op
BenchmarkTakeList/XLargeBatch_MediumLists-16 128
9309120 ns/op 10742230 rows/sec 16428821 B/op 85 allocs/op
BenchmarkTakeList/LargeBatch_ShortLists_Large-16 739
1560044 ns/op 32050453 rows/sec 3428837 B/op 84 allocs/op
BenchmarkTakeList/XLargeBatch_MediumLists_Large-16 122
9712600 ns/op 10295969 rows/sec 24834215 B/op 85 allocs/op
BenchmarkTakeListPartitionPattern-16 96
11756706 ns/op 4252901 rows/sec 18421151 B/op 98 allocs/op
PASS
ok github.com/apache/arrow-go/v18/arrow/compute 17.869s
```
---
arrow/compute/internal/kernels/vector_selection.go | 29 ++-
arrow/compute/vector_selection_test.go | 205 ++++++++++++++++++++-
2 files changed, 229 insertions(+), 5 deletions(-)
diff --git a/arrow/compute/internal/kernels/vector_selection.go
b/arrow/compute/internal/kernels/vector_selection.go
index 1d6c2187..62923407 100644
--- a/arrow/compute/internal/kernels/vector_selection.go
+++ b/arrow/compute/internal/kernels/vector_selection.go
@@ -1591,10 +1591,16 @@ func ListImpl[OffsetT int32 | int64](ctx
*exec.KernelCtx, batch *exec.ExecSpan,
if values.Len > 0 {
dataLength := rawOffsets[values.Len] - rawOffsets[0]
meanListLen := float64(dataLength) / float64(values.Len)
- childIdxBuilder.reserve(int(meanListLen))
+ estimatedTotal := int(meanListLen * float64(outputLength))
+
+ // Cap the pre-allocation at a reasonable size
+ const maxPreAlloc = 16777216 // 16M elements
+ estimatedTotal = min(estimatedTotal, maxPreAlloc)
+ childIdxBuilder.reserve(estimatedTotal)
}
offsetBuilder.reserve(int(outputLength) + 1)
+ spaceAvail := childIdxBuilder.cap()
var offset OffsetT
err := fn(ctx, outputLength, values, selection, out,
func(idx int64) error {
@@ -1602,10 +1608,29 @@ func ListImpl[OffsetT int32 | int64](ctx
*exec.KernelCtx, batch *exec.ExecSpan,
valueOffset := rawOffsets[idx]
valueLength := rawOffsets[idx+1] - valueOffset
offset += valueLength
- childIdxBuilder.reserve(int(valueLength))
+ if int(valueLength) > spaceAvail {
+ // Calculate how much total capacity we need
+ needed := childIdxBuilder.len() +
int(valueLength)
+ newCap := childIdxBuilder.cap()
+
+ // Double capacity until we have enough space
+ // This gives us O(log n) reallocations instead
of O(n)
+ if newCap == 0 {
+ newCap = int(valueLength)
+ }
+ for newCap < needed {
+ newCap = newCap * 2
+ }
+
+ // Reserve the additional capacity
+ additional := newCap - childIdxBuilder.len()
+ childIdxBuilder.reserve(additional)
+ spaceAvail = childIdxBuilder.cap() -
childIdxBuilder.len()
+ }
for j := valueOffset; j < valueOffset+valueLength; j++ {
childIdxBuilder.unsafeAppend(j)
}
+ spaceAvail -= int(valueLength)
return nil
}, func() error {
offsetBuilder.unsafeAppend(offset)
diff --git a/arrow/compute/vector_selection_test.go
b/arrow/compute/vector_selection_test.go
index 475d3821..a9d86a74 100644
--- a/arrow/compute/vector_selection_test.go
+++ b/arrow/compute/vector_selection_test.go
@@ -1677,7 +1677,7 @@ func BenchmarkTakeString(b *testing.B) {
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
- mem := memory.NewGoAllocator()
+ mem := memory.DefaultAllocator
ctx := compute.WithAllocator(context.Background(), mem)
// Create source array with strings of specified
average length
@@ -1731,7 +1731,7 @@ func BenchmarkTakeString(b *testing.B) {
func BenchmarkTakeStringPartitionPattern(b *testing.B) {
// Simulate real-world partitioning workload where data is reorganized
// into multiple partitions (e.g., by timestamp month + host)
- mem := memory.NewGoAllocator()
+ mem := memory.DefaultAllocator
ctx := compute.WithAllocator(context.Background(), mem)
const numRows = 50000
@@ -1777,7 +1777,7 @@ func BenchmarkTakeStringPartitionPattern(b *testing.B) {
func BenchmarkTakeMultiColumn(b *testing.B) {
// Benchmark Take on a record batch with multiple string columns
// to simulate real-world use cases (e.g., CloudFront logs with 20+
string columns)
- mem := memory.NewGoAllocator()
+ mem := memory.DefaultAllocator
ctx := compute.WithAllocator(context.Background(), mem)
const numRows = 50000
@@ -1865,3 +1865,202 @@ func BenchmarkTakeMultiColumn(b *testing.B) {
b.ReportMetric(float64(numRows*b.N)/b.Elapsed().Seconds(), "rows/sec")
}
+
+// Benchmark tests for Take operation with list types
+// These benchmarks test the performance improvements from buffer
pre-allocation
+// and exponential growth in ListImpl for nested data reorganization.
+
+func BenchmarkTakeList(b *testing.B) {
+ // Test various batch sizes and list lengths
+ benchmarks := []struct {
+ name string
+ numRows int64
+ avgListLen int
+ listType arrow.DataType
+ }{
+ {"SmallBatch_ShortLists", 1000, 5,
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+ {"MediumBatch_ShortLists", 10000, 5,
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+ {"LargeBatch_ShortLists", 50000, 5,
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+ {"XLargeBatch_ShortLists", 100000, 5,
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+ {"SmallBatch_MediumLists", 1000, 20,
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+ {"MediumBatch_MediumLists", 10000, 20,
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+ {"LargeBatch_MediumLists", 50000, 20,
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+ {"XLargeBatch_MediumLists", 100000, 20,
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+ {"LargeBatch_ShortLists_Large", 50000, 5,
arrow.LargeListOf(arrow.PrimitiveTypes.Int32)},
+ {"XLargeBatch_MediumLists_Large", 100000, 20,
arrow.LargeListOf(arrow.PrimitiveTypes.Int32)},
+ }
+
+ for _, bm := range benchmarks {
+ b.Run(bm.name, func(b *testing.B) {
+ mem := memory.DefaultAllocator
+ ctx := compute.WithAllocator(context.Background(), mem)
+
+ // Create source array with lists of specified average
length
+ bldr := array.NewBuilder(mem, bm.listType)
+ defer bldr.Release()
+
+ // Generate test data with varying list lengths
+ for i := int64(0); i < bm.numRows; i++ {
+ // Vary list length around the average
+ listLen := bm.avgListLen + int(i%5) - 2
+ if listLen < 0 {
+ listLen = 0
+ }
+
+ switch b := bldr.(type) {
+ case *array.ListBuilder:
+ b.Append(true)
+ valBldr :=
b.ValueBuilder().(*array.Int32Builder)
+ for j := 0; j < listLen; j++ {
+ valBldr.Append(int32(i*100 +
int64(j)))
+ }
+ case *array.LargeListBuilder:
+ b.Append(true)
+ valBldr :=
b.ValueBuilder().(*array.Int32Builder)
+ for j := 0; j < listLen; j++ {
+ valBldr.Append(int32(i*100 +
int64(j)))
+ }
+ }
+ }
+ values := bldr.NewArray()
+ defer values.Release()
+
+ // Create indices that simulate
partitioning/reorganization
+ // Use a pattern that would be common in partitioned
writes:
+ // reverse order to maximize data movement
+ indicesBldr := array.NewInt64Builder(mem)
+ defer indicesBldr.Release()
+ for i := bm.numRows - 1; i >= 0; i-- {
+ indicesBldr.Append(i)
+ }
+ indices := indicesBldr.NewArray()
+ defer indices.Release()
+
+ // Reset timer after setup
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ // Run benchmark
+ for i := 0; i < b.N; i++ {
+ result, err := compute.TakeArray(ctx, values,
indices)
+ if err != nil {
+ b.Fatal(err)
+ }
+ result.Release()
+ }
+
+ // Report throughput
+
b.ReportMetric(float64(bm.numRows*int64(b.N))/b.Elapsed().Seconds(), "rows/sec")
+ })
+ }
+}
+
+func BenchmarkTakeNestedList(b *testing.B) {
+ // Test nested list types (list<list<int32>>)
+ // This is particularly relevant for complex schemas like usage events
with resources
+ mem := memory.DefaultAllocator
+ ctx := compute.WithAllocator(context.Background(), mem)
+
+ const numRows = 50000
+ const avgOuterListLen = 3
+ const avgInnerListLen = 4
+
+ // Create source data
+ dt := arrow.ListOf(arrow.ListOf(arrow.PrimitiveTypes.Int32))
+ bldr := array.NewBuilder(mem, dt).(*array.ListBuilder)
+ defer bldr.Release()
+
+ for i := 0; i < numRows; i++ {
+ bldr.Append(true)
+ innerListBldr := bldr.ValueBuilder().(*array.ListBuilder)
+ innerValBldr :=
innerListBldr.ValueBuilder().(*array.Int32Builder)
+
+ numInnerLists := avgOuterListLen + (i % 3) - 1
+ for j := 0; j < numInnerLists; j++ {
+ innerListBldr.Append(true)
+ numVals := avgInnerListLen + (j % 3) - 1
+ for k := 0; k < numVals; k++ {
+ innerValBldr.Append(int32(i*1000 + j*10 + k))
+ }
+ }
+ }
+ values := bldr.NewArray()
+ defer values.Release()
+
+ // Create indices for partitioning pattern
+ indicesBldr := array.NewInt64Builder(mem)
+ defer indicesBldr.Release()
+ for i := numRows - 1; i >= 0; i-- {
+ indicesBldr.Append(int64(i))
+ }
+ indices := indicesBldr.NewArray()
+ defer indices.Release()
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ result, err := compute.TakeArray(ctx, values, indices)
+ if err != nil {
+ b.Fatal(err)
+ }
+ result.Release()
+ }
+
+ b.ReportMetric(float64(numRows*b.N)/b.Elapsed().Seconds(), "rows/sec")
+}
+
+func BenchmarkTakeListPartitionPattern(b *testing.B) {
+ // Simulate real-world partitioning workload where list data is
reorganized
+ // into multiple partitions (e.g., usage events with resources by
timestamp + source)
+ mem := memory.DefaultAllocator
+ ctx := compute.WithAllocator(context.Background(), mem)
+
+ const numRows = 50000
+ const numPartitions = 8
+ const avgListLen = 15
+
+ // Create source data
+ dt := arrow.ListOf(arrow.BinaryTypes.String)
+ bldr := array.NewBuilder(mem, dt).(*array.ListBuilder)
+ defer bldr.Release()
+
+ valBldr := bldr.ValueBuilder().(*array.StringBuilder)
+ for i := 0; i < numRows; i++ {
+ bldr.Append(true)
+ listLen := avgListLen + (i % 5) - 2
+ if listLen < 0 {
+ listLen = 0
+ }
+ for j := 0; j < listLen; j++ {
+ valBldr.Append(fmt.Sprintf("resource_%d_%d", i, j))
+ }
+ }
+ values := bldr.NewArray()
+ defer values.Release()
+
+ // Create indices that simulate partitioning by interleaving
+ // (every Nth row goes to partition N)
+ indicesBldr := array.NewInt64Builder(mem)
+ defer indicesBldr.Release()
+ for partition := 0; partition < numPartitions; partition++ {
+ for i := partition; i < numRows; i += numPartitions {
+ indicesBldr.Append(int64(i))
+ }
+ }
+ indices := indicesBldr.NewArray()
+ defer indices.Release()
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ result, err := compute.TakeArray(ctx, values, indices)
+ if err != nil {
+ b.Fatal(err)
+ }
+ result.Release()
+ }
+
+ b.ReportMetric(float64(numRows*b.N)/b.Elapsed().Seconds(), "rows/sec")
+}