amogh-jahagirdar commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147019781


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, 
partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = 
Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize 
partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+
+    // parallelize the manifest read and
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, 
normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }

Review Comment:
   Wondering is there an clean way to do this with java streams?



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, 
partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = 
Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize 
partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+
+    // parallelize the manifest read and
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, 
normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    // read list of data and delete manifests from current snapshot obtained 
via scan
+    CloseableIterable<ManifestFile> dataManifests =
+        CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+    LoadingCache<Integer, ManifestEvaluator> evalCache =
+        Caffeine.newBuilder()
+            .build(
+                specId -> {
+                  PartitionSpec spec = table.specs().get(specId);
+                  PartitionSpec transformedSpec = 
transformSpec(scan.tableSchema(), spec);
+                  return ManifestEvaluator.forRowFilter(
+                      scan.filter(), transformedSpec, scan.isCaseSensitive());
+                });
+
+    CloseableIterable<ManifestFile> filteredManifests =
+        CloseableIterable.filter(
+            dataManifests, manifest -> 
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<DataFile>> tasks =
+        CloseableIterable.transform(
+            filteredManifests,
+            manifest ->
+                ManifestFiles.read(manifest, table.io(), table.specs())
+                    .caseSensitive(scan.isCaseSensitive())
+                    // hardcoded to avoid scan stats column on partition table
+                    .select(BaseScan.SCAN_COLUMNS));
+
+    return (scan.planExecutor() != null)
+        ? new ParallelIterable<>(tasks, scan.planExecutor())
+        : CloseableIterable.concat(tasks);

Review Comment:
   Can we avoid the branching behavior here? It looks like 
`scan.planExecutor()` will never be null so we can always return 
`ParallelIterable<>`?



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, 
partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting

Review Comment:
   Do we need the annotation here? Ideally we can test the behavior through the 
existing tests without having to expose this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to