Copilot commented on code in PR #2972:
URL: https://github.com/apache/fluss/pull/2972#discussion_r3020092547
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java:
##########
@@ -149,6 +150,7 @@ public class FlinkTestBase extends AbstractTestBase {
@BeforeAll
protected static void beforeAll() {
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
Review Comment:
Setting CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER to ROUND_ROBIN in the shared
FlinkTestBase affects all Flink tests that use a table without bucket keys
(e.g., distributedBy(n) with no key). That can unintentionally change
behavior/assumptions in unrelated tests. Prefer scoping this config override to
only the tests that require round-robin distribution (or document why this
global change is safe), e.g., set it in the specific ITCase(s) before writing
rows and restore afterward.
```suggestion
Configuration baseConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
clientConf = new Configuration(baseConf);
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -1150,6 +1150,52 @@ void testStreamingReadSinglePartitionPushDown() throws
Exception {
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
}
+ @Test
+ void testStreamingReadPartitionPushDownWithWatermark() throws Exception {
+ tEnv.executeSql(
+ "create table watermark_partitioned_table"
+ + " (a int not null, b varchar, ts timestamp(3),"
+ + " c string,"
+ + " primary key (a, c) NOT ENFORCED,"
+ + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND)"
+ + " partitioned by (c) ");
+ TablePath tablePath = TablePath.of(DEFAULT_DB,
"watermark_partitioned_table");
+ tEnv.executeSql("alter table watermark_partitioned_table add partition
(c=2025)");
+ tEnv.executeSql("alter table watermark_partitioned_table add partition
(c=2026)");
+
+ // write data with 4 columns (a, b, ts, c), ts is nullable
+ List<InternalRow> rows = new ArrayList<>();
+ List<String> expectedRowValues = new ArrayList<>();
+ for (String partition : Arrays.asList("2025", "2026")) {
+ for (int i = 0; i < 10; i++) {
+ rows.add(row(i, "v1", null, partition));
+ if (partition.equals("2025")) {
+ expectedRowValues.add(String.format("+I[%d, v1, %s]", i,
partition));
+ }
+ }
+ }
+ writeRows(conn, tablePath, rows, false);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
+
+ // verify partition filter is pushed down in the execution plan
+ String plan =
+ tEnv.explainSql("select a, b, c from
watermark_partitioned_table where c ='2025'");
+ assertThat(plan)
+ .contains(
+ "TableSourceScan(table=[[testcatalog, defaultdb,
watermark_partitioned_table, "
+ + "watermark=[-(ts, 5000:INTERVAL SECOND)], "
+ + "watermarkEmitStrategy=[on-periodic], "
+ + "filter=[=(c,
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")]]], "
+ + "fields=[a, b, ts, c])");
Review Comment:
This execution-plan assertion is very brittle because it matches Flink's
exact string rendering (including charset/UTF-16LE literal formatting and
watermarkEmitStrategy wording). Minor planner/formatting changes across Flink
patch versions can break the test while behavior is still correct. Consider
asserting smaller, stable substrings separately (e.g., that the scan contains
the table name, a watermark spec, and the partition filter in the
TableSourceScan) rather than the full formatted node string.
```suggestion
// Assert on stable substrings rather than the full,
formatter-dependent plan string
assertThat(plan)
.contains("TableSourceScan(table=[[testcatalog, defaultdb,
watermark_partitioned_table")
.contains("watermark=[-(ts")
.contains("filter=[=(c")
.contains("2025")
.contains("fields=[a, b, ts, c]");
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java:
##########
@@ -266,6 +276,149 @@ public void testTableLogSourceWithProjectionPushdown()
throws Exception {
assertThat(collectedElements).hasSameElementsAs(expectedOutput);
}
+ /** Verifies that event-time timestamps are correctly assigned via
WatermarkStrategy. */
+ @Test
+ void testTimestamp() throws Exception {
+ // 1. Create Fluss log table
+ String tableName = "wm_timestamp_test";
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("event_time", DataTypes.BIGINT())
+ .build();
+ createTable(tablePath,
TableDescriptor.builder().schema(schema).distributedBy(1).build());
+
+ // 2. Write 3 records with known event_time values
+ final long currentTimestamp = System.currentTimeMillis();
+ List<InternalRow> rows =
+ Arrays.asList(
+ row(1, "name1", currentTimestamp + 1L),
+ row(2, "name2", currentTimestamp + 2L),
+ row(3, "name3", currentTimestamp + 3L));
+ writeRows(conn, tablePath, rows, true);
+
+ // 3. Build FlussSource and apply WatermarkStrategy with
TimestampAssigner
+ FlussSource<RowData> source =
+ FlussSource.<RowData>builder()
+ .setBootstrapServers(bootstrapServers)
+ .setDatabase(DEFAULT_DB)
+ .setTable(tableName)
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setDeserializationSchema(new
RowDataDeserializationSchema())
+ .build();
+
+ env.setParallelism(1);
+ DataStreamSource<RowData> stream =
+ env.fromSource(
+ source,
+ WatermarkStrategy.<RowData>noWatermarks()
+ .withTimestampAssigner(
+ (rowData, ts) -> rowData.getLong(2)),
// event_time column
+ "testTimestamp");
+
+ // Verify that the timestamp and watermark are working fine.
Review Comment:
The comment "Verify that the timestamp and watermark are working fine" is
misleading here because the strategy is WatermarkStrategy.noWatermarks() (with
only a TimestampAssigner), so no watermarks are actually generated. Update the
wording to reflect that this test is validating event-time timestamp assignment
(not watermark emission).
```suggestion
// Verify that event-time timestamps are correctly assigned from the
event_time column.
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java:
##########
@@ -266,6 +276,149 @@ public void testTableLogSourceWithProjectionPushdown()
throws Exception {
assertThat(collectedElements).hasSameElementsAs(expectedOutput);
}
+ /** Verifies that event-time timestamps are correctly assigned via
WatermarkStrategy. */
+ @Test
+ void testTimestamp() throws Exception {
+ // 1. Create Fluss log table
+ String tableName = "wm_timestamp_test";
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("event_time", DataTypes.BIGINT())
+ .build();
+ createTable(tablePath,
TableDescriptor.builder().schema(schema).distributedBy(1).build());
+
+ // 2. Write 3 records with known event_time values
+ final long currentTimestamp = System.currentTimeMillis();
+ List<InternalRow> rows =
+ Arrays.asList(
+ row(1, "name1", currentTimestamp + 1L),
+ row(2, "name2", currentTimestamp + 2L),
+ row(3, "name3", currentTimestamp + 3L));
+ writeRows(conn, tablePath, rows, true);
+
+ // 3. Build FlussSource and apply WatermarkStrategy with
TimestampAssigner
+ FlussSource<RowData> source =
+ FlussSource.<RowData>builder()
+ .setBootstrapServers(bootstrapServers)
+ .setDatabase(DEFAULT_DB)
+ .setTable(tableName)
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setDeserializationSchema(new
RowDataDeserializationSchema())
+ .build();
+
+ env.setParallelism(1);
+ DataStreamSource<RowData> stream =
+ env.fromSource(
+ source,
+ WatermarkStrategy.<RowData>noWatermarks()
+ .withTimestampAssigner(
+ (rowData, ts) -> rowData.getLong(2)),
// event_time column
+ "testTimestamp");
+
+ // Verify that the timestamp and watermark are working fine.
+ List<Long> result =
+ stream.transform(
+ "timestampVerifier",
+ TypeInformation.of(Long.class),
+ new WatermarkVerifyingOperator(v ->
v.getLong(2)))
+ .executeAndCollect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ currentTimestamp + 1L, currentTimestamp + 2L,
currentTimestamp + 3L);
+ }
+
+ /** Verifies per-bucket (per-split) watermark multiplexing correctness. */
+ @Test
+ void testPerBucketWatermark() throws Exception {
+ // 1. Create 2-bucket Fluss log table
+ String tableName = "wm_per_bucket_test";
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("ts", DataTypes.BIGINT())
+ .build();
+ createTable(tablePath,
TableDescriptor.builder().schema(schema).distributedBy(2).build());
+
+ // 2. Write 6 records with interleaved timestamps
+ List<InternalRow> rows =
+ Arrays.asList(
+ row(1, "a", 100L),
+ row(2, "b", 150L),
+ row(3, "c", 200L),
+ row(4, "d", 250L),
+ row(5, "e", 300L),
+ row(6, "f", 350L));
+ writeRows(conn, tablePath, rows, true);
+
+ // 3. Build FlussSource and apply per-split WatermarkStrategy
+ FlussSource<RowData> source =
+ FlussSource.<RowData>builder()
+ .setBootstrapServers(bootstrapServers)
+ .setDatabase(DEFAULT_DB)
+ .setTable(tableName)
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setDeserializationSchema(new
RowDataDeserializationSchema())
+ .build();
+
+ env.setParallelism(1);
+
+ // 4. Assert per-split watermark ordering via ProcessFunction
+ env.fromSource(
+ source,
+ WatermarkStrategy.forGenerator(ctx -> new
OnEventWatermarkGenerator())
+ .withTimestampAssigner(
+ (rowData, ts) -> rowData.getLong(2)),
// ts column
+ "testPerPartitionWatermark")
+ .process(
+ new ProcessFunction<RowData, Object>() {
+ @Override
+ public void processElement(
+ RowData value,
+ ProcessFunction<RowData, Object>.Context
ctx,
+ Collector<Object> out) {
+ assertThat(ctx.timestamp())
+ .as(
+ "Event time should never
behind watermark "
+ + "because of
per-split watermark multiplexing logic")
+ .isGreaterThanOrEqualTo(
+
ctx.timerService().currentWatermark());
+ out.collect(ctx.timestamp());
+ }
+ })
+ .executeAndCollect(6);
+ }
+
+ /** A StreamMap that verifies the watermark logic. */
+ private static class WatermarkVerifyingOperator extends StreamMap<RowData,
Long> {
+
+ private static final long serialVersionUID = 1L;
+
+ public WatermarkVerifyingOperator(MapFunction<RowData, Long> mapper) {
+ super(mapper);
+ }
+
+ @Override
+ public void processElement(StreamRecord<RowData> element) {
Review Comment:
WatermarkVerifyingOperator extends StreamMap but overrides processElement
without invoking the parent implementation, so the provided MapFunction is
never used. This makes the operator misleading and harder to
understand/maintain as a test utility. Consider replacing it with a minimal
custom OneInputStreamOperator (or a ProcessFunction/map that explicitly emits
element.getTimestamp()) to avoid carrying an unused mapper.
```suggestion
public void processElement(StreamRecord<RowData> element) throws
Exception {
// Invoke the user-provided MapFunction to ensure it is not
ignored.
userFunction.map(element.getValue());
// Emit the element timestamp as the output for watermark
verification.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]