stevenzwu commented on code in PR #8553: URL: https://github.com/apache/iceberg/pull/8553#discussion_r1396577355
########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Emitter which emits the watermarks, records and updates the split position. + * + * <p>The Emitter emits watermarks at the beginning of every split provided by the {@link + * SplitWatermarkExtractor}. + */ +class WatermarkExtractorRecordEmitter<T> implements SerializableRecordEmitter<T> { + private static final Logger LOG = LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class); + private final SplitWatermarkExtractor timeExtractor; + private String lastSplitId = null; + private long watermark; + + WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) { + this.timeExtractor = timeExtractor; + } + + @Override + public void emitRecord( + RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) { + if (!split.splitId().equals(lastSplitId)) { + long newWatermark = timeExtractor.extractWatermark(split); + if (newWatermark < watermark) { + LOG.info( + "previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", Review Comment: we need a summary in the beginning. "Received a new split with lower watermark. " the logic also seems incorrect. we don't emit watermark when it is backwards. ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java: ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MockFileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestColumnStatsWatermarkExtractor { + public static final Schema SCHEMA = + new Schema( + required(1, "ts", Types.TimestampType.withoutZone()), + required(2, "tstz", Types.TimestampType.withZone()), + required(3, "l", Types.LongType.get()), + required(4, "s", Types.StringType.get())); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, SCHEMA); + + private GenericAppenderHelper dataAppender; + private long minTs = Long.MAX_VALUE; + private long minTsTz = Long.MAX_VALUE; + private long minL = Long.MAX_VALUE; + + @Before + public void initTable() throws IOException { + dataAppender = + new GenericAppenderHelper( + sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER); + + List<Record> batch = RandomGenericData.generate(SCHEMA, 3, 2L); + dataAppender.appendToTable(batch); + + for (Record r : batch) { + LocalDateTime localDateTime = (LocalDateTime) r.get(0); + minTs = Math.min(minTs, localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli()); + + OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1); + minTsTz = Math.min(minTsTz, offsetDateTime.toInstant().toEpochMilli()); + + minL = Math.min(minL, (Long) r.get(2)); + } + } + + @Test + public void testTimestamp() { + DataFile dataFile = Review Comment: should we move line 94-100 to the `before` method? ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergWatermarkExtractor.java: ########## @@ -18,19 +18,11 @@ */ package org.apache.iceberg.flink.source.reader; -import org.apache.flink.api.connector.source.SourceOutput; -import org.apache.flink.connector.base.source.reader.RecordEmitter; +import java.io.Serializable; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; -final class IcebergSourceRecordEmitter<T> - implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> { - - IcebergSourceRecordEmitter() {} - - @Override - public void emitRecord( - RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) { - output.collect(element.record()); - split.updatePosition(element.fileOffset(), element.recordOffset()); - } +/** The interface used to extract watermarks from splits. */ +public interface IcebergWatermarkExtractor extends Serializable { + /** Get the watermark for a split. */ Review Comment: I don't know how common it is. but there are definitely some Iceberg classes document throws E.g. `BatchScan` from core ``` /** * Create a new {@link BatchScan} from this scan's configuration that will use a snapshot with the * given ID. * * @param snapshotId a snapshot ID * @return a new scan based on this with the given snapshot ID * @throws IllegalArgumentException if the snapshot cannot be found */ BatchScan useSnapshot(long snapshotId); ``` ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java: ########## @@ -429,6 +444,30 @@ public Builder<T> setAll(Map<String, String> properties) { return this; } + /** + * Emits watermarks once per split based on the file statistics for the given split. The + * generated watermarks are also used for ordering the splits for read. Accepted column types + * are timestamp/timestamptz/long. For long columns consider setting {@link + * #watermarkTimeUnit(TimeUnit)}. + */ + public Builder<T> watermarkColumn(String columnName) { + Preconditions.checkArgument( + splitAssignerFactory == null, + "Watermark column and SplitAssigner should not be set in the same source"); + this.watermarkColumn = columnName; + return this; + } + + /** + * When the type of the {@link #watermarkColumn} is {@link + * org.apache.iceberg.types.Types.LongType}, then sets the {@link TimeUnit} to convert the + * value. The default value is {@link TimeUnit#MICROSECONDS}. + */ + public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) { Review Comment: wondering if it is better to merge this with the method above. That is what I see more often e.g. Iceberg `DefaultMetricsContext` ``` public Timer timer(String name, TimeUnit unit) ``` For SQL, these will be two separate configs/hints. `IcebergTableSource` can handle the Java API. ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.util.StructLikeWrapper; +import org.awaitility.Awaitility; + +public class TestIcebergSourceFailoverWithWatermarkExtractor extends TestIcebergSourceFailover { Review Comment: is there any value testing this? we didn't assert anything related to watermark alignment behavior. ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java: ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MockFileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestColumnStatsWatermarkExtractor { + public static final Schema SCHEMA = + new Schema( + required(1, "ts", Types.TimestampType.withoutZone()), + required(2, "tstz", Types.TimestampType.withZone()), + required(3, "l", Types.LongType.get()), + required(4, "s", Types.StringType.get())); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, SCHEMA); + + private GenericAppenderHelper dataAppender; + private long minTs = Long.MAX_VALUE; + private long minTsTz = Long.MAX_VALUE; + private long minL = Long.MAX_VALUE; + + @Before + public void initTable() throws IOException { + dataAppender = + new GenericAppenderHelper( + sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER); + + List<Record> batch = RandomGenericData.generate(SCHEMA, 3, 2L); + dataAppender.appendToTable(batch); + + for (Record r : batch) { + LocalDateTime localDateTime = (LocalDateTime) r.get(0); + minTs = Math.min(minTs, localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli()); + + OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1); + minTsTz = Math.min(minTsTz, offsetDateTime.toInstant().toEpochMilli()); + + minL = Math.min(minL, (Long) r.get(2)); + } + } + + @Test + public void testTimestamp() { + DataFile dataFile = + sourceTableResource + .table() + .currentSnapshot() + .addedDataFiles(sourceTableResource.table().io()) + .iterator() + .next(); + ColumnStatsWatermarkExtractor tsExtractor = + new ColumnStatsWatermarkExtractor(SCHEMA, "ts", null); + + Assert.assertEquals( + minTs, + tsExtractor.extractWatermark( + IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile)))); + } + + @Test + public void testTimestampWithTz() { + DataFile dataFile = + sourceTableResource + .table() + .currentSnapshot() + .addedDataFiles(sourceTableResource.table().io()) + .iterator() + .next(); + ColumnStatsWatermarkExtractor tsTzExtractor = + new ColumnStatsWatermarkExtractor(SCHEMA, "tstz", null); + + Assert.assertEquals( + minTsTz, + tsTzExtractor.extractWatermark( + IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile)))); + } + + @Test + public void testLong() { + DataFile dataFile = + sourceTableResource + .table() + .currentSnapshot() + .addedDataFiles(sourceTableResource.table().io()) + .iterator() + .next(); + ColumnStatsWatermarkExtractor longExtractorMilliSeconds = + new ColumnStatsWatermarkExtractor(SCHEMA, "l", TimeUnit.MILLISECONDS); + ColumnStatsWatermarkExtractor longExtractorMicroSeconds = + new ColumnStatsWatermarkExtractor(SCHEMA, "l", TimeUnit.MICROSECONDS); + + Assert.assertEquals( + minL, + longExtractorMilliSeconds.extractWatermark( + IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile)))); + Assert.assertEquals( + minL / 1000L, + longExtractorMicroSeconds.extractWatermark( + IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile)))); + } + + @Test + public void testMultipleFiles() throws IOException { + DataFile oldDataFile = + sourceTableResource + .table() + .currentSnapshot() + .addedDataFiles(sourceTableResource.table().io()) + .iterator() + .next(); + List<Record> batch = RandomGenericData.generate(SCHEMA, 3, 19L); + dataAppender.appendToTable(batch); + + long minTsNew = Long.MAX_VALUE; + for (Record r : batch) { + LocalDateTime localDateTime = (LocalDateTime) r.get(0); + minTsNew = Math.min(minTsNew, localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli()); + } + + DataFile newDataFile = + sourceTableResource + .table() + .currentSnapshot() + .addedDataFiles(sourceTableResource.table().io()) + .iterator() + .next(); + ColumnStatsWatermarkExtractor tsExtractor = + new ColumnStatsWatermarkExtractor(SCHEMA, "ts", null); + + Assert.assertEquals( + minTsNew, + tsExtractor.extractWatermark( + IcebergSourceSplit.fromCombinedScanTask(new DummyTask(newDataFile)))); + Assert.assertEquals( + minTs, + tsExtractor.extractWatermark( + IcebergSourceSplit.fromCombinedScanTask(new DummyTask(oldDataFile)))); + Assert.assertEquals( + Math.min(minTsNew, minTs), + tsExtractor.extractWatermark( + IcebergSourceSplit.fromCombinedScanTask(new DummyTask(newDataFile, oldDataFile)))); + } + + @Test + public void testWrongColumn() { + Assertions.assertThatThrownBy(() -> new ColumnStatsWatermarkExtractor(SCHEMA, "s", null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Found STRING, expected a LONG or TIMESTAMP column for watermark generation."); + } + + private static class DummyTask extends BaseCombinedScanTask { Review Comment: why not just a util method? maybe add to `SplitHelpers`? we can refer to `createFileTask` or `createCombinedScanTask` from `ReaderUtil` class on construction tasks. ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type.TypeID; +import org.apache.iceberg.types.Types; + +/** + * {@link SplitWatermarkExtractor} implementation which uses an Iceberg timestamp column statistics + * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is emitted by the {@link + * WatermarkExtractorRecordEmitter} along with the actual records. + */ +@Internal +public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, Serializable { + private final int tsFieldId; + private final TimeUnit timeUnit; + + /** + * Creates the extractor. + * + * @param schema The schema of the Table + * @param tsFieldName The column which should be used as an event time + * @param timeUnit Used for converting the long value to epoch milliseconds + */ + public ColumnStatsWatermarkExtractor(Schema schema, String tsFieldName, TimeUnit timeUnit) { + Types.NestedField field = schema.findField(tsFieldName); + TypeID typeID = field.type().typeId(); + Preconditions.checkArgument( + typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP), + "Found %s, expected a LONG or TIMESTAMP column for watermark generation.", + typeID); + this.tsFieldId = field.fieldId(); + // Use the timeUnit only for Long columns. + this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : TimeUnit.MICROSECONDS; Review Comment: after this PR merged, time unit can be part of the type. https://github.com/apache/iceberg/pull/9008/files#diff-78fdb1926e230d4d08b0c5baccdbd2357c38110a5d2d63d278e517f1975cebdf I agree with using time unit only for long type. ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java: ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + private static final int PARALLELISM = 4; + private static final String SOURCE_NAME = "IcebergSource"; + private static final int RECORD_NUM_FOR_2_SPLITS = 200; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .withHaLeadershipControl() + .build()); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA); + + @Test + public void testWindowing() throws Exception { + GenericAppenderHelper dataAppender = appender(); + List<Record> expectedRecords = Lists.newArrayList(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - File 3 - Parallel write for the first records (Watermark 60000) + // - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, "file_3-recordTs_3") + List<Record> batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, "103")); Review Comment: this doesn't follow the style of file 2 and 3 ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java: ########## @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.reader.IcebergTimestampWatermarkExtractor; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.util.StructLikeWrapper; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { Review Comment: sounds reasonable ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java: ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MockFileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestColumnStatsWatermarkExtractor { + public static final Schema SCHEMA = + new Schema( + required(1, "ts", Types.TimestampType.withoutZone()), + required(2, "tstz", Types.TimestampType.withZone()), + required(3, "l", Types.LongType.get()), + required(4, "s", Types.StringType.get())); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, SCHEMA); + + private GenericAppenderHelper dataAppender; + private long minTs = Long.MAX_VALUE; + private long minTsTz = Long.MAX_VALUE; + private long minL = Long.MAX_VALUE; Review Comment: use a little more descriptive variable name. `minL` is not intuitive. maybe sth like `longFieldMinValue`. also applies to the 2 above ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java: ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MockFileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestColumnStatsWatermarkExtractor { + public static final Schema SCHEMA = + new Schema( + required(1, "ts", Types.TimestampType.withoutZone()), + required(2, "tstz", Types.TimestampType.withZone()), + required(3, "l", Types.LongType.get()), + required(4, "s", Types.StringType.get())); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, SCHEMA); + + private GenericAppenderHelper dataAppender; + private long minTs = Long.MAX_VALUE; + private long minTsTz = Long.MAX_VALUE; + private long minL = Long.MAX_VALUE; + + @Before + public void initTable() throws IOException { + dataAppender = + new GenericAppenderHelper( + sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER); + + List<Record> batch = RandomGenericData.generate(SCHEMA, 3, 2L); + dataAppender.appendToTable(batch); + + for (Record r : batch) { + LocalDateTime localDateTime = (LocalDateTime) r.get(0); + minTs = Math.min(minTs, localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli()); + + OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1); + minTsTz = Math.min(minTsTz, offsetDateTime.toInstant().toEpochMilli()); + + minL = Math.min(minL, (Long) r.get(2)); + } + } + + @Test + public void testTimestamp() { + DataFile dataFile = + sourceTableResource + .table() + .currentSnapshot() + .addedDataFiles(sourceTableResource.table().io()) + .iterator() + .next(); + ColumnStatsWatermarkExtractor tsExtractor = + new ColumnStatsWatermarkExtractor(SCHEMA, "ts", null); + + Assert.assertEquals( + minTs, + tsExtractor.extractWatermark( + IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile)))); + } + + @Test + public void testTimestampWithTz() { + DataFile dataFile = + sourceTableResource + .table() + .currentSnapshot() + .addedDataFiles(sourceTableResource.table().io()) + .iterator() + .next(); + ColumnStatsWatermarkExtractor tsTzExtractor = + new ColumnStatsWatermarkExtractor(SCHEMA, "tstz", null); + + Assert.assertEquals( + minTsTz, + tsTzExtractor.extractWatermark( + IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile)))); + } + + @Test + public void testLong() { + DataFile dataFile = + sourceTableResource + .table() + .currentSnapshot() + .addedDataFiles(sourceTableResource.table().io()) + .iterator() + .next(); + ColumnStatsWatermarkExtractor longExtractorMilliSeconds = + new ColumnStatsWatermarkExtractor(SCHEMA, "l", TimeUnit.MILLISECONDS); + ColumnStatsWatermarkExtractor longExtractorMicroSeconds = + new ColumnStatsWatermarkExtractor(SCHEMA, "l", TimeUnit.MICROSECONDS); + + Assert.assertEquals( + minL, + longExtractorMilliSeconds.extractWatermark( + IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile)))); + Assert.assertEquals( + minL / 1000L, + longExtractorMicroSeconds.extractWatermark( + IcebergSourceSplit.fromCombinedScanTask(new DummyTask(dataFile)))); + } + + @Test + public void testMultipleFiles() throws IOException { Review Comment: following this style, maybe the previous 3 methods can be merged into one test method `testSingleFile`? ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java: ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + private static final int PARALLELISM = 4; + private static final String SOURCE_NAME = "IcebergSource"; + private static final int RECORD_NUM_FOR_2_SPLITS = 200; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .withHaLeadershipControl() + .build()); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA); + + @Test + public void testWindowing() throws Exception { + GenericAppenderHelper dataAppender = appender(); + List<Record> expectedRecords = Lists.newArrayList(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - File 3 - Parallel write for the first records (Watermark 60000) + // - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, "file_3-recordTs_3") + List<Record> batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, "103")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "file_2-recordTs_" + i)); Review Comment: I am not sure why `% 5`. ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java: ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + private static final int PARALLELISM = 4; + private static final String SOURCE_NAME = "IcebergSource"; + private static final int RECORD_NUM_FOR_2_SPLITS = 200; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .withHaLeadershipControl() + .build()); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA); + + @Test + public void testWindowing() throws Exception { + GenericAppenderHelper dataAppender = appender(); + List<Record> expectedRecords = Lists.newArrayList(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - File 3 - Parallel write for the first records (Watermark 60000) + // - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, "file_3-recordTs_3") + List<Record> batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, "103")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "file_2-recordTs_" + i)); + } + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = + ImmutableList.of( + generateRecord(1, "file_3-recordTs_1"), generateRecord(3, "file_3-recordTs_3")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<RowData> stream = + env.fromSource( + sourceBuilder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10)) + .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(), + WatermarkStrategy.<RowData>noWatermarks() + .withTimestampAssigner(new RowDataTimestampAssigner()), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + DataStream<RowData> windowed = + stream + .windowAll(TumblingEventTimeWindows.of(Time.minutes(5))) + .apply( + new AllWindowFunction<RowData, RowData, TimeWindow>() { + @Override + public void apply( + TimeWindow window, Iterable<RowData> values, Collector<RowData> out) { + // Just print all the data to confirm everything has arrived + values.forEach(out::collect); + } + }); + + try (CloseableIterator<RowData> resultIterator = windowed.collectAsync()) { + env.executeAsync("Iceberg Source Windowing Test"); + + // Write data so the windows containing test data are closed Review Comment: I am not sure how the window and this last records validate watermark alignment. if watermark alignment doesn't work, would it affect the result and assertion. ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/TimestampBasedWatermarkExtractor.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import java.io.Serializable; +import java.util.Comparator; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * {@link IcebergWatermarkExtractor} implementation which uses an Iceberg timestamp column + * statistics to get the watermarks for the {@link IcebergSourceSplit}. This watermark is emitted by + * the {@link WatermarkExtractorRecordEmitter} along with the actual records. + */ +public class TimestampBasedWatermarkExtractor implements IcebergWatermarkExtractor, Serializable { + private final int tsFieldId; + + /** + * Creates the extractor. + * + * @param schema The schema of the Table + * @param tsFieldName The timestamp column which should be used as an event time + */ + public TimestampBasedWatermarkExtractor(Schema schema, String tsFieldName) { + Types.NestedField field = schema.findField(tsFieldName); + Preconditions.checkArgument( + field.type().typeId().equals(Type.TypeID.TIMESTAMP), "Type should be timestamp"); + this.tsFieldId = field.fieldId(); + } + + @Override + public long extractWatermark(IcebergSourceSplit split) { + return split.task().files().stream() + .map( + scanTask -> + (long) + Conversions.fromByteBuffer( + Types.LongType.get(), scanTask.file().lowerBounds().get(tsFieldId)) Review Comment: `read.split.open-file-cost` doesn't affect splitting big files. it just avoid bundling multiple smaller files in one split. > should check if lowerBounds have stats and throws IllegalArgumentException if not also please address the first comment ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type.TypeID; +import org.apache.iceberg.types.Types; + +/** + * {@link SplitWatermarkExtractor} implementation which uses an Iceberg timestamp column statistics + * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is emitted by the {@link + * WatermarkExtractorRecordEmitter} along with the actual records. + */ +@Internal +public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, Serializable { + private final int tsFieldId; + private final TimeUnit timeUnit; + + /** + * Creates the extractor. + * + * @param schema The schema of the Table + * @param tsFieldName The column which should be used as an event time + * @param timeUnit Used for converting the long value to epoch milliseconds + */ + public ColumnStatsWatermarkExtractor(Schema schema, String tsFieldName, TimeUnit timeUnit) { + Types.NestedField field = schema.findField(tsFieldName); + TypeID typeID = field.type().typeId(); + Preconditions.checkArgument( + typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP), + "Found %s, expected a LONG or TIMESTAMP column for watermark generation.", + typeID); + this.tsFieldId = field.fieldId(); + // Use the timeUnit only for Long columns. + this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : TimeUnit.MICROSECONDS; Review Comment: also we need to handle the difference of with and without TZ. ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java: ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + private static final int PARALLELISM = 4; + private static final String SOURCE_NAME = "IcebergSource"; + private static final int RECORD_NUM_FOR_2_SPLITS = 200; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .withHaLeadershipControl() + .build()); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA); + + @Test + public void testWindowing() throws Exception { + GenericAppenderHelper dataAppender = appender(); + List<Record> expectedRecords = Lists.newArrayList(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - File 3 - Parallel write for the first records (Watermark 60000) + // - Split 1 - 2 records (1, "file_3-recordTs_1"), (3, "file_3-recordTs_3") + List<Record> batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, "103")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "file_2-recordTs_" + i)); + } + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = + ImmutableList.of( + generateRecord(1, "file_3-recordTs_1"), generateRecord(3, "file_3-recordTs_3")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<RowData> stream = + env.fromSource( + sourceBuilder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10)) + .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(), + WatermarkStrategy.<RowData>noWatermarks() + .withTimestampAssigner(new RowDataTimestampAssigner()), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + DataStream<RowData> windowed = + stream + .windowAll(TumblingEventTimeWindows.of(Time.minutes(5))) + .apply( + new AllWindowFunction<RowData, RowData, TimeWindow>() { + @Override + public void apply( + TimeWindow window, Iterable<RowData> values, Collector<RowData> out) { + // Just print all the data to confirm everything has arrived + values.forEach(out::collect); + } + }); + + try (CloseableIterator<RowData> resultIterator = windowed.collectAsync()) { + env.executeAsync("Iceberg Source Windowing Test"); + + // Write data so the windows containing test data are closed + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + + assertRecords(resultIterator, expectedRecords); + } + } + + @Test + public void testThrottling() throws Exception { + GenericAppenderHelper dataAppender = appender(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + List<Record> batch; + batch = + ImmutableList.of( + generateRecord(100, "file_1-recordTs_100"), generateRecord(103, "file_1-recordTs_103")); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "file_2-recordTs_" + i)); + } + + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<RowData> stream = + env.fromSource( + sourceBuilder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10)) + .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(), + WatermarkStrategy.<RowData>noWatermarks() + .withWatermarkAlignment("iceberg", Duration.ofMinutes(20)), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + + try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) { + JobClient jobClient = env.executeAsync("Continuous Iceberg Source Failover Test"); + + // Check that the read the non-blocked data + // The first RECORD_NUM_FOR_2_SPLITS should be read + // 1 or more from the runaway reader should be arrived depending on thread scheduling + waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1); + + // Get the drift metric, wait for it to be created and reach the expected state + // (100 min - 20 min - 0 min) + // Also this validates that the WatermarkAlignment is working + Awaitility.await() + .atMost(120, TimeUnit.SECONDS) + .until(() -> findAlignmentDriftMetric(jobClient.getJobID(), 4800000L).isPresent()); + Gauge<Long> drift = findAlignmentDriftMetric(jobClient.getJobID(), 4800000L).get(); Review Comment: why the drift would be 80 minutes? I thought max allowed drift config is 20 mins. nit: maybe change the large numeric value to sth more readable like `TimeUnit.MINUTES.toMilli(80)`. ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java: ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.util.StructLikeWrapper; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + private static final int PARALLELISM = 4; + private static final String SOURCE_NAME = "IcebergSource"; + private static final int RECORD_NUM_FOR_2_SPLITS = 200; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .withHaLeadershipControl() + .build()); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA); + + @Test + public void testWindowing() throws Exception { + GenericAppenderHelper dataAppender = appender(); + List<Record> expectedRecords = Lists.newArrayList(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),... + // - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),... + // - File 3 - Parallel write for the first records (Watermark 60000) + // - Split 1 - 2 records (1, "File-3-1"), (3, "File-3-3") + List<Record> batch; + batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, "103")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "File-2-" + i)); + } + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = ImmutableList.of(generateRecord(1, "File-3-1"), generateRecord(3, "File-3-3")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<RowData> stream = + env.fromSource( + sourceBuilder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10)) + .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(), + WatermarkStrategy.<RowData>noWatermarks() + .withTimestampAssigner(new RowDataTimestampAssigner()), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + DataStream<RowData> windowed = + stream + .windowAll(TumblingEventTimeWindows.of(Time.minutes(5))) + .apply( + new AllWindowFunction<RowData, RowData, TimeWindow>() { + @Override + public void apply( + TimeWindow window, Iterable<RowData> values, Collector<RowData> out) { + // Just print all the data to confirm everything has arrived + values.forEach(out::collect); + } + }); + + CollectResultIterator<RowData> resultIterator = addCollectSink(windowed); + + // Start the job + JobClient jobClient = env.executeAsync("Iceberg Source Windowing Test"); + resultIterator.setJobClient(jobClient); + + // Write data so the windows containing test data are closed + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + + assertRecords(resultIterator, expectedRecords); + } + + @Test + public void testThrottling() throws Exception { + GenericAppenderHelper dataAppender = appender(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),... + // - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),... + List<Record> batch; + batch = ImmutableList.of(generateRecord(100, "File-1-100"), generateRecord(103, "File-1-103")); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "File-2-" + i)); + } + + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<RowData> stream = + env.fromSource( + sourceBuilder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10)) + .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(), + WatermarkStrategy.<RowData>noWatermarks() + .withWatermarkAlignment("iceberg", Duration.ofMinutes(20)), Review Comment: I understand the timestamp values are arbitrary. I am talking about watermark update/refresh internal so that watermark change can propagate faster. ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java: ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.util.StructLikeWrapper; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + private static final int PARALLELISM = 4; + private static final String SOURCE_NAME = "IcebergSource"; + private static final int RECORD_NUM_FOR_2_SPLITS = 200; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .withHaLeadershipControl() + .build()); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA); + + @Test + public void testWindowing() throws Exception { + GenericAppenderHelper dataAppender = appender(); + List<Record> expectedRecords = Lists.newArrayList(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),... + // - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),... + // - File 3 - Parallel write for the first records (Watermark 60000) + // - Split 1 - 2 records (1, "File-3-1"), (3, "File-3-3") + List<Record> batch; + batch = ImmutableList.of(generateRecord(100, "100"), generateRecord(103, "103")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "File-2-" + i)); + } + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = ImmutableList.of(generateRecord(1, "File-3-1"), generateRecord(3, "File-3-3")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<RowData> stream = + env.fromSource( + sourceBuilder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10)) + .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(), + WatermarkStrategy.<RowData>noWatermarks() + .withTimestampAssigner(new RowDataTimestampAssigner()), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + DataStream<RowData> windowed = + stream + .windowAll(TumblingEventTimeWindows.of(Time.minutes(5))) + .apply( + new AllWindowFunction<RowData, RowData, TimeWindow>() { + @Override + public void apply( + TimeWindow window, Iterable<RowData> values, Collector<RowData> out) { + // Just print all the data to confirm everything has arrived + values.forEach(out::collect); + } + }); + + CollectResultIterator<RowData> resultIterator = addCollectSink(windowed); + + // Start the job + JobClient jobClient = env.executeAsync("Iceberg Source Windowing Test"); + resultIterator.setJobClient(jobClient); + + // Write data so the windows containing test data are closed + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + dataAppender.appendToTable(ImmutableList.of(generateRecord(1500, "last-record"))); + + assertRecords(resultIterator, expectedRecords); + } + + @Test + public void testThrottling() throws Exception { + GenericAppenderHelper dataAppender = appender(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "File-1-100"), (103, "File-1-103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "File-2-0"), (1, "File-2-1"),... + // - Split 2 - 100 records (0, "File-2-0"), (1, "File-2-1"),... + List<Record> batch; + batch = ImmutableList.of(generateRecord(100, "File-1-100"), generateRecord(103, "File-1-103")); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(i % 5, "File-2-" + i)); + } + + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<RowData> stream = + env.fromSource( + sourceBuilder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10)) + .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(), + WatermarkStrategy.<RowData>noWatermarks() + .withWatermarkAlignment("iceberg", Duration.ofMinutes(20)), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + + CollectResultIterator<RowData> resultIterator = addCollectSink(stream); + + // Start the job + JobClient jobClient = env.executeAsync("Continuous Iceberg Source Failover Test"); + resultIterator.setJobClient(jobClient); + + // Check that the read the non-blocked data + // The first RECORD_NUM_FOR_2_SPLITS should be read + // 1 or more from the runaway reader should be arrived depending on thread scheduling + waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1); + + // Get the drift metric, wait for it to be created and reach the expected state + // (100 min - 20 min - 0 min) + // Also this validates that the WatermarkAlignment is working + Awaitility.await() + .atMost(120, TimeUnit.SECONDS) + .until(() -> findAlignmentDriftMetric(jobClient.getJobID(), 4800000L).isPresent()); + Gauge<Long> drift = findAlignmentDriftMetric(jobClient.getJobID(), 4800000L).get(); + + // Add some old records with 2 splits, so even if the blocked gets one split, the other reader + // one gets one as well + batch = + ImmutableList.of( + generateRecord(15, "File-3-15"), + generateRecord(16, "File-3-16"), + generateRecord(17, "File-3-17")); + dataAppender.appendToTable(batch); + batch = + ImmutableList.of( + generateRecord(15, "File-4-15"), + generateRecord(16, "File-4-16"), + generateRecord(17, "File-4-17")); + dataAppender.appendToTable(batch); + // The records received will highly depend on scheduling + // We minimally get 3 records from the non-blocked reader + // We might get 1 record from the blocked reader (as part of the previous batch - File-1) + // We might get 3 records form the non-blocked reader if it gets both new splits + waitForRecords(resultIterator, 3); + + // Get the drift metric, wait for it to be created and reach the expected state (100 min - 20 + // min - 15 min) + Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> drift.getValue() == 3900000L); + + // Add some new records which should unblock the throttled reader + batch = ImmutableList.of(generateRecord(110, "File-5-110"), generateRecord(111, "File-5-111")); + dataAppender.appendToTable(batch); + // We should get all the records at this point + waitForRecords(resultIterator, 6); + + // Wait for the new drift to decrease below the allowed drift to signal the normal state + Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> drift.getValue() < 1200000L); + } + + protected IcebergSource.Builder<RowData> sourceBuilder() { + return IcebergSource.<RowData>builder() + .tableLoader(sourceTableResource.tableLoader()) + .watermarkColumn("ts") + .project(TestFixtures.TS_SCHEMA) + .splitSize(100L); + } + + protected Record generateRecord(int minutes, String str) { + // Override the ts field to create a more realistic situation for event time alignment + Record record = GenericRecord.create(TestFixtures.TS_SCHEMA); + LocalDateTime ts = + LocalDateTime.ofInstant( + Instant.ofEpochMilli(Time.of(minutes, TimeUnit.MINUTES).toMilliseconds()), + ZoneId.of("Z")); + record.setField("ts", ts); + record.setField("str", str); + return record; + } + + /** + * This override is needed because {@link Comparators} used by {@link StructLikeWrapper} retrieves + * Timestamp type using Long type as inner class, while the {@link RandomGenericData} generates + * {@link LocalDateTime} for {@code TimestampType.withoutZone()}. This method normalizes the + * {@link LocalDateTime} to a Long type so that Comparators can continue to work. + */ + protected void assertRecords( + CollectResultIterator<RowData> iterator, List<Record> expectedRecords) throws Exception { + + Set<RowData> received = Sets.newHashSetWithExpectedSize(expectedRecords.size()); + + assertThat( + CompletableFuture.supplyAsync( + () -> { + int count = 0; + while (count < expectedRecords.size() && iterator.hasNext()) { + received.add(iterator.next()); + count++; + } + if (count < expectedRecords.size()) { + throw new IllegalStateException( + String.format("Fail to get %d records.", expectedRecords.size())); + } + return true; + })) + .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); + + Set<RowData> expected = + expectedRecords.stream() + .map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e)) + .collect(Collectors.toSet()); + Assert.assertEquals(expected, received); + } + + protected void waitForRecords(CollectResultIterator<RowData> iterator, int num) { + assertThat( + CompletableFuture.supplyAsync( + () -> { + int count = 0; + while (count < num && iterator.hasNext()) { + iterator.next(); + count++; + } + if (count < num) { + throw new IllegalStateException(String.format("Fail to get %d records.", num)); + } + return true; + })) + .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); + } + + private CollectResultIterator<RowData> addCollectSink(DataStream<RowData> stream) { + TypeSerializer<RowData> serializer = + stream.getType().createSerializer(stream.getExecutionConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory<RowData> factory = + new CollectSinkOperatorFactory<>(serializer, accumulatorName); + CollectSinkOperator<RowData> operator = (CollectSinkOperator<RowData>) factory.getOperator(); + CollectStreamSink<RowData> sink = new CollectStreamSink<>(stream, factory); + sink.name("Data stream collect sink"); + stream.getExecutionEnvironment().addOperator(sink.getTransformation()); + return new CollectResultIterator<>( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + stream.getExecutionEnvironment().getCheckpointConfig()); + } + + private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long withValue) { + String metricsName = SOURCE_NAME + ".*" + MetricNames.WATERMARK_ALIGNMENT_DRIFT; + return reporter.findMetrics(jobID, metricsName).values().stream() + .map(m -> (Gauge<Long>) m) + .filter(m -> m.getValue() == withValue) + .findFirst(); + } + + private GenericAppenderHelper appender() { + // We need to create multiple splits, so we need to generate parquet files with multiple offsets Review Comment: oh. I missed that we set the `splitSize` to 100 in the source builder -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org