geyanggang opened a new issue, #10920:
URL: https://github.com/apache/gravitino/issues/10920
### Version
main branch
### Describe what's wrong
When a Paimon table contains a column with `MULTISET<STRING>` type, writing
to the table through the Gravitino Flink Connector fails with a schema mismatch
error. The same operation works correctly when using the native Paimon Flink
Catalog directly (without Gravitino).
Root cause: Gravitino's type system does not have a `MULTISET` type. In the
Paimon catalog backend, `MULTISET<T>` is converted to `MapType<T, INTEGER>` (in
`PaimonToGravitinoTypeVisitor.visit(MultisetType)`). When the Flink Connector
reads the table back, it converts this `MapType` to Flink's `MAP<T, INT NOT
NULL>` instead of restoring it to `MULTISET<T>`. Paimon's `FlinkTableFactory`
then compares the Flink schema (`MAP<STRING, INT NOT NULL>`) against the store
schema (`MULTISET<STRING>`) and throws an `IllegalArgumentException`.
Additionally, `TypeUtils.toGravitinoType()` in the Flink Connector throws
`UnsupportedOperationException` for `MULTISET` type, which also blocks table
creation with MULTISET columns.
### Error message and/or stacktrace
Caused by: java.lang.IllegalArgumentException: Flink schema and store schema
are not the same,
store schema is ROW<..., `field_multiset` MULTISET<STRING> COMMENT '测试字段',
...>,
Flink schema is ROW<..., `field_multiset` MAP<STRING, INT NOT NULL> '...',
...> NOT NULL
at
org.apache.paimon.utils.Preconditions.checkArgument(Preconditions.java:149)
at
org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable(AbstractFlinkTableFactory.java:246)
at
org.apache.paimon.flink.AbstractFlinkTableFactory.createDynamicTableSink(AbstractFlinkTableFactory.java:127)
at
org.apache.paimon.flink.FlinkTableFactory.createDynamicTableSink(FlinkTableFactory.java:66)
at
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:335)
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:450)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:227)
### How to reproduce
1. Create a Paimon table with a `MULTISET<STRING>` column (either directly
via Paimon or via Flink SQL without Gravitino):
```sql
CREATE TABLE test_multiset (
id BIGINT,
field_multiset MULTISET<STRING>,
PRIMARY KEY (id) NOT ENFORCED
);
```
1. Register the Paimon catalog in Gravitino
2. Use the Gravitino Flink Connector (GravitinoCatalogStore) to access the
table
3. Attempt to write data to the table:
```sql
INSERT INTO test_multiset VALUES (1, MULTISET['a', 'b', 'a']);
```
1. The INSERT fails with the schema mismatch error above
Environment: Flink 1.20.3, Paimon 1.3.1, Gravitino main branch
### Additional context
The type conversion chain that causes this issue:
Paimon store: MULTISET<STRING>
→ PaimonToGravitinoTypeVisitor.visit(MultisetType): MapType(STRING,
INTEGER, valueNullable=false)
→ Flink Connector TypeUtils.toFlinkType(MAP): MAP<STRING, INT NOT NULL>
→ Paimon FlinkTableFactory compares: MAP<STRING, INT NOT NULL> ≠
MULTISET<STRING> → ERROR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]