alexprosak commented on code in PR #13824:
URL: https://github.com/apache/iceberg/pull/13824#discussion_r2319862220


##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java:
##########
@@ -214,6 +240,122 @@ public void testReadStreamWithCompositeReadLimit() throws 
Exception {
             SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
             SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
+
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws 
Exception {
+    File writerCheckpointFolder = 
temp.resolve("writer-checkpoint-folder").toFile();
+    File writerCheckpoint = new File(writerCheckpointFolder, 
"writer-checkpoint");
+    File output = temp.resolve("junit").toFile();
+
+    DataStreamWriter querySource =
+        spark
+            .readStream()
+            .format("iceberg")
+            .load(tableName)
+            .writeStream()
+            .option("checkpointLocation", writerCheckpoint.toString())
+            .format("parquet")
+            .trigger(Trigger.AvailableNow())
+            .option("path", output.getPath());
+
+    List<SimpleRecord> expected = Lists.newArrayList();
+    for (List<List<SimpleRecord>> expectedCheckpoint :
+        TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
+
+      // New data was added while the stream was not running
+      appendDataAsMultipleSnapshots(expectedCheckpoint);
+      
expected.addAll(Lists.newArrayList(Iterables.concat(Iterables.concat(expectedCheckpoint))));
+
+      try {
+        StreamingQuery query = querySource.start();
+
+        // Query should terminate on its own after processing all available 
data
+        assertThat(query.awaitTermination(60000)).isTrue();
+
+        // Check output
+        List<SimpleRecord> actual =
+            spark
+                .read()
+                .load(output.getPath())
+                .as(Encoders.bean(SimpleRecord.class))
+                .collectAsList();
+        
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+
+        // Restarting immediately should not reprocess data
+        query = querySource.start();
+        assertThat(query.awaitTermination(60000)).isTrue();
+        assertThat(query.recentProgress().length).isEqualTo(1);
+        assertThat(query.recentProgress()[0].sources()[0].startOffset())
+            .isEqualTo(query.recentProgress()[0].sources()[0].endOffset());
+      } finally {
+        stopStreams();
+      }
+    }
+  }
+
+  @TestTemplate
+  public void testTriggerAvailableNowDoesNotProcessNewDataWhileRunning() 
throws Exception {
+    List<List<SimpleRecord>> expectedData = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expectedData);
+
+    long expectedRecordCount = 
expectedData.stream().mapToLong(List::size).sum();
+
+    table.refresh();
+    long expectedSnapshotId = table.currentSnapshot().snapshotId();
+
+    String sinkTable = "availablenow_sink";
+    StreamingQuery query =
+        spark
+            .readStream()
+            .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")
+            .format("iceberg")
+            .load(tableName)
+            .writeStream()
+            .format("memory")
+            .queryName(sinkTable)
+            .trigger(Trigger.AvailableNow())
+            .start();
+
+    Thread.sleep(500);

Review Comment:
   Not required, forgot to remove it myself - good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to