tanmayrauth commented on code in PR #1099:
URL: https://github.com/apache/iceberg-go/pull/1099#discussion_r3291870263


##########
table/transaction.go:
##########
@@ -1798,3 +1905,53 @@ func (s *StagedTable) Refresh(ctx context.Context) 
(*Table, error) {
 func (s *StagedTable) Scan(opts ...ScanOption) *Scan {
        panic(fmt.Errorf("%w: cannot scan a staged table", ErrInvalidOperation))
 }
+
+// prepareBatchFilter binds the given Iceberg filter against schema and 
converts
+// it to substrait once, returning a per-batch filter function that can be
+// reused across every record batch. The setup work (BindExpr, ConvertExpr) is
+// independent of the batch and is the most expensive part of filter-eval, so
+// hoisting it out of the iterator loop is a measurable win on rewrites that
+// produce many batches.
+//
+// The returned function takes ownership of the input batch (it releases it on
+// the AlwaysFalse fast-path) and returns a possibly-new batch the caller is
+// responsible for releasing.
+func prepareBatchFilter(ctx context.Context, filter iceberg.BooleanExpression, 
schema *iceberg.Schema, caseSensitive bool) (func(arrow.RecordBatch) 
(arrow.RecordBatch, error), error) {

Review Comment:
   I think the current contract is consistent: input ownership transfers to 
output, caller releases once. Adding Retain() on the AlwaysTrue path would leak 
the input (caller would have to release input AND output, but the contract 
today is "after postFilter(rec), only release the result").  
     



##########
table/transaction.go:
##########
@@ -1530,14 +1565,50 @@ func (t *Transaction) rewriteFilesWithFilter(ctx 
context.Context, fs io.IO, upda
        return nil
 }
 
-// rewriteSingleFile reads a single data file, applies the filter, and writes 
new files with filtered data
-func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO, 
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, caseSensitive 
bool, commitUUID uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
+// rewriteSingleFile reads a single data file, applies the filter, and writes 
new files with filtered data.
+// fileSeqNum is the source file's file_sequence_number from its manifest 
entry; required to synthesize
+// _last_updated_sequence_number for rows whose value is null in the source 
file.
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO, 
originalFile iceberg.DataFile, fileSeqNum *int64, filter 
iceberg.BooleanExpression, postFilter func(arrow.RecordBatch) 
(arrow.RecordBatch, error), caseSensitive bool, commitUUID uuid.UUID, 
concurrency int) ([]iceberg.DataFile, error) {

Review Comment:
   Bundled the params into rewriteSingleFileArgs. On the wasted bind: hoisted 
BindExpr into the !preserveRowLineage branch so the per-file bind is no longer 
wasted on the lineage path. Left the   prepareBatchFilter bind alone — it runs 
once per rewriteFilesWithFilter call rather than per file, so the cost is 
amortized across all rewritten files in the operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to