stevenzwu commented on code in PR #8553:
URL: https://github.com/apache/iceberg/pull/8553#discussion_r1396577355


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Emitter which emits the watermarks, records and updates the split position.
+ *
+ * <p>The Emitter emits watermarks at the beginning of every split provided by 
the {@link
+ * SplitWatermarkExtractor}.
+ */
+class WatermarkExtractorRecordEmitter<T> implements 
SerializableRecordEmitter<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class);
+  private final SplitWatermarkExtractor timeExtractor;
+  private String lastSplitId = null;
+  private long watermark;
+
+  WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) {
+    this.timeExtractor = timeExtractor;
+  }
+
+  @Override
+  public void emitRecord(
+      RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit 
split) {
+    if (!split.splitId().equals(lastSplitId)) {
+      long newWatermark = timeExtractor.extractWatermark(split);
+      if (newWatermark < watermark) {
+        LOG.info(
+            "previous watermark = {}, current watermark = {}, previous split = 
{}, current split = {}",

Review Comment:
   we need a summary in the beginning. "Received a new split with lower 
watermark. "
   
   the logic also seems incorrect. we don't emit watermark when it is backwards.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MockFileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestColumnStatsWatermarkExtractor {
+  public static final Schema SCHEMA =
+      new Schema(
+          required(1, "ts", Types.TimestampType.withoutZone()),
+          required(2, "tstz", Types.TimestampType.withZone()),
+          required(3, "l", Types.LongType.get()),
+          required(4, "s", Types.StringType.get()));
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE, SCHEMA);
+
+  private GenericAppenderHelper dataAppender;
+  private long minTs = Long.MAX_VALUE;
+  private long minTsTz = Long.MAX_VALUE;
+  private long minL = Long.MAX_VALUE;
+
+  @Before
+  public void initTable() throws IOException {
+    dataAppender =
+        new GenericAppenderHelper(
+            sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    List<Record> batch = RandomGenericData.generate(SCHEMA, 3, 2L);
+    dataAppender.appendToTable(batch);
+
+    for (Record r : batch) {
+      LocalDateTime localDateTime = (LocalDateTime) r.get(0);
+      minTs = Math.min(minTs, 
localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli());
+
+      OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1);
+      minTsTz = Math.min(minTsTz, offsetDateTime.toInstant().toEpochMilli());
+
+      minL = Math.min(minL, (Long) r.get(2));
+    }
+  }
+
+  @Test
+  public void testTimestamp() {
+    DataFile dataFile =

Review Comment:
   should we move line 94-100 to the `before` method?



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergWatermarkExtractor.java:
##########
@@ -18,19 +18,11 @@
  */
 package org.apache.iceberg.flink.source.reader;
 
-import org.apache.flink.api.connector.source.SourceOutput;
-import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import java.io.Serializable;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 
-final class IcebergSourceRecordEmitter<T>
-    implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {
-
-  IcebergSourceRecordEmitter() {}
-
-  @Override
-  public void emitRecord(
-      RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit 
split) {
-    output.collect(element.record());
-    split.updatePosition(element.fileOffset(), element.recordOffset());
-  }
+/** The interface used to extract watermarks from splits. */
+public interface IcebergWatermarkExtractor extends Serializable {
+  /** Get the watermark for a split. */

Review Comment:
   I don't know how common it is. but there are definitely some Iceberg classes 
document throws
   
   E.g. `BatchScan` from core
   ```
     /**
      * Create a new {@link BatchScan} from this scan's configuration that will 
use a snapshot with the
      * given ID.
      *
      * @param snapshotId a snapshot ID
      * @return a new scan based on this with the given snapshot ID
      * @throws IllegalArgumentException if the snapshot cannot be found
      */
     BatchScan useSnapshot(long snapshotId);
   ```



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -429,6 +444,30 @@ public Builder<T> setAll(Map<String, String> properties) {
       return this;
     }
 
+    /**
+     * Emits watermarks once per split based on the file statistics for the 
given split. The
+     * generated watermarks are also used for ordering the splits for read. 
Accepted column types
+     * are timestamp/timestamptz/long. For long columns consider setting {@link
+     * #watermarkTimeUnit(TimeUnit)}.
+     */
+    public Builder<T> watermarkColumn(String columnName) {
+      Preconditions.checkArgument(
+          splitAssignerFactory == null,
+          "Watermark column and SplitAssigner should not be set in the same 
source");
+      this.watermarkColumn = columnName;
+      return this;
+    }
+
+    /**
+     * When the type of the {@link #watermarkColumn} is {@link
+     * org.apache.iceberg.types.Types.LongType}, then sets the {@link 
TimeUnit} to convert the
+     * value. The default value is {@link TimeUnit#MICROSECONDS}.
+     */
+    public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {

Review Comment:
   wondering if it is better to merge this with the method above. That is what 
I see more often
   
   e.g. Iceberg `DefaultMetricsContext`
   ```
     public Timer timer(String name, TimeUnit unit) 
   ```
   
   For SQL, these will be two separate configs/hints. `IcebergTableSource` can 
handle the Java API.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+
+public class TestIcebergSourceFailoverWithWatermarkExtractor extends 
TestIcebergSourceFailover {

Review Comment:
   is there any value testing this? we didn't assert anything related to 
watermark alignment behavior.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MockFileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestColumnStatsWatermarkExtractor {
+  public static final Schema SCHEMA =
+      new Schema(
+          required(1, "ts", Types.TimestampType.withoutZone()),
+          required(2, "tstz", Types.TimestampType.withZone()),
+          required(3, "l", Types.LongType.get()),
+          required(4, "s", Types.StringType.get()));
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE, SCHEMA);
+
+  private GenericAppenderHelper dataAppender;
+  private long minTs = Long.MAX_VALUE;
+  private long minTsTz = Long.MAX_VALUE;
+  private long minL = Long.MAX_VALUE;
+
+  @Before
+  public void initTable() throws IOException {
+    dataAppender =
+        new GenericAppenderHelper(
+            sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    List<Record> batch = RandomGenericData.generate(SCHEMA, 3, 2L);
+    dataAppender.appendToTable(batch);
+
+    for (Record r : batch) {
+      LocalDateTime localDateTime = (LocalDateTime) r.get(0);
+      minTs = Math.min(minTs, 
localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli());
+
+      OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1);
+      minTsTz = Math.min(minTsTz, offsetDateTime.toInstant().toEpochMilli());
+
+      minL = Math.min(minL, (Long) r.get(2));
+    }
+  }
+
+  @Test
+  public void testTimestamp() {
+    DataFile dataFile =
+        sourceTableResource
+            .table()
+            .currentSnapshot()
+            .addedDataFiles(sourceTableResource.table().io())
+            .iterator()
+            .next();
+    ColumnStatsWatermarkExtractor tsExtractor =
+        new ColumnStatsWatermarkExtractor(SCHEMA, "ts", null);
+
+    Assert.assertEquals(
+        minTs,
+        tsExtractor.extractWatermark(
+            IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile))));
+  }
+
+  @Test
+  public void testTimestampWithTz() {
+    DataFile dataFile =
+        sourceTableResource
+            .table()
+            .currentSnapshot()
+            .addedDataFiles(sourceTableResource.table().io())
+            .iterator()
+            .next();
+    ColumnStatsWatermarkExtractor tsTzExtractor =
+        new ColumnStatsWatermarkExtractor(SCHEMA, "tstz", null);
+
+    Assert.assertEquals(
+        minTsTz,
+        tsTzExtractor.extractWatermark(
+            IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile))));
+  }
+
+  @Test
+  public void testLong() {
+    DataFile dataFile =
+        sourceTableResource
+            .table()
+            .currentSnapshot()
+            .addedDataFiles(sourceTableResource.table().io())
+            .iterator()
+            .next();
+    ColumnStatsWatermarkExtractor longExtractorMilliSeconds =
+        new ColumnStatsWatermarkExtractor(SCHEMA, "l", TimeUnit.MILLISECONDS);
+    ColumnStatsWatermarkExtractor longExtractorMicroSeconds =
+        new ColumnStatsWatermarkExtractor(SCHEMA, "l", TimeUnit.MICROSECONDS);
+
+    Assert.assertEquals(
+        minL,
+        longExtractorMilliSeconds.extractWatermark(
+            IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile))));
+    Assert.assertEquals(
+        minL / 1000L,
+        longExtractorMicroSeconds.extractWatermark(
+            IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile))));
+  }
+
+  @Test
+  public void testMultipleFiles() throws IOException {
+    DataFile oldDataFile =
+        sourceTableResource
+            .table()
+            .currentSnapshot()
+            .addedDataFiles(sourceTableResource.table().io())
+            .iterator()
+            .next();
+    List<Record> batch = RandomGenericData.generate(SCHEMA, 3, 19L);
+    dataAppender.appendToTable(batch);
+
+    long minTsNew = Long.MAX_VALUE;
+    for (Record r : batch) {
+      LocalDateTime localDateTime = (LocalDateTime) r.get(0);
+      minTsNew = Math.min(minTsNew, 
localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli());
+    }
+
+    DataFile newDataFile =
+        sourceTableResource
+            .table()
+            .currentSnapshot()
+            .addedDataFiles(sourceTableResource.table().io())
+            .iterator()
+            .next();
+    ColumnStatsWatermarkExtractor tsExtractor =
+        new ColumnStatsWatermarkExtractor(SCHEMA, "ts", null);
+
+    Assert.assertEquals(
+        minTsNew,
+        tsExtractor.extractWatermark(
+            IcebergSourceSplit.fromCombinedScanTask(new 
DummyTask(newDataFile))));
+    Assert.assertEquals(
+        minTs,
+        tsExtractor.extractWatermark(
+            IcebergSourceSplit.fromCombinedScanTask(new 
DummyTask(oldDataFile))));
+    Assert.assertEquals(
+        Math.min(minTsNew, minTs),
+        tsExtractor.extractWatermark(
+            IcebergSourceSplit.fromCombinedScanTask(new DummyTask(newDataFile, 
oldDataFile))));
+  }
+
+  @Test
+  public void testWrongColumn() {
+    Assertions.assertThatThrownBy(() -> new 
ColumnStatsWatermarkExtractor(SCHEMA, "s", null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining(
+            "Found STRING, expected a LONG or TIMESTAMP column for watermark 
generation.");
+  }
+
+  private static class DummyTask extends BaseCombinedScanTask {

Review Comment:
   why not just a util method? maybe add to `SplitHelpers`?
   
   we can refer to `createFileTask` or `createCombinedScanTask` from 
`ReaderUtil` class on construction tasks. 
   



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg 
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is 
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, 
Serializable {
+  private final int tsFieldId;
+  private final TimeUnit timeUnit;
+
+  /**
+   * Creates the extractor.
+   *
+   * @param schema The schema of the Table
+   * @param tsFieldName The column which should be used as an event time
+   * @param timeUnit Used for converting the long value to epoch milliseconds
+   */
+  public ColumnStatsWatermarkExtractor(Schema schema, String tsFieldName, 
TimeUnit timeUnit) {
+    Types.NestedField field = schema.findField(tsFieldName);
+    TypeID typeID = field.type().typeId();
+    Preconditions.checkArgument(
+        typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
+        "Found %s, expected a LONG or TIMESTAMP column for watermark 
generation.",
+        typeID);
+    this.tsFieldId = field.fieldId();
+    // Use the timeUnit only for Long columns.
+    this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : 
TimeUnit.MICROSECONDS;

Review Comment:
   after this PR merged, time unit can be part of the type. 
   
   
https://github.com/apache/iceberg/pull/9008/files#diff-78fdb1926e230d4d08b0c5baccdbd2357c38110a5d2d63d278e517f1975cebdf
   
   I agree with using time unit only for long type. 
   



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, 
"file_3-recordTs_3")
+    List<Record> batch = ImmutableList.of(generateRecord(100, "100"), 
generateRecord(103, "103"));

Review Comment:
   this doesn't follow the style of file 2 and 3



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import 
org.apache.iceberg.flink.source.reader.IcebergTimestampWatermarkExtractor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {

Review Comment:
   sounds reasonable



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MockFileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestColumnStatsWatermarkExtractor {
+  public static final Schema SCHEMA =
+      new Schema(
+          required(1, "ts", Types.TimestampType.withoutZone()),
+          required(2, "tstz", Types.TimestampType.withZone()),
+          required(3, "l", Types.LongType.get()),
+          required(4, "s", Types.StringType.get()));
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE, SCHEMA);
+
+  private GenericAppenderHelper dataAppender;
+  private long minTs = Long.MAX_VALUE;
+  private long minTsTz = Long.MAX_VALUE;
+  private long minL = Long.MAX_VALUE;

Review Comment:
   use a little more descriptive variable name. `minL` is not intuitive. maybe 
sth like `longFieldMinValue`.
   
   also applies to the 2 above



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MockFileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestColumnStatsWatermarkExtractor {
+  public static final Schema SCHEMA =
+      new Schema(
+          required(1, "ts", Types.TimestampType.withoutZone()),
+          required(2, "tstz", Types.TimestampType.withZone()),
+          required(3, "l", Types.LongType.get()),
+          required(4, "s", Types.StringType.get()));
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE, SCHEMA);
+
+  private GenericAppenderHelper dataAppender;
+  private long minTs = Long.MAX_VALUE;
+  private long minTsTz = Long.MAX_VALUE;
+  private long minL = Long.MAX_VALUE;
+
+  @Before
+  public void initTable() throws IOException {
+    dataAppender =
+        new GenericAppenderHelper(
+            sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    List<Record> batch = RandomGenericData.generate(SCHEMA, 3, 2L);
+    dataAppender.appendToTable(batch);
+
+    for (Record r : batch) {
+      LocalDateTime localDateTime = (LocalDateTime) r.get(0);
+      minTs = Math.min(minTs, 
localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli());
+
+      OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1);
+      minTsTz = Math.min(minTsTz, offsetDateTime.toInstant().toEpochMilli());
+
+      minL = Math.min(minL, (Long) r.get(2));
+    }
+  }
+
+  @Test
+  public void testTimestamp() {
+    DataFile dataFile =
+        sourceTableResource
+            .table()
+            .currentSnapshot()
+            .addedDataFiles(sourceTableResource.table().io())
+            .iterator()
+            .next();
+    ColumnStatsWatermarkExtractor tsExtractor =
+        new ColumnStatsWatermarkExtractor(SCHEMA, "ts", null);
+
+    Assert.assertEquals(
+        minTs,
+        tsExtractor.extractWatermark(
+            IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile))));
+  }
+
+  @Test
+  public void testTimestampWithTz() {
+    DataFile dataFile =
+        sourceTableResource
+            .table()
+            .currentSnapshot()
+            .addedDataFiles(sourceTableResource.table().io())
+            .iterator()
+            .next();
+    ColumnStatsWatermarkExtractor tsTzExtractor =
+        new ColumnStatsWatermarkExtractor(SCHEMA, "tstz", null);
+
+    Assert.assertEquals(
+        minTsTz,
+        tsTzExtractor.extractWatermark(
+            IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile))));
+  }
+
+  @Test
+  public void testLong() {
+    DataFile dataFile =
+        sourceTableResource
+            .table()
+            .currentSnapshot()
+            .addedDataFiles(sourceTableResource.table().io())
+            .iterator()
+            .next();
+    ColumnStatsWatermarkExtractor longExtractorMilliSeconds =
+        new ColumnStatsWatermarkExtractor(SCHEMA, "l", TimeUnit.MILLISECONDS);
+    ColumnStatsWatermarkExtractor longExtractorMicroSeconds =
+        new ColumnStatsWatermarkExtractor(SCHEMA, "l", TimeUnit.MICROSECONDS);
+
+    Assert.assertEquals(
+        minL,
+        longExtractorMilliSeconds.extractWatermark(
+            IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile))));
+    Assert.assertEquals(
+        minL / 1000L,
+        longExtractorMicroSeconds.extractWatermark(
+            IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile))));
+  }
+
+  @Test
+  public void testMultipleFiles() throws IOException {

Review Comment:
   following this style, maybe the previous 3 methods can be merged into one 
test method `testSingleFile`?



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, 
"file_3-recordTs_3")
+    List<Record> batch = ImmutableList.of(generateRecord(100, "100"), 
generateRecord(103, "103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));

Review Comment:
   I am not sure why `% 5`. 



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, 
"file_3-recordTs_3")
+    List<Record> batch = ImmutableList.of(generateRecord(100, "100"), 
generateRecord(103, "103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));
+    }
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch =
+        ImmutableList.of(
+            generateRecord(1, "file_3-recordTs_1"), generateRecord(3, 
"file_3-recordTs_3"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withTimestampAssigner(new RowDataTimestampAssigner()),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+    DataStream<RowData> windowed =
+        stream
+            .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+            .apply(
+                new AllWindowFunction<RowData, RowData, TimeWindow>() {
+                  @Override
+                  public void apply(
+                      TimeWindow window, Iterable<RowData> values, 
Collector<RowData> out) {
+                    // Just print all the data to confirm everything has 
arrived
+                    values.forEach(out::collect);
+                  }
+                });
+
+    try (CloseableIterator<RowData> resultIterator = windowed.collectAsync()) {
+      env.executeAsync("Iceberg Source Windowing Test");
+
+      // Write data so the windows containing test data are closed

Review Comment:
   I am not sure how the window and this last records validate watermark 
alignment. if watermark alignment doesn't work, would it affect the result and 
assertion.



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/TimestampBasedWatermarkExtractor.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link IcebergWatermarkExtractor} implementation which uses an Iceberg 
timestamp column
+ * statistics to get the watermarks for the {@link IcebergSourceSplit}. This 
watermark is emitted by
+ * the {@link WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+public class TimestampBasedWatermarkExtractor implements 
IcebergWatermarkExtractor, Serializable {
+  private final int tsFieldId;
+
+  /**
+   * Creates the extractor.
+   *
+   * @param schema The schema of the Table
+   * @param tsFieldName The timestamp column which should be used as an event 
time
+   */
+  public TimestampBasedWatermarkExtractor(Schema schema, String tsFieldName) {
+    Types.NestedField field = schema.findField(tsFieldName);
+    Preconditions.checkArgument(
+        field.type().typeId().equals(Type.TypeID.TIMESTAMP), "Type should be 
timestamp");
+    this.tsFieldId = field.fieldId();
+  }
+
+  @Override
+  public long extractWatermark(IcebergSourceSplit split) {
+    return split.task().files().stream()
+        .map(
+            scanTask ->
+                (long)
+                        Conversions.fromByteBuffer(
+                            Types.LongType.get(), 
scanTask.file().lowerBounds().get(tsFieldId))

Review Comment:
   `read.split.open-file-cost` doesn't affect splitting big files. it just 
avoid bundling multiple smaller files in one split.
   
   > should check if lowerBounds have stats and throws IllegalArgumentException 
if not
   
   also please address the first comment



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg 
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is 
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, 
Serializable {
+  private final int tsFieldId;
+  private final TimeUnit timeUnit;
+
+  /**
+   * Creates the extractor.
+   *
+   * @param schema The schema of the Table
+   * @param tsFieldName The column which should be used as an event time
+   * @param timeUnit Used for converting the long value to epoch milliseconds
+   */
+  public ColumnStatsWatermarkExtractor(Schema schema, String tsFieldName, 
TimeUnit timeUnit) {
+    Types.NestedField field = schema.findField(tsFieldName);
+    TypeID typeID = field.type().typeId();
+    Preconditions.checkArgument(
+        typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
+        "Found %s, expected a LONG or TIMESTAMP column for watermark 
generation.",
+        typeID);
+    this.tsFieldId = field.fieldId();
+    // Use the timeUnit only for Long columns.
+    this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : 
TimeUnit.MICROSECONDS;

Review Comment:
   also we need to handle the difference of with and without TZ.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, 
"file_3-recordTs_3")
+    List<Record> batch = ImmutableList.of(generateRecord(100, "100"), 
generateRecord(103, "103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));
+    }
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch =
+        ImmutableList.of(
+            generateRecord(1, "file_3-recordTs_1"), generateRecord(3, 
"file_3-recordTs_3"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withTimestampAssigner(new RowDataTimestampAssigner()),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+    DataStream<RowData> windowed =
+        stream
+            .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+            .apply(
+                new AllWindowFunction<RowData, RowData, TimeWindow>() {
+                  @Override
+                  public void apply(
+                      TimeWindow window, Iterable<RowData> values, 
Collector<RowData> out) {
+                    // Just print all the data to confirm everything has 
arrived
+                    values.forEach(out::collect);
+                  }
+                });
+
+    try (CloseableIterator<RowData> resultIterator = windowed.collectAsync()) {
+      env.executeAsync("Iceberg Source Windowing Test");
+
+      // Write data so the windows containing test data are closed
+      dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+      dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+      dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+
+      assertRecords(resultIterator, expectedRecords);
+    }
+  }
+
+  @Test
+  public void testThrottling() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    List<Record> batch;
+    batch =
+        ImmutableList.of(
+            generateRecord(100, "file_1-recordTs_100"), generateRecord(103, 
"file_1-recordTs_103"));
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "file_2-recordTs_" + i));
+    }
+
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withWatermarkAlignment("iceberg", Duration.ofMinutes(20)),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+
+    try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) {
+      JobClient jobClient = env.executeAsync("Continuous Iceberg Source 
Failover Test");
+
+      // Check that the read the non-blocked data
+      // The first RECORD_NUM_FOR_2_SPLITS should be read
+      // 1 or more from the runaway reader should be arrived depending on 
thread scheduling
+      waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);
+
+      // Get the drift metric, wait for it to be created and reach the 
expected state
+      // (100 min - 20 min - 0 min)
+      // Also this validates that the WatermarkAlignment is working
+      Awaitility.await()
+          .atMost(120, TimeUnit.SECONDS)
+          .until(() -> findAlignmentDriftMetric(jobClient.getJobID(), 
4800000L).isPresent());
+      Gauge<Long> drift = findAlignmentDriftMetric(jobClient.getJobID(), 
4800000L).get();

Review Comment:
   why the drift would be 80 minutes? I thought max allowed drift config is 20 
mins.
   
   nit: maybe change the large numeric value to sth more readable like 
`TimeUnit.MINUTES.toMilli(80)`.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "File-3-1"), (3, "File-3-3")
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, 
"103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "File-2-" + i));
+    }
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = ImmutableList.of(generateRecord(1, "File-3-1"), generateRecord(3, 
"File-3-3"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withTimestampAssigner(new RowDataTimestampAssigner()),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+    DataStream<RowData> windowed =
+        stream
+            .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+            .apply(
+                new AllWindowFunction<RowData, RowData, TimeWindow>() {
+                  @Override
+                  public void apply(
+                      TimeWindow window, Iterable<RowData> values, 
Collector<RowData> out) {
+                    // Just print all the data to confirm everything has 
arrived
+                    values.forEach(out::collect);
+                  }
+                });
+
+    CollectResultIterator<RowData> resultIterator = addCollectSink(windowed);
+
+    // Start the job
+    JobClient jobClient = env.executeAsync("Iceberg Source Windowing Test");
+    resultIterator.setJobClient(jobClient);
+
+    // Write data so the windows containing test data are closed
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+
+    assertRecords(resultIterator, expectedRecords);
+  }
+
+  @Test
+  public void testThrottling() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "File-1-100"), 
generateRecord(103, "File-1-103"));
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "File-2-" + i));
+    }
+
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withWatermarkAlignment("iceberg", Duration.ofMinutes(20)),

Review Comment:
   I understand the timestamp values are arbitrary. I am talking about 
watermark update/refresh internal so that watermark change can propagate faster.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    // - File 3 - Parallel write for the first records (Watermark 60000)
+    //    - Split 1 - 2 records (1, "File-3-1"), (3, "File-3-3")
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, 
"103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "File-2-" + i));
+    }
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = ImmutableList.of(generateRecord(1, "File-3-1"), generateRecord(3, 
"File-3-3"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withTimestampAssigner(new RowDataTimestampAssigner()),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+    DataStream<RowData> windowed =
+        stream
+            .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+            .apply(
+                new AllWindowFunction<RowData, RowData, TimeWindow>() {
+                  @Override
+                  public void apply(
+                      TimeWindow window, Iterable<RowData> values, 
Collector<RowData> out) {
+                    // Just print all the data to confirm everything has 
arrived
+                    values.forEach(out::collect);
+                  }
+                });
+
+    CollectResultIterator<RowData> resultIterator = addCollectSink(windowed);
+
+    // Start the job
+    JobClient jobClient = env.executeAsync("Iceberg Source Windowing Test");
+    resultIterator.setJobClient(jobClient);
+
+    // Write data so the windows containing test data are closed
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+    dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, 
"last-record")));
+
+    assertRecords(resultIterator, expectedRecords);
+  }
+
+  @Test
+  public void testThrottling() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    //    - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),...
+    List<Record> batch;
+    batch = ImmutableList.of(generateRecord(100, "File-1-100"), 
generateRecord(103, "File-1-103"));
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(i % 5, "File-2-" + i));
+    }
+
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder()
+                .streaming(true)
+                .monitorInterval(Duration.ofMillis(10))
+                
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+                .build(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withWatermarkAlignment("iceberg", Duration.ofMinutes(20)),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+
+    CollectResultIterator<RowData> resultIterator = addCollectSink(stream);
+
+    // Start the job
+    JobClient jobClient = env.executeAsync("Continuous Iceberg Source Failover 
Test");
+    resultIterator.setJobClient(jobClient);
+
+    // Check that the read the non-blocked data
+    // The first RECORD_NUM_FOR_2_SPLITS should be read
+    // 1 or more from the runaway reader should be arrived depending on thread 
scheduling
+    waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);
+
+    // Get the drift metric, wait for it to be created and reach the expected 
state
+    // (100 min - 20 min - 0 min)
+    // Also this validates that the WatermarkAlignment is working
+    Awaitility.await()
+        .atMost(120, TimeUnit.SECONDS)
+        .until(() -> findAlignmentDriftMetric(jobClient.getJobID(), 
4800000L).isPresent());
+    Gauge<Long> drift = findAlignmentDriftMetric(jobClient.getJobID(), 
4800000L).get();
+
+    // Add some old records with 2 splits, so even if the blocked gets one 
split, the other reader
+    // one gets one as well
+    batch =
+        ImmutableList.of(
+            generateRecord(15, "File-3-15"),
+            generateRecord(16, "File-3-16"),
+            generateRecord(17, "File-3-17"));
+    dataAppender.appendToTable(batch);
+    batch =
+        ImmutableList.of(
+            generateRecord(15, "File-4-15"),
+            generateRecord(16, "File-4-16"),
+            generateRecord(17, "File-4-17"));
+    dataAppender.appendToTable(batch);
+    // The records received will highly depend on scheduling
+    // We minimally get 3 records from the non-blocked reader
+    // We might get 1 record from the blocked reader (as part of the previous 
batch - File-1)
+    // We might get 3 records form the non-blocked reader if it gets both new 
splits
+    waitForRecords(resultIterator, 3);
+
+    // Get the drift metric, wait for it to be created and reach the expected 
state (100 min - 20
+    // min - 15 min)
+    Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> 
drift.getValue() == 3900000L);
+
+    // Add some new records which should unblock the throttled reader
+    batch = ImmutableList.of(generateRecord(110, "File-5-110"), 
generateRecord(111, "File-5-111"));
+    dataAppender.appendToTable(batch);
+    // We should get all the records at this point
+    waitForRecords(resultIterator, 6);
+
+    // Wait for the new drift to decrease below the allowed drift to signal 
the normal state
+    Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> 
drift.getValue() < 1200000L);
+  }
+
+  protected IcebergSource.Builder<RowData> sourceBuilder() {
+    return IcebergSource.<RowData>builder()
+        .tableLoader(sourceTableResource.tableLoader())
+        .watermarkColumn("ts")
+        .project(TestFixtures.TS_SCHEMA)
+        .splitSize(100L);
+  }
+
+  protected Record generateRecord(int minutes, String str) {
+    // Override the ts field to create a more realistic situation for event 
time alignment
+    Record record = GenericRecord.create(TestFixtures.TS_SCHEMA);
+    LocalDateTime ts =
+        LocalDateTime.ofInstant(
+            Instant.ofEpochMilli(Time.of(minutes, 
TimeUnit.MINUTES).toMilliseconds()),
+            ZoneId.of("Z"));
+    record.setField("ts", ts);
+    record.setField("str", str);
+    return record;
+  }
+
+  /**
+   * This override is needed because {@link Comparators} used by {@link 
StructLikeWrapper} retrieves
+   * Timestamp type using Long type as inner class, while the {@link 
RandomGenericData} generates
+   * {@link LocalDateTime} for {@code TimestampType.withoutZone()}. This 
method normalizes the
+   * {@link LocalDateTime} to a Long type so that Comparators can continue to 
work.
+   */
+  protected void assertRecords(
+      CollectResultIterator<RowData> iterator, List<Record> expectedRecords) 
throws Exception {
+
+    Set<RowData> received = 
Sets.newHashSetWithExpectedSize(expectedRecords.size());
+
+    assertThat(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  int count = 0;
+                  while (count < expectedRecords.size() && iterator.hasNext()) 
{
+                    received.add(iterator.next());
+                    count++;
+                  }
+                  if (count < expectedRecords.size()) {
+                    throw new IllegalStateException(
+                        String.format("Fail to get %d records.", 
expectedRecords.size()));
+                  }
+                  return true;
+                }))
+        .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+
+    Set<RowData> expected =
+        expectedRecords.stream()
+            .map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e))
+            .collect(Collectors.toSet());
+    Assert.assertEquals(expected, received);
+  }
+
+  protected void waitForRecords(CollectResultIterator<RowData> iterator, int 
num) {
+    assertThat(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  int count = 0;
+                  while (count < num && iterator.hasNext()) {
+                    iterator.next();
+                    count++;
+                  }
+                  if (count < num) {
+                    throw new IllegalStateException(String.format("Fail to get 
%d records.", num));
+                  }
+                  return true;
+                }))
+        .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+  }
+
+  private CollectResultIterator<RowData> addCollectSink(DataStream<RowData> 
stream) {
+    TypeSerializer<RowData> serializer =
+        stream.getType().createSerializer(stream.getExecutionConfig());
+    String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+    CollectSinkOperatorFactory<RowData> factory =
+        new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+    CollectSinkOperator<RowData> operator = (CollectSinkOperator<RowData>) 
factory.getOperator();
+    CollectStreamSink<RowData> sink = new CollectStreamSink<>(stream, factory);
+    sink.name("Data stream collect sink");
+    stream.getExecutionEnvironment().addOperator(sink.getTransformation());
+    return new CollectResultIterator<>(
+        operator.getOperatorIdFuture(),
+        serializer,
+        accumulatorName,
+        stream.getExecutionEnvironment().getCheckpointConfig());
+  }
+
+  private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long 
withValue) {
+    String metricsName = SOURCE_NAME + ".*" + 
MetricNames.WATERMARK_ALIGNMENT_DRIFT;
+    return reporter.findMetrics(jobID, metricsName).values().stream()
+        .map(m -> (Gauge<Long>) m)
+        .filter(m -> m.getValue() == withValue)
+        .findFirst();
+  }
+
+  private GenericAppenderHelper appender() {
+    // We need to create multiple splits, so we need to generate parquet files 
with multiple offsets

Review Comment:
   oh. I missed that we set the `splitSize` to 100 in the source builder



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to