[
https://issues.apache.org/jira/browse/FLINK-39088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gustavo de Morais updated FLINK-39088:
--------------------------------------
Description:
Currently, when a CAST operation is applied to a primary key or upsert key
column in a streaming query, Flink's optimizer loses track of the key even when
the cast is provably injective (one-to-one mapping). This causes the planner to
insert a SinkUpsertMaterializer operator unnecessarily, adding state overhead
and reducing performance. For example:
CREATE TABLE source (
id INT PRIMARY KEY NOT ENFORCED,
name STRING
);
CREATE TABLE sink (
id STRING PRIMARY KEY NOT ENFORCED,
name STRING
);
INSERT INTO sink SELECT CAST(id AS STRING), name FROM source;
In this case, CAST(INT AS STRING) is injective - every distinct integer maps to
a distinct string representation. The upsert key should be preserved through
this cast. However, the current implementation treats any cast as potentially
key-destroying, requiring a SinkUpsertMaterializer to re-establish the key
before writing to the sink.Solution:Extend the key-tracking logic in
FlinkRelMdUniqueKeys to recognize injective casts as key-preserving operations.
An explicit cast is considered injective when every distinct input value maps
to a distinct output value.The following explicit casts to VARCHAR/CHAR will be
recognized as injective:
- Integer types: TINYINT, SMALLINT, INTEGER, BIGINT (biggest win)
The following can also be added but are up to discussion since they're more
rarely used as unique keys
- Floating point types: FLOAT, DOUBLE
- BOOLEAN
- DATE
- Timestamp types: TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_TZ
was:
Currently, when a CAST operation is applied to a primary key or upsert key
column in a streaming query, Flink's optimizer loses track of the key even when
the cast is provably injective (one-to-one mapping). This causes the planner to
insert a SinkUpsertMaterializer operator unnecessarily, adding state overhead
and reducing performance. For example:
CREATE TABLE source (
id INT PRIMARY KEY NOT ENFORCED,
name STRING
);
CREATE TABLE sink (
id STRING PRIMARY KEY NOT ENFORCED,
name STRING
);
INSERT INTO sink SELECT CAST(id AS STRING), name FROM source;
In this case, CAST(INT AS STRING) is injective - every distinct integer maps to
a distinct string representation. The upsert key should be preserved through
this cast. However, the current implementation treats any cast as potentially
key-destroying, requiring a SinkUpsertMaterializer to re-establish the key
before writing to the sink.Solution:Extend the key-tracking logic in
FlinkRelMdUniqueKeys to recognize injective casts as key-preserving operations.
An explicit cast is considered injective when every distinct input value maps
to a distinct output value.The following explicit casts to VARCHAR/CHAR will be
recognized as injective: * Integer types: TINYINT, SMALLINT, INTEGER, BIGINT
* Floating point types: FLOAT, DOUBLE
* BOOLEAN
* DATE
* Timestamp types: TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_TZ
> Preserve upsert keys for explicit (injective) CAST calls to avoid unnecessary
> SinkUpsertMaterializer
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-39088
> URL: https://issues.apache.org/jira/browse/FLINK-39088
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 2.2.0
> Reporter: Gustavo de Morais
> Assignee: Gustavo de Morais
> Priority: Major
> Fix For: 2.3.0
>
>
> Currently, when a CAST operation is applied to a primary key or upsert key
> column in a streaming query, Flink's optimizer loses track of the key even
> when the cast is provably injective (one-to-one mapping). This causes the
> planner to insert a SinkUpsertMaterializer operator unnecessarily, adding
> state overhead and reducing performance. For example:
> CREATE TABLE source (
> id INT PRIMARY KEY NOT ENFORCED,
> name STRING
> );
>
> CREATE TABLE sink (
> id STRING PRIMARY KEY NOT ENFORCED,
> name STRING
> );
>
> INSERT INTO sink SELECT CAST(id AS STRING), name FROM source;
>
> In this case, CAST(INT AS STRING) is injective - every distinct integer maps
> to a distinct string representation. The upsert key should be preserved
> through this cast. However, the current implementation treats any cast as
> potentially key-destroying, requiring a SinkUpsertMaterializer to
> re-establish the key before writing to the sink.Solution:Extend the
> key-tracking logic in FlinkRelMdUniqueKeys to recognize injective casts as
> key-preserving operations. An explicit cast is considered injective when
> every distinct input value maps to a distinct output value.The following
> explicit casts to VARCHAR/CHAR will be recognized as injective:
>
> - Integer types: TINYINT, SMALLINT, INTEGER, BIGINT (biggest win)
> The following can also be added but are up to discussion since they're more
> rarely used as unique keys
> - Floating point types: FLOAT, DOUBLE
> - BOOLEAN
> - DATE
> - Timestamp types: TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_TZ
--
This message was sent by Atlassian Jira
(v8.20.10#820010)