advancedxy commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1284027818


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -120,6 +138,9 @@ private CloseableIterable<RowData> newIterable(
       }
     }
 
+    if (rowFilter != null) {
+      return CloseableIterable.filter(iter, rowFilter::filter);
+    }

Review Comment:
   @Fokko @stevenzwu We have an internal request to add row filter dynamically, 
current impl requires the filter to be supplied at job startup time.  After 
some investigation, I believe maybe we don't have to passing the filters all 
the way done to this class.. The task itself already has the row filter 
`task.residual()`. We could simply convert it to rowFilter here, such as:
   
   ```
   //    if (rowFilter != null) {
   //      return CloseableIterable.filter(iter, rowFilter::filter);
   //    }
       if (task.residual() != null && 
!task.residual().isEquivalentTo(Expressions.alwaysTrue())) {
         FlinkSourceFilter dataFilter =
                 new FlinkSourceFilter(this.projectedSchema, task.residual(), 
this.caseSensitive);
         return CloseableIterable.filter(iter, dataFilter::filter);
       }
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to