stevenzwu commented on code in PR #12199:
URL: https://github.com/apache/iceberg/pull/12199#discussion_r1966252062


##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class FlinkCreateTableOptions {

Review Comment:
   this can be package private



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -332,7 +335,22 @@ public List<String> listTables(String databaseName)
   public CatalogTable getTable(ObjectPath tablePath)
       throws TableNotExistException, CatalogException {
     Table table = loadIcebergTable(tablePath);
-    return toCatalogTable(table);
+
+    // Flink's CREATE TABLE LIKE clause relies on properties sent back here to 
create new table.
+    // Inorder to create such table in non iceberg catalog, we need to send 
across catalog
+    // properties also.
+    // As Flink API accepts only Map<String, String> for props, here we are 
serializing catalog
+    // props as json string to distinguish between catalog and table 
properties in createTable.
+    String srcCatalogProps =
+        FlinkCreateTableOptions.toJson(
+            getName(), tablePath.getDatabaseName(), tablePath.getObjectName(), 
catalogProps);
+
+    ImmutableMap.Builder<String, String> mergedProps = ImmutableMap.builder();
+    mergedProps.put("connector", FlinkDynamicTableFactory.FACTORY_IDENTIFIER);
+    mergedProps.put(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, 
srcCatalogProps);
+    mergedProps.putAll(table.properties());

Review Comment:
   we should check there are no name collisions for the 2 reserved keys with 
`table.properties()` and fail explicitly with proper error msg. Otherwise, the 
error/exception can be misleading and confusing.



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java:
##########
@@ -162,4 +162,21 @@ public void testWatermarkOptionsDescending() throws 
Exception {
         expected,
         SCHEMA_TS);
   }
+
+  /** Test create table using LIKE */
+  @Test
+  public void testFlinkTableUsingLIKE() throws Exception {

Review Comment:
   nit: rename to `testCreateTableLikeDiffCatalog`



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -332,7 +335,22 @@ public List<String> listTables(String databaseName)
   public CatalogTable getTable(ObjectPath tablePath)
       throws TableNotExistException, CatalogException {
     Table table = loadIcebergTable(tablePath);
-    return toCatalogTable(table);
+
+    // Flink's CREATE TABLE LIKE clause relies on properties sent back here to 
create new table.
+    // Inorder to create such table in non iceberg catalog, we need to send 
across catalog
+    // properties also.
+    // As Flink API accepts only Map<String, String> for props, here we are 
serializing catalog
+    // props as json string to distinguish between catalog and table 
properties in createTable.
+    String srcCatalogProps =
+        FlinkCreateTableOptions.toJson(
+            getName(), tablePath.getDatabaseName(), tablePath.getObjectName(), 
catalogProps);
+
+    ImmutableMap.Builder<String, String> mergedProps = ImmutableMap.builder();
+    mergedProps.put("connector", FlinkDynamicTableFactory.FACTORY_IDENTIFIER);

Review Comment:
   for consistency, the connector literal can be defined as a key too.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -97,17 +97,20 @@ public class FlinkCatalog extends AbstractCatalog {
   private final Namespace baseNamespace;
   private final SupportsNamespaces asNamespaceCatalog;
   private final Closeable closeable;
+  private final Map<String, String> catalogProps;
   private final boolean cacheEnabled;
 
   public FlinkCatalog(
       String catalogName,
       String defaultDatabase,
       Namespace baseNamespace,
       CatalogLoader catalogLoader,
+      Map<String, String> catalogProps,

Review Comment:
   On paper, this is a public class and this change could break compatibility. 
Not sure if this class is used by users directly. 
   
   I guess we can either mark this as `@Internal` or add a new constructor to 
keep this backward compatible?
   
   what's your take? @pvary @mxm 



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class FlinkCreateTableOptions {
+  private final String catalogName;
+  private final String catalogDb;
+  private final String catalogTable;
+  private final Map<String, String> catalogProps;
+
+  private FlinkCreateTableOptions(
+      String catalogName, String catalogDb, String catalogTable, Map<String, 
String> props) {
+    this.catalogName = catalogName;
+    this.catalogDb = catalogDb;
+    this.catalogTable = catalogTable;
+    this.catalogProps = props;
+  }
+
+  public static final ConfigOption<String> CATALOG_NAME =
+      ConfigOptions.key("catalog-name")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog name");
+
+  public static final ConfigOption<String> CATALOG_TYPE =
+      ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog type, the optional types are: custom, 
hadoop, hive.");
+
+  public static final ConfigOption<String> CATALOG_DATABASE =
+      ConfigOptions.key("catalog-database")
+          .stringType()
+          .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+          .withDescription("Database name managed in the iceberg catalog.");
+
+  public static final ConfigOption<String> CATALOG_TABLE =
+      ConfigOptions.key("catalog-table")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Table name managed in the underlying iceberg 
catalog and database.");
+
+  public static final ConfigOption<Map<String, String>> CATALOG_PROPS =
+      ConfigOptions.key("catalog-props")
+          .mapType()
+          .noDefaultValue()
+          .withDescription("Properties for the underlying catalog for iceberg 
table.");
+
+  public static final String SRC_CATALOG_PROPS_KEY = "src-catalog";

Review Comment:
   `SRC_TABLE_PROPS_KEY` seems more accurate to me. `catalog props` typically 
means config like HMS endpoints etc.



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java:
##########
@@ -188,6 +188,37 @@ public void testCreateTableLike() throws 
TableNotExistException {
         .isEqualTo(TableSchema.builder().field("id", 
DataTypes.BIGINT()).build());
   }
 
+  @TestTemplate
+  public void testCreateTableLikeInDifferentCatalog() throws 
TableNotExistException {
+    sql("CREATE TABLE tl(id BIGINT)");
+    sql("CREATE TABLE tl2 LIKE tl");
+
+    Table table = table("tl2");
+    assertThat(table.schema().asStruct())
+        .isEqualTo(
+            new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct());
+    CatalogTable catalogTable = catalogTable("tl2");
+    assertThat(catalogTable.getSchema())
+        .isEqualTo(TableSchema.builder().field("id", 
DataTypes.BIGINT()).build());
+  }
+
+  @TestTemplate
+  public void testCreateTableLikeInFlinkCatalog() throws 
TableNotExistException {

Review Comment:
   is the method name mixed up with the one above? should this method be the 
`DifferentCatalog` case?



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java:
##########
@@ -188,6 +188,37 @@ public void testCreateTableLike() throws 
TableNotExistException {
         .isEqualTo(TableSchema.builder().field("id", 
DataTypes.BIGINT()).build());
   }
 
+  @TestTemplate
+  public void testCreateTableLikeInDifferentCatalog() throws 
TableNotExistException {

Review Comment:
   isn't this the same test as the method above it?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class FlinkCreateTableOptions {
+  private final String catalogName;
+  private final String catalogDb;
+  private final String catalogTable;
+  private final Map<String, String> catalogProps;
+
+  private FlinkCreateTableOptions(
+      String catalogName, String catalogDb, String catalogTable, Map<String, 
String> props) {
+    this.catalogName = catalogName;
+    this.catalogDb = catalogDb;
+    this.catalogTable = catalogTable;
+    this.catalogProps = props;
+  }
+
+  public static final ConfigOption<String> CATALOG_NAME =
+      ConfigOptions.key("catalog-name")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog name");
+
+  public static final ConfigOption<String> CATALOG_TYPE =
+      ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog type, the optional types are: custom, 
hadoop, hive.");
+
+  public static final ConfigOption<String> CATALOG_DATABASE =
+      ConfigOptions.key("catalog-database")
+          .stringType()
+          .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+          .withDescription("Database name managed in the iceberg catalog.");
+
+  public static final ConfigOption<String> CATALOG_TABLE =
+      ConfigOptions.key("catalog-table")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Table name managed in the underlying iceberg 
catalog and database.");
+
+  public static final ConfigOption<Map<String, String>> CATALOG_PROPS =
+      ConfigOptions.key("catalog-props")
+          .mapType()
+          .noDefaultValue()
+          .withDescription("Properties for the underlying catalog for iceberg 
table.");
+
+  public static final String SRC_CATALOG_PROPS_KEY = "src-catalog";
+
+  static String toJson(
+      String catalogName, String catalogDb, String catalogTable, Map<String, 
String> props) {
+    return JsonUtil.generate(
+        gen -> {
+          gen.writeStartObject();
+          gen.writeStringField(CATALOG_NAME.key(), catalogName);
+          gen.writeStringField(CATALOG_DATABASE.key(), catalogDb);
+          gen.writeStringField(CATALOG_TABLE.key(), catalogTable);
+          JsonUtil.writeStringMap(CATALOG_PROPS.key(), props, gen);
+          gen.writeEndObject();
+        },
+        false);
+  }
+
+  static FlinkCreateTableOptions fromJson(String createTableOptions) {
+    return JsonUtil.parse(
+        createTableOptions,
+        node -> {
+          String catalogName = JsonUtil.getString(CATALOG_NAME.key(), node);
+          String catalogDb = JsonUtil.getString(CATALOG_DATABASE.key(), node);
+          String catalogTable = JsonUtil.getString(CATALOG_TABLE.key(), node);
+          Map<String, String> catalogProps = 
JsonUtil.getStringMap(CATALOG_PROPS.key(), node);
+
+          return new FlinkCreateTableOptions(catalogName, catalogDb, 
catalogTable, catalogProps);
+        });
+  }
+
+  String getCatalogName() {

Review Comment:
   Iceberg coding style doesn't include `get` in getter method. so this should 
be just `catalogName()`



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class FlinkCreateTableOptions {
+  private final String catalogName;
+  private final String catalogDb;
+  private final String catalogTable;
+  private final Map<String, String> catalogProps;
+
+  private FlinkCreateTableOptions(
+      String catalogName, String catalogDb, String catalogTable, Map<String, 
String> props) {
+    this.catalogName = catalogName;
+    this.catalogDb = catalogDb;
+    this.catalogTable = catalogTable;
+    this.catalogProps = props;
+  }
+
+  public static final ConfigOption<String> CATALOG_NAME =
+      ConfigOptions.key("catalog-name")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog name");
+
+  public static final ConfigOption<String> CATALOG_TYPE =
+      ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog type, the optional types are: custom, 
hadoop, hive.");
+
+  public static final ConfigOption<String> CATALOG_DATABASE =
+      ConfigOptions.key("catalog-database")
+          .stringType()
+          .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+          .withDescription("Database name managed in the iceberg catalog.");
+
+  public static final ConfigOption<String> CATALOG_TABLE =
+      ConfigOptions.key("catalog-table")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Table name managed in the underlying iceberg 
catalog and database.");
+
+  public static final ConfigOption<Map<String, String>> CATALOG_PROPS =
+      ConfigOptions.key("catalog-props")
+          .mapType()
+          .noDefaultValue()
+          .withDescription("Properties for the underlying catalog for iceberg 
table.");
+
+  public static final String SRC_CATALOG_PROPS_KEY = "src-catalog";
+
+  static String toJson(
+      String catalogName, String catalogDb, String catalogTable, Map<String, 
String> props) {
+    return JsonUtil.generate(
+        gen -> {
+          gen.writeStartObject();
+          gen.writeStringField(CATALOG_NAME.key(), catalogName);
+          gen.writeStringField(CATALOG_DATABASE.key(), catalogDb);
+          gen.writeStringField(CATALOG_TABLE.key(), catalogTable);
+          JsonUtil.writeStringMap(CATALOG_PROPS.key(), props, gen);

Review Comment:
   based on the usage, it seems that `CATALOG_PROPS` should be `TABLE_PROPS` as 
Iceberg table properties



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -625,7 +649,7 @@ private static List<String> toPartitionKeys(PartitionSpec 
spec, Schema icebergSc
     return partitionKeysBuilder.build();
   }
 
-  static CatalogTable toCatalogTable(Table table) {
+  static CatalogTable toCatalogTableWithCustomProps(Table table, Map<String, 
String> customProps) {

Review Comment:
   let's just call the 2nd arg `props` or `catalogTableProps`. I think `custom` 
would add more confusion. Same apply to the method name.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -406,7 +427,10 @@ void createIcebergTable(ObjectPath tablePath, 
ResolvedCatalogTable table, boolea
     for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
       if ("location".equalsIgnoreCase(entry.getKey())) {
         location = entry.getValue();
-      } else {
+      } // Catalog props are added to support CREATE TABLE LIKE in getTable().

Review Comment:
   nit: put the comment lines inside the `else if` block. 
   
   maybe even extract a `boolean isReservedKey(String key)` method? so sth like 
this might be a little easier to read. `location` should also be considered a 
reserved key.
   ```
   if (!isReservedKey(entry.getKey())) {
       properties.put(...);
   } else {
        // handling reserved keys
        if ("location".equalsIgnoreCase(entry.getKey())) {
           location = entry.getValue();
         
   }



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java:
##########
@@ -162,4 +162,21 @@ public void testWatermarkOptionsDescending() throws 
Exception {
         expected,
         SCHEMA_TS);
   }
+
+  /** Test create table using LIKE */

Review Comment:
   nit: this Javadoc seems redundant



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class FlinkCreateTableOptions {
+  private final String catalogName;
+  private final String catalogDb;
+  private final String catalogTable;
+  private final Map<String, String> catalogProps;
+
+  private FlinkCreateTableOptions(
+      String catalogName, String catalogDb, String catalogTable, Map<String, 
String> props) {
+    this.catalogName = catalogName;
+    this.catalogDb = catalogDb;
+    this.catalogTable = catalogTable;
+    this.catalogProps = props;
+  }
+
+  public static final ConfigOption<String> CATALOG_NAME =
+      ConfigOptions.key("catalog-name")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog name");
+
+  public static final ConfigOption<String> CATALOG_TYPE =
+      ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog type, the optional types are: custom, 
hadoop, hive.");
+
+  public static final ConfigOption<String> CATALOG_DATABASE =
+      ConfigOptions.key("catalog-database")
+          .stringType()
+          .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+          .withDescription("Database name managed in the iceberg catalog.");
+
+  public static final ConfigOption<String> CATALOG_TABLE =
+      ConfigOptions.key("catalog-table")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Table name managed in the underlying iceberg 
catalog and database.");
+
+  public static final ConfigOption<Map<String, String>> CATALOG_PROPS =
+      ConfigOptions.key("catalog-props")
+          .mapType()
+          .noDefaultValue()
+          .withDescription("Properties for the underlying catalog for iceberg 
table.");
+
+  public static final String SRC_CATALOG_PROPS_KEY = "src-catalog";
+
+  static String toJson(
+      String catalogName, String catalogDb, String catalogTable, Map<String, 
String> props) {

Review Comment:
   nit: props -> catalogProps



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to