Copilot commented on code in PR #2972:
URL: https://github.com/apache/fluss/pull/2972#discussion_r3020092547


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java:
##########
@@ -149,6 +150,7 @@ public class FlinkTestBase extends AbstractTestBase {
     @BeforeAll
     protected static void beforeAll() {
         clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();

Review Comment:
   Setting CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER to ROUND_ROBIN in the shared 
FlinkTestBase affects all Flink tests that use a table without bucket keys 
(e.g., distributedBy(n) with no key). That can unintentionally change 
behavior/assumptions in unrelated tests. Prefer scoping this config override to 
only the tests that require round-robin distribution (or document why this 
global change is safe), e.g., set it in the specific ITCase(s) before writing 
rows and restore afterward.
   ```suggestion
           Configuration baseConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
           clientConf = new Configuration(baseConf);
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -1150,6 +1150,52 @@ void testStreamingReadSinglePartitionPushDown() throws 
Exception {
         assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
     }
 
+    @Test
+    void testStreamingReadPartitionPushDownWithWatermark() throws Exception {
+        tEnv.executeSql(
+                "create table watermark_partitioned_table"
+                        + " (a int not null, b varchar, ts timestamp(3),"
+                        + " c string,"
+                        + " primary key (a, c) NOT ENFORCED,"
+                        + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND)"
+                        + " partitioned by (c) ");
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"watermark_partitioned_table");
+        tEnv.executeSql("alter table watermark_partitioned_table add partition 
(c=2025)");
+        tEnv.executeSql("alter table watermark_partitioned_table add partition 
(c=2026)");
+
+        // write data with 4 columns (a, b, ts, c), ts is nullable
+        List<InternalRow> rows = new ArrayList<>();
+        List<String> expectedRowValues = new ArrayList<>();
+        for (String partition : Arrays.asList("2025", "2026")) {
+            for (int i = 0; i < 10; i++) {
+                rows.add(row(i, "v1", null, partition));
+                if (partition.equals("2025")) {
+                    expectedRowValues.add(String.format("+I[%d, v1, %s]", i, 
partition));
+                }
+            }
+        }
+        writeRows(conn, tablePath, rows, false);
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
+
+        // verify partition filter is pushed down in the execution plan
+        String plan =
+                tEnv.explainSql("select a, b, c from 
watermark_partitioned_table where c ='2025'");
+        assertThat(plan)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
watermark_partitioned_table, "
+                                + "watermark=[-(ts, 5000:INTERVAL SECOND)], "
+                                + "watermarkEmitStrategy=[on-periodic], "
+                                + "filter=[=(c, 
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")]]], "
+                                + "fields=[a, b, ts, c])");

Review Comment:
   This execution-plan assertion is very brittle because it matches Flink's 
exact string rendering (including charset/UTF-16LE literal formatting and 
watermarkEmitStrategy wording). Minor planner/formatting changes across Flink 
patch versions can break the test while behavior is still correct. Consider 
asserting smaller, stable substrings separately (e.g., that the scan contains 
the table name, a watermark spec, and the partition filter in the 
TableSourceScan) rather than the full formatted node string.
   ```suggestion
           // Assert on stable substrings rather than the full, 
formatter-dependent plan string
           assertThat(plan)
                   .contains("TableSourceScan(table=[[testcatalog, defaultdb, 
watermark_partitioned_table")
                   .contains("watermark=[-(ts")
                   .contains("filter=[=(c")
                   .contains("2025")
                   .contains("fields=[a, b, ts, c]");
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java:
##########
@@ -266,6 +276,149 @@ public void testTableLogSourceWithProjectionPushdown() 
throws Exception {
         assertThat(collectedElements).hasSameElementsAs(expectedOutput);
     }
 
+    /** Verifies that event-time timestamps are correctly assigned via 
WatermarkStrategy. */
+    @Test
+    void testTimestamp() throws Exception {
+        // 1. Create Fluss log table
+        String tableName = "wm_timestamp_test";
+        TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column("event_time", DataTypes.BIGINT())
+                        .build();
+        createTable(tablePath, 
TableDescriptor.builder().schema(schema).distributedBy(1).build());
+
+        // 2. Write 3 records with known event_time values
+        final long currentTimestamp = System.currentTimeMillis();
+        List<InternalRow> rows =
+                Arrays.asList(
+                        row(1, "name1", currentTimestamp + 1L),
+                        row(2, "name2", currentTimestamp + 2L),
+                        row(3, "name3", currentTimestamp + 3L));
+        writeRows(conn, tablePath, rows, true);
+
+        // 3. Build FlussSource and apply WatermarkStrategy with 
TimestampAssigner
+        FlussSource<RowData> source =
+                FlussSource.<RowData>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setDatabase(DEFAULT_DB)
+                        .setTable(tableName)
+                        .setStartingOffsets(OffsetsInitializer.earliest())
+                        .setDeserializationSchema(new 
RowDataDeserializationSchema())
+                        .build();
+
+        env.setParallelism(1);
+        DataStreamSource<RowData> stream =
+                env.fromSource(
+                        source,
+                        WatermarkStrategy.<RowData>noWatermarks()
+                                .withTimestampAssigner(
+                                        (rowData, ts) -> rowData.getLong(2)), 
// event_time column
+                        "testTimestamp");
+
+        // Verify that the timestamp and watermark are working fine.

Review Comment:
   The comment "Verify that the timestamp and watermark are working fine" is 
misleading here because the strategy is WatermarkStrategy.noWatermarks() (with 
only a TimestampAssigner), so no watermarks are actually generated. Update the 
wording to reflect that this test is validating event-time timestamp assignment 
(not watermark emission).
   ```suggestion
           // Verify that event-time timestamps are correctly assigned from the 
event_time column.
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java:
##########
@@ -266,6 +276,149 @@ public void testTableLogSourceWithProjectionPushdown() 
throws Exception {
         assertThat(collectedElements).hasSameElementsAs(expectedOutput);
     }
 
+    /** Verifies that event-time timestamps are correctly assigned via 
WatermarkStrategy. */
+    @Test
+    void testTimestamp() throws Exception {
+        // 1. Create Fluss log table
+        String tableName = "wm_timestamp_test";
+        TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column("event_time", DataTypes.BIGINT())
+                        .build();
+        createTable(tablePath, 
TableDescriptor.builder().schema(schema).distributedBy(1).build());
+
+        // 2. Write 3 records with known event_time values
+        final long currentTimestamp = System.currentTimeMillis();
+        List<InternalRow> rows =
+                Arrays.asList(
+                        row(1, "name1", currentTimestamp + 1L),
+                        row(2, "name2", currentTimestamp + 2L),
+                        row(3, "name3", currentTimestamp + 3L));
+        writeRows(conn, tablePath, rows, true);
+
+        // 3. Build FlussSource and apply WatermarkStrategy with 
TimestampAssigner
+        FlussSource<RowData> source =
+                FlussSource.<RowData>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setDatabase(DEFAULT_DB)
+                        .setTable(tableName)
+                        .setStartingOffsets(OffsetsInitializer.earliest())
+                        .setDeserializationSchema(new 
RowDataDeserializationSchema())
+                        .build();
+
+        env.setParallelism(1);
+        DataStreamSource<RowData> stream =
+                env.fromSource(
+                        source,
+                        WatermarkStrategy.<RowData>noWatermarks()
+                                .withTimestampAssigner(
+                                        (rowData, ts) -> rowData.getLong(2)), 
// event_time column
+                        "testTimestamp");
+
+        // Verify that the timestamp and watermark are working fine.
+        List<Long> result =
+                stream.transform(
+                                "timestampVerifier",
+                                TypeInformation.of(Long.class),
+                                new WatermarkVerifyingOperator(v -> 
v.getLong(2)))
+                        .executeAndCollect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        currentTimestamp + 1L, currentTimestamp + 2L, 
currentTimestamp + 3L);
+    }
+
+    /** Verifies per-bucket (per-split) watermark multiplexing correctness. */
+    @Test
+    void testPerBucketWatermark() throws Exception {
+        // 1. Create 2-bucket Fluss log table
+        String tableName = "wm_per_bucket_test";
+        TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column("ts", DataTypes.BIGINT())
+                        .build();
+        createTable(tablePath, 
TableDescriptor.builder().schema(schema).distributedBy(2).build());
+
+        // 2. Write 6 records with interleaved timestamps
+        List<InternalRow> rows =
+                Arrays.asList(
+                        row(1, "a", 100L),
+                        row(2, "b", 150L),
+                        row(3, "c", 200L),
+                        row(4, "d", 250L),
+                        row(5, "e", 300L),
+                        row(6, "f", 350L));
+        writeRows(conn, tablePath, rows, true);
+
+        // 3. Build FlussSource and apply per-split WatermarkStrategy
+        FlussSource<RowData> source =
+                FlussSource.<RowData>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setDatabase(DEFAULT_DB)
+                        .setTable(tableName)
+                        .setStartingOffsets(OffsetsInitializer.earliest())
+                        .setDeserializationSchema(new 
RowDataDeserializationSchema())
+                        .build();
+
+        env.setParallelism(1);
+
+        // 4. Assert per-split watermark ordering via ProcessFunction
+        env.fromSource(
+                        source,
+                        WatermarkStrategy.forGenerator(ctx -> new 
OnEventWatermarkGenerator())
+                                .withTimestampAssigner(
+                                        (rowData, ts) -> rowData.getLong(2)), 
// ts column
+                        "testPerPartitionWatermark")
+                .process(
+                        new ProcessFunction<RowData, Object>() {
+                            @Override
+                            public void processElement(
+                                    RowData value,
+                                    ProcessFunction<RowData, Object>.Context 
ctx,
+                                    Collector<Object> out) {
+                                assertThat(ctx.timestamp())
+                                        .as(
+                                                "Event time should never 
behind watermark "
+                                                        + "because of 
per-split watermark multiplexing logic")
+                                        .isGreaterThanOrEqualTo(
+                                                
ctx.timerService().currentWatermark());
+                                out.collect(ctx.timestamp());
+                            }
+                        })
+                .executeAndCollect(6);
+    }
+
+    /** A StreamMap that verifies the watermark logic. */
+    private static class WatermarkVerifyingOperator extends StreamMap<RowData, 
Long> {
+
+        private static final long serialVersionUID = 1L;
+
+        public WatermarkVerifyingOperator(MapFunction<RowData, Long> mapper) {
+            super(mapper);
+        }
+
+        @Override
+        public void processElement(StreamRecord<RowData> element) {

Review Comment:
   WatermarkVerifyingOperator extends StreamMap but overrides processElement 
without invoking the parent implementation, so the provided MapFunction is 
never used. This makes the operator misleading and harder to 
understand/maintain as a test utility. Consider replacing it with a minimal 
custom OneInputStreamOperator (or a ProcessFunction/map that explicitly emits 
element.getTimestamp()) to avoid carrying an unused mapper.
   ```suggestion
           public void processElement(StreamRecord<RowData> element) throws 
Exception {
               // Invoke the user-provided MapFunction to ensure it is not 
ignored.
               userFunction.map(element.getValue());
               // Emit the element timestamp as the output for watermark 
verification.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to