singhpk234 commented on code in PR #8980:
URL: https://github.com/apache/iceberg/pull/8980#discussion_r1393422572


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -392,8 +392,15 @@ public Offset latestOffset(Offset startOffset, ReadLimit 
limit) {
 
       // if everything was OK and we consumed complete snapshot then move to 
next snapshot
       if (shouldContinueReading) {
+        Snapshot nextValid = nextValidSnapshot(curSnapshot);
+        if (nextValid == null) {
+          // nextValide is null, this implies all the remaining snapshots 
should be skipped.

Review Comment:
   ```suggestion
             // nextValid null implies all the remaining snapshots should be 
skipped.
   ```



##########
core/src/main/java/org/apache/iceberg/MicroBatches.java:
##########
@@ -92,7 +92,7 @@ private static List<Pair<ManifestFile, Integer>> 
indexManifests(
 
     for (ManifestFile manifest : manifestFiles) {
       manifestIndexes.add(Pair.of(manifest, currentFileIndex));
-      currentFileIndex += manifest.addedFilesCount() + 
manifest.existingFilesCount();
+      currentFileIndex += manifest.addedFilesCount();

Review Comment:
   imho then we should not remove this line until we have coverage for this 
then, can you please revert this change then ? 



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -392,8 +392,15 @@ public Offset latestOffset(Offset startOffset, ReadLimit 
limit) {
 
       // if everything was OK and we consumed complete snapshot then move to 
next snapshot
       if (shouldContinueReading) {
+        Snapshot nextValid = nextValidSnapshot(curSnapshot);
+        if (nextValid == null) {
+          // nextValide is null, this implies all the remaining snapshots 
should be skipped.
+          shouldContinueReading = false;
+          break;
+        }
+        // we found the next available snapshot, continue from there.

Review Comment:
   is this comment required ? 



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -406,6 +413,30 @@ public Offset latestOffset(Offset startOffset, ReadLimit 
limit) {
     return latestStreamingOffset.equals(startingOffset) ? null : 
latestStreamingOffset;
   }
 
+  /**
+   * Get the next snapshot skiping over rewrite and delete snapshots.
+   *
+   * @param curSnapshot the current snapshot
+   * @return the next valid snapshot (not a rewrite or delete snapshot), 
returns null if all
+   *     remaining snapshots should be skipped.
+   */
+  private Snapshot nextValidSnapshot(Snapshot curSnapshot) {
+    Preconditions.checkArgument(
+        curSnapshot != null, "Sanity check, curSnapshot should not be null");

Review Comment:
   i would remove `Sanity check` txt from here as Precondition pretty much 
applies the same thing 



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -406,6 +413,30 @@ public Offset latestOffset(Offset startOffset, ReadLimit 
limit) {
     return latestStreamingOffset.equals(startingOffset) ? null : 
latestStreamingOffset;
   }
 
+  /**
+   * Get the next snapshot skiping over rewrite and delete snapshots.
+   *
+   * @param curSnapshot the current snapshot
+   * @return the next valid snapshot (not a rewrite or delete snapshot), 
returns null if all
+   *     remaining snapshots should be skipped.
+   */
+  private Snapshot nextValidSnapshot(Snapshot curSnapshot) {
+    Preconditions.checkArgument(
+        curSnapshot != null, "Sanity check, curSnapshot should not be null");
+
+    Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table, 
curSnapshot.snapshotId());
+    // skip over rewrite and delete snapshots
+    while (!shouldProcess(nextSnapshot)) {

Review Comment:
   does shouldProcess handle null ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to