dchristle commented on code in PR #8553: URL: https://github.com/apache/iceberg/pull/8553#discussion_r1323415114
########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/eventtimeextractor/IcebergTimestampEventTimeExtractor.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.eventtimeextractor; + +import java.io.Serializable; +import java.util.Comparator; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * {@link IcebergEventTimeExtractor} implementation which uses an Iceberg timestamp column to get + * the watermarks and the event times for the {@link RowData} read by the reader function. + */ +public class IcebergTimestampEventTimeExtractor + implements IcebergEventTimeExtractor<RowData>, Serializable { + private final int tsFieldId; + private final int tsFiledPos; Review Comment: `tsFiledPos` -> `tsFieldPos` ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverEventTimeExtractor.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.eventtimeextractor.IcebergTimestampEventTimeExtractor; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.util.StructLikeWrapper; + +public class TestIcebergSourceFailoverEventTimeExtractor extends TestIcebergSourceFailover { + // increment ts by 60 minutes for each generateRecords batch Review Comment: nit: `increment` -> `Increment` ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java: ########## @@ -237,6 +248,9 @@ public Builder<T> table(Table newTable) { } public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) { + Preconditions.checkArgument( Review Comment: nit: The `checkRequired` at the end of the build method is already doing validations. An alternative to adding these checks in `assignerFactory` and `eventTimeExtractor` is to put them alongside, so that all validations are kept in a single place. That might make it easier to read/reason about in the future. Also, `this.splitAssignerFactory` is modified elsewhere, near the end of `build()`, so doing the validations at the end would ensure no mistakes from other places that modify these two fields. ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java: ########## @@ -216,6 +225,8 @@ public static class Builder<T> { private Table table; private SplitAssignerFactory splitAssignerFactory; private SerializableComparator<IcebergSourceSplit> splitComparator; + private IcebergEventTimeExtractor timeExtractor; Review Comment: nit: I would rename `timeExtractor` -> `eventTimeExtractor` to be more inline with the class names. ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java: ########## @@ -56,4 +57,20 @@ public static SerializableComparator<IcebergSourceSplit> fileSequenceNumber() { } }; } + + /** Comparator which orders the splits based on watermark of the splits */ + public static SerializableComparator<IcebergSourceSplit> watermarkComparator( + IcebergEventTimeExtractor<?> eventTimeExtractor) { + return (IcebergSourceSplit o1, IcebergSourceSplit o2) -> { + long seq1 = eventTimeExtractor.extractWatermark(o1); + long seq2 = eventTimeExtractor.extractWatermark(o2); + + int temp = Long.compare(seq1, seq2); + if (temp != 0) { + return temp; + } else { + return o1.splitId().compareTo(o2.splitId()); + } Review Comment: nit: `seq` is a little bit vague. ``` long watermark1 = eventTimeExtractor.extractWatermark(o1); long watermark2 = eventTimeExtractor.extractWatermark(o2); int watermarkComparison = Long.compare(watermark1, watermark2); if (watermarkComparison != 0) { return watermarkComparison; } else { return o1.splitId().compareTo(o2.splitId()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
