alexprosak commented on code in PR #13824:
URL: https://github.com/apache/iceberg/pull/13824#discussion_r2303227079


##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java:
##########
@@ -216,6 +217,87 @@ public void testReadStreamWithCompositeReadLimit() throws 
Exception {
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
+  @TestTemplate
+  public void testAvailableNowReadStreamWithMaxFiles2() throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"2"),
+        List.of(3L, 2L, 2L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowReadStreamWithMaxRows4() throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"4"),
+        List.of(4L, 3L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowReadStreamWithCompositeReadLimit() throws 
Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws 
Exception {
+    File writerCheckpointFolder = 
temp.resolve("writer-checkpoint-folder").toFile();
+    File writerCheckpoint = new File(writerCheckpointFolder, 
"writer-checkpoint");
+    File output = temp.resolve("junit").toFile();
+
+    DataStreamWriter querySource =
+        spark
+            .readStream()
+            .format("iceberg")
+            .load(tableName)
+            .writeStream()
+            .option("checkpointLocation", writerCheckpoint.toString())
+            .format("parquet")
+            .trigger(Trigger.AvailableNow())
+            .option("path", output.getPath());
+
+    List<SimpleRecord> expected = Lists.newArrayList();
+    for (List<List<SimpleRecord>> expectedCheckpoint :
+        TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
+
+      // New data was added while the stream was down

Review Comment:
   reworded this to "not running"



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java:
##########
@@ -216,6 +217,87 @@ public void testReadStreamWithCompositeReadLimit() throws 
Exception {
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
+  @TestTemplate
+  public void testAvailableNowReadStreamWithMaxFiles2() throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"2"),
+        List.of(3L, 2L, 2L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowReadStreamWithMaxRows4() throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"4"),
+        List.of(4L, 3L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowReadStreamWithCompositeReadLimit() throws 
Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L),
+        Trigger.AvailableNow());
+  }

Review Comment:
   Sure, updated to include in the above tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to