geyanggang opened a new issue, #10920:
URL: https://github.com/apache/gravitino/issues/10920

   ### Version
   
   main branch
   
   ### Describe what's wrong
   
   When a Paimon table contains a column with `MULTISET<STRING>` type, writing 
to the table through the Gravitino Flink Connector fails with a schema mismatch 
error. The same operation works correctly when using the native Paimon Flink 
Catalog directly (without Gravitino).
   
   Root cause: Gravitino's type system does not have a `MULTISET` type. In the 
Paimon catalog backend, `MULTISET<T>` is converted to `MapType<T, INTEGER>` (in 
`PaimonToGravitinoTypeVisitor.visit(MultisetType)`). When the Flink Connector 
reads the table back, it converts this `MapType` to Flink's `MAP<T, INT NOT 
NULL>` instead of restoring it to `MULTISET<T>`. Paimon's `FlinkTableFactory` 
then compares the Flink schema (`MAP<STRING, INT NOT NULL>`) against the store 
schema (`MULTISET<STRING>`) and throws an `IllegalArgumentException`.
   
   Additionally, `TypeUtils.toGravitinoType()` in the Flink Connector throws 
`UnsupportedOperationException` for `MULTISET` type, which also blocks table 
creation with MULTISET columns.
   
   ### Error message and/or stacktrace
   
   Caused by: java.lang.IllegalArgumentException: Flink schema and store schema 
are not the same,
   store schema is ROW<..., `field_multiset` MULTISET<STRING> COMMENT '测试字段', 
...>,
   Flink schema is ROW<..., `field_multiset` MAP<STRING, INT NOT NULL> '...', 
...> NOT NULL
       at 
org.apache.paimon.utils.Preconditions.checkArgument(Preconditions.java:149)
       at 
org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable(AbstractFlinkTableFactory.java:246)
       at 
org.apache.paimon.flink.AbstractFlinkTableFactory.createDynamicTableSink(AbstractFlinkTableFactory.java:127)
       at 
org.apache.paimon.flink.FlinkTableFactory.createDynamicTableSink(FlinkTableFactory.java:66)
       at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:335)
       at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:450)
       at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:227)
   
   
   ### How to reproduce
   
   1. Create a Paimon table with a `MULTISET<STRING>` column (either directly 
via Paimon or via Flink SQL without Gravitino):
   
   ```sql
   CREATE TABLE test_multiset (
       id BIGINT,
       field_multiset MULTISET<STRING>,
       PRIMARY KEY (id) NOT ENFORCED
   );
   ```
   
   1. Register the Paimon catalog in Gravitino
   2. Use the Gravitino Flink Connector (GravitinoCatalogStore) to access the 
table
   3. Attempt to write data to the table:
   
   ```sql
   INSERT INTO test_multiset VALUES (1, MULTISET['a', 'b', 'a']);
   ```
   
   1. The INSERT fails with the schema mismatch error above
   
   Environment: Flink 1.20.3, Paimon 1.3.1, Gravitino main branch
   
   ### Additional context
   
   The type conversion chain that causes this issue:
   Paimon store: MULTISET<STRING>
     → PaimonToGravitinoTypeVisitor.visit(MultisetType): MapType(STRING, 
INTEGER, valueNullable=false)
     → Flink Connector TypeUtils.toFlinkType(MAP): MAP<STRING, INT NOT NULL>
     → Paimon FlinkTableFactory compares: MAP<STRING, INT NOT NULL> ≠ 
MULTISET<STRING> → ERROR
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to