[ 
https://issues.apache.org/jira/browse/FLINK-39088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gustavo de Morais updated FLINK-39088:
--------------------------------------
    Description: 
Currently, when a CAST operation is applied to a primary key or upsert key 
column in a streaming query, Flink's optimizer loses track of the key even when 
the cast is provably injective (one-to-one mapping). This causes the planner to 
insert a SinkUpsertMaterializer operator unnecessarily, adding state overhead 
and reducing performance. For example:

CREATE TABLE source (
  id INT PRIMARY KEY NOT ENFORCED,
  name STRING
);
 
CREATE TABLE sink (
  id STRING PRIMARY KEY NOT ENFORCED,
  name STRING  
);
 
INSERT INTO sink SELECT CAST(id AS STRING), name FROM source;
 
In this case, CAST(INT AS STRING) is injective - every distinct integer maps to 
a distinct string representation. The upsert key should be preserved through 
this cast. However, the current implementation treats any cast as potentially 
key-destroying, requiring a SinkUpsertMaterializer to re-establish the key 
before writing to the sink.Solution:Extend the key-tracking logic in 
FlinkRelMdUniqueKeys to recognize injective casts as key-preserving operations. 
An explicit cast is considered injective when every distinct input value maps 
to a distinct output value.The following explicit casts to VARCHAR/CHAR will be 
recognized as injective:

 

- Integer types: TINYINT, SMALLINT, INTEGER, BIGINT (biggest win)

The following can also be added but are up to discussion since they're more 
rarely used as unique keys

- Floating point types: FLOAT, DOUBLE
- BOOLEAN
- DATE
- Timestamp types: TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_TZ

  was:
Currently, when a CAST operation is applied to a primary key or upsert key 
column in a streaming query, Flink's optimizer loses track of the key even when 
the cast is provably injective (one-to-one mapping). This causes the planner to 
insert a SinkUpsertMaterializer operator unnecessarily, adding state overhead 
and reducing performance. For example:

CREATE TABLE source (
  id INT PRIMARY KEY NOT ENFORCED,
  name STRING
);
 
CREATE TABLE sink (
  id STRING PRIMARY KEY NOT ENFORCED,
  name STRING  
);
 
INSERT INTO sink SELECT CAST(id AS STRING), name FROM source;
 
In this case, CAST(INT AS STRING) is injective - every distinct integer maps to 
a distinct string representation. The upsert key should be preserved through 
this cast. However, the current implementation treats any cast as potentially 
key-destroying, requiring a SinkUpsertMaterializer to re-establish the key 
before writing to the sink.Solution:Extend the key-tracking logic in 
FlinkRelMdUniqueKeys to recognize injective casts as key-preserving operations. 
An explicit cast is considered injective when every distinct input value maps 
to a distinct output value.The following explicit casts to VARCHAR/CHAR will be 
recognized as injective: * Integer types: TINYINT, SMALLINT, INTEGER, BIGINT
 * Floating point types: FLOAT, DOUBLE

 * BOOLEAN

 * DATE

 * Timestamp types: TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_TZ


> Preserve upsert keys for explicit (injective) CAST calls to avoid unnecessary 
> SinkUpsertMaterializer
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39088
>                 URL: https://issues.apache.org/jira/browse/FLINK-39088
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 2.2.0
>            Reporter: Gustavo de Morais
>            Assignee: Gustavo de Morais
>            Priority: Major
>             Fix For: 2.3.0
>
>
> Currently, when a CAST operation is applied to a primary key or upsert key 
> column in a streaming query, Flink's optimizer loses track of the key even 
> when the cast is provably injective (one-to-one mapping). This causes the 
> planner to insert a SinkUpsertMaterializer operator unnecessarily, adding 
> state overhead and reducing performance. For example:
> CREATE TABLE source (
>   id INT PRIMARY KEY NOT ENFORCED,
>   name STRING
> );
>  
> CREATE TABLE sink (
>   id STRING PRIMARY KEY NOT ENFORCED,
>   name STRING  
> );
>  
> INSERT INTO sink SELECT CAST(id AS STRING), name FROM source;
>  
> In this case, CAST(INT AS STRING) is injective - every distinct integer maps 
> to a distinct string representation. The upsert key should be preserved 
> through this cast. However, the current implementation treats any cast as 
> potentially key-destroying, requiring a SinkUpsertMaterializer to 
> re-establish the key before writing to the sink.Solution:Extend the 
> key-tracking logic in FlinkRelMdUniqueKeys to recognize injective casts as 
> key-preserving operations. An explicit cast is considered injective when 
> every distinct input value maps to a distinct output value.The following 
> explicit casts to VARCHAR/CHAR will be recognized as injective:
>  
> - Integer types: TINYINT, SMALLINT, INTEGER, BIGINT (biggest win)
> The following can also be added but are up to discussion since they're more 
> rarely used as unique keys
> - Floating point types: FLOAT, DOUBLE
> - BOOLEAN
> - DATE
> - Timestamp types: TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_TZ



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to