singhpk234 commented on code in PR #8980: URL: https://github.com/apache/iceberg/pull/8980#discussion_r1393422572
########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -392,8 +392,15 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { // if everything was OK and we consumed complete snapshot then move to next snapshot if (shouldContinueReading) { + Snapshot nextValid = nextValidSnapshot(curSnapshot); + if (nextValid == null) { + // nextValide is null, this implies all the remaining snapshots should be skipped. Review Comment: ```suggestion // nextValid null implies all the remaining snapshots should be skipped. ``` ########## core/src/main/java/org/apache/iceberg/MicroBatches.java: ########## @@ -92,7 +92,7 @@ private static List<Pair<ManifestFile, Integer>> indexManifests( for (ManifestFile manifest : manifestFiles) { manifestIndexes.add(Pair.of(manifest, currentFileIndex)); - currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount(); + currentFileIndex += manifest.addedFilesCount(); Review Comment: imho then we should not remove this line until we have coverage for this then, can you please revert this change then ? ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -392,8 +392,15 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { // if everything was OK and we consumed complete snapshot then move to next snapshot if (shouldContinueReading) { + Snapshot nextValid = nextValidSnapshot(curSnapshot); + if (nextValid == null) { + // nextValide is null, this implies all the remaining snapshots should be skipped. + shouldContinueReading = false; + break; + } + // we found the next available snapshot, continue from there. Review Comment: is this comment required ? ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -406,6 +413,30 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset; } + /** + * Get the next snapshot skiping over rewrite and delete snapshots. + * + * @param curSnapshot the current snapshot + * @return the next valid snapshot (not a rewrite or delete snapshot), returns null if all + * remaining snapshots should be skipped. + */ + private Snapshot nextValidSnapshot(Snapshot curSnapshot) { + Preconditions.checkArgument( + curSnapshot != null, "Sanity check, curSnapshot should not be null"); Review Comment: i would remove `Sanity check` txt from here as Precondition pretty much applies the same thing ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -406,6 +413,30 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset; } + /** + * Get the next snapshot skiping over rewrite and delete snapshots. + * + * @param curSnapshot the current snapshot + * @return the next valid snapshot (not a rewrite or delete snapshot), returns null if all + * remaining snapshots should be skipped. + */ + private Snapshot nextValidSnapshot(Snapshot curSnapshot) { + Preconditions.checkArgument( + curSnapshot != null, "Sanity check, curSnapshot should not be null"); + + Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId()); + // skip over rewrite and delete snapshots + while (!shouldProcess(nextSnapshot)) { Review Comment: does shouldProcess handle null ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org