singhpk234 commented on code in PR #12856: URL: https://github.com/apache/iceberg/pull/12856#discussion_r2052647100
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -86,7 +89,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio private final int maxFilesPerMicroBatch; private final int maxRecordsPerMicroBatch; - SparkMicroBatchStream( + public SparkMicroBatchStream( Review Comment: why public ? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -222,37 +264,74 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse } Snapshot snapshot = table.snapshot(currentOffset.snapshotId()); - validateCurrentSnapshotExists(snapshot, currentOffset); if (!shouldProcess(snapshot)) { - LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name()); + LOG.debug( + "Skipping processing for snapshot id={} operation={}", + snapshot.snapshotId(), + snapshot.operation()); continue; } - Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId()); - if (currentOffset.snapshotId() == endOffset.snapshotId()) { - endFileIndex = endOffset.position(); - } else { - endFileIndex = addedFilesCount(currentSnapshot); - } + long endFileIndex = + currentOffset.snapshotId() == endOffset.snapshotId() + ? endOffset.position() + : addedFilesCount(snapshot); MicroBatch latestMicroBatch = - MicroBatches.from(currentSnapshot, table.io()) + MicroBatches.from(snapshot, table.io()) .caseSensitive(caseSensitive) .specsById(table.specs()) .generate( currentOffset.position(), endFileIndex, Long.MAX_VALUE, currentOffset.shouldScanAllFiles()); - - fileScanTasks.addAll(latestMicroBatch.tasks()); + List<FileScanTask> tasks = latestMicroBatch.tasks(); + Instant snapshotDt = Instant.ofEpochMilli(snapshot.timestampMillis()); + LOG.debug( + "Processing snapshot [id={}, dateTime={}, ageHours={}, startFileIndex={}, endFileIndex={}] generated {} file scan tasks", + currentOffset.snapshotId(), + DateTimeFormatter.ISO_INSTANT.format(snapshotDt), + ChronoUnit.HOURS.between(snapshotDt, Instant.now()), + currentOffset.position(), + endFileIndex, + tasks.size()); + + fileScanTasks.addAll(tasks); } while (currentOffset.snapshotId() != endOffset.snapshotId()); return fileScanTasks; } + public static class FileScanTaskSummary { Review Comment: why public ? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -105,28 +108,54 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch(); this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch(); + LOG.info( + "Initializing SparkMicroBatchStream with params: branch={}, caseSensitive={}, " + + "splitSize={}, splitLookback={}, splitOpenFileCost={}, fromTimestamp={}, " + + "maxFilesPerMicroBatch={}, maxRecordsPerMicroBatch={}", + branch, + caseSensitive, + splitSize, + splitLookback, + splitOpenFileCost, + fromTimestamp, + maxFilesPerMicroBatch, + maxRecordsPerMicroBatch); + InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation, fromTimestamp); this.initialOffset = initialOffsetStore.initialOffset(); + LOG.debug("Initial offset set to {}", initialOffset); this.skipDelete = readConf.streamingSkipDeleteSnapshots(); this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots(); + LOG.debug("Skip delete snapshots={}, skip overwrite snapshots={}", skipDelete, skipOverwrite); } @Override public Offset latestOffset() { table.refresh(); - if (table.currentSnapshot() == null) { - return StreamingOffset.START_OFFSET; - } - - if (table.currentSnapshot().timestampMillis() < fromTimestamp) { + LOG.debug( + "Refreshed table {}, current snapshot id={}", Review Comment: Suggestion : I would log both table name and table-uuid ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -105,28 +108,54 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch(); this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch(); + LOG.info( + "Initializing SparkMicroBatchStream with params: branch={}, caseSensitive={}, " + + "splitSize={}, splitLookback={}, splitOpenFileCost={}, fromTimestamp={}, " + + "maxFilesPerMicroBatch={}, maxRecordsPerMicroBatch={}", Review Comment: isn't this what we are inferring from the readConf and sparkContext etc ? is this is not easily derviable from the spark UI / history server ? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -105,28 +108,54 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch(); this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch(); + LOG.info( + "Initializing SparkMicroBatchStream with params: branch={}, caseSensitive={}, " + + "splitSize={}, splitLookback={}, splitOpenFileCost={}, fromTimestamp={}, " + + "maxFilesPerMicroBatch={}, maxRecordsPerMicroBatch={}", + branch, + caseSensitive, + splitSize, + splitLookback, + splitOpenFileCost, + fromTimestamp, + maxFilesPerMicroBatch, + maxRecordsPerMicroBatch); + InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation, fromTimestamp); this.initialOffset = initialOffsetStore.initialOffset(); + LOG.debug("Initial offset set to {}", initialOffset); this.skipDelete = readConf.streamingSkipDeleteSnapshots(); this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots(); + LOG.debug("Skip delete snapshots={}, skip overwrite snapshots={}", skipDelete, skipOverwrite); } @Override public Offset latestOffset() { table.refresh(); - if (table.currentSnapshot() == null) { - return StreamingOffset.START_OFFSET; - } - - if (table.currentSnapshot().timestampMillis() < fromTimestamp) { + LOG.debug( + "Refreshed table {}, current snapshot id={}", + table.name(), + table.currentSnapshot() != null ? table.currentSnapshot().snapshotId() : "null"); + + if (table.currentSnapshot() == null + || table.currentSnapshot().timestampMillis() < fromTimestamp) { + LOG.debug( + "No valid current snapshot or snapshot before fromTimestamp ({}), returning START_OFFSET", + fromTimestamp); return StreamingOffset.START_OFFSET; } Snapshot latestSnapshot = table.currentSnapshot(); Review Comment: can you move this line above and use latestSnapshot everywhere rather than using table.currentSnapshot() ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -138,24 +167,32 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { "Invalid start offset: %s is not a StreamingOffset", start); - if (end.equals(StreamingOffset.START_OFFSET)) { + StreamingOffset startOffset = (StreamingOffset) start; + StreamingOffset endOffset = (StreamingOffset) end; + + if (endOffset.equals(StreamingOffset.START_OFFSET)) { + LOG.debug("End offset is START_OFFSET, returning no partitions"); return new InputPartition[0]; } - StreamingOffset endOffset = (StreamingOffset) end; - StreamingOffset startOffset = (StreamingOffset) start; - List<FileScanTask> fileScanTasks = planFiles(startOffset, endOffset); + FileScanTaskSummary taskSummary = new FileScanTaskSummary(fileScanTasks); + LOG.debug( + "planFiles returned {} file scan tasks. total_files={}, total_size_in_bytes={}. Time taken to eval stats {} ms", + fileScanTasks.size(), + taskSummary.nrOfFiles, + taskSummary.sizeInBytes, + taskSummary.evalTimeTakenMs); CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTasks), splitSize); List<CombinedScanTask> combinedScanTasks = Lists.newArrayList( TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); - String[][] locations = computePreferredLocations(combinedScanTasks); + LOG.debug("Split into {} combined scan tasks", combinedScanTasks.size()); Review Comment: I would use the class name, rather than using the static names as they can change ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -222,37 +264,74 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse } Snapshot snapshot = table.snapshot(currentOffset.snapshotId()); - validateCurrentSnapshotExists(snapshot, currentOffset); if (!shouldProcess(snapshot)) { - LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name()); + LOG.debug( + "Skipping processing for snapshot id={} operation={}", + snapshot.snapshotId(), + snapshot.operation()); continue; } - Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId()); - if (currentOffset.snapshotId() == endOffset.snapshotId()) { - endFileIndex = endOffset.position(); - } else { - endFileIndex = addedFilesCount(currentSnapshot); - } + long endFileIndex = + currentOffset.snapshotId() == endOffset.snapshotId() + ? endOffset.position() + : addedFilesCount(snapshot); MicroBatch latestMicroBatch = - MicroBatches.from(currentSnapshot, table.io()) + MicroBatches.from(snapshot, table.io()) .caseSensitive(caseSensitive) .specsById(table.specs()) .generate( currentOffset.position(), endFileIndex, Long.MAX_VALUE, currentOffset.shouldScanAllFiles()); - - fileScanTasks.addAll(latestMicroBatch.tasks()); + List<FileScanTask> tasks = latestMicroBatch.tasks(); + Instant snapshotDt = Instant.ofEpochMilli(snapshot.timestampMillis()); + LOG.debug( + "Processing snapshot [id={}, dateTime={}, ageHours={}, startFileIndex={}, endFileIndex={}] generated {} file scan tasks", + currentOffset.snapshotId(), + DateTimeFormatter.ISO_INSTANT.format(snapshotDt), + ChronoUnit.HOURS.between(snapshotDt, Instant.now()), + currentOffset.position(), + endFileIndex, + tasks.size()); + + fileScanTasks.addAll(tasks); } while (currentOffset.snapshotId() != endOffset.snapshotId()); return fileScanTasks; } + public static class FileScanTaskSummary { + private long sizeInBytes = 0; + private int nrOfFiles = 0; + private long evalTimeTakenMs; + + public FileScanTaskSummary(List<FileScanTask> list) { + long currentTimeMillis = System.currentTimeMillis(); + for (var task : list) { + nrOfFiles += task.filesCount(); + sizeInBytes += task.sizeBytes(); + } + this.evalTimeTakenMs = System.currentTimeMillis() - currentTimeMillis; + } Review Comment: can you please move this out of the constructor and to it's individual functions and cache it ? we need this info only debug logging is enabled and it might be a bit expensive. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org