bk-mz commented on code in PR #12856: URL: https://github.com/apache/iceberg/pull/12856#discussion_r2052921366
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -105,28 +108,54 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch(); this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch(); + LOG.info( + "Initializing SparkMicroBatchStream with params: branch={}, caseSensitive={}, " + + "splitSize={}, splitLookback={}, splitOpenFileCost={}, fromTimestamp={}, " + + "maxFilesPerMicroBatch={}, maxRecordsPerMicroBatch={}", + branch, + caseSensitive, + splitSize, + splitLookback, + splitOpenFileCost, + fromTimestamp, + maxFilesPerMicroBatch, + maxRecordsPerMicroBatch); + InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation, fromTimestamp); this.initialOffset = initialOffsetStore.initialOffset(); + LOG.debug("Initial offset set to {}", initialOffset); this.skipDelete = readConf.streamingSkipDeleteSnapshots(); this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots(); + LOG.debug("Skip delete snapshots={}, skip overwrite snapshots={}", skipDelete, skipOverwrite); } @Override public Offset latestOffset() { table.refresh(); - if (table.currentSnapshot() == null) { - return StreamingOffset.START_OFFSET; - } - - if (table.currentSnapshot().timestampMillis() < fromTimestamp) { + LOG.debug( + "Refreshed table {}, current snapshot id={}", + table.name(), + table.currentSnapshot() != null ? table.currentSnapshot().snapshotId() : "null"); + + if (table.currentSnapshot() == null + || table.currentSnapshot().timestampMillis() < fromTimestamp) { + LOG.debug( + "No valid current snapshot or snapshot before fromTimestamp ({}), returning START_OFFSET", + fromTimestamp); return StreamingOffset.START_OFFSET; } Snapshot latestSnapshot = table.currentSnapshot(); Review Comment: fixed ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -222,37 +264,74 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse } Snapshot snapshot = table.snapshot(currentOffset.snapshotId()); - validateCurrentSnapshotExists(snapshot, currentOffset); if (!shouldProcess(snapshot)) { - LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name()); + LOG.debug( + "Skipping processing for snapshot id={} operation={}", + snapshot.snapshotId(), + snapshot.operation()); continue; } - Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId()); - if (currentOffset.snapshotId() == endOffset.snapshotId()) { - endFileIndex = endOffset.position(); - } else { - endFileIndex = addedFilesCount(currentSnapshot); - } + long endFileIndex = + currentOffset.snapshotId() == endOffset.snapshotId() + ? endOffset.position() + : addedFilesCount(snapshot); MicroBatch latestMicroBatch = - MicroBatches.from(currentSnapshot, table.io()) + MicroBatches.from(snapshot, table.io()) .caseSensitive(caseSensitive) .specsById(table.specs()) .generate( currentOffset.position(), endFileIndex, Long.MAX_VALUE, currentOffset.shouldScanAllFiles()); - - fileScanTasks.addAll(latestMicroBatch.tasks()); + List<FileScanTask> tasks = latestMicroBatch.tasks(); + Instant snapshotDt = Instant.ofEpochMilli(snapshot.timestampMillis()); + LOG.debug( + "Processing snapshot [id={}, dateTime={}, ageHours={}, startFileIndex={}, endFileIndex={}] generated {} file scan tasks", + currentOffset.snapshotId(), + DateTimeFormatter.ISO_INSTANT.format(snapshotDt), + ChronoUnit.HOURS.between(snapshotDt, Instant.now()), + currentOffset.position(), + endFileIndex, + tasks.size()); + + fileScanTasks.addAll(tasks); } while (currentOffset.snapshotId() != endOffset.snapshotId()); return fileScanTasks; } + public static class FileScanTaskSummary { + private long sizeInBytes = 0; + private int nrOfFiles = 0; + private long evalTimeTakenMs; + + public FileScanTaskSummary(List<FileScanTask> list) { + long currentTimeMillis = System.currentTimeMillis(); + for (var task : list) { + nrOfFiles += task.filesCount(); + sizeInBytes += task.sizeBytes(); + } + this.evalTimeTakenMs = System.currentTimeMillis() - currentTimeMillis; + } Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org