stevenzwu commented on code in PR #9346:
URL: https://github.com/apache/iceberg/pull/9346#discussion_r1441115126


##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java:
##########
@@ -131,16 +131,17 @@ private DataStream<RowData> 
createDataStream(StreamExecutionEnvironment execEnv)
   private DataStreamSource<RowData> 
createFLIP27Stream(StreamExecutionEnvironment env) {
     SplitAssignerType assignerType =
         readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
-    IcebergSource<RowData> source =
+    IcebergSource.Builder builder =

Review Comment:
   is this change necessary?



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java:
##########
@@ -56,7 +56,7 @@ public abstract class TestSqlBase {
   @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
 
   @Rule
-  public final HadoopCatalogResource catalogResource =

Review Comment:
   is this change necessary?



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java:
##########
@@ -146,6 +159,10 @@ public static void assertRows(List<Row> results, List<Row> 
expected) {
     
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
   }
 
+  public static void assertRowsWithOrder(List<Row> results, List<Row> 
expected) {
+    Assertions.assertThat(results).containsExactly(expected.toArray(new 
Row[0]));

Review Comment:
   why converting the `expected` list to an array?



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java:
##########
@@ -77,4 +102,88 @@ protected List<Row> run(
     String optionStr = SqlHelpers.sqlOptionsToString(options);
     return SqlHelpers.sql(getTableEnv(), "select %s from t %s %s", select, 
optionStr, sqlFilter);
   }
+
+  protected Record generateRecord(Instant t1, long t2) {
+    Record record = GenericRecord.create(SCHEMA_TS);
+    record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+    record.setField("t2", t2);
+    return record;
+  }
+
+  /**
+   * Tests the order of splits returned when setting the watermark-column and 
watermark-timeunit
+   * options
+   */
+  @Test
+  public void testWatermarkOptions() throws Exception {
+    // Skip AVRO since we don't collect metrics for it, and hence we cannot 
use watermark column as
+    // there are no stats
+    // re: https://github.com/apache/iceberg/pull/1963
+    Assume.assumeTrue("Temporary skip AVRO", FileFormat.AVRO != fileFormat);
+
+    Table table = 
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
+    long baseTime = 1702382109000L;
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+
+    // File 1 - early timestamps, new longs
+    Record early1 =
+        generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 * 
60 * 24 * 30L));

Review Comment:
   now I remember what I meant for this comment. field `t1` and `t2` aren't 
used at the same time. they are used for different field types (timestamp vs 
long) for watermark extraction. it will be probably more intuitive if they are 
set to the same value.



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java:
##########
@@ -77,4 +102,88 @@ protected List<Row> run(
     String optionStr = SqlHelpers.sqlOptionsToString(options);
     return SqlHelpers.sql(getTableEnv(), "select %s from t %s %s", select, 
optionStr, sqlFilter);
   }
+
+  protected Record generateRecord(Instant t1, long t2) {
+    Record record = GenericRecord.create(SCHEMA_TS);
+    record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+    record.setField("t2", t2);
+    return record;
+  }
+
+  /**
+   * Tests the order of splits returned when setting the watermark-column and 
watermark-timeunit
+   * options
+   */
+  @Test
+  public void testWatermarkOptions() throws Exception {
+    // Skip AVRO since we don't collect metrics for it, and hence we cannot 
use watermark column as
+    // there are no stats
+    // re: https://github.com/apache/iceberg/pull/1963
+    Assume.assumeTrue("Temporary skip AVRO", FileFormat.AVRO != fileFormat);

Review Comment:
   I am saying that we don't need to add the tests in this class. only add them 
to `TestIcebergSourceSql`, which were testing some advanced features/options 
(like residual filtering, expose locality). we can't add it to the base class 
`TestSqlBase`, because it is used for both old and new flip-27 sources.



##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java:
##########
@@ -122,7 +127,8 @@ private ScanContext(
     this.planParallelism = planParallelism;
     this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
     this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
-

Review Comment:
   nit: unnecessary style change which has an empty line before the `validate` 
method call.



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java:
##########
@@ -126,7 +126,9 @@ public static List<Row> convertRowDataToRow(List<RowData> 
rowDataList, RowType r
         .collect(Collectors.toList());
   }
 
-  public static void assertRecords(List<Row> results, List<Record> 
expectedRecords, Schema schema) {

Review Comment:
   it is better to keep this line unchanged to minimize the change and easier 
to see the refactoring of method extraction.



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java:
##########
@@ -126,7 +126,9 @@ public static List<Row> convertRowDataToRow(List<RowData> 
rowDataList, RowType r
         .collect(Collectors.toList());
   }
 
-  public static void assertRecords(List<Row> results, List<Record> 
expectedRecords, Schema schema) {
+  public static List<Row> assertRecordsExpected(

Review Comment:
   this should be private. also this method is NOT `assert`. it is more like 
`convert`



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java:
##########
@@ -39,4 +63,95 @@ public void before() throws IOException {
         .getConfiguration()
         .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
   }
+
+  protected Record generateRecord(Instant t1, long t2) {
+    Record record = GenericRecord.create(SCHEMA_TS);
+    record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+    record.setField("t2", t2);
+    return record;
+  }
+
+  private List<Record> testWatermarkOptionsInternal(boolean early) throws 
Exception {

Review Comment:
   `early` name is not intuitive. I think you meant the order (ascending or 
descending).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to