stevenzwu commented on code in PR #7661: URL: https://github.com/apache/iceberg/pull/7661#discussion_r1221835400
########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.assigner; + +import java.util.Collection; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.apache.iceberg.flink.source.split.SerializableComparator; + +/** Create simple assigner that hands out splits without any guarantee in order or locality. */ Review Comment: nit: Javadoc needs to be changed ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java: ########## @@ -62,8 +62,15 @@ public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable( final String warehouse = "file:" + warehouseFile; Configuration hadoopConf = new Configuration(); final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse); + ImmutableMap<String, String> properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"); Review Comment: this hardcode the format to v2. what about adding a 2nd variation of `createSplitsFromTransientHadoopTable` with format version passed in? ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java: ########## @@ -462,15 +475,22 @@ public IcebergSource<T> build() { } } + if (splitAssignerFactory == null) { + if (splitComparator == null) { + splitAssignerFactory = new SimpleSplitAssignerFactory(); + } else { + splitAssignerFactory = new OrderedSplitAssignerFactory(splitComparator); + } + } + checkRequired(); // Since builder already load the table, pass it to the source to avoid double loading return new IcebergSource<T>( - tableLoader, context, readerFunction, splitAssignerFactory, table); + tableLoader, context, readerFunction, splitAssignerFactory, splitComparator, table); } private void checkRequired() { Preconditions.checkNotNull(tableLoader, "tableLoader is required."); - Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is required."); Review Comment: why removing this check? it is still valid, right? ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/FileSequenceNumberBasedComparator.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.split; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class FileSequenceNumberBasedComparator + implements SerializableComparator<IcebergSourceSplit> { + @Override + public int compare(IcebergSourceSplit o1, IcebergSourceSplit o2) { + Preconditions.checkArgument( + o1.task().files().size() == 1 && o2.task().files().size() == 1, + "Could not compare combined task. Please use 'split-open-file-cost' to prevent combining multiple files to a split"); + + Long seq1 = o1.task().files().iterator().next().file().fileSequenceNumber(); + Long seq2 = o2.task().files().iterator().next().file().fileSequenceNumber(); + + Preconditions.checkNotNull( + seq1, "Invalid file sequence number for {}. Only V2 table format is supported", o1); + Preconditions.checkNotNull( + seq2, "Invalid file sequence number for {}. Only V2 table format is supported", o2); + + if (o1.splitId().equals(o2.splitId())) { Review Comment: I think this if block is not necessarily ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java: ########## @@ -55,13 +56,67 @@ public void testReaderMetrics() throws Exception { TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>(); TestingMetricGroup metricGroup = new TestingMetricGroup(); TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), metricGroup); - IcebergSourceReader reader = createReader(metricGroup, readerContext); + IcebergSourceReader reader = createReader(metricGroup, readerContext, null); reader.start(); testOneSplitFetcher(reader, readerOutput, metricGroup, 1); testOneSplitFetcher(reader, readerOutput, metricGroup, 2); } + @Test + public void testReaderOrder() throws Exception { + // Create 2 splits + List<List<Record>> recordBatchList1 = + ReaderUtil.createRecordBatchList(0L, TestFixtures.SCHEMA, 1, 1); + CombinedScanTask task1 = + ReaderUtil.createCombinedScanTask( + recordBatchList1, TEMPORARY_FOLDER, FileFormat.PARQUET, appenderFactory); + + List<List<Record>> recordBatchList2 = + ReaderUtil.createRecordBatchList(1L, TestFixtures.SCHEMA, 1, 1); + CombinedScanTask task2 = + ReaderUtil.createCombinedScanTask( + recordBatchList2, TEMPORARY_FOLDER, FileFormat.PARQUET, appenderFactory); + + // Sort the splits in one way + List<RowData> rowData1 = Review Comment: nit: `rowData1` sounds like a single `RowData` object (not a list) ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/FileSequenceNumberBasedComparator.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.split; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class FileSequenceNumberBasedComparator Review Comment: let's also provide some Javadoc for the factory method. ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java: ########## @@ -23,14 +23,15 @@ /** Create simple assigner that hands out splits without any guarantee in order or locality. */ public class SimpleSplitAssignerFactory implements SplitAssignerFactory { + public SimpleSplitAssignerFactory() {} @Override - public SimpleSplitAssigner createAssigner() { - return new SimpleSplitAssigner(); + public DefaultSplitAssigner createAssigner() { Review Comment: just realized the previous mistake. this should just return the `SplitAssigner` interface (not the impl class) ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java: ########## @@ -55,13 +56,67 @@ public void testReaderMetrics() throws Exception { TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>(); TestingMetricGroup metricGroup = new TestingMetricGroup(); TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), metricGroup); - IcebergSourceReader reader = createReader(metricGroup, readerContext); + IcebergSourceReader reader = createReader(metricGroup, readerContext, null); reader.start(); testOneSplitFetcher(reader, readerOutput, metricGroup, 1); testOneSplitFetcher(reader, readerOutput, metricGroup, 2); } + @Test + public void testReaderOrder() throws Exception { + // Create 2 splits + List<List<Record>> recordBatchList1 = + ReaderUtil.createRecordBatchList(0L, TestFixtures.SCHEMA, 1, 1); + CombinedScanTask task1 = + ReaderUtil.createCombinedScanTask( + recordBatchList1, TEMPORARY_FOLDER, FileFormat.PARQUET, appenderFactory); + + List<List<Record>> recordBatchList2 = + ReaderUtil.createRecordBatchList(1L, TestFixtures.SCHEMA, 1, 1); + CombinedScanTask task2 = + ReaderUtil.createCombinedScanTask( + recordBatchList2, TEMPORARY_FOLDER, FileFormat.PARQUET, appenderFactory); + + // Sort the splits in one way + List<RowData> rowData1 = + read( + Arrays.asList( + IcebergSourceSplit.fromCombinedScanTask(task1), + IcebergSourceSplit.fromCombinedScanTask(task2)), + 2); + + // Reverse the splits + List<RowData> rowData2 = + read( + Arrays.asList( + IcebergSourceSplit.fromCombinedScanTask(task2), + IcebergSourceSplit.fromCombinedScanTask(task1)), + 2); + + // Check that the order of the elements is not changed + Assert.assertEquals(rowData1.get(0), rowData2.get(0)); + Assert.assertEquals(rowData1.get(1), rowData2.get(1)); + } + + private List<RowData> read(List<IcebergSourceSplit> splits, long expected) throws Exception { + TestingMetricGroup metricGroup = new TestingMetricGroup(); + TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), metricGroup); + IcebergSourceReader reader = createReader(metricGroup, readerContext, new IdBasedComparator()); Review Comment: do we need `IdBasedComparator`? we can just pass in null here right? ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/FileSequenceNumberBasedComparator.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.split; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class FileSequenceNumberBasedComparator Review Comment: Following many of Iceberg code pattern, we can name this class as `SplitComparators` and provide an API as `public static SerializableComparator<IcebergSourceSplit> fileSequenceNumber()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
