This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new caa859d5 perf(compute): optimize Take kernel for list types (#573)
caa859d5 is described below

commit caa859d5c4f853d2422e17a4a9f99e262f934c72
Author: Alex <[email protected]>
AuthorDate: Fri Nov 14 12:40:28 2025 -0700

    perf(compute): optimize Take kernel for list types (#573)
    
    ### Rationale for this change
    
    This PR implements the same allocation improvements as
    https://github.com/apache/arrow-go/pull/557 but for the Take list
    kernel.
    
    ### What changes are included in this PR?
    
    - Pre-allocate upfront with a better estimate of the necessary buffer
    size to eliminate repeated reallocations.
    - Use exponential growth for additional allocations for O(log n) total
    reallocations.
    
    ### Are these changes tested?
    
    New benchmarks added for take list kernel
    
    ### Are there any user-facing changes?
    
    No
    
    ### Performance Comparison
    
    on `main`
    
    ```
    goos: darwin
    goarch: arm64
    pkg: github.com/apache/arrow-go/v18/arrow/compute
    cpu: Apple M3 Max
    BenchmarkTakeList/SmallBatch_ShortLists-16                  2320            
 555889 ns/op          1798923 rows/sec          3355390 B/op            396 
allocs/op
    BenchmarkTakeList/MediumBatch_ShortLists-16                   31           
37600434 ns/op           265955 rows/sec        324238694 B/op           3214 
allocs/op
    BenchmarkTakeList/LargeBatch_ShortLists-16                     2          
653682896 ns/op            76490 rows/sec       7877124660 B/op          15751 
allocs/op
    BenchmarkTakeList/XLargeBatch_ShortLists-16                    1         
2019873959 ns/op            49508 rows/sec      31380868976 B/op          31489 
allocs/op
    BenchmarkTakeList/SmallBatch_MediumLists-16                  334            
3435205 ns/op           291104 rows/sec         43059334 B/op           1086 
allocs/op
    BenchmarkTakeList/MediumBatch_MediumLists-16                   5          
230308533 ns/op            43420 rows/sec       4041679715 B/op          10098 
allocs/op
    BenchmarkTakeList/LargeBatch_MediumLists-16                    1         
4600489959 ns/op            10868 rows/sec     100213267960 B/op          50328 
allocs/op
    BenchmarkTakeList/XLargeBatch_MediumLists-16                   1        
16462343792 ns/op             6074 rows/sec     400427784144 B/op         
101032 allocs/op
    BenchmarkTakeList/LargeBatch_ShortLists_Large-16               1         
1682293042 ns/op            29721 rows/sec      31379855376 B/op          31422 
allocs/op
    BenchmarkTakeList/XLargeBatch_MediumLists_Large-16             1        
33800353917 ns/op             2959 rows/sec     800433389776 B/op         
102480 allocs/op
    BenchmarkTakeListPartitionPattern-16                           1         
3003105000 ns/op            16649 rows/sec      69581474288 B/op          46479 
allocs/op
    PASS
    ok      github.com/apache/arrow-go/v18/arrow/compute    71.015s
    ```
    
    on this branch
    
    ```
    goos: darwin
    goarch: arm64
    pkg: github.com/apache/arrow-go/v18/arrow/compute
    cpu: Apple M3 Max
    BenchmarkTakeList/SmallBatch_ShortLists-16                 25522            
 46138 ns/op          21674062 rows/sec        50668 B/op         83 allocs/op
    BenchmarkTakeList/MediumBatch_ShortLists-16                 3792            
316046 ns/op          31641022 rows/sec       457558 B/op         83 allocs/op
    BenchmarkTakeList/LargeBatch_ShortLists-16                   804           
1521240 ns/op          32867968 rows/sec      2232578 B/op         84 allocs/op
    BenchmarkTakeList/XLargeBatch_ShortLists-16                  416           
2832247 ns/op          35307705 rows/sec      4435314 B/op         84 allocs/op
    BenchmarkTakeList/SmallBatch_MediumLists-16                 9444            
125321 ns/op           7979591 rows/sec       173603 B/op         83 allocs/op
    BenchmarkTakeList/MediumBatch_MediumLists-16                1176            
999217 ns/op          10007857 rows/sec      1653926 B/op         83 allocs/op
    BenchmarkTakeList/LargeBatch_MediumLists-16                  232           
4913249 ns/op          10176590 rows/sec      8229752 B/op         85 allocs/op
    BenchmarkTakeList/XLargeBatch_MediumLists-16                 128           
9309120 ns/op          10742230 rows/sec     16428821 B/op         85 allocs/op
    BenchmarkTakeList/LargeBatch_ShortLists_Large-16             739           
1560044 ns/op          32050453 rows/sec      3428837 B/op         84 allocs/op
    BenchmarkTakeList/XLargeBatch_MediumLists_Large-16           122           
9712600 ns/op          10295969 rows/sec     24834215 B/op         85 allocs/op
    BenchmarkTakeListPartitionPattern-16                          96          
11756706 ns/op           4252901 rows/sec     18421151 B/op         98 allocs/op
    PASS
    ok      github.com/apache/arrow-go/v18/arrow/compute    17.869s
    
    ```
---
 arrow/compute/internal/kernels/vector_selection.go |  29 ++-
 arrow/compute/vector_selection_test.go             | 205 ++++++++++++++++++++-
 2 files changed, 229 insertions(+), 5 deletions(-)

diff --git a/arrow/compute/internal/kernels/vector_selection.go 
b/arrow/compute/internal/kernels/vector_selection.go
index 1d6c2187..62923407 100644
--- a/arrow/compute/internal/kernels/vector_selection.go
+++ b/arrow/compute/internal/kernels/vector_selection.go
@@ -1591,10 +1591,16 @@ func ListImpl[OffsetT int32 | int64](ctx 
*exec.KernelCtx, batch *exec.ExecSpan,
        if values.Len > 0 {
                dataLength := rawOffsets[values.Len] - rawOffsets[0]
                meanListLen := float64(dataLength) / float64(values.Len)
-               childIdxBuilder.reserve(int(meanListLen))
+               estimatedTotal := int(meanListLen * float64(outputLength))
+
+               // Cap the pre-allocation at a reasonable size
+               const maxPreAlloc = 16777216 // 16M elements
+               estimatedTotal = min(estimatedTotal, maxPreAlloc)
+               childIdxBuilder.reserve(estimatedTotal)
        }
 
        offsetBuilder.reserve(int(outputLength) + 1)
+       spaceAvail := childIdxBuilder.cap()
        var offset OffsetT
        err := fn(ctx, outputLength, values, selection, out,
                func(idx int64) error {
@@ -1602,10 +1608,29 @@ func ListImpl[OffsetT int32 | int64](ctx 
*exec.KernelCtx, batch *exec.ExecSpan,
                        valueOffset := rawOffsets[idx]
                        valueLength := rawOffsets[idx+1] - valueOffset
                        offset += valueLength
-                       childIdxBuilder.reserve(int(valueLength))
+                       if int(valueLength) > spaceAvail {
+                               // Calculate how much total capacity we need
+                               needed := childIdxBuilder.len() + 
int(valueLength)
+                               newCap := childIdxBuilder.cap()
+
+                               // Double capacity until we have enough space
+                               // This gives us O(log n) reallocations instead 
of O(n)
+                               if newCap == 0 {
+                                       newCap = int(valueLength)
+                               }
+                               for newCap < needed {
+                                       newCap = newCap * 2
+                               }
+
+                               // Reserve the additional capacity
+                               additional := newCap - childIdxBuilder.len()
+                               childIdxBuilder.reserve(additional)
+                               spaceAvail = childIdxBuilder.cap() - 
childIdxBuilder.len()
+                       }
                        for j := valueOffset; j < valueOffset+valueLength; j++ {
                                childIdxBuilder.unsafeAppend(j)
                        }
+                       spaceAvail -= int(valueLength)
                        return nil
                }, func() error {
                        offsetBuilder.unsafeAppend(offset)
diff --git a/arrow/compute/vector_selection_test.go 
b/arrow/compute/vector_selection_test.go
index 475d3821..a9d86a74 100644
--- a/arrow/compute/vector_selection_test.go
+++ b/arrow/compute/vector_selection_test.go
@@ -1677,7 +1677,7 @@ func BenchmarkTakeString(b *testing.B) {
 
        for _, bm := range benchmarks {
                b.Run(bm.name, func(b *testing.B) {
-                       mem := memory.NewGoAllocator()
+                       mem := memory.DefaultAllocator
                        ctx := compute.WithAllocator(context.Background(), mem)
 
                        // Create source array with strings of specified 
average length
@@ -1731,7 +1731,7 @@ func BenchmarkTakeString(b *testing.B) {
 func BenchmarkTakeStringPartitionPattern(b *testing.B) {
        // Simulate real-world partitioning workload where data is reorganized
        // into multiple partitions (e.g., by timestamp month + host)
-       mem := memory.NewGoAllocator()
+       mem := memory.DefaultAllocator
        ctx := compute.WithAllocator(context.Background(), mem)
 
        const numRows = 50000
@@ -1777,7 +1777,7 @@ func BenchmarkTakeStringPartitionPattern(b *testing.B) {
 func BenchmarkTakeMultiColumn(b *testing.B) {
        // Benchmark Take on a record batch with multiple string columns
        // to simulate real-world use cases (e.g., CloudFront logs with 20+ 
string columns)
-       mem := memory.NewGoAllocator()
+       mem := memory.DefaultAllocator
        ctx := compute.WithAllocator(context.Background(), mem)
 
        const numRows = 50000
@@ -1865,3 +1865,202 @@ func BenchmarkTakeMultiColumn(b *testing.B) {
 
        b.ReportMetric(float64(numRows*b.N)/b.Elapsed().Seconds(), "rows/sec")
 }
+
+// Benchmark tests for Take operation with list types
+// These benchmarks test the performance improvements from buffer 
pre-allocation
+// and exponential growth in ListImpl for nested data reorganization.
+
+func BenchmarkTakeList(b *testing.B) {
+       // Test various batch sizes and list lengths
+       benchmarks := []struct {
+               name       string
+               numRows    int64
+               avgListLen int
+               listType   arrow.DataType
+       }{
+               {"SmallBatch_ShortLists", 1000, 5, 
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+               {"MediumBatch_ShortLists", 10000, 5, 
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+               {"LargeBatch_ShortLists", 50000, 5, 
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+               {"XLargeBatch_ShortLists", 100000, 5, 
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+               {"SmallBatch_MediumLists", 1000, 20, 
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+               {"MediumBatch_MediumLists", 10000, 20, 
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+               {"LargeBatch_MediumLists", 50000, 20, 
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+               {"XLargeBatch_MediumLists", 100000, 20, 
arrow.ListOf(arrow.PrimitiveTypes.Int32)},
+               {"LargeBatch_ShortLists_Large", 50000, 5, 
arrow.LargeListOf(arrow.PrimitiveTypes.Int32)},
+               {"XLargeBatch_MediumLists_Large", 100000, 20, 
arrow.LargeListOf(arrow.PrimitiveTypes.Int32)},
+       }
+
+       for _, bm := range benchmarks {
+               b.Run(bm.name, func(b *testing.B) {
+                       mem := memory.DefaultAllocator
+                       ctx := compute.WithAllocator(context.Background(), mem)
+
+                       // Create source array with lists of specified average 
length
+                       bldr := array.NewBuilder(mem, bm.listType)
+                       defer bldr.Release()
+
+                       // Generate test data with varying list lengths
+                       for i := int64(0); i < bm.numRows; i++ {
+                               // Vary list length around the average
+                               listLen := bm.avgListLen + int(i%5) - 2
+                               if listLen < 0 {
+                                       listLen = 0
+                               }
+
+                               switch b := bldr.(type) {
+                               case *array.ListBuilder:
+                                       b.Append(true)
+                                       valBldr := 
b.ValueBuilder().(*array.Int32Builder)
+                                       for j := 0; j < listLen; j++ {
+                                               valBldr.Append(int32(i*100 + 
int64(j)))
+                                       }
+                               case *array.LargeListBuilder:
+                                       b.Append(true)
+                                       valBldr := 
b.ValueBuilder().(*array.Int32Builder)
+                                       for j := 0; j < listLen; j++ {
+                                               valBldr.Append(int32(i*100 + 
int64(j)))
+                                       }
+                               }
+                       }
+                       values := bldr.NewArray()
+                       defer values.Release()
+
+                       // Create indices that simulate 
partitioning/reorganization
+                       // Use a pattern that would be common in partitioned 
writes:
+                       // reverse order to maximize data movement
+                       indicesBldr := array.NewInt64Builder(mem)
+                       defer indicesBldr.Release()
+                       for i := bm.numRows - 1; i >= 0; i-- {
+                               indicesBldr.Append(i)
+                       }
+                       indices := indicesBldr.NewArray()
+                       defer indices.Release()
+
+                       // Reset timer after setup
+                       b.ResetTimer()
+                       b.ReportAllocs()
+
+                       // Run benchmark
+                       for i := 0; i < b.N; i++ {
+                               result, err := compute.TakeArray(ctx, values, 
indices)
+                               if err != nil {
+                                       b.Fatal(err)
+                               }
+                               result.Release()
+                       }
+
+                       // Report throughput
+                       
b.ReportMetric(float64(bm.numRows*int64(b.N))/b.Elapsed().Seconds(), "rows/sec")
+               })
+       }
+}
+
+func BenchmarkTakeNestedList(b *testing.B) {
+       // Test nested list types (list<list<int32>>)
+       // This is particularly relevant for complex schemas like usage events 
with resources
+       mem := memory.DefaultAllocator
+       ctx := compute.WithAllocator(context.Background(), mem)
+
+       const numRows = 50000
+       const avgOuterListLen = 3
+       const avgInnerListLen = 4
+
+       // Create source data
+       dt := arrow.ListOf(arrow.ListOf(arrow.PrimitiveTypes.Int32))
+       bldr := array.NewBuilder(mem, dt).(*array.ListBuilder)
+       defer bldr.Release()
+
+       for i := 0; i < numRows; i++ {
+               bldr.Append(true)
+               innerListBldr := bldr.ValueBuilder().(*array.ListBuilder)
+               innerValBldr := 
innerListBldr.ValueBuilder().(*array.Int32Builder)
+
+               numInnerLists := avgOuterListLen + (i % 3) - 1
+               for j := 0; j < numInnerLists; j++ {
+                       innerListBldr.Append(true)
+                       numVals := avgInnerListLen + (j % 3) - 1
+                       for k := 0; k < numVals; k++ {
+                               innerValBldr.Append(int32(i*1000 + j*10 + k))
+                       }
+               }
+       }
+       values := bldr.NewArray()
+       defer values.Release()
+
+       // Create indices for partitioning pattern
+       indicesBldr := array.NewInt64Builder(mem)
+       defer indicesBldr.Release()
+       for i := numRows - 1; i >= 0; i-- {
+               indicesBldr.Append(int64(i))
+       }
+       indices := indicesBldr.NewArray()
+       defer indices.Release()
+
+       b.ResetTimer()
+       b.ReportAllocs()
+
+       for i := 0; i < b.N; i++ {
+               result, err := compute.TakeArray(ctx, values, indices)
+               if err != nil {
+                       b.Fatal(err)
+               }
+               result.Release()
+       }
+
+       b.ReportMetric(float64(numRows*b.N)/b.Elapsed().Seconds(), "rows/sec")
+}
+
+func BenchmarkTakeListPartitionPattern(b *testing.B) {
+       // Simulate real-world partitioning workload where list data is 
reorganized
+       // into multiple partitions (e.g., usage events with resources by 
timestamp + source)
+       mem := memory.DefaultAllocator
+       ctx := compute.WithAllocator(context.Background(), mem)
+
+       const numRows = 50000
+       const numPartitions = 8
+       const avgListLen = 15
+
+       // Create source data
+       dt := arrow.ListOf(arrow.BinaryTypes.String)
+       bldr := array.NewBuilder(mem, dt).(*array.ListBuilder)
+       defer bldr.Release()
+
+       valBldr := bldr.ValueBuilder().(*array.StringBuilder)
+       for i := 0; i < numRows; i++ {
+               bldr.Append(true)
+               listLen := avgListLen + (i % 5) - 2
+               if listLen < 0 {
+                       listLen = 0
+               }
+               for j := 0; j < listLen; j++ {
+                       valBldr.Append(fmt.Sprintf("resource_%d_%d", i, j))
+               }
+       }
+       values := bldr.NewArray()
+       defer values.Release()
+
+       // Create indices that simulate partitioning by interleaving
+       // (every Nth row goes to partition N)
+       indicesBldr := array.NewInt64Builder(mem)
+       defer indicesBldr.Release()
+       for partition := 0; partition < numPartitions; partition++ {
+               for i := partition; i < numRows; i += numPartitions {
+                       indicesBldr.Append(int64(i))
+               }
+       }
+       indices := indicesBldr.NewArray()
+       defer indices.Release()
+
+       b.ResetTimer()
+       b.ReportAllocs()
+
+       for i := 0; i < b.N; i++ {
+               result, err := compute.TakeArray(ctx, values, indices)
+               if err != nil {
+                       b.Fatal(err)
+               }
+               result.Release()
+       }
+
+       b.ReportMetric(float64(numRows*b.N)/b.Elapsed().Seconds(), "rows/sec")
+}

Reply via email to