Gustavo de Morais created FLINK-39088:
-----------------------------------------
Summary: Preserve upsert keys for explicit (injective) CAST calls
to avoid unnecessary SinkUpsertMaterializer
Key: FLINK-39088
URL: https://issues.apache.org/jira/browse/FLINK-39088
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Affects Versions: 2.2.0
Reporter: Gustavo de Morais
Assignee: Gustavo de Morais
Fix For: 2.3.0
Currently, when a CAST operation is applied to a primary key or upsert key
column in a streaming query, Flink's optimizer loses track of the key even when
the cast is provably injective (one-to-one mapping). This causes the planner to
insert a SinkUpsertMaterializer operator unnecessarily, adding state overhead
and reducing performance.For example:
CREATE TABLE source (
id INT PRIMARY KEY NOT ENFORCED,
name STRING
) WITH ('connector' = 'kafka', ...);
CREATE TABLE sink (
id STRING PRIMARY KEY NOT ENFORCED,
name STRING
) WITH ('connector' = 'jdbc', ...);
INSERT INTO sink SELECT CAST(id AS STRING), name FROM source;
In this case, CAST(INT AS STRING) is injective - every distinct integer maps to
a distinct string representation. The upsert key should be preserved through
this cast. However, the current implementation treats any cast as potentially
key-destroying, requiring a SinkUpsertMaterializer to re-establish the key
before writing to the sink.Solution:Extend the key-tracking logic in
FlinkRelMdUniqueKeys to recognize injective casts as key-preserving operations.
An explicit cast is considered injective when every distinct input value maps
to a distinct output value.The following explicit casts to VARCHAR/CHAR will be
recognized as injective: * Integer types: TINYINT, SMALLINT, INTEGER, BIGINT
* Floating point types: FLOAT, DOUBLE
* BOOLEAN
* DATE
* Timestamp types: TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_TZ
--
This message was sent by Atlassian Jira
(v8.20.10#820010)