stevenzwu commented on code in PR #7661:
URL: https://github.com/apache/iceberg/pull/7661#discussion_r1221835400


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.Collection;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
+
+/** Create simple assigner that hands out splits without any guarantee in 
order or locality. */

Review Comment:
   nit: Javadoc needs to be changed



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java:
##########
@@ -62,8 +62,15 @@ public static List<IcebergSourceSplit> 
createSplitsFromTransientHadoopTable(
     final String warehouse = "file:" + warehouseFile;
     Configuration hadoopConf = new Configuration();
     final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse);
+    ImmutableMap<String, String> properties = 
ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");

Review Comment:
   this hardcode the format to v2. what about adding a 2nd variation of 
`createSplitsFromTransientHadoopTable` with format version passed in?



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -462,15 +475,22 @@ public IcebergSource<T> build() {
         }
       }
 
+      if (splitAssignerFactory == null) {
+        if (splitComparator == null) {
+          splitAssignerFactory = new SimpleSplitAssignerFactory();
+        } else {
+          splitAssignerFactory = new 
OrderedSplitAssignerFactory(splitComparator);
+        }
+      }
+
       checkRequired();
       // Since builder already load the table, pass it to the source to avoid 
double loading
       return new IcebergSource<T>(
-          tableLoader, context, readerFunction, splitAssignerFactory, table);
+          tableLoader, context, readerFunction, splitAssignerFactory, 
splitComparator, table);
     }
 
     private void checkRequired() {
       Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
-      Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is 
required.");

Review Comment:
   why removing this check? it is still valid, right?



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/FileSequenceNumberBasedComparator.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.split;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class FileSequenceNumberBasedComparator
+    implements SerializableComparator<IcebergSourceSplit> {
+  @Override
+  public int compare(IcebergSourceSplit o1, IcebergSourceSplit o2) {
+    Preconditions.checkArgument(
+        o1.task().files().size() == 1 && o2.task().files().size() == 1,
+        "Could not compare combined task. Please use 'split-open-file-cost' to 
prevent combining multiple files to a split");
+
+    Long seq1 = 
o1.task().files().iterator().next().file().fileSequenceNumber();
+    Long seq2 = 
o2.task().files().iterator().next().file().fileSequenceNumber();
+
+    Preconditions.checkNotNull(
+        seq1, "Invalid file sequence number for {}. Only V2 table format is 
supported", o1);
+    Preconditions.checkNotNull(
+        seq2, "Invalid file sequence number for {}. Only V2 table format is 
supported", o2);
+
+    if (o1.splitId().equals(o2.splitId())) {

Review Comment:
   I think this if block is not necessarily



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java:
##########
@@ -55,13 +56,67 @@ public void testReaderMetrics() throws Exception {
     TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
     TestingMetricGroup metricGroup = new TestingMetricGroup();
     TestingReaderContext readerContext = new TestingReaderContext(new 
Configuration(), metricGroup);
-    IcebergSourceReader reader = createReader(metricGroup, readerContext);
+    IcebergSourceReader reader = createReader(metricGroup, readerContext, 
null);
     reader.start();
 
     testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
     testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
   }
 
+  @Test
+  public void testReaderOrder() throws Exception {
+    // Create 2 splits
+    List<List<Record>> recordBatchList1 =
+        ReaderUtil.createRecordBatchList(0L, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task1 =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList1, TEMPORARY_FOLDER, FileFormat.PARQUET, 
appenderFactory);
+
+    List<List<Record>> recordBatchList2 =
+        ReaderUtil.createRecordBatchList(1L, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task2 =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList2, TEMPORARY_FOLDER, FileFormat.PARQUET, 
appenderFactory);
+
+    // Sort the splits in one way
+    List<RowData> rowData1 =

Review Comment:
   nit: `rowData1` sounds like a single `RowData` object (not a list)



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/FileSequenceNumberBasedComparator.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.split;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class FileSequenceNumberBasedComparator

Review Comment:
   let's also provide some Javadoc for the factory method.



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java:
##########
@@ -23,14 +23,15 @@
 
 /** Create simple assigner that hands out splits without any guarantee in 
order or locality. */
 public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+  public SimpleSplitAssignerFactory() {}
 
   @Override
-  public SimpleSplitAssigner createAssigner() {
-    return new SimpleSplitAssigner();
+  public DefaultSplitAssigner createAssigner() {

Review Comment:
   just realized the previous mistake. this should just return the 
`SplitAssigner` interface (not the impl class)



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java:
##########
@@ -55,13 +56,67 @@ public void testReaderMetrics() throws Exception {
     TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
     TestingMetricGroup metricGroup = new TestingMetricGroup();
     TestingReaderContext readerContext = new TestingReaderContext(new 
Configuration(), metricGroup);
-    IcebergSourceReader reader = createReader(metricGroup, readerContext);
+    IcebergSourceReader reader = createReader(metricGroup, readerContext, 
null);
     reader.start();
 
     testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
     testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
   }
 
+  @Test
+  public void testReaderOrder() throws Exception {
+    // Create 2 splits
+    List<List<Record>> recordBatchList1 =
+        ReaderUtil.createRecordBatchList(0L, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task1 =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList1, TEMPORARY_FOLDER, FileFormat.PARQUET, 
appenderFactory);
+
+    List<List<Record>> recordBatchList2 =
+        ReaderUtil.createRecordBatchList(1L, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task2 =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList2, TEMPORARY_FOLDER, FileFormat.PARQUET, 
appenderFactory);
+
+    // Sort the splits in one way
+    List<RowData> rowData1 =
+        read(
+            Arrays.asList(
+                IcebergSourceSplit.fromCombinedScanTask(task1),
+                IcebergSourceSplit.fromCombinedScanTask(task2)),
+            2);
+
+    // Reverse the splits
+    List<RowData> rowData2 =
+        read(
+            Arrays.asList(
+                IcebergSourceSplit.fromCombinedScanTask(task2),
+                IcebergSourceSplit.fromCombinedScanTask(task1)),
+            2);
+
+    // Check that the order of the elements is not changed
+    Assert.assertEquals(rowData1.get(0), rowData2.get(0));
+    Assert.assertEquals(rowData1.get(1), rowData2.get(1));
+  }
+
+  private List<RowData> read(List<IcebergSourceSplit> splits, long expected) 
throws Exception {
+    TestingMetricGroup metricGroup = new TestingMetricGroup();
+    TestingReaderContext readerContext = new TestingReaderContext(new 
Configuration(), metricGroup);
+    IcebergSourceReader reader = createReader(metricGroup, readerContext, new 
IdBasedComparator());

Review Comment:
   do we need `IdBasedComparator`? we can just pass in null here right?



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/FileSequenceNumberBasedComparator.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.split;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class FileSequenceNumberBasedComparator

Review Comment:
   Following many of Iceberg code pattern, we can name this class as 
`SplitComparators` and provide an API as `public static 
SerializableComparator<IcebergSourceSplit> fileSequenceNumber()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to