singhpk234 commented on code in PR #8980: URL: https://github.com/apache/iceberg/pull/8980#discussion_r1382019127
########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -392,8 +405,15 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { // if everything was OK and we consumed complete snapshot then move to next snapshot if (shouldContinueReading) { + Snapshot nextValid = nextSnapshotSkippingOverNoneProcessable(curSnapshot); + if (nextValid == null) { Review Comment: can you please add a comment what null in the nextValid implies. Also can rename this to nextValidSnapshot. ########## spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java: ########## @@ -497,6 +500,67 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception .hasMessageStartingWith("Cannot process overwrite snapshot"); } + @Test + public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplace() throws Exception { + // fill table with some data + List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(expected); + + makeRewriteDataFiles(); + + Iterable<Snapshot> snapshots = table.snapshots(); + for (Snapshot s : snapshots) { + System.out.println(s.snapshotId()); + } + + Assert.assertEquals( + 6, + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"))); Review Comment: I would test this with both the limits and each limits individually ########## core/src/main/java/org/apache/iceberg/MicroBatches.java: ########## @@ -92,7 +92,7 @@ private static List<Pair<ManifestFile, Integer>> indexManifests( for (ManifestFile manifest : manifestFiles) { manifestIndexes.add(Pair.of(manifest, currentFileIndex)); - currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount(); + currentFileIndex += manifest.addedFilesCount(); Review Comment: can we please add a ut for this ? ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -309,6 +309,19 @@ private static StreamingOffset determineStartingOffset(Table table, Long fromTim } } + private Snapshot nextSnapshotSkippingOverNoneProcessable(Snapshot curSnapshot) { Review Comment: i would put this private function below the public function. ########## spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java: ########## @@ -497,6 +500,67 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception .hasMessageStartingWith("Cannot process overwrite snapshot"); } + @Test + public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplace() throws Exception { + // fill table with some data + List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(expected); + + makeRewriteDataFiles(); + + Iterable<Snapshot> snapshots = table.snapshots(); + for (Snapshot s : snapshots) { + System.out.println(s.snapshotId()); Review Comment: is this for debugging ? and required ? ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -309,6 +309,19 @@ private static StreamingOffset determineStartingOffset(Table table, Long fromTim } } + private Snapshot nextSnapshotSkippingOverNoneProcessable(Snapshot curSnapshot) { + curSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId()); + while (!shouldProcess(curSnapshot)) { Review Comment: we should have a null check for the curSnapshot ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org