[ 
https://issues.apache.org/jira/browse/FLINK-39062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

featzhang updated FLINK-39062:
------------------------------
    Description: 
h1. 1. Motivation

Currently, watermark definition in Flink SQL is tightly coupled with table DDL. 
This introduces several limitations:
 # Watermark is treated as catalog metadata instead of a logical relational 
property.

 # It cannot be flexibly reused across logical abstractions (e.g., view-based 
pipelines).

 # It complicates modular query design when watermark logic needs to be shared.

During earlier discussion, extending VIEW DDL to support watermark was 
considered. However, this approach raised concerns around:
 * Semantic ambiguity between metadata and logical properties

 * Increased complexity in view expansion and optimization

 * Potential impact on existing DDL semantics

To address these concerns while still enabling logical reuse of watermark 
definitions, this FLIP proposes introducing a function-based approach.
----
h1. 2. Proposed Solution

Introduce a built-in table function:
 
 
 
 
APPLY_WATERMARK(
table,
DESCRIPTOR(rowtime_column),
watermark_expression
)
 
This function attaches watermark semantics at the logical relational level 
instead of the catalog level.
----
h1. 3. Syntax
 
 
 
 
SELECT *
FROM APPLY_WATERMARK(
Orders,
DESCRIPTOR(order_time),
order_time - INTERVAL '5' SECOND
);
 
The first argument supports:
 * Base tables

 * Views (including non-materialized views)

 * Subqueries

This enables logical reuse without introducing watermark semantics into catalog 
DDL.
----
h1. 4. Semantics
 * The function returns a relation identical to the input table but with an 
attached watermark definition.

 * The watermark is scoped to the query.

 * It does not modify catalog metadata.

 * It does not alter view definitions.

Watermark propagation rules remain consistent with current planner behavior.
----
h1. 5. Why Function-Based Instead of VIEW Extension

The function-based approach:
 * Keeps watermark as a logical property rather than catalog metadata

 * Avoids modifying VIEW DDL semantics

 * Avoids ambiguity between physical and logical properties

 * Aligns with existing function-style relational extensions

 * Minimizes impact on optimizer and catalog layers

This design keeps the scope focused and reduces planner and metadata coupling.
----
h1. 6. Design Evolution

The initial proposal considered allowing watermark definition directly inside 
VIEW DDL to support reuse.

After discussion, it was identified that:
 * Watermark semantics are fundamentally logical rather than metadata.

 * Embedding watermark in VIEW DDL introduces complexity in view expansion.

 * It may blur the boundary between logical and catalog layers.

Based on community feedback, the proposal pivots to a function-based solution 
that preserves logical reuse while avoiding DDL-level changes.

The view-based design is no longer preferred.
----
h1. 7. Compatibility
 * No changes to existing table DDL behavior.

 * No backward compatibility impact.

 * Purely additive feature.

----
h1. 8. Scope

This FLIP only introduces:
 * The APPLY_WATERMARK function

 * Planner support for logical watermark attachment

It does not:
 * Modify catalog metadata

 * Change existing VIEW DDL syntax

 * Introduce physical materialization behavior

----
h1. 9. Implementation Plan
 # Add APPLY_WATERMARK function definition

 # Extend planner to attach logical watermark trait

 # Add validation rules for descriptor and expression

 # Add unit and integration tests

----
h1. 10. Open Questions
 * Should nested APPLY_WATERMARK calls be allowed?

 * Should conflicting watermark definitions throw validation errors?

  was:
h3. Summary

Flink SQL views currently do not support watermark definitions, which limits 
their effectiveness in event-time processing. Users cannot define watermarks on 
views, forcing them to expose underlying table structures and duplicate 
temporal logic across queries.

This issue proposes adding watermark support for views through two new SQL 
syntax options:
 # {{CREATE VIEW ... WATERMARK FOR ...}}
 # {{ALTER VIEW SET WATERMARK FOR ...}}

----
h3. Problem Statement

*Current Limitation:*

Views in Flink SQL are logical constructs that do not support watermark 
definitions. This creates several critical limitations:
 # ❌ *Broken Abstraction* - Users must reference underlying table watermarks 
directly
 # ❌ *No Strategy Flexibility* - Cannot define different watermark strategies 
for different downstream use cases
 # ❌ *Code Duplication* - Temporal logic must be repeated in every query
 # ❌ *Limited Architecture Support* - Incompatible with modern layered data 
architectures (Bronze/Silver/Gold)

*Example of Current Workaround:*
{code:sql}
-- Source table with watermark
CREATE TABLE raw_events (
    event_id BIGINT,
    event_time TIMESTAMP(3),
    data STRING,
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (...);

-- View without watermark (watermark inherited implicitly)
CREATE VIEW cleaned_events AS 
SELECT event_id, event_time FROM raw_events WHERE data IS NOT NULL;

-- Query must assume watermark from raw_events
SELECT TUMBLE_START(event_time, INTERVAL '1' HOUR), COUNT(*)
FROM cleaned_events
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);
{code}
Problems:
 * View cannot define its own watermark strategy
 * Cannot create multiple views with different watermark strategies on same 
source
 * Watermark propagation is implicit and unclear

----
h3. Proposed Solution
h4. Syntax 1: CREATE VIEW with WATERMARK
{code:sql}
CREATE VIEW view_name
WATERMARK FOR time_column AS watermark_expression
AS SELECT ...;
{code}
*Example:*
{code:sql}
CREATE VIEW user_activity
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
AS SELECT user_id, event_time, action_type
FROM raw_events
WHERE action_type IN ('click', 'view', 'purchase');
{code}
h4. Syntax 2: ALTER VIEW SET WATERMARK
{code:sql}
ALTER VIEW view_name SET WATERMARK FOR time_column AS watermark_expression;
{code}
*Example:*
{code:sql}
-- Create view first
CREATE VIEW user_activity AS 
SELECT user_id, event_time, action_type FROM raw_events;

-- Add watermark later
ALTER VIEW user_activity 
SET WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND;
{code}
----
h3. Use Cases
h4. Use Case 1: Multi-Tenant Scenarios

Different tenants require different lateness tolerance:
{code:sql}
-- Source table (no watermark)
CREATE TABLE all_events (
    event_id BIGINT,
    event_time TIMESTAMP(3),
    tenant_id STRING,
    data STRING
) WITH ('connector' = 'kafka', ...);

-- Tenant A: Low-latency (5-second tolerance)
CREATE VIEW tenant_a_events
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
AS SELECT * FROM all_events WHERE tenant_id = 'tenant_a';

-- Tenant B: Batch-like (30-second tolerance)
CREATE VIEW tenant_b_events
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
AS SELECT * FROM all_events WHERE tenant_id = 'tenant_b';
{code}
h4. Use Case 2: Data Lakehouse Architecture

Bronze/Silver/Gold layers with progressive watermark strategies:
{code:sql}
-- Bronze: Raw data (no watermark)
CREATE TABLE bronze_events (raw_data STRING, ingestion_time TIMESTAMP(3)) 
WITH (...);

-- Silver: Cleaned data with watermark
CREATE VIEW silver_events
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
AS SELECT 
    CAST(JSON_VALUE(raw_data, '$.event_time') AS TIMESTAMP(3)) AS event_time,
    JSON_VALUE(raw_data, '$.user_id') AS user_id
FROM bronze_events
WHERE JSON_VALUE(raw_data, '$.event_time') IS NOT NULL;

-- Gold: Business aggregations
CREATE VIEW gold_hourly_stats AS
SELECT 
    TUMBLE_START(event_time, INTERVAL '1' HOUR) AS hour,
    user_id,
    COUNT(*) AS event_count
FROM silver_events
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), user_id;
{code}
h4. Use Case 3: Data Quality Monitoring

View for detecting late data:
{code:sql}
CREATE VIEW late_data_monitor
WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE
AS SELECT 
    data_source,
    event_time,
    processing_time,
    TIMESTAMPDIFF(SECOND, event_time, processing_time) AS latency_seconds
FROM all_events
WHERE TIMESTAMPDIFF(SECOND, event_time, processing_time) > 60;
{code}
----
h2. Implementation Plan
h3. Affected Components
h4. 1. Parser Layer ({{{}flink-sql-parser{}}})
 * Extend {{parserImpls.ftl}} to support WATERMARK clause in CREATE VIEW
 * Add {{SqlAlterViewSetWatermark}} AST node for ALTER VIEW syntax
 * Parse watermark expression as {{SqlNode}}

*Files to Modify:*
 * {{flink-sql-parser/src/main/codegen/includes/parserImpls.ftl}}
 * 
{{flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterViewSetWatermarkOperation.java}}
 (new)

h4. 2. Planner Layer ({{{}flink-table-planner{}}})
 * Implement {{SqlAlterViewSetWatermarkConverter}} to convert AST → Operation
 * Add validation logic in {{{}OperationConverterUtils{}}}:
 ** Watermark column exists in view schema
 ** Column type is TIMESTAMP or TIMESTAMP_LTZ
 ** Only one watermark per view
 ** Expression is valid SQL

*Files to Modify:*
 * 
{{flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlAlterViewSetWatermarkConverter.java}}
 (new)
 * 
{{flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala}}

h4. 3. Catalog Layer ({{{}flink-table-common{}}})
 * Store watermark metadata in {{CatalogView}} options:
{code:java}
Map<String, String> options = new HashMap<>();
options.put("watermark.column", "event_time");
options.put("watermark.strategy", "event_time - INTERVAL '5' SECOND");
{code}

*Files to Modify:*
 * 
{{flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogView.java}}

h4. 4. Execution Layer ({{{}flink-table-api-java{}}})
 * Modify {{TableEnvironmentImpl}} to apply watermark when resolving views
 * Implement watermark propagation during query planning

*Files to Modify:*
 * 
{{flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java}}

----
h2. Acceptance Criteria
h3. Functional Requirements
 * Users can create views with watermark using {{CREATE VIEW ... WATERMARK ...}}
 * Users can add/modify watermark using {{ALTER VIEW SET WATERMARK ...}}
 * Watermark column must exist in view's output schema (validated at creation 
time)
 * Watermark column must be TIMESTAMP or TIMESTAMP_LTZ type (validated at 
creation time)
 * Watermark expression is validated at view creation/alteration
 * Time window queries on views with watermark execute correctly
 * Watermark metadata persists in catalog across restarts
 * {{DESCRIBE VIEW}} shows watermark information

h3. Non-Functional Requirements
 * No performance regression on existing views without watermark
 * Backward compatible: old views work without modification
 * Watermark metadata works with all catalog implementations (GenericInMemory, 
Hive, JDBC)
 * Clear error messages for invalid watermark definitions

h3. Test Coverage
 * {*}Parser Tests{*}: SQL syntax parsing for CREATE VIEW and ALTER VIEW
 * {*}Converter Tests{*}: AST to Operation conversion
 * {*}Planner Tests{*}: Query planning with view watermarks
 * {*}Integration Tests{*}: E2E with SQL Client and Table API
 * {*}Negative Tests{*}: Invalid column, wrong type, duplicate watermark
 * {*}Catalog Tests{*}: Persistence across sessions
 * {*}Compatibility Tests{*}: Old views still work

----
h2. Test Cases
h3. Positive Test Cases
{code:sql}
-- TC-01: CREATE VIEW with watermark
CREATE VIEW test_view1
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
AS SELECT event_id, event_time FROM source_table;

-- TC-02: ALTER VIEW to add watermark
CREATE VIEW test_view2 AS SELECT event_id, event_time FROM source_table;
ALTER VIEW test_view2 SET WATERMARK FOR event_time AS event_time - INTERVAL 
'10' SECOND;

-- TC-03: Time window query on view
SELECT 
    TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
    COUNT(*) AS event_count
FROM test_view1
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);

-- TC-04: Complex watermark expression
CREATE VIEW test_view3
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND - INTERVAL '500' 
MILLISECOND
AS SELECT * FROM source_table;

-- TC-05: DESCRIBE VIEW shows watermark
DESCRIBE test_view1;
-- Expected output includes: WATERMARK FOR event_time AS event_time - INTERVAL 
'5' SECOND
{code}
h3. Negative Test Cases
{code:sql}
-- TC-06: Watermark column doesn't exist (should fail)
CREATE VIEW test_error1
WATERMARK FOR non_existent_col AS non_existent_col - INTERVAL '5' SECOND
AS SELECT event_id FROM source_table;
-- Expected: "Column 'non_existent_col' not found in view schema"

-- TC-07: Watermark column is not TIMESTAMP type (should fail)
CREATE VIEW test_error2
WATERMARK FOR event_id AS event_id - INTERVAL '5' SECOND
AS SELECT event_id, event_time FROM source_table;
-- Expected: "Column 'event_id' must be of type TIMESTAMP or TIMESTAMP_LTZ"

-- TC-08: Invalid watermark expression (should fail)
CREATE VIEW test_error3
WATERMARK FOR event_time AS invalid_expression
AS SELECT event_id, event_time FROM source_table;
-- Expected: "Invalid watermark expression syntax"

-- TC-09: ALTER VIEW on non-existent view (should fail)
ALTER VIEW non_existent_view SET WATERMARK FOR event_time AS event_time - 
INTERVAL '5' SECOND;
-- Expected: "View 'non_existent_view' does not exist"

-- TC-10: Duplicate watermark definition (should fail)
CREATE VIEW test_error4
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
AS SELECT event_id, event_time FROM 
    (SELECT * FROM source_table WATERMARK FOR event_time AS event_time - 
INTERVAL '10' SECOND);
-- Expected: "View already has a watermark definition"
{code}
----
h2. Backward Compatibility
h3. No Breaking Changes

✅ *Fully backward compatible*
 * Existing views without watermark continue to work unchanged
 * Watermark is stored as optional metadata in view options
 * Older Flink versions ignore watermark options (graceful degradation)

h3. Migration Path

Users can adopt this feature gradually:
 # {*}Phase 1{*}: Continue using existing views without watermark
 # {*}Phase 2{*}: Create new views with watermark syntax
 # {*}Phase 3{*}: Use {{ALTER VIEW SET WATERMARK}} to add watermark to existing 
views incrementally

----
h2. Performance Impact
h3. Expected Impact
 * ✅ *No regression* on views without watermark
 * ✅ *Minimal overhead* for views with watermark (same as table watermark 
processing)
 * ✅ *Catalog overhead* negligible (storing 2 strings in options map)

h3. Benchmarking Plan
 * Measure query latency with and without view watermarks
 * Compare catalog read/write performance
 * Validate checkpoint size and recovery time

----
h2. Related Issues
 * FLINK-XXXXX: FLIP-296: Extend watermark-related features for SQL
 * FLINK-XXXXX: Support event time in SQL views (if exists)
 * FLINK-XXXXX: Improve watermark propagation in multi-source queries

----
h2. Documentation Requirements
h3. User Documentation
 * Update SQL {{CREATE VIEW}} documentation page
 * Add new {{ALTER VIEW SET WATERMARK}} documentation
 * Add examples to "Event Time and Watermarks" guide
 * Add troubleshooting section for common errors

h3. Developer Documentation
 * Architecture design document (FLIP or tech doc)
 * Implementation guide for contributors
 * JavaDoc/ScalaDoc for new classes and methods

h2. Open Questions
h3. Q1: Watermark Conflict in Joins

*Question:* What happens when a view with watermark is joined with a table with 
a different watermark?

*Proposed Answer:* Follow existing Flink watermark propagation rules:
 * For inner joins: Use the minimum watermark of both sides
 * For outer joins: Depends on join type (LEFT/RIGHT/FULL)

h3. Q2: Nested View Watermarks

*Question:* How to handle watermark when a view is built on another view with 
watermark?

*Proposed Answer:* Child view's watermark overrides parent's watermark (similar 
to how views override underlying table schemas).
h3. Q3: ALTER VIEW DROP WATERMARK

*Question:* Should we support removing watermark from views?

*Proposed Answer:* Defer to Phase 2 (future enhancement). Initial 
implementation focuses on ADD/UPDATE only.
----
h2. Alternative Approaches Considered
h3. Alternative 1: Query Hints
{code:sql}
SELECT /*+ WATERMARK('event_time', INTERVAL '5' SECOND) */ * FROM my_view;
{code}
❌ {*}Rejected{*}: Not reusable, must be specified in every query
h3. Alternative 2: Table Functions
{code:sql}
SELECT * FROM TABLE(with_watermark(my_view, 'event_time', INTERVAL '5' SECOND));
{code}
❌ {*}Rejected{*}: Verbose, unintuitive, doesn't leverage existing SQL syntax
h3. Alternative 3: Materialized Views with Watermark
{code:sql}
CREATE MATERIALIZED VIEW mv WATERMARK FOR ... AS SELECT ...;
{code}
❌ {*}Rejected{*}: Too heavyweight (requires physical storage), doesn't solve 
abstraction problem

*Conclusion:* View-level watermark definition is the cleanest approach.


> Support logical-level watermark definition via SQL function
> -----------------------------------------------------------
>
>                 Key: FLINK-39062
>                 URL: https://issues.apache.org/jira/browse/FLINK-39062
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API, Table SQL / Planner
>            Reporter: featzhang
>            Priority: Major
>
> h1. 1. Motivation
> Currently, watermark definition in Flink SQL is tightly coupled with table 
> DDL. This introduces several limitations:
>  # Watermark is treated as catalog metadata instead of a logical relational 
> property.
>  # It cannot be flexibly reused across logical abstractions (e.g., view-based 
> pipelines).
>  # It complicates modular query design when watermark logic needs to be 
> shared.
> During earlier discussion, extending VIEW DDL to support watermark was 
> considered. However, this approach raised concerns around:
>  * Semantic ambiguity between metadata and logical properties
>  * Increased complexity in view expansion and optimization
>  * Potential impact on existing DDL semantics
> To address these concerns while still enabling logical reuse of watermark 
> definitions, this FLIP proposes introducing a function-based approach.
> ----
> h1. 2. Proposed Solution
> Introduce a built-in table function:
>  
>  
>  
>  
> APPLY_WATERMARK(
> table,
> DESCRIPTOR(rowtime_column),
> watermark_expression
> )
>  
> This function attaches watermark semantics at the logical relational level 
> instead of the catalog level.
> ----
> h1. 3. Syntax
>  
>  
>  
>  
> SELECT *
> FROM APPLY_WATERMARK(
> Orders,
> DESCRIPTOR(order_time),
> order_time - INTERVAL '5' SECOND
> );
>  
> The first argument supports:
>  * Base tables
>  * Views (including non-materialized views)
>  * Subqueries
> This enables logical reuse without introducing watermark semantics into 
> catalog DDL.
> ----
> h1. 4. Semantics
>  * The function returns a relation identical to the input table but with an 
> attached watermark definition.
>  * The watermark is scoped to the query.
>  * It does not modify catalog metadata.
>  * It does not alter view definitions.
> Watermark propagation rules remain consistent with current planner behavior.
> ----
> h1. 5. Why Function-Based Instead of VIEW Extension
> The function-based approach:
>  * Keeps watermark as a logical property rather than catalog metadata
>  * Avoids modifying VIEW DDL semantics
>  * Avoids ambiguity between physical and logical properties
>  * Aligns with existing function-style relational extensions
>  * Minimizes impact on optimizer and catalog layers
> This design keeps the scope focused and reduces planner and metadata coupling.
> ----
> h1. 6. Design Evolution
> The initial proposal considered allowing watermark definition directly inside 
> VIEW DDL to support reuse.
> After discussion, it was identified that:
>  * Watermark semantics are fundamentally logical rather than metadata.
>  * Embedding watermark in VIEW DDL introduces complexity in view expansion.
>  * It may blur the boundary between logical and catalog layers.
> Based on community feedback, the proposal pivots to a function-based solution 
> that preserves logical reuse while avoiding DDL-level changes.
> The view-based design is no longer preferred.
> ----
> h1. 7. Compatibility
>  * No changes to existing table DDL behavior.
>  * No backward compatibility impact.
>  * Purely additive feature.
> ----
> h1. 8. Scope
> This FLIP only introduces:
>  * The APPLY_WATERMARK function
>  * Planner support for logical watermark attachment
> It does not:
>  * Modify catalog metadata
>  * Change existing VIEW DDL syntax
>  * Introduce physical materialization behavior
> ----
> h1. 9. Implementation Plan
>  # Add APPLY_WATERMARK function definition
>  # Extend planner to attach logical watermark trait
>  # Add validation rules for descriptor and expression
>  # Add unit and integration tests
> ----
> h1. 10. Open Questions
>  * Should nested APPLY_WATERMARK calls be allowed?
>  * Should conflicting watermark definitions throw validation errors?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to