pvary commented on code in PR #9346:
URL: https://github.com/apache/iceberg/pull/9346#discussion_r1445741694


##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java:
##########
@@ -18,25 +18,136 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.apache.iceberg.types.Types.NestedField.required;
+
 import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.List;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Test;
 
 /** Use the IcebergSource (FLIP-27) */
 public class TestIcebergSourceSql extends TestSqlBase {
+  private static final Schema SCHEMA_TS =
+      new Schema(
+          required(1, "t1", Types.TimestampType.withoutZone()),
+          required(2, "t2", Types.LongType.get()));
+
   @Override
   public void before() throws IOException {
-    Configuration tableConf = getTableEnv().getConfig().getConfiguration();
+    TableEnvironment tableEnvironment = getTableEnv();
+    Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
     
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
 true);
+
+    
tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", 
"1");
     SqlHelpers.sql(
-        getTableEnv(),
+        tableEnvironment,
         "create catalog iceberg_catalog with ('type'='iceberg', 
'catalog-type'='hadoop', 'warehouse'='%s')",
         catalogResource.warehouse());
-    SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog");
-    getTableEnv()
-        .getConfig()
-        .getConfiguration()
-        .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
+    SqlHelpers.sql(tableEnvironment, "use catalog iceberg_catalog");
+
+    tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, 
true);
+  }
+
+  private Record generateRecord(Instant t1, long t2) {
+    Record record = GenericRecord.create(SCHEMA_TS);
+    record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+    record.setField("t2", t2);
+    return record;
+  }
+
+  /** Generates the records in the expected order, with respect to their 
datafile */
+  private List<Record> generateExpectedRecords(boolean ascending) throws 
Exception {
+    Table table = 
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
+    long baseTime = 1702382109000L;
+
+    GenericAppenderHelper helper =
+        new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    // File 1 - early timestamps, new longs
+    Record file1Record1 =
+        generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 * 
60 * 24 * 30L));
+    Record file1Record2 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime - 10 * 1000L), baseTime + (1000 * 60 
* 60 * 24 * 35L));
+
+    List<Record> recordsDataFile1 = Lists.newArrayList();
+    recordsDataFile1.add(file1Record1);
+    recordsDataFile1.add(file1Record2);
+    DataFile dataFile1 = helper.writeFile(recordsDataFile1);
+    // File 2 - old timestamps, old longs
+    Record file2Record1 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime + 14 * 1000L), baseTime - (1000 * 60 
* 60 * 24 * 30L));
+    Record file2Record2 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime + 12 * 1000L), baseTime - (1000 * 60 
* 61 * 24 * 35L));
+
+    List<Record> recordsDataFile2 = Lists.newArrayList();
+    recordsDataFile2.add(file2Record1);
+    recordsDataFile2.add(file2Record2);
+
+    // early1 - early2 -- late1 late 2

Review Comment:
   I do not get this comment.
   Maybe something like this - feel free to reword if you feel so:
   ```
   // Expected records if the splits are ordered
   //     - ascending (watermark from t1) - records from the split with early 
timestamps then records from the split with late timestamps
   //     - descending (watermark from t2) - records from the split with old 
longs then records from the split with new longs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to