bk-mz commented on code in PR #12856:
URL: https://github.com/apache/iceberg/pull/12856#discussion_r2052921366


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -105,28 +108,54 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
     this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch();
     this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch();
 
+    LOG.info(
+        "Initializing SparkMicroBatchStream with params: branch={}, 
caseSensitive={}, "
+            + "splitSize={}, splitLookback={}, splitOpenFileCost={}, 
fromTimestamp={}, "
+            + "maxFilesPerMicroBatch={}, maxRecordsPerMicroBatch={}",
+        branch,
+        caseSensitive,
+        splitSize,
+        splitLookback,
+        splitOpenFileCost,
+        fromTimestamp,
+        maxFilesPerMicroBatch,
+        maxRecordsPerMicroBatch);
+
     InitialOffsetStore initialOffsetStore =
         new InitialOffsetStore(table, checkpointLocation, fromTimestamp);
     this.initialOffset = initialOffsetStore.initialOffset();
+    LOG.debug("Initial offset set to {}", initialOffset);
 
     this.skipDelete = readConf.streamingSkipDeleteSnapshots();
     this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots();
+    LOG.debug("Skip delete snapshots={}, skip overwrite snapshots={}", 
skipDelete, skipOverwrite);
   }
 
   @Override
   public Offset latestOffset() {
     table.refresh();
-    if (table.currentSnapshot() == null) {
-      return StreamingOffset.START_OFFSET;
-    }
-
-    if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
+    LOG.debug(
+        "Refreshed table {}, current snapshot id={}",
+        table.name(),
+        table.currentSnapshot() != null ? table.currentSnapshot().snapshotId() 
: "null");
+
+    if (table.currentSnapshot() == null
+        || table.currentSnapshot().timestampMillis() < fromTimestamp) {
+      LOG.debug(
+          "No valid current snapshot or snapshot before fromTimestamp ({}), 
returning START_OFFSET",
+          fromTimestamp);
       return StreamingOffset.START_OFFSET;
     }
 
     Snapshot latestSnapshot = table.currentSnapshot();

Review Comment:
   fixed



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -222,37 +264,74 @@ private List<FileScanTask> planFiles(StreamingOffset 
startOffset, StreamingOffse
       }
 
       Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
-
       validateCurrentSnapshotExists(snapshot, currentOffset);
 
       if (!shouldProcess(snapshot)) {
-        LOG.debug("Skipping snapshot: {} of table {}", 
currentOffset.snapshotId(), table.name());
+        LOG.debug(
+            "Skipping processing for snapshot id={} operation={}",
+            snapshot.snapshotId(),
+            snapshot.operation());
         continue;
       }
 
-      Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
-      if (currentOffset.snapshotId() == endOffset.snapshotId()) {
-        endFileIndex = endOffset.position();
-      } else {
-        endFileIndex = addedFilesCount(currentSnapshot);
-      }
+      long endFileIndex =
+          currentOffset.snapshotId() == endOffset.snapshotId()
+              ? endOffset.position()
+              : addedFilesCount(snapshot);
 
       MicroBatch latestMicroBatch =
-          MicroBatches.from(currentSnapshot, table.io())
+          MicroBatches.from(snapshot, table.io())
               .caseSensitive(caseSensitive)
               .specsById(table.specs())
               .generate(
                   currentOffset.position(),
                   endFileIndex,
                   Long.MAX_VALUE,
                   currentOffset.shouldScanAllFiles());
-
-      fileScanTasks.addAll(latestMicroBatch.tasks());
+      List<FileScanTask> tasks = latestMicroBatch.tasks();
+      Instant snapshotDt = Instant.ofEpochMilli(snapshot.timestampMillis());
+      LOG.debug(
+          "Processing snapshot [id={}, dateTime={}, ageHours={}, 
startFileIndex={}, endFileIndex={}] generated {} file scan tasks",
+          currentOffset.snapshotId(),
+          DateTimeFormatter.ISO_INSTANT.format(snapshotDt),
+          ChronoUnit.HOURS.between(snapshotDt, Instant.now()),
+          currentOffset.position(),
+          endFileIndex,
+          tasks.size());
+
+      fileScanTasks.addAll(tasks);
     } while (currentOffset.snapshotId() != endOffset.snapshotId());
 
     return fileScanTasks;
   }
 
+  public static class FileScanTaskSummary {
+    private long sizeInBytes = 0;
+    private int nrOfFiles = 0;
+    private long evalTimeTakenMs;
+
+    public FileScanTaskSummary(List<FileScanTask> list) {
+      long currentTimeMillis = System.currentTimeMillis();
+      for (var task : list) {
+        nrOfFiles += task.filesCount();
+        sizeInBytes += task.sizeBytes();
+      }
+      this.evalTimeTakenMs = System.currentTimeMillis() - currentTimeMillis;
+    }

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to