anoopj commented on code in PR #16210:
URL: https://github.com/apache/iceberg/pull/16210#discussion_r3196753333


##########
core/src/main/java/org/apache/iceberg/MicroBatches.java:
##########
@@ -58,11 +62,39 @@ public static CloseableIterable<FileScanTask> 
openManifestFile(
       Snapshot snapshot,
       ManifestFile manifestFile,
       boolean scanAllFiles) {
+    return openManifestFileWithFilter(
+        io, specsById, caseSensitive, snapshot, manifestFile, scanAllFiles, 
Lists.newArrayList());
+  }
+
+  public static CloseableIterable<FileScanTask> openManifestFileWithFilter(
+      FileIO io,
+      Map<Integer, PartitionSpec> specsById,
+      boolean caseSensitive,
+      Snapshot snapshot,
+      ManifestFile manifestFile,
+      boolean scanAllFiles,
+      List<Expression> pushedFilters) {
+
+    // 1. Get the field IDs used in the partition spec
+    Expression partitionExpr = Expressions.alwaysTrue();
+    Expression dataExpr = Expressions.alwaysTrue();
+
+    for (Expression filter : pushedFilters) {
+      if (isPartitionOnly(
+          filter, specsById.values().iterator().next().partitionType(), 
caseSensitive)) {

Review Comment:
   Tables with partition evolution will have multiple specs. The code here is 
grabbing an arbitrary partition spec. Take a look at 
`ExpressionUtil.selectsPartitions()` for an example. 



##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -102,11 +106,13 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
     this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch();
     this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch();
     this.cacheDeleteFilesOnExecutors = readConf.cacheDeleteFilesOnExecutors();
+    this.filters = filters;
 
     InitialOffsetStore initialOffsetStore =
         new InitialOffsetStore(
             table, checkpointLocation, fromTimestamp, 
sparkContext.hadoopConfiguration());
     this.initialOffset = initialOffsetStore.initialOffset();
+    LOGGER.error("[jalpan] creating micro batch with filter {} ", filters);

Review Comment:
   Remove these? Also below on L137



##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -57,6 +58,7 @@
 import org.slf4j.LoggerFactory;
 
 public class SparkMicroBatchStream implements MicroBatchStream, 
SupportsTriggerAvailableNow {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);

Review Comment:
   This class already has a logger called `LOG` a few lines below. This is 
initializing a duplicate logger. 



##########
core/src/main/java/org/apache/iceberg/MicroBatches.java:
##########
@@ -76,6 +108,20 @@ public static CloseableIterable<FileScanTask> 
openManifestFile(
     return manifestGroup.planFiles();
   }
 
+  // 2. The Helper Method using Iceberg's core Visitor
+  private static boolean isPartitionOnly(
+      Expression expr, Types.StructType partitionType, boolean caseSensitive) {
+    try {
+      // If this doesn't throw an error, it means the filter
+      // only uses columns present in the partition schema.
+      Binder.bind(partitionType, expr, caseSensitive);
+      return true;

Review Comment:
   It's not a good idea to do exception-driven control flow for normal 
operations.  The code base already has  `ExpressionUtil.selectsPartitions()` 
which handles this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to