[ 
https://issues.apache.org/jira/browse/FLINK-39062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

featzhang updated FLINK-39062:
------------------------------
    Summary: Support logical-level watermark definition via SQL function  (was: 
Support Watermark Definition in SQL Views)

> Support logical-level watermark definition via SQL function
> -----------------------------------------------------------
>
>                 Key: FLINK-39062
>                 URL: https://issues.apache.org/jira/browse/FLINK-39062
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API, Table SQL / Planner
>            Reporter: featzhang
>            Priority: Major
>
> h3. Summary
> Flink SQL views currently do not support watermark definitions, which limits 
> their effectiveness in event-time processing. Users cannot define watermarks 
> on views, forcing them to expose underlying table structures and duplicate 
> temporal logic across queries.
> This issue proposes adding watermark support for views through two new SQL 
> syntax options:
>  # {{CREATE VIEW ... WATERMARK FOR ...}}
>  # {{ALTER VIEW SET WATERMARK FOR ...}}
> ----
> h3. Problem Statement
> *Current Limitation:*
> Views in Flink SQL are logical constructs that do not support watermark 
> definitions. This creates several critical limitations:
>  # ❌ *Broken Abstraction* - Users must reference underlying table watermarks 
> directly
>  # ❌ *No Strategy Flexibility* - Cannot define different watermark strategies 
> for different downstream use cases
>  # ❌ *Code Duplication* - Temporal logic must be repeated in every query
>  # ❌ *Limited Architecture Support* - Incompatible with modern layered data 
> architectures (Bronze/Silver/Gold)
> *Example of Current Workaround:*
> {code:sql}
> -- Source table with watermark
> CREATE TABLE raw_events (
>     event_id BIGINT,
>     event_time TIMESTAMP(3),
>     data STRING,
>     WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> ) WITH (...);
> -- View without watermark (watermark inherited implicitly)
> CREATE VIEW cleaned_events AS 
> SELECT event_id, event_time FROM raw_events WHERE data IS NOT NULL;
> -- Query must assume watermark from raw_events
> SELECT TUMBLE_START(event_time, INTERVAL '1' HOUR), COUNT(*)
> FROM cleaned_events
> GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);
> {code}
> Problems:
>  * View cannot define its own watermark strategy
>  * Cannot create multiple views with different watermark strategies on same 
> source
>  * Watermark propagation is implicit and unclear
> ----
> h3. Proposed Solution
> h4. Syntax 1: CREATE VIEW with WATERMARK
> {code:sql}
> CREATE VIEW view_name
> WATERMARK FOR time_column AS watermark_expression
> AS SELECT ...;
> {code}
> *Example:*
> {code:sql}
> CREATE VIEW user_activity
> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> AS SELECT user_id, event_time, action_type
> FROM raw_events
> WHERE action_type IN ('click', 'view', 'purchase');
> {code}
> h4. Syntax 2: ALTER VIEW SET WATERMARK
> {code:sql}
> ALTER VIEW view_name SET WATERMARK FOR time_column AS watermark_expression;
> {code}
> *Example:*
> {code:sql}
> -- Create view first
> CREATE VIEW user_activity AS 
> SELECT user_id, event_time, action_type FROM raw_events;
> -- Add watermark later
> ALTER VIEW user_activity 
> SET WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND;
> {code}
> ----
> h3. Use Cases
> h4. Use Case 1: Multi-Tenant Scenarios
> Different tenants require different lateness tolerance:
> {code:sql}
> -- Source table (no watermark)
> CREATE TABLE all_events (
>     event_id BIGINT,
>     event_time TIMESTAMP(3),
>     tenant_id STRING,
>     data STRING
> ) WITH ('connector' = 'kafka', ...);
> -- Tenant A: Low-latency (5-second tolerance)
> CREATE VIEW tenant_a_events
> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> AS SELECT * FROM all_events WHERE tenant_id = 'tenant_a';
> -- Tenant B: Batch-like (30-second tolerance)
> CREATE VIEW tenant_b_events
> WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
> AS SELECT * FROM all_events WHERE tenant_id = 'tenant_b';
> {code}
> h4. Use Case 2: Data Lakehouse Architecture
> Bronze/Silver/Gold layers with progressive watermark strategies:
> {code:sql}
> -- Bronze: Raw data (no watermark)
> CREATE TABLE bronze_events (raw_data STRING, ingestion_time TIMESTAMP(3)) 
> WITH (...);
> -- Silver: Cleaned data with watermark
> CREATE VIEW silver_events
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> AS SELECT 
>     CAST(JSON_VALUE(raw_data, '$.event_time') AS TIMESTAMP(3)) AS event_time,
>     JSON_VALUE(raw_data, '$.user_id') AS user_id
> FROM bronze_events
> WHERE JSON_VALUE(raw_data, '$.event_time') IS NOT NULL;
> -- Gold: Business aggregations
> CREATE VIEW gold_hourly_stats AS
> SELECT 
>     TUMBLE_START(event_time, INTERVAL '1' HOUR) AS hour,
>     user_id,
>     COUNT(*) AS event_count
> FROM silver_events
> GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), user_id;
> {code}
> h4. Use Case 3: Data Quality Monitoring
> View for detecting late data:
> {code:sql}
> CREATE VIEW late_data_monitor
> WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE
> AS SELECT 
>     data_source,
>     event_time,
>     processing_time,
>     TIMESTAMPDIFF(SECOND, event_time, processing_time) AS latency_seconds
> FROM all_events
> WHERE TIMESTAMPDIFF(SECOND, event_time, processing_time) > 60;
> {code}
> ----
> h2. Implementation Plan
> h3. Affected Components
> h4. 1. Parser Layer ({{{}flink-sql-parser{}}})
>  * Extend {{parserImpls.ftl}} to support WATERMARK clause in CREATE VIEW
>  * Add {{SqlAlterViewSetWatermark}} AST node for ALTER VIEW syntax
>  * Parse watermark expression as {{SqlNode}}
> *Files to Modify:*
>  * {{flink-sql-parser/src/main/codegen/includes/parserImpls.ftl}}
>  * 
> {{flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterViewSetWatermarkOperation.java}}
>  (new)
> h4. 2. Planner Layer ({{{}flink-table-planner{}}})
>  * Implement {{SqlAlterViewSetWatermarkConverter}} to convert AST → Operation
>  * Add validation logic in {{{}OperationConverterUtils{}}}:
>  ** Watermark column exists in view schema
>  ** Column type is TIMESTAMP or TIMESTAMP_LTZ
>  ** Only one watermark per view
>  ** Expression is valid SQL
> *Files to Modify:*
>  * 
> {{flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlAlterViewSetWatermarkConverter.java}}
>  (new)
>  * 
> {{flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala}}
> h4. 3. Catalog Layer ({{{}flink-table-common{}}})
>  * Store watermark metadata in {{CatalogView}} options:
> {code:java}
> Map<String, String> options = new HashMap<>();
> options.put("watermark.column", "event_time");
> options.put("watermark.strategy", "event_time - INTERVAL '5' SECOND");
> {code}
> *Files to Modify:*
>  * 
> {{flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogView.java}}
> h4. 4. Execution Layer ({{{}flink-table-api-java{}}})
>  * Modify {{TableEnvironmentImpl}} to apply watermark when resolving views
>  * Implement watermark propagation during query planning
> *Files to Modify:*
>  * 
> {{flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java}}
> ----
> h2. Acceptance Criteria
> h3. Functional Requirements
>  * Users can create views with watermark using {{CREATE VIEW ... WATERMARK 
> ...}}
>  * Users can add/modify watermark using {{ALTER VIEW SET WATERMARK ...}}
>  * Watermark column must exist in view's output schema (validated at creation 
> time)
>  * Watermark column must be TIMESTAMP or TIMESTAMP_LTZ type (validated at 
> creation time)
>  * Watermark expression is validated at view creation/alteration
>  * Time window queries on views with watermark execute correctly
>  * Watermark metadata persists in catalog across restarts
>  * {{DESCRIBE VIEW}} shows watermark information
> h3. Non-Functional Requirements
>  * No performance regression on existing views without watermark
>  * Backward compatible: old views work without modification
>  * Watermark metadata works with all catalog implementations 
> (GenericInMemory, Hive, JDBC)
>  * Clear error messages for invalid watermark definitions
> h3. Test Coverage
>  * {*}Parser Tests{*}: SQL syntax parsing for CREATE VIEW and ALTER VIEW
>  * {*}Converter Tests{*}: AST to Operation conversion
>  * {*}Planner Tests{*}: Query planning with view watermarks
>  * {*}Integration Tests{*}: E2E with SQL Client and Table API
>  * {*}Negative Tests{*}: Invalid column, wrong type, duplicate watermark
>  * {*}Catalog Tests{*}: Persistence across sessions
>  * {*}Compatibility Tests{*}: Old views still work
> ----
> h2. Test Cases
> h3. Positive Test Cases
> {code:sql}
> -- TC-01: CREATE VIEW with watermark
> CREATE VIEW test_view1
> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> AS SELECT event_id, event_time FROM source_table;
> -- TC-02: ALTER VIEW to add watermark
> CREATE VIEW test_view2 AS SELECT event_id, event_time FROM source_table;
> ALTER VIEW test_view2 SET WATERMARK FOR event_time AS event_time - INTERVAL 
> '10' SECOND;
> -- TC-03: Time window query on view
> SELECT 
>     TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
>     COUNT(*) AS event_count
> FROM test_view1
> GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);
> -- TC-04: Complex watermark expression
> CREATE VIEW test_view3
> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND - INTERVAL '500' 
> MILLISECOND
> AS SELECT * FROM source_table;
> -- TC-05: DESCRIBE VIEW shows watermark
> DESCRIBE test_view1;
> -- Expected output includes: WATERMARK FOR event_time AS event_time - 
> INTERVAL '5' SECOND
> {code}
> h3. Negative Test Cases
> {code:sql}
> -- TC-06: Watermark column doesn't exist (should fail)
> CREATE VIEW test_error1
> WATERMARK FOR non_existent_col AS non_existent_col - INTERVAL '5' SECOND
> AS SELECT event_id FROM source_table;
> -- Expected: "Column 'non_existent_col' not found in view schema"
> -- TC-07: Watermark column is not TIMESTAMP type (should fail)
> CREATE VIEW test_error2
> WATERMARK FOR event_id AS event_id - INTERVAL '5' SECOND
> AS SELECT event_id, event_time FROM source_table;
> -- Expected: "Column 'event_id' must be of type TIMESTAMP or TIMESTAMP_LTZ"
> -- TC-08: Invalid watermark expression (should fail)
> CREATE VIEW test_error3
> WATERMARK FOR event_time AS invalid_expression
> AS SELECT event_id, event_time FROM source_table;
> -- Expected: "Invalid watermark expression syntax"
> -- TC-09: ALTER VIEW on non-existent view (should fail)
> ALTER VIEW non_existent_view SET WATERMARK FOR event_time AS event_time - 
> INTERVAL '5' SECOND;
> -- Expected: "View 'non_existent_view' does not exist"
> -- TC-10: Duplicate watermark definition (should fail)
> CREATE VIEW test_error4
> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> AS SELECT event_id, event_time FROM 
>     (SELECT * FROM source_table WATERMARK FOR event_time AS event_time - 
> INTERVAL '10' SECOND);
> -- Expected: "View already has a watermark definition"
> {code}
> ----
> h2. Backward Compatibility
> h3. No Breaking Changes
> ✅ *Fully backward compatible*
>  * Existing views without watermark continue to work unchanged
>  * Watermark is stored as optional metadata in view options
>  * Older Flink versions ignore watermark options (graceful degradation)
> h3. Migration Path
> Users can adopt this feature gradually:
>  # {*}Phase 1{*}: Continue using existing views without watermark
>  # {*}Phase 2{*}: Create new views with watermark syntax
>  # {*}Phase 3{*}: Use {{ALTER VIEW SET WATERMARK}} to add watermark to 
> existing views incrementally
> ----
> h2. Performance Impact
> h3. Expected Impact
>  * ✅ *No regression* on views without watermark
>  * ✅ *Minimal overhead* for views with watermark (same as table watermark 
> processing)
>  * ✅ *Catalog overhead* negligible (storing 2 strings in options map)
> h3. Benchmarking Plan
>  * Measure query latency with and without view watermarks
>  * Compare catalog read/write performance
>  * Validate checkpoint size and recovery time
> ----
> h2. Related Issues
>  * FLINK-XXXXX: FLIP-296: Extend watermark-related features for SQL
>  * FLINK-XXXXX: Support event time in SQL views (if exists)
>  * FLINK-XXXXX: Improve watermark propagation in multi-source queries
> ----
> h2. Documentation Requirements
> h3. User Documentation
>  * Update SQL {{CREATE VIEW}} documentation page
>  * Add new {{ALTER VIEW SET WATERMARK}} documentation
>  * Add examples to "Event Time and Watermarks" guide
>  * Add troubleshooting section for common errors
> h3. Developer Documentation
>  * Architecture design document (FLIP or tech doc)
>  * Implementation guide for contributors
>  * JavaDoc/ScalaDoc for new classes and methods
> h2. Open Questions
> h3. Q1: Watermark Conflict in Joins
> *Question:* What happens when a view with watermark is joined with a table 
> with a different watermark?
> *Proposed Answer:* Follow existing Flink watermark propagation rules:
>  * For inner joins: Use the minimum watermark of both sides
>  * For outer joins: Depends on join type (LEFT/RIGHT/FULL)
> h3. Q2: Nested View Watermarks
> *Question:* How to handle watermark when a view is built on another view with 
> watermark?
> *Proposed Answer:* Child view's watermark overrides parent's watermark 
> (similar to how views override underlying table schemas).
> h3. Q3: ALTER VIEW DROP WATERMARK
> *Question:* Should we support removing watermark from views?
> *Proposed Answer:* Defer to Phase 2 (future enhancement). Initial 
> implementation focuses on ADD/UPDATE only.
> ----
> h2. Alternative Approaches Considered
> h3. Alternative 1: Query Hints
> {code:sql}
> SELECT /*+ WATERMARK('event_time', INTERVAL '5' SECOND) */ * FROM my_view;
> {code}
> ❌ {*}Rejected{*}: Not reusable, must be specified in every query
> h3. Alternative 2: Table Functions
> {code:sql}
> SELECT * FROM TABLE(with_watermark(my_view, 'event_time', INTERVAL '5' 
> SECOND));
> {code}
> ❌ {*}Rejected{*}: Verbose, unintuitive, doesn't leverage existing SQL syntax
> h3. Alternative 3: Materialized Views with Watermark
> {code:sql}
> CREATE MATERIALIZED VIEW mv WATERMARK FOR ... AS SELECT ...;
> {code}
> ❌ {*}Rejected{*}: Too heavyweight (requires physical storage), doesn't solve 
> abstraction problem
> *Conclusion:* View-level watermark definition is the cleanest approach.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to