singhpk234 commented on code in PR #13824:
URL: https://github.com/apache/iceberg/pull/13824#discussion_r2319704364


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -523,6 +530,16 @@ public ReadLimit getDefaultReadLimit() {
     }
   }
 
+  @Override
+  public void prepareForTriggerAvailableNow() {
+    LOG.info("The streaming query reports to use Trigger.AvailableNow");
+
+    lastOffsetForTriggerAvailableNow =
+        (StreamingOffset) latestOffset(initialOffset, 
ReadLimit.allAvailable());
+
+    LOG.info("lastOffset for Trigger.AvailableNow is {}", 
lastOffsetForTriggerAvailableNow.json());

Review Comment:
   why `.json()` is toString not suff ?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -384,6 +385,12 @@ public Offset latestOffset(Offset startOffset, ReadLimit 
limit) {
     Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
     validateCurrentSnapshotExists(curSnapshot, startingOffset);
 
+    // Use the pre-fetched snapshotId when Trigger.AvailableNow is enabled.

Review Comment:
   nit:
   ```suggestion
       // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled.
   ```



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java:
##########
@@ -214,6 +240,122 @@ public void testReadStreamWithCompositeReadLimit() throws 
Exception {
             SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
             SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
+
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws 
Exception {
+    File writerCheckpointFolder = 
temp.resolve("writer-checkpoint-folder").toFile();
+    File writerCheckpoint = new File(writerCheckpointFolder, 
"writer-checkpoint");
+    File output = temp.resolve("junit").toFile();
+
+    DataStreamWriter querySource =
+        spark
+            .readStream()
+            .format("iceberg")
+            .load(tableName)
+            .writeStream()
+            .option("checkpointLocation", writerCheckpoint.toString())
+            .format("parquet")
+            .trigger(Trigger.AvailableNow())
+            .option("path", output.getPath());
+
+    List<SimpleRecord> expected = Lists.newArrayList();
+    for (List<List<SimpleRecord>> expectedCheckpoint :
+        TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
+
+      // New data was added while the stream was not running
+      appendDataAsMultipleSnapshots(expectedCheckpoint);
+      
expected.addAll(Lists.newArrayList(Iterables.concat(Iterables.concat(expectedCheckpoint))));
+
+      try {
+        StreamingQuery query = querySource.start();
+
+        // Query should terminate on its own after processing all available 
data
+        assertThat(query.awaitTermination(60000)).isTrue();
+
+        // Check output
+        List<SimpleRecord> actual =
+            spark
+                .read()
+                .load(output.getPath())
+                .as(Encoders.bean(SimpleRecord.class))
+                .collectAsList();
+        
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+
+        // Restarting immediately should not reprocess data
+        query = querySource.start();
+        assertThat(query.awaitTermination(60000)).isTrue();
+        assertThat(query.recentProgress().length).isEqualTo(1);
+        assertThat(query.recentProgress()[0].sources()[0].startOffset())
+            .isEqualTo(query.recentProgress()[0].sources()[0].endOffset());
+      } finally {
+        stopStreams();
+      }
+    }
+  }
+
+  @TestTemplate
+  public void testTriggerAvailableNowDoesNotProcessNewDataWhileRunning() 
throws Exception {
+    List<List<SimpleRecord>> expectedData = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expectedData);
+
+    long expectedRecordCount = 
expectedData.stream().mapToLong(List::size).sum();
+
+    table.refresh();
+    long expectedSnapshotId = table.currentSnapshot().snapshotId();
+
+    String sinkTable = "availablenow_sink";
+    StreamingQuery query =
+        spark
+            .readStream()
+            .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")
+            .format("iceberg")
+            .load(tableName)
+            .writeStream()
+            .format("memory")
+            .queryName(sinkTable)
+            .trigger(Trigger.AvailableNow())
+            .start();
+
+    Thread.sleep(500);

Review Comment:
   why is this sleep required ?



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java:
##########
@@ -214,6 +240,122 @@ public void testReadStreamWithCompositeReadLimit() throws 
Exception {
             SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
             SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
+
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws 
Exception {
+    File writerCheckpointFolder = 
temp.resolve("writer-checkpoint-folder").toFile();
+    File writerCheckpoint = new File(writerCheckpointFolder, 
"writer-checkpoint");
+    File output = temp.resolve("junit").toFile();
+
+    DataStreamWriter querySource =
+        spark
+            .readStream()
+            .format("iceberg")
+            .load(tableName)
+            .writeStream()
+            .option("checkpointLocation", writerCheckpoint.toString())
+            .format("parquet")
+            .trigger(Trigger.AvailableNow())
+            .option("path", output.getPath());
+
+    List<SimpleRecord> expected = Lists.newArrayList();
+    for (List<List<SimpleRecord>> expectedCheckpoint :
+        TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
+
+      // New data was added while the stream was not running
+      appendDataAsMultipleSnapshots(expectedCheckpoint);
+      
expected.addAll(Lists.newArrayList(Iterables.concat(Iterables.concat(expectedCheckpoint))));
+
+      try {
+        StreamingQuery query = querySource.start();
+
+        // Query should terminate on its own after processing all available 
data
+        assertThat(query.awaitTermination(60000)).isTrue();
+
+        // Check output
+        List<SimpleRecord> actual =
+            spark
+                .read()
+                .load(output.getPath())
+                .as(Encoders.bean(SimpleRecord.class))
+                .collectAsList();
+        
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+
+        // Restarting immediately should not reprocess data
+        query = querySource.start();
+        assertThat(query.awaitTermination(60000)).isTrue();
+        assertThat(query.recentProgress().length).isEqualTo(1);
+        assertThat(query.recentProgress()[0].sources()[0].startOffset())
+            .isEqualTo(query.recentProgress()[0].sources()[0].endOffset());
+      } finally {
+        stopStreams();
+      }
+    }
+  }
+
+  @TestTemplate
+  public void testTriggerAvailableNowDoesNotProcessNewDataWhileRunning() 
throws Exception {
+    List<List<SimpleRecord>> expectedData = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expectedData);
+
+    long expectedRecordCount = 
expectedData.stream().mapToLong(List::size).sum();
+
+    table.refresh();
+    long expectedSnapshotId = table.currentSnapshot().snapshotId();
+
+    String sinkTable = "availablenow_sink";
+    StreamingQuery query =
+        spark
+            .readStream()
+            .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")
+            .format("iceberg")
+            .load(tableName)
+            .writeStream()
+            .format("memory")
+            .queryName(sinkTable)
+            .trigger(Trigger.AvailableNow())
+            .start();
+
+    Thread.sleep(500);
+    assertThat(query.isActive()).isTrue();
+
+    // Add new data while the stream is running
+    List<SimpleRecord> newDataDuringStreamSnap1 =
+        Lists.newArrayList(
+            new SimpleRecord(100, "hundred"),
+            new SimpleRecord(101, "hundred-one"),
+            new SimpleRecord(102, "hundred-two"));
+    List<SimpleRecord> newDataDuringStreamSnap2 =
+        Lists.newArrayList(
+            new SimpleRecord(200, "two-hundred"), new SimpleRecord(201, 
"two-hundred-one"));
+    appendData(newDataDuringStreamSnap1);
+    appendData(newDataDuringStreamSnap2);
+
+    // Query should terminate on its own after processing all available data
+    assertThat(query.awaitTermination(60000)).isTrue();

Review Comment:
   ```suggestion
       // Query should terminate on its own after processing all available data 
till expectedSnapshotId 
       assertThat(query.awaitTermination(60000)).isTrue();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to