stevenzwu commented on code in PR #9308:
URL: https://github.com/apache/iceberg/pull/9308#discussion_r1428899523


##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java:
##########
@@ -113,7 +139,13 @@ public void 
handleSplitsChanges(SplitsChange<IcebergSourceSplit> splitsChange) {
   }
 
   @Override
-  public void wakeUp() {}
+  public void wakeUp() {
+    LOG.info("WakeUp called");
+    pausedSplits.clear();

Review Comment:
   The `fetch` method to be unblocked upon `wakeUp`. but is it correct to clear 
the `pausedSplits` upon `wakeUp`?



##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java:
##########
@@ -123,6 +155,19 @@ public void close() throws Exception {
     }
   }
 
+  @Override
+  public void pauseOrResumeSplits(
+      Collection<IcebergSourceSplit> splitsToPause, 
Collection<IcebergSourceSplit> splitsToResume) {
+    LOG.info("Pause splits: {} and resume splits: {}", splitsToPause, 
splitsToResume);
+    pausedSplits.addAll(

Review Comment:
   all the access to `pausedSplits` probably also need to be protected by 
synchronization.



##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java:
##########
@@ -80,6 +87,25 @@ public RecordsWithSplitIds<RecordAndPosition<T>> fetch() 
throws IOException {
     }
 
     if (currentReader.hasNext()) {
+      // Wait until the reader is blocked. Wake every second in case this 
missed a signal
+      boolean first = true;
+      while (pausedSplits.contains(currentSplitId)) {
+        if (first) {
+          LOG.info("Paused reading {}", currentSplitId);

Review Comment:
   all logs should include subtask index as important context



##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java:
##########
@@ -113,7 +139,13 @@ public void 
handleSplitsChanges(SplitsChange<IcebergSourceSplit> splitsChange) {
   }
 
   @Override
-  public void wakeUp() {}
+  public void wakeUp() {
+    LOG.info("WakeUp called");
+    pausedSplits.clear();
+    synchronized (signal) {
+      signal.notify();

Review Comment:
   probably doesn't matter. but is it safer to use `notifyAll` here?



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -273,23 +271,40 @@ public void apply(
   public void testThrottling() throws Exception {
     GenericAppenderHelper dataAppender = appender();
 
-    // Generate records with the following pattern:
-    // - File 1 - Later records (Watermark 6000000)
-    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
-    // - File 2 - First records (Watermark 0)
-    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
-    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
-    List<Record> batch =
+    // Generate records in advance

Review Comment:
   what's the reason of changing unit test code here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to