singhpk234 commented on code in PR #8980:
URL: https://github.com/apache/iceberg/pull/8980#discussion_r1382019127


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -392,8 +405,15 @@ public Offset latestOffset(Offset startOffset, ReadLimit 
limit) {
 
       // if everything was OK and we consumed complete snapshot then move to 
next snapshot
       if (shouldContinueReading) {
+        Snapshot nextValid = 
nextSnapshotSkippingOverNoneProcessable(curSnapshot);
+        if (nextValid == null) {

Review Comment:
   can you please add a comment what null in the nextValid implies. 
   Also  can rename this to nextValidSnapshot. 



##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java:
##########
@@ -497,6 +500,67 @@ public void 
testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception
         .hasMessageStartingWith("Cannot process overwrite snapshot");
   }
 
+  @Test
+  public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplace() 
throws Exception {
+    // fill table with some data
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expected);
+
+    makeRewriteDataFiles();
+
+    Iterable<Snapshot> snapshots = table.snapshots();
+    for (Snapshot s : snapshots) {
+      System.out.println(s.snapshotId());
+    }
+
+    Assert.assertEquals(
+        6,
+        microBatchCount(
+            
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")));

Review Comment:
   I would test this with both the limits and each limits individually



##########
core/src/main/java/org/apache/iceberg/MicroBatches.java:
##########
@@ -92,7 +92,7 @@ private static List<Pair<ManifestFile, Integer>> 
indexManifests(
 
     for (ManifestFile manifest : manifestFiles) {
       manifestIndexes.add(Pair.of(manifest, currentFileIndex));
-      currentFileIndex += manifest.addedFilesCount() + 
manifest.existingFilesCount();
+      currentFileIndex += manifest.addedFilesCount();

Review Comment:
   can we please add a ut for this ?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -309,6 +309,19 @@ private static StreamingOffset 
determineStartingOffset(Table table, Long fromTim
     }
   }
 
+  private Snapshot nextSnapshotSkippingOverNoneProcessable(Snapshot 
curSnapshot) {

Review Comment:
   i would put this private function below the public function.



##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java:
##########
@@ -497,6 +500,67 @@ public void 
testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception
         .hasMessageStartingWith("Cannot process overwrite snapshot");
   }
 
+  @Test
+  public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplace() 
throws Exception {
+    // fill table with some data
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expected);
+
+    makeRewriteDataFiles();
+
+    Iterable<Snapshot> snapshots = table.snapshots();
+    for (Snapshot s : snapshots) {
+      System.out.println(s.snapshotId());

Review Comment:
   is this for debugging ? and required ? 



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -309,6 +309,19 @@ private static StreamingOffset 
determineStartingOffset(Table table, Long fromTim
     }
   }
 
+  private Snapshot nextSnapshotSkippingOverNoneProcessable(Snapshot 
curSnapshot) {
+    curSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId());
+    while (!shouldProcess(curSnapshot)) {

Review Comment:
   we should have a null check for the curSnapshot ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to