pvary commented on code in PR #9606: URL: https://github.com/apache/iceberg/pull/9606#discussion_r1502277710
########## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java: ########## @@ -68,6 +73,42 @@ public static Schema convert(TableSchema schema) { return freshIdentifierFieldIds(iSchema, schema); } + /** Convert the flink table schema to apache iceberg schema with column comment. */ + public static Schema convert(TableSchema schema, Map<String, String> columnComments) { Review Comment: I think it would be better to not introduce new methods which are using the deprecated `TableSchema`. Maybe we could something like this instead: ```java /** Convert the flink table schema to apache iceberg schema with column comment. */ public static Schema convert(ResolvedCatalogTable catalogTable) { List<Column> tableColumns = catalogTable.getResolvedSchema().getColumns(); // copy from org.apache.flink.table.api.Schema#toRowDataType DataTypes.Field[] fields = tableColumns.stream() .map( column -> { if (column.getComment().isPresent()) { return DataTypes.FIELD(column.getName(), column.getDataType(), column.getComment().get()); } else { return DataTypes.FIELD(column.getName(), column.getDataType()); } }) .toArray(DataTypes.Field[]::new); LogicalType schemaType = DataTypes.ROW(fields).notNull().getLogicalType(); Preconditions.checkArgument( schemaType instanceof RowType, "Schema logical type should be RowType."); RowType root = (RowType) schemaType; Type converted = root.accept(new FlinkTypeToType(root)); Schema iSchema = new Schema(converted.asStructType().fields()); return freshIdentifierFieldIds(iSchema, catalogTable.getSchema()); } ``` Since `Catalog.createTable` javadoc says (since Flink 1.13.0 / [FLINK-21396](https://issues.apache.org/jira/browse/FLINK-21396)): ``` Creates a new table or view. The framework will make sure to call this method with fully validated ResolvedCatalogTable or ResolvedCatalogView. Those instances are easy to serialize for a durable catalog implementation. ``` We might be able to change the `FlinkCatalog.createTable` to cast the table parameter to `ResolvedCatalogTable` ``` @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws CatalogException, TableAlreadyExistException { if (Objects.equals( table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) { throw new IllegalArgumentException( "Cannot create the table with 'connector'='iceberg' table property in " + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or " + "create table without 'connector'='iceberg' related properties in an iceberg table."); } Preconditions. createIcebergTable(tablePath, table, ignoreIfExists); } ``` and `FlinkCatalog.createIcebergTable` to accept only `ResolvedCatalogTable`, like: ``` void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolean ignoreIfExists) throws CatalogException, TableAlreadyExistException { validateFlinkTable(table); ``` In this case we do not need the `instanceof` check, and we also would move forward to remove the usage of the deprecated `TableSchema` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org