stevenzwu commented on code in PR #12199: URL: https://github.com/apache/iceberg/pull/12199#discussion_r1966252062
########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.iceberg.util.JsonUtil; + +public class FlinkCreateTableOptions { Review Comment: this can be package private ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java: ########## @@ -332,7 +335,22 @@ public List<String> listTables(String databaseName) public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { Table table = loadIcebergTable(tablePath); - return toCatalogTable(table); + + // Flink's CREATE TABLE LIKE clause relies on properties sent back here to create new table. + // Inorder to create such table in non iceberg catalog, we need to send across catalog + // properties also. + // As Flink API accepts only Map<String, String> for props, here we are serializing catalog + // props as json string to distinguish between catalog and table properties in createTable. + String srcCatalogProps = + FlinkCreateTableOptions.toJson( + getName(), tablePath.getDatabaseName(), tablePath.getObjectName(), catalogProps); + + ImmutableMap.Builder<String, String> mergedProps = ImmutableMap.builder(); + mergedProps.put("connector", FlinkDynamicTableFactory.FACTORY_IDENTIFIER); + mergedProps.put(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps); + mergedProps.putAll(table.properties()); Review Comment: we should check there are no name collisions for the 2 reserved keys with `table.properties()` and fail explicitly with proper error msg. Otherwise, the error/exception can be misleading and confusing. ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java: ########## @@ -162,4 +162,21 @@ public void testWatermarkOptionsDescending() throws Exception { expected, SCHEMA_TS); } + + /** Test create table using LIKE */ + @Test + public void testFlinkTableUsingLIKE() throws Exception { Review Comment: nit: rename to `testCreateTableLikeDiffCatalog` ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java: ########## @@ -332,7 +335,22 @@ public List<String> listTables(String databaseName) public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { Table table = loadIcebergTable(tablePath); - return toCatalogTable(table); + + // Flink's CREATE TABLE LIKE clause relies on properties sent back here to create new table. + // Inorder to create such table in non iceberg catalog, we need to send across catalog + // properties also. + // As Flink API accepts only Map<String, String> for props, here we are serializing catalog + // props as json string to distinguish between catalog and table properties in createTable. + String srcCatalogProps = + FlinkCreateTableOptions.toJson( + getName(), tablePath.getDatabaseName(), tablePath.getObjectName(), catalogProps); + + ImmutableMap.Builder<String, String> mergedProps = ImmutableMap.builder(); + mergedProps.put("connector", FlinkDynamicTableFactory.FACTORY_IDENTIFIER); Review Comment: for consistency, the connector literal can be defined as a key too. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java: ########## @@ -97,17 +97,20 @@ public class FlinkCatalog extends AbstractCatalog { private final Namespace baseNamespace; private final SupportsNamespaces asNamespaceCatalog; private final Closeable closeable; + private final Map<String, String> catalogProps; private final boolean cacheEnabled; public FlinkCatalog( String catalogName, String defaultDatabase, Namespace baseNamespace, CatalogLoader catalogLoader, + Map<String, String> catalogProps, Review Comment: On paper, this is a public class and this change could break compatibility. Not sure if this class is used by users directly. I guess we can either mark this as `@Internal` or add a new constructor to keep this backward compatible? what's your take? @pvary @mxm ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.iceberg.util.JsonUtil; + +public class FlinkCreateTableOptions { + private final String catalogName; + private final String catalogDb; + private final String catalogTable; + private final Map<String, String> catalogProps; + + private FlinkCreateTableOptions( + String catalogName, String catalogDb, String catalogTable, Map<String, String> props) { + this.catalogName = catalogName; + this.catalogDb = catalogDb; + this.catalogTable = catalogTable; + this.catalogProps = props; + } + + public static final ConfigOption<String> CATALOG_NAME = + ConfigOptions.key("catalog-name") + .stringType() + .noDefaultValue() + .withDescription("Catalog name"); + + public static final ConfigOption<String> CATALOG_TYPE = + ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) + .stringType() + .noDefaultValue() + .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); + + public static final ConfigOption<String> CATALOG_DATABASE = + ConfigOptions.key("catalog-database") + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) + .withDescription("Database name managed in the iceberg catalog."); + + public static final ConfigOption<String> CATALOG_TABLE = + ConfigOptions.key("catalog-table") + .stringType() + .noDefaultValue() + .withDescription("Table name managed in the underlying iceberg catalog and database."); + + public static final ConfigOption<Map<String, String>> CATALOG_PROPS = + ConfigOptions.key("catalog-props") + .mapType() + .noDefaultValue() + .withDescription("Properties for the underlying catalog for iceberg table."); + + public static final String SRC_CATALOG_PROPS_KEY = "src-catalog"; Review Comment: `SRC_TABLE_PROPS_KEY` seems more accurate to me. `catalog props` typically means config like HMS endpoints etc. ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -188,6 +188,37 @@ public void testCreateTableLike() throws TableNotExistException { .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); } + @TestTemplate + public void testCreateTableLikeInDifferentCatalog() throws TableNotExistException { + sql("CREATE TABLE tl(id BIGINT)"); + sql("CREATE TABLE tl2 LIKE tl"); + + Table table = table("tl2"); + assertThat(table.schema().asStruct()) + .isEqualTo( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct()); + CatalogTable catalogTable = catalogTable("tl2"); + assertThat(catalogTable.getSchema()) + .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); + } + + @TestTemplate + public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException { Review Comment: is the method name mixed up with the one above? should this method be the `DifferentCatalog` case? ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -188,6 +188,37 @@ public void testCreateTableLike() throws TableNotExistException { .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); } + @TestTemplate + public void testCreateTableLikeInDifferentCatalog() throws TableNotExistException { Review Comment: isn't this the same test as the method above it? ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.iceberg.util.JsonUtil; + +public class FlinkCreateTableOptions { + private final String catalogName; + private final String catalogDb; + private final String catalogTable; + private final Map<String, String> catalogProps; + + private FlinkCreateTableOptions( + String catalogName, String catalogDb, String catalogTable, Map<String, String> props) { + this.catalogName = catalogName; + this.catalogDb = catalogDb; + this.catalogTable = catalogTable; + this.catalogProps = props; + } + + public static final ConfigOption<String> CATALOG_NAME = + ConfigOptions.key("catalog-name") + .stringType() + .noDefaultValue() + .withDescription("Catalog name"); + + public static final ConfigOption<String> CATALOG_TYPE = + ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) + .stringType() + .noDefaultValue() + .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); + + public static final ConfigOption<String> CATALOG_DATABASE = + ConfigOptions.key("catalog-database") + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) + .withDescription("Database name managed in the iceberg catalog."); + + public static final ConfigOption<String> CATALOG_TABLE = + ConfigOptions.key("catalog-table") + .stringType() + .noDefaultValue() + .withDescription("Table name managed in the underlying iceberg catalog and database."); + + public static final ConfigOption<Map<String, String>> CATALOG_PROPS = + ConfigOptions.key("catalog-props") + .mapType() + .noDefaultValue() + .withDescription("Properties for the underlying catalog for iceberg table."); + + public static final String SRC_CATALOG_PROPS_KEY = "src-catalog"; + + static String toJson( + String catalogName, String catalogDb, String catalogTable, Map<String, String> props) { + return JsonUtil.generate( + gen -> { + gen.writeStartObject(); + gen.writeStringField(CATALOG_NAME.key(), catalogName); + gen.writeStringField(CATALOG_DATABASE.key(), catalogDb); + gen.writeStringField(CATALOG_TABLE.key(), catalogTable); + JsonUtil.writeStringMap(CATALOG_PROPS.key(), props, gen); + gen.writeEndObject(); + }, + false); + } + + static FlinkCreateTableOptions fromJson(String createTableOptions) { + return JsonUtil.parse( + createTableOptions, + node -> { + String catalogName = JsonUtil.getString(CATALOG_NAME.key(), node); + String catalogDb = JsonUtil.getString(CATALOG_DATABASE.key(), node); + String catalogTable = JsonUtil.getString(CATALOG_TABLE.key(), node); + Map<String, String> catalogProps = JsonUtil.getStringMap(CATALOG_PROPS.key(), node); + + return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable, catalogProps); + }); + } + + String getCatalogName() { Review Comment: Iceberg coding style doesn't include `get` in getter method. so this should be just `catalogName()` ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.iceberg.util.JsonUtil; + +public class FlinkCreateTableOptions { + private final String catalogName; + private final String catalogDb; + private final String catalogTable; + private final Map<String, String> catalogProps; + + private FlinkCreateTableOptions( + String catalogName, String catalogDb, String catalogTable, Map<String, String> props) { + this.catalogName = catalogName; + this.catalogDb = catalogDb; + this.catalogTable = catalogTable; + this.catalogProps = props; + } + + public static final ConfigOption<String> CATALOG_NAME = + ConfigOptions.key("catalog-name") + .stringType() + .noDefaultValue() + .withDescription("Catalog name"); + + public static final ConfigOption<String> CATALOG_TYPE = + ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) + .stringType() + .noDefaultValue() + .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); + + public static final ConfigOption<String> CATALOG_DATABASE = + ConfigOptions.key("catalog-database") + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) + .withDescription("Database name managed in the iceberg catalog."); + + public static final ConfigOption<String> CATALOG_TABLE = + ConfigOptions.key("catalog-table") + .stringType() + .noDefaultValue() + .withDescription("Table name managed in the underlying iceberg catalog and database."); + + public static final ConfigOption<Map<String, String>> CATALOG_PROPS = + ConfigOptions.key("catalog-props") + .mapType() + .noDefaultValue() + .withDescription("Properties for the underlying catalog for iceberg table."); + + public static final String SRC_CATALOG_PROPS_KEY = "src-catalog"; + + static String toJson( + String catalogName, String catalogDb, String catalogTable, Map<String, String> props) { + return JsonUtil.generate( + gen -> { + gen.writeStartObject(); + gen.writeStringField(CATALOG_NAME.key(), catalogName); + gen.writeStringField(CATALOG_DATABASE.key(), catalogDb); + gen.writeStringField(CATALOG_TABLE.key(), catalogTable); + JsonUtil.writeStringMap(CATALOG_PROPS.key(), props, gen); Review Comment: based on the usage, it seems that `CATALOG_PROPS` should be `TABLE_PROPS` as Iceberg table properties ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java: ########## @@ -625,7 +649,7 @@ private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSc return partitionKeysBuilder.build(); } - static CatalogTable toCatalogTable(Table table) { + static CatalogTable toCatalogTableWithCustomProps(Table table, Map<String, String> customProps) { Review Comment: let's just call the 2nd arg `props` or `catalogTableProps`. I think `custom` would add more confusion. Same apply to the method name. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java: ########## @@ -406,7 +427,10 @@ void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolea for (Map.Entry<String, String> entry : table.getOptions().entrySet()) { if ("location".equalsIgnoreCase(entry.getKey())) { location = entry.getValue(); - } else { + } // Catalog props are added to support CREATE TABLE LIKE in getTable(). Review Comment: nit: put the comment lines inside the `else if` block. maybe even extract a `boolean isReservedKey(String key)` method? so sth like this might be a little easier to read. `location` should also be considered a reserved key. ``` if (!isReservedKey(entry.getKey())) { properties.put(...); } else { // handling reserved keys if ("location".equalsIgnoreCase(entry.getKey())) { location = entry.getValue(); } ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java: ########## @@ -162,4 +162,21 @@ public void testWatermarkOptionsDescending() throws Exception { expected, SCHEMA_TS); } + + /** Test create table using LIKE */ Review Comment: nit: this Javadoc seems redundant ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.iceberg.util.JsonUtil; + +public class FlinkCreateTableOptions { + private final String catalogName; + private final String catalogDb; + private final String catalogTable; + private final Map<String, String> catalogProps; + + private FlinkCreateTableOptions( + String catalogName, String catalogDb, String catalogTable, Map<String, String> props) { + this.catalogName = catalogName; + this.catalogDb = catalogDb; + this.catalogTable = catalogTable; + this.catalogProps = props; + } + + public static final ConfigOption<String> CATALOG_NAME = + ConfigOptions.key("catalog-name") + .stringType() + .noDefaultValue() + .withDescription("Catalog name"); + + public static final ConfigOption<String> CATALOG_TYPE = + ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) + .stringType() + .noDefaultValue() + .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); + + public static final ConfigOption<String> CATALOG_DATABASE = + ConfigOptions.key("catalog-database") + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) + .withDescription("Database name managed in the iceberg catalog."); + + public static final ConfigOption<String> CATALOG_TABLE = + ConfigOptions.key("catalog-table") + .stringType() + .noDefaultValue() + .withDescription("Table name managed in the underlying iceberg catalog and database."); + + public static final ConfigOption<Map<String, String>> CATALOG_PROPS = + ConfigOptions.key("catalog-props") + .mapType() + .noDefaultValue() + .withDescription("Properties for the underlying catalog for iceberg table."); + + public static final String SRC_CATALOG_PROPS_KEY = "src-catalog"; + + static String toJson( + String catalogName, String catalogDb, String catalogTable, Map<String, String> props) { Review Comment: nit: props -> catalogProps -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org