This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 4ff615f fix(arrow/compute): Fix scalar comparison batches (#465)
4ff615f is described below
commit 4ff615f063e3beb31e8175ce40cb8c4bfe168145
Author: Matt Topol <[email protected]>
AuthorDate: Fri Aug 8 16:04:02 2025 -0400
fix(arrow/compute): Fix scalar comparison batches (#465)
### Rationale for this change
Fixes #464
### What changes are included in this PR?
Ensure the size of the slice used in scalar comparisons stays within the
expected batch size
### Are these changes tested?
Yes a test is added.
### Are there any user-facing changes?
A condition that previously resulted in panic will now succeed
---
arrow/compute/exprs/exec_test.go | 82 ++++++++++++++++++++++
.../compute/internal/kernels/scalar_comparisons.go | 2 +-
2 files changed, 83 insertions(+), 1 deletion(-)
diff --git a/arrow/compute/exprs/exec_test.go b/arrow/compute/exprs/exec_test.go
index b1cbec3..f56b9f7 100644
--- a/arrow/compute/exprs/exec_test.go
+++ b/arrow/compute/exprs/exec_test.go
@@ -699,3 +699,85 @@ func TestLargeTypes(t *testing.T) {
defer result.Release()
})
}
+
+func TestDecimalFilterLarge(t *testing.T) {
+ t.Parallel()
+
+ tt := []struct {
+ name string
+ n int
+ }{
+ {
+ name: "arrow.DECIMAL128 - number of records < 33 ok",
+ n: 32,
+ },
+ {
+ name: "arrow.DECIMAL128 - number of records >= 33
panic",
+ n: 33,
+ },
+ }
+
+ for _, tc := range tt {
+ tc := tc
+ t.Run(tc.name, func(t *testing.T) {
+ t.Parallel()
+
+ ctx := context.Background()
+ rq := require.New(t)
+
+ typ := &arrow.Decimal128Type{Precision: 3, Scale: 1}
+ field := arrow.Field{
+ Name: "col",
+ Type: typ,
+ Nullable: true,
+ }
+ schema := arrow.NewSchema([]arrow.Field{field}, nil)
+
+ db :=
array.NewDecimal128Builder(memory.DefaultAllocator, typ)
+ defer db.Release()
+
+ for i := 0; i < tc.n; i++ {
+ d, err :=
decimal.Decimal128FromFloat(float64(i), 3, 1)
+ rq.NoError(err, "Failed to create Decimal128
value")
+
+ db.Append(d)
+ }
+
+ rec := array.NewRecord(schema,
[]arrow.Array{db.NewArray()}, int64(tc.n))
+
+ extSet := exprs.GetExtensionIDSet(ctx)
+ builder := exprs.NewExprBuilder(extSet)
+
+ err := builder.SetInputSchema(schema)
+ rq.NoError(err, "Failed to set input schema")
+
+ v, p, s, err := expr.DecimalStringToBytes("10.0")
+ rq.NoError(err, "Failed to convert decimal string to
bytes")
+
+ lit, err := expr.NewLiteral(&types.Decimal{
+ Value: v[:16],
+ Precision: p,
+ Scale: s,
+ }, true)
+ rq.NoError(err, "Failed to create Decimal128 literal")
+
+ b, err := builder.CallScalar("less", nil,
+ builder.FieldRef("col"),
+ builder.Literal(lit),
+ )
+
+ rq.NoError(err, "Failed to call scalar")
+
+ e, err := b.BuildExpr()
+ rq.NoError(err, "Failed to build expression")
+
+ ctx = exprs.WithExtensionIDSet(ctx, extSet)
+
+ dr := compute.NewDatum(rec)
+ defer dr.Release()
+
+ _, err = exprs.ExecuteScalarExpression(ctx, schema, e,
dr)
+ rq.NoError(err, "Failed to execute scalar expression")
+ })
+ }
+}
diff --git a/arrow/compute/internal/kernels/scalar_comparisons.go
b/arrow/compute/internal/kernels/scalar_comparisons.go
index e4a5054..8cf23a2 100644
--- a/arrow/compute/internal/kernels/scalar_comparisons.go
+++ b/arrow/compute/internal/kernels/scalar_comparisons.go
@@ -147,7 +147,7 @@ func comparePrimitiveScalarArray[T arrow.FixedWidthType](op
cmpScalarLeft[T, T])
}
for j := 0; j < nbatches; j++ {
- op(leftVal, right, tmpOutSlice)
+ op(leftVal, right[:batchSize], tmpOutSlice)
right = right[batchSize:]
packBits(tmpOutput, out)
out = out[batchSize/8:]