bk-mz commented on code in PR #12856:
URL: https://github.com/apache/iceberg/pull/12856#discussion_r2052921152


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -222,37 +264,74 @@ private List<FileScanTask> planFiles(StreamingOffset 
startOffset, StreamingOffse
       }
 
       Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
-
       validateCurrentSnapshotExists(snapshot, currentOffset);
 
       if (!shouldProcess(snapshot)) {
-        LOG.debug("Skipping snapshot: {} of table {}", 
currentOffset.snapshotId(), table.name());
+        LOG.debug(
+            "Skipping processing for snapshot id={} operation={}",
+            snapshot.snapshotId(),
+            snapshot.operation());
         continue;
       }
 
-      Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
-      if (currentOffset.snapshotId() == endOffset.snapshotId()) {
-        endFileIndex = endOffset.position();
-      } else {
-        endFileIndex = addedFilesCount(currentSnapshot);
-      }
+      long endFileIndex =
+          currentOffset.snapshotId() == endOffset.snapshotId()
+              ? endOffset.position()
+              : addedFilesCount(snapshot);
 
       MicroBatch latestMicroBatch =
-          MicroBatches.from(currentSnapshot, table.io())
+          MicroBatches.from(snapshot, table.io())
               .caseSensitive(caseSensitive)
               .specsById(table.specs())
               .generate(
                   currentOffset.position(),
                   endFileIndex,
                   Long.MAX_VALUE,
                   currentOffset.shouldScanAllFiles());
-
-      fileScanTasks.addAll(latestMicroBatch.tasks());
+      List<FileScanTask> tasks = latestMicroBatch.tasks();
+      Instant snapshotDt = Instant.ofEpochMilli(snapshot.timestampMillis());
+      LOG.debug(
+          "Processing snapshot [id={}, dateTime={}, ageHours={}, 
startFileIndex={}, endFileIndex={}] generated {} file scan tasks",
+          currentOffset.snapshotId(),
+          DateTimeFormatter.ISO_INSTANT.format(snapshotDt),
+          ChronoUnit.HOURS.between(snapshotDt, Instant.now()),
+          currentOffset.position(),
+          endFileIndex,
+          tasks.size());
+
+      fileScanTasks.addAll(tasks);
     } while (currentOffset.snapshotId() != endOffset.snapshotId());
 
     return fileScanTasks;
   }
 
+  public static class FileScanTaskSummary {

Review Comment:
   fixed



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -86,7 +89,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
   private final int maxFilesPerMicroBatch;
   private final int maxRecordsPerMicroBatch;
 
-  SparkMicroBatchStream(
+  public SparkMicroBatchStream(

Review Comment:
   fixed



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -105,28 +108,54 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
     this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch();
     this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch();
 
+    LOG.info(
+        "Initializing SparkMicroBatchStream with params: branch={}, 
caseSensitive={}, "
+            + "splitSize={}, splitLookback={}, splitOpenFileCost={}, 
fromTimestamp={}, "
+            + "maxFilesPerMicroBatch={}, maxRecordsPerMicroBatch={}",
+        branch,
+        caseSensitive,
+        splitSize,
+        splitLookback,
+        splitOpenFileCost,
+        fromTimestamp,
+        maxFilesPerMicroBatch,
+        maxRecordsPerMicroBatch);
+
     InitialOffsetStore initialOffsetStore =
         new InitialOffsetStore(table, checkpointLocation, fromTimestamp);
     this.initialOffset = initialOffsetStore.initialOffset();
+    LOG.debug("Initial offset set to {}", initialOffset);
 
     this.skipDelete = readConf.streamingSkipDeleteSnapshots();
     this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots();
+    LOG.debug("Skip delete snapshots={}, skip overwrite snapshots={}", 
skipDelete, skipOverwrite);
   }
 
   @Override
   public Offset latestOffset() {
     table.refresh();
-    if (table.currentSnapshot() == null) {
-      return StreamingOffset.START_OFFSET;
-    }
-
-    if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
+    LOG.debug(
+        "Refreshed table {}, current snapshot id={}",

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to