tanmayrauth commented on code in PR #1099:
URL: https://github.com/apache/iceberg-go/pull/1099#discussion_r3291870263
##########
table/transaction.go:
##########
@@ -1798,3 +1905,53 @@ func (s *StagedTable) Refresh(ctx context.Context)
(*Table, error) {
func (s *StagedTable) Scan(opts ...ScanOption) *Scan {
panic(fmt.Errorf("%w: cannot scan a staged table", ErrInvalidOperation))
}
+
+// prepareBatchFilter binds the given Iceberg filter against schema and
converts
+// it to substrait once, returning a per-batch filter function that can be
+// reused across every record batch. The setup work (BindExpr, ConvertExpr) is
+// independent of the batch and is the most expensive part of filter-eval, so
+// hoisting it out of the iterator loop is a measurable win on rewrites that
+// produce many batches.
+//
+// The returned function takes ownership of the input batch (it releases it on
+// the AlwaysFalse fast-path) and returns a possibly-new batch the caller is
+// responsible for releasing.
+func prepareBatchFilter(ctx context.Context, filter iceberg.BooleanExpression,
schema *iceberg.Schema, caseSensitive bool) (func(arrow.RecordBatch)
(arrow.RecordBatch, error), error) {
Review Comment:
I think the current contract is consistent: input ownership transfers to
output, caller releases once. Adding Retain() on the AlwaysTrue path would leak
the input (caller would have to release input AND output, but the contract
today is "after postFilter(rec), only release the result").
##########
table/transaction.go:
##########
@@ -1530,14 +1565,50 @@ func (t *Transaction) rewriteFilesWithFilter(ctx
context.Context, fs io.IO, upda
return nil
}
-// rewriteSingleFile reads a single data file, applies the filter, and writes
new files with filtered data
-func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO,
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, caseSensitive
bool, commitUUID uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
+// rewriteSingleFile reads a single data file, applies the filter, and writes
new files with filtered data.
+// fileSeqNum is the source file's file_sequence_number from its manifest
entry; required to synthesize
+// _last_updated_sequence_number for rows whose value is null in the source
file.
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO,
originalFile iceberg.DataFile, fileSeqNum *int64, filter
iceberg.BooleanExpression, postFilter func(arrow.RecordBatch)
(arrow.RecordBatch, error), caseSensitive bool, commitUUID uuid.UUID,
concurrency int) ([]iceberg.DataFile, error) {
Review Comment:
Bundled the params into rewriteSingleFileArgs. On the wasted bind: hoisted
BindExpr into the !preserveRowLineage branch so the per-file bind is no longer
wasted on the lineage path. Left the prepareBatchFilter bind alone — it runs
once per rewriteFilesWithFilter call rather than per file, so the cost is
amortized across all rewritten files in the operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]