stevenzwu commented on code in PR #8553:
URL: https://github.com/apache/iceberg/pull/8553#discussion_r1395154710


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {

Review Comment:
   I am not very clear about our test strategy to confirm watermark alignment. 
We can probably borrow some ideas from Flink test code on watermark alignment



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import 
org.apache.iceberg.flink.source.reader.IcebergTimestampWatermarkExtractor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {

Review Comment:
   `TestBucketPartitionerFlinkIcebergSink` is an example of junit5



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "File-3-1"), (3, "File-3-3")
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, 
"103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "File-2-" + i));
+    }
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = ImmutableList.of(generateRecord(1, "File-3-1"), generateRecord(3, 
"File-3-3"));

Review Comment:
   `File-3-1` -> `file-3-record-ts-1`.
   
    also change file-1 string value to be consistent with file 2 and 3.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "File-3-1"), (3, "File-3-3")
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, 
"103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "File-2-" + i));
+    }
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = ImmutableList.of(generateRecord(1, "File-3-1"), generateRecord(3, 
"File-3-3"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withTimestampAssigner(new RowDataTimestampAssigner()),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+    DataStream<RowData> windowed =
+        stream
+            .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+            .apply(
+                new AllWindowFunction<RowData, RowData, TimeWindow>() {
+                  @Override
+                  public void apply(
+                      TimeWindow window, Iterable<RowData> values, 
Collector<RowData> out) {
+                    // Just print all the data to confirm everything has 
arrived
+                    values.forEach(out::collect);
+                  }
+                });
+
+    CollectResultIterator<RowData> resultIterator = addCollectSink(windowed);
+
+    // Start the job
+    JobClient jobClient = env.executeAsync("Iceberg Source Windowing Test");
+    resultIterator.setJobClient(jobClient);
+
+    // Write data so the windows containing test data are closed
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+
+    assertRecords(resultIterator, expectedRecords);
+  }
+
+  @Test
+  public void testThrottling() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "File-1-100"), 
generateRecord(103, "File-1-103"));
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "File-2-" + i));
+    }
+
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withWatermarkAlignment("iceberg", Duration.ofMinutes(20)),

Review Comment:
   we should use this version to set the update interval from default 1s to a 
much smaller value like 1 or 2 ms
   ```
   default WatermarkStrategy<T> withWatermarkAlignment(
               String watermarkGroup, Duration maxAllowedWatermarkDrift, 
Duration updateInterval)
   ```



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import 
org.apache.iceberg.flink.source.reader.IcebergTimestampWatermarkExtractor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)

Review Comment:
   why is important to have multiple row groups in the Parquet?
   
   I didn't see we change the target split size during scanning.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "File-3-1"), (3, "File-3-3")
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, 
"103"));

Review Comment:
   merge with the line above



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "File-3-1"), (3, "File-3-3")
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, 
"103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "File-2-" + i));
+    }
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = ImmutableList.of(generateRecord(1, "File-3-1"), generateRecord(3, 
"File-3-3"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withTimestampAssigner(new RowDataTimestampAssigner()),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+    DataStream<RowData> windowed =
+        stream
+            .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+            .apply(
+                new AllWindowFunction<RowData, RowData, TimeWindow>() {
+                  @Override
+                  public void apply(
+                      TimeWindow window, Iterable<RowData> values, 
Collector<RowData> out) {
+                    // Just print all the data to confirm everything has 
arrived
+                    values.forEach(out::collect);
+                  }
+                });
+
+    CollectResultIterator<RowData> resultIterator = addCollectSink(windowed);
+
+    // Start the job
+    JobClient jobClient = env.executeAsync("Iceberg Source Windowing Test");
+    resultIterator.setJobClient(jobClient);
+
+    // Write data so the windows containing test data are closed
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+
+    assertRecords(resultIterator, expectedRecords);
+  }
+
+  @Test
+  public void testThrottling() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "File-1-100"), 
generateRecord(103, "File-1-103"));
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "File-2-" + i));
+    }
+
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withWatermarkAlignment("iceberg", Duration.ofMinutes(20)),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+
+    CollectResultIterator<RowData> resultIterator = addCollectSink(stream);
+
+    // Start the job
+    JobClient jobClient = env.executeAsync("Continuous Iceberg Source Failover 
Test");
+    resultIterator.setJobClient(jobClient);
+
+    // Check that the read the non-blocked data
+    // The first RECORD_NUM_FOR_2_SPLITS should be read
+    // 1 or more from the runaway reader should be arrived depending on thread 
scheduling
+    waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);
+
+    // Get the drift metric, wait for it to be created and reach the expected 
state
+    // (100 min - 20 min - 0 min)
+    // Also this validates that the WatermarkAlignment is working
+    Awaitility.await()
+        .atMost(120, TimeUnit.SECONDS)
+        .until(() -> findAlignmentDriftMetric(jobClient.getJobID(), 
4800000L).isPresent());
+    Gauge<Long> drift = findAlignmentDriftMetric(jobClient.getJobID(), 
4800000L).get();
+
+    // Add some old records with 2 splits, so even if the blocked gets one 
split, the other reader
+    // one gets one as well
+    batch =
+        ImmutableList.of(
+            generateRecord(15, "File-3-15"),
+            generateRecord(16, "File-3-16"),
+            generateRecord(17, "File-3-17"));
+    dataAppender.appendToTable(batch);
+    batch =
+        ImmutableList.of(
+            generateRecord(15, "File-4-15"),
+            generateRecord(16, "File-4-16"),
+            generateRecord(17, "File-4-17"));
+    dataAppender.appendToTable(batch);
+    // The records received will highly depend on scheduling
+    // We minimally get 3 records from the non-blocked reader
+    // We might get 1 record from the blocked reader (as part of the previous 
batch - File-1)
+    // We might get 3 records form the non-blocked reader if it gets both new 
splits
+    waitForRecords(resultIterator, 3);
+
+    // Get the drift metric, wait for it to be created and reach the expected 
state (100 min - 20
+    // min - 15 min)
+    Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> 
drift.getValue() == 3900000L);
+
+    // Add some new records which should unblock the throttled reader
+    batch = ImmutableList.of(generateRecord(110, "File-5-110"), 
generateRecord(111, "File-5-111"));
+    dataAppender.appendToTable(batch);
+    // We should get all the records at this point
+    waitForRecords(resultIterator, 6);
+
+    // Wait for the new drift to decrease below the allowed drift to signal 
the normal state
+    Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> 
drift.getValue() < 1200000L);
+  }
+
+  protected IcebergSource.Builder<RowData> sourceBuilder() {
+    return IcebergSource.<RowData>builder()
+        .tableLoader(sourceTableResource.tableLoader())
+        .watermarkColumn("ts")
+        .project(TestFixtures.TS_SCHEMA)
+        .splitSize(100L);
+  }
+
+  protected Record generateRecord(int minutes, String str) {
+    // Override the ts field to create a more realistic situation for event 
time alignment
+    Record record = GenericRecord.create(TestFixtures.TS_SCHEMA);
+    LocalDateTime ts =
+        LocalDateTime.ofInstant(
+            Instant.ofEpochMilli(Time.of(minutes, 
TimeUnit.MINUTES).toMilliseconds()),
+            ZoneId.of("Z"));
+    record.setField("ts", ts);
+    record.setField("str", str);
+    return record;
+  }
+
+  /**
+   * This override is needed because {@link Comparators} used by {@link 
StructLikeWrapper} retrieves
+   * Timestamp type using Long type as inner class, while the {@link 
RandomGenericData} generates
+   * {@link LocalDateTime} for {@code TimestampType.withoutZone()}. This 
method normalizes the
+   * {@link LocalDateTime} to a Long type so that Comparators can continue to 
work.
+   */
+  protected void assertRecords(
+      CollectResultIterator<RowData> iterator, List<Record> expectedRecords) 
throws Exception {
+
+    Set<RowData> received = 
Sets.newHashSetWithExpectedSize(expectedRecords.size());
+
+    assertThat(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  int count = 0;
+                  while (count < expectedRecords.size() && iterator.hasNext()) 
{
+                    received.add(iterator.next());
+                    count++;
+                  }
+                  if (count < expectedRecords.size()) {
+                    throw new IllegalStateException(
+                        String.format("Fail to get %d records.", 
expectedRecords.size()));
+                  }
+                  return true;
+                }))
+        .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+
+    Set<RowData> expected =
+        expectedRecords.stream()
+            .map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e))
+            .collect(Collectors.toSet());
+    Assert.assertEquals(expected, received);
+  }
+
+  protected void waitForRecords(CollectResultIterator<RowData> iterator, int 
num) {
+    assertThat(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  int count = 0;
+                  while (count < num && iterator.hasNext()) {
+                    iterator.next();
+                    count++;
+                  }
+                  if (count < num) {
+                    throw new IllegalStateException(String.format("Fail to get 
%d records.", num));
+                  }
+                  return true;
+                }))
+        .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+  }
+
+  private CollectResultIterator<RowData> addCollectSink(DataStream<RowData> 
stream) {
+    TypeSerializer<RowData> serializer =
+        stream.getType().createSerializer(stream.getExecutionConfig());
+    String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+    CollectSinkOperatorFactory<RowData> factory =
+        new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+    CollectSinkOperator<RowData> operator = (CollectSinkOperator<RowData>) 
factory.getOperator();
+    CollectStreamSink<RowData> sink = new CollectStreamSink<>(stream, factory);
+    sink.name("Data stream collect sink");
+    stream.getExecutionEnvironment().addOperator(sink.getTransformation());
+    return new CollectResultIterator<>(
+        operator.getOperatorIdFuture(),
+        serializer,
+        accumulatorName,
+        stream.getExecutionEnvironment().getCheckpointConfig());
+  }
+
+  private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long 
withValue) {
+    String metricsName = SOURCE_NAME + ".*" + 
MetricNames.WATERMARK_ALIGNMENT_DRIFT;
+    return reporter.findMetrics(jobID, metricsName).values().stream()
+        .map(m -> (Gauge<Long>) m)
+        .filter(m -> m.getValue() == withValue)
+        .findFirst();
+  }
+
+  private GenericAppenderHelper appender() {
+    // We need to create multiple splits, so we need to generate parquet files 
with multiple offsets

Review Comment:
   why is it necessary to have multiple splits/row groups in one file?



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "File-3-1"), (3, "File-3-3")
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, 
"103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "File-2-" + i));
+    }
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = ImmutableList.of(generateRecord(1, "File-3-1"), generateRecord(3, 
"File-3-3"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withTimestampAssigner(new RowDataTimestampAssigner()),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+    DataStream<RowData> windowed =
+        stream
+            .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+            .apply(
+                new AllWindowFunction<RowData, RowData, TimeWindow>() {
+                  @Override
+                  public void apply(
+                      TimeWindow window, Iterable<RowData> values, 
Collector<RowData> out) {
+                    // Just print all the data to confirm everything has 
arrived
+                    values.forEach(out::collect);
+                  }
+                });
+
+    CollectResultIterator<RowData> resultIterator = addCollectSink(windowed);
+
+    // Start the job
+    JobClient jobClient = env.executeAsync("Iceberg Source Windowing Test");
+    resultIterator.setJobClient(jobClient);
+
+    // Write data so the windows containing test data are closed
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+
+    assertRecords(resultIterator, expectedRecords);
+  }
+
+  @Test
+  public void testThrottling() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "File-1-100"), 
generateRecord(103, "File-1-103"));
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "File-2-" + i));
+    }
+
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withWatermarkAlignment("iceberg", Duration.ofMinutes(20)),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+
+    CollectResultIterator<RowData> resultIterator = addCollectSink(stream);

Review Comment:
   why don't we use this method from `DataStream`?
   ```
       public CloseableIterator<T> executeAndCollect(String jobExecutionName) 
throws Exception {
           return executeAndCollectWithClient(jobExecutionName).iterator;
       }
   ```



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import 
org.apache.iceberg.flink.source.reader.IcebergTimestampWatermarkExtractor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "File-3-1"), (3, "File-3-3")
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, 
"103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "File-2-" + i));

Review Comment:
   I am still not sure why we need the mod 5 here. can't we just simply 
increment the timestamp?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to