pvary commented on code in PR #9346:
URL: https://github.com/apache/iceberg/pull/9346#discussion_r1445734747


##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java:
##########
@@ -18,25 +18,136 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.apache.iceberg.types.Types.NestedField.required;
+
 import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.List;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Test;
 
 /** Use the IcebergSource (FLIP-27) */
 public class TestIcebergSourceSql extends TestSqlBase {
+  private static final Schema SCHEMA_TS =
+      new Schema(
+          required(1, "t1", Types.TimestampType.withoutZone()),
+          required(2, "t2", Types.LongType.get()));
+
   @Override
   public void before() throws IOException {
-    Configuration tableConf = getTableEnv().getConfig().getConfiguration();
+    TableEnvironment tableEnvironment = getTableEnv();
+    Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
     
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
 true);
+
+    
tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", 
"1");
     SqlHelpers.sql(
-        getTableEnv(),
+        tableEnvironment,
         "create catalog iceberg_catalog with ('type'='iceberg', 
'catalog-type'='hadoop', 'warehouse'='%s')",
         catalogResource.warehouse());
-    SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog");
-    getTableEnv()
-        .getConfig()
-        .getConfiguration()
-        .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
+    SqlHelpers.sql(tableEnvironment, "use catalog iceberg_catalog");
+
+    tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, 
true);
+  }
+
+  private Record generateRecord(Instant t1, long t2) {
+    Record record = GenericRecord.create(SCHEMA_TS);
+    record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+    record.setField("t2", t2);
+    return record;
+  }
+
+  /** Generates the records in the expected order, with respect to their 
datafile */
+  private List<Record> generateExpectedRecords(boolean ascending) throws 
Exception {
+    Table table = 
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
+    long baseTime = 1702382109000L;
+
+    GenericAppenderHelper helper =
+        new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    // File 1 - early timestamps, new longs
+    Record file1Record1 =
+        generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 * 
60 * 24 * 30L));
+    Record file1Record2 =
+        generateRecord(
+            Instant.ofEpochMilli(baseTime - 10 * 1000L), baseTime + (1000 * 60 
* 60 * 24 * 35L));
+
+    List<Record> recordsDataFile1 = Lists.newArrayList();
+    recordsDataFile1.add(file1Record1);
+    recordsDataFile1.add(file1Record2);
+    DataFile dataFile1 = helper.writeFile(recordsDataFile1);
+    // File 2 - old timestamps, old longs

Review Comment:
   This comment is not correct:
   ```
   // File 2 - late timestamps, old longs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to