pvary commented on code in PR #9606:
URL: https://github.com/apache/iceberg/pull/9606#discussion_r1502277710


##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java:
##########
@@ -68,6 +73,42 @@ public static Schema convert(TableSchema schema) {
     return freshIdentifierFieldIds(iSchema, schema);
   }
 
+  /** Convert the flink table schema to apache iceberg schema with column 
comment. */
+  public static Schema convert(TableSchema schema, Map<String, String> 
columnComments) {

Review Comment:
   I think it would be better to not introduce new methods which are using the 
deprecated `TableSchema`.
   
   Maybe we could something like this instead:
   ```java
     /** Convert the flink table schema to apache iceberg schema with column 
comment. */
     public static Schema convert(ResolvedCatalogTable catalogTable) {
       List<Column> tableColumns = 
catalogTable.getResolvedSchema().getColumns();
       // copy from org.apache.flink.table.api.Schema#toRowDataType
       DataTypes.Field[] fields =
           tableColumns.stream()
               .map(
                   column -> {
                     if (column.getComment().isPresent()) {
                       return DataTypes.FIELD(column.getName(), 
column.getDataType(), column.getComment().get());
                     } else {
                       return DataTypes.FIELD(column.getName(), 
column.getDataType());
                     }
                   })
             .toArray(DataTypes.Field[]::new);
   
       LogicalType schemaType = 
DataTypes.ROW(fields).notNull().getLogicalType();
       Preconditions.checkArgument(
           schemaType instanceof RowType, "Schema logical type should be 
RowType.");
   
       RowType root = (RowType) schemaType;
       Type converted = root.accept(new FlinkTypeToType(root));
       Schema iSchema = new Schema(converted.asStructType().fields());
       return freshIdentifierFieldIds(iSchema, catalogTable.getSchema());
     }
   ```
   
   Since `Catalog.createTable` javadoc says (since Flink 1.13.0 / 
[FLINK-21396](https://issues.apache.org/jira/browse/FLINK-21396)):
   ```
   Creates a new table or view.
   The framework will make sure to call this method with fully validated
   ResolvedCatalogTable or ResolvedCatalogView. Those instances are 
   easy to serialize for a durable catalog implementation.
   ```
   
   We might be able to change the `FlinkCatalog.createTable` to cast the table 
parameter to `ResolvedCatalogTable`
   ```
     @Override
     public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
         throws CatalogException, TableAlreadyExistException {
       if (Objects.equals(
           table.getOptions().get("connector"), 
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
         throw new IllegalArgumentException(
             "Cannot create the table with 'connector'='iceberg' table property 
in "
                 + "an iceberg catalog, Please create table with 
'connector'='iceberg' property in a non-iceberg catalog or "
                 + "create table without 'connector'='iceberg' related 
properties in an iceberg table.");
       }
   
       Preconditions.
       createIcebergTable(tablePath, table, ignoreIfExists);
     }
   ```
   
   and `FlinkCatalog.createIcebergTable` to accept only `ResolvedCatalogTable`, 
like:
   ```
     void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, 
boolean ignoreIfExists)
         throws CatalogException, TableAlreadyExistException {
       validateFlinkTable(table);
   ```
   
   In this case we do not need the `instanceof` check, and we also would move 
forward to remove the usage of the deprecated `TableSchema`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to