stevenzwu commented on code in PR #7628:
URL: https://github.com/apache/iceberg/pull/7628#discussion_r1248843159


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -472,11 +482,16 @@ public void alterTable(ObjectPath tablePath, 
CatalogBaseTable newTable, boolean
 
     CatalogTable table = toCatalogTable(icebergTable);
 
-    // Currently, Flink SQL only support altering table properties.
+    // This alterTable API only supports altering table properties.
 
-    // For current Flink Catalog API, support for adding/removing/renaming 
columns cannot be done by
+    // Support for adding/removing/renaming columns cannot be done by
     // comparing
     // CatalogTable instances, unless the Flink schema contains Iceberg column 
IDs.
+
+    // To alter columns, use the other alterTable API and provide a list of 
TableChange's.
+    LOG.warn(

Review Comment:
   what is the purpose of this warning msg?



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.util;
+
+import java.util.List;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+
+public class FlinkAlterTableUtil {
+  private FlinkAlterTableUtil() {}
+
+  /**
+   * Applies a list of Flink table changes to an {@link UpdateSchema} 
operation.
+   *
+   * @param pendingUpdate an uncommitted UpdateSchema operation to configure
+   * @param schemaChanges a list of Flink table changes
+   */
+  public static void applySchemaChanges(
+      UpdateSchema pendingUpdate, List<TableChange> schemaChanges) {
+    for (TableChange change : schemaChanges) {
+      if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        Column flinkColumn = addColumn.getColumn();
+        Preconditions.checkArgument(
+            FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
+            "Adding computed columns is not supported yet: %s",
+            flinkColumn.getName());
+        Type icebergType = 
FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
+        pendingUpdate.addColumn(flinkColumn.getName(), icebergType);
+
+      } else if (change instanceof TableChange.ModifyColumn) {
+        TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) 
change;
+        applyModifyColumn(pendingUpdate, modifyColumn);
+
+      } else if (change instanceof TableChange.DropColumn) {
+        TableChange.DropColumn dropColumn = (TableChange.DropColumn) change;
+        pendingUpdate.deleteColumn(dropColumn.getColumnName());
+
+      } else if (change instanceof TableChange.AddWatermark) {
+        throw new UnsupportedOperationException("Adding watermark specs is not 
supported yet. ");
+
+      } else if (change instanceof TableChange.ModifyWatermark) {
+        throw new UnsupportedOperationException("Modifying watermark specs is 
not supported yet. ");
+
+      } else if (change instanceof TableChange.DropWatermark) {
+        throw new UnsupportedOperationException("Watermark specs is not 
supported yet. ");
+
+      } else if (change instanceof TableChange.AddUniqueConstraint) {
+        TableChange.AddUniqueConstraint addPk = 
(TableChange.AddUniqueConstraint) change;
+        applyUniqueConstraint(pendingUpdate, addPk.getConstraint());
+
+      } else if (change instanceof TableChange.ModifyUniqueConstraint) {
+        TableChange.ModifyUniqueConstraint modifyPk = 
(TableChange.ModifyUniqueConstraint) change;
+        applyUniqueConstraint(pendingUpdate, modifyPk.getNewConstraint());
+
+      } else if (change instanceof TableChange.DropConstraint) {
+        throw new UnsupportedOperationException("Dropping constraints is not 
supported yet. ");
+
+      } else {
+        throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + change);
+      }
+    }
+  }
+
+  /**
+   * Applies a list of Flink table changes to an {@link UpdateProperties} 
operation.

Review Comment:
   nit: javadoc should be updated for property change



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.util;
+
+import java.util.List;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+
+public class FlinkAlterTableUtil {
+  private FlinkAlterTableUtil() {}
+
+  /**
+   * Applies a list of Flink table changes to an {@link UpdateSchema} 
operation.
+   *
+   * @param pendingUpdate an uncommitted UpdateSchema operation to configure
+   * @param schemaChanges a list of Flink table changes
+   */
+  public static void applySchemaChanges(
+      UpdateSchema pendingUpdate, List<TableChange> schemaChanges) {
+    for (TableChange change : schemaChanges) {
+      if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        Column flinkColumn = addColumn.getColumn();
+        Preconditions.checkArgument(
+            FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
+            "Adding computed columns is not supported yet: %s",
+            flinkColumn.getName());
+        Type icebergType = 
FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
+        pendingUpdate.addColumn(flinkColumn.getName(), icebergType);
+

Review Comment:
   nit: this empty line doesn't seem to following coding style. please check 
all places.



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.util;
+
+import java.util.List;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+
+public class FlinkAlterTableUtil {
+  private FlinkAlterTableUtil() {}
+
+  /**
+   * Applies a list of Flink table changes to an {@link UpdateSchema} 
operation.
+   *
+   * @param pendingUpdate an uncommitted UpdateSchema operation to configure
+   * @param schemaChanges a list of Flink table changes
+   */
+  public static void applySchemaChanges(
+      UpdateSchema pendingUpdate, List<TableChange> schemaChanges) {
+    for (TableChange change : schemaChanges) {
+      if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        Column flinkColumn = addColumn.getColumn();
+        Preconditions.checkArgument(
+            FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
+            "Adding computed columns is not supported yet: %s",
+            flinkColumn.getName());
+        Type icebergType = 
FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
+        pendingUpdate.addColumn(flinkColumn.getName(), icebergType);
+
+      } else if (change instanceof TableChange.ModifyColumn) {
+        TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) 
change;
+        applyModifyColumn(pendingUpdate, modifyColumn);
+
+      } else if (change instanceof TableChange.DropColumn) {
+        TableChange.DropColumn dropColumn = (TableChange.DropColumn) change;
+        pendingUpdate.deleteColumn(dropColumn.getColumnName());
+
+      } else if (change instanceof TableChange.AddWatermark) {
+        throw new UnsupportedOperationException("Adding watermark specs is not 
supported yet. ");
+
+      } else if (change instanceof TableChange.ModifyWatermark) {
+        throw new UnsupportedOperationException("Modifying watermark specs is 
not supported yet. ");
+
+      } else if (change instanceof TableChange.DropWatermark) {
+        throw new UnsupportedOperationException("Watermark specs is not 
supported yet. ");
+
+      } else if (change instanceof TableChange.AddUniqueConstraint) {
+        TableChange.AddUniqueConstraint addPk = 
(TableChange.AddUniqueConstraint) change;
+        applyUniqueConstraint(pendingUpdate, addPk.getConstraint());
+
+      } else if (change instanceof TableChange.ModifyUniqueConstraint) {
+        TableChange.ModifyUniqueConstraint modifyPk = 
(TableChange.ModifyUniqueConstraint) change;
+        applyUniqueConstraint(pendingUpdate, modifyPk.getNewConstraint());
+
+      } else if (change instanceof TableChange.DropConstraint) {
+        throw new UnsupportedOperationException("Dropping constraints is not 
supported yet. ");
+
+      } else {
+        throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + change);
+      }
+    }
+  }
+
+  /**
+   * Applies a list of Flink table changes to an {@link UpdateProperties} 
operation.
+   *
+   * @param pendingUpdate an uncommitted UpdateProperty operation to configure
+   * @param propertyChanges a list of Flink table changes
+   */
+  public static void applyPropertyChanges(
+      UpdateProperties pendingUpdate, List<TableChange> propertyChanges) {
+    for (TableChange change : propertyChanges) {
+      if (change instanceof TableChange.SetOption) {
+        TableChange.SetOption setOption = (TableChange.SetOption) change;
+        pendingUpdate.set(setOption.getKey(), setOption.getValue());
+
+      } else if (change instanceof TableChange.ResetOption) {
+        TableChange.ResetOption resetOption = (TableChange.ResetOption) change;
+        pendingUpdate.remove(resetOption.getKey());
+
+      } else {
+        throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + change);
+      }
+    }
+  }
+
+  private static void applyModifyColumn(
+      UpdateSchema pendingUpdate, TableChange.ModifyColumn modifyColumn) {
+    if (modifyColumn instanceof TableChange.ModifyColumnName) {
+      TableChange.ModifyColumnName modifyName = (TableChange.ModifyColumnName) 
modifyColumn;
+      pendingUpdate.renameColumn(modifyName.getOldColumnName(), 
modifyName.getNewColumnName());
+
+    } else if (modifyColumn instanceof TableChange.ModifyColumnPosition) {
+      TableChange.ModifyColumnPosition modifyPosition =
+          (TableChange.ModifyColumnPosition) modifyColumn;
+      applyModifyColumnPosition(pendingUpdate, modifyPosition);
+
+    } else if (modifyColumn instanceof TableChange.ModifyPhysicalColumnType) {
+      TableChange.ModifyPhysicalColumnType modifyType =
+          (TableChange.ModifyPhysicalColumnType) modifyColumn;
+      Type type = 
FlinkSchemaUtil.convert(modifyType.getNewType().getLogicalType());
+      String columnName = modifyType.getOldColumn().getName();
+      pendingUpdate.updateColumn(columnName, type.asPrimitiveType());
+      if 
(modifyType.getNewColumn().getDataType().getLogicalType().isNullable()) {
+        pendingUpdate.makeColumnOptional(columnName);
+      } else {
+        pendingUpdate.requireColumn(columnName);
+      }
+
+    } else if (modifyColumn instanceof TableChange.ModifyColumnComment) {
+      TableChange.ModifyColumnComment modifyComment =
+          (TableChange.ModifyColumnComment) modifyColumn;
+      pendingUpdate.updateColumnDoc(
+          modifyComment.getOldColumn().getName(), 
modifyComment.getNewComment());
+
+    } else {
+      throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + modifyColumn);
+    }
+  }
+
+  private static void applyModifyColumnPosition(
+      UpdateSchema pendingUpdate, TableChange.ModifyColumnPosition 
modifyColumnPosition) {
+    TableChange.ColumnPosition newPosition = 
modifyColumnPosition.getNewPosition();
+    if (newPosition instanceof TableChange.First) {
+      pendingUpdate.moveFirst(modifyColumnPosition.getOldColumn().getName());
+
+    } else if (newPosition instanceof TableChange.After) {
+      TableChange.After after = (TableChange.After) newPosition;
+      pendingUpdate.moveAfter(modifyColumnPosition.getOldColumn().getName(), 
after.column());
+
+    } else {
+      throw new UnsupportedOperationException(
+          "Cannot apply unknown table change: " + modifyColumnPosition);

Review Comment:
   nit: more specific error msg



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.util;
+
+import java.util.List;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+
+public class FlinkAlterTableUtil {
+  private FlinkAlterTableUtil() {}
+
+  /**
+   * Applies a list of Flink table changes to an {@link UpdateSchema} 
operation.
+   *
+   * @param pendingUpdate an uncommitted UpdateSchema operation to configure
+   * @param schemaChanges a list of Flink table changes
+   */
+  public static void applySchemaChanges(
+      UpdateSchema pendingUpdate, List<TableChange> schemaChanges) {
+    for (TableChange change : schemaChanges) {
+      if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        Column flinkColumn = addColumn.getColumn();
+        Preconditions.checkArgument(
+            FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
+            "Adding computed columns is not supported yet: %s",
+            flinkColumn.getName());
+        Type icebergType = 
FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
+        pendingUpdate.addColumn(flinkColumn.getName(), icebergType);
+
+      } else if (change instanceof TableChange.ModifyColumn) {
+        TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) 
change;
+        applyModifyColumn(pendingUpdate, modifyColumn);
+
+      } else if (change instanceof TableChange.DropColumn) {
+        TableChange.DropColumn dropColumn = (TableChange.DropColumn) change;
+        pendingUpdate.deleteColumn(dropColumn.getColumnName());
+
+      } else if (change instanceof TableChange.AddWatermark) {
+        throw new UnsupportedOperationException("Adding watermark specs is not 
supported yet. ");
+
+      } else if (change instanceof TableChange.ModifyWatermark) {
+        throw new UnsupportedOperationException("Modifying watermark specs is 
not supported yet. ");
+
+      } else if (change instanceof TableChange.DropWatermark) {
+        throw new UnsupportedOperationException("Watermark specs is not 
supported yet. ");
+
+      } else if (change instanceof TableChange.AddUniqueConstraint) {
+        TableChange.AddUniqueConstraint addPk = 
(TableChange.AddUniqueConstraint) change;
+        applyUniqueConstraint(pendingUpdate, addPk.getConstraint());
+
+      } else if (change instanceof TableChange.ModifyUniqueConstraint) {
+        TableChange.ModifyUniqueConstraint modifyPk = 
(TableChange.ModifyUniqueConstraint) change;
+        applyUniqueConstraint(pendingUpdate, modifyPk.getNewConstraint());
+
+      } else if (change instanceof TableChange.DropConstraint) {
+        throw new UnsupportedOperationException("Dropping constraints is not 
supported yet. ");
+
+      } else {
+        throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + change);
+      }
+    }
+  }
+
+  /**
+   * Applies a list of Flink table changes to an {@link UpdateProperties} 
operation.
+   *
+   * @param pendingUpdate an uncommitted UpdateProperty operation to configure
+   * @param propertyChanges a list of Flink table changes
+   */
+  public static void applyPropertyChanges(
+      UpdateProperties pendingUpdate, List<TableChange> propertyChanges) {
+    for (TableChange change : propertyChanges) {
+      if (change instanceof TableChange.SetOption) {
+        TableChange.SetOption setOption = (TableChange.SetOption) change;
+        pendingUpdate.set(setOption.getKey(), setOption.getValue());
+
+      } else if (change instanceof TableChange.ResetOption) {
+        TableChange.ResetOption resetOption = (TableChange.ResetOption) change;
+        pendingUpdate.remove(resetOption.getKey());
+
+      } else {
+        throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + change);
+      }
+    }
+  }
+
+  private static void applyModifyColumn(
+      UpdateSchema pendingUpdate, TableChange.ModifyColumn modifyColumn) {
+    if (modifyColumn instanceof TableChange.ModifyColumnName) {
+      TableChange.ModifyColumnName modifyName = (TableChange.ModifyColumnName) 
modifyColumn;
+      pendingUpdate.renameColumn(modifyName.getOldColumnName(), 
modifyName.getNewColumnName());
+
+    } else if (modifyColumn instanceof TableChange.ModifyColumnPosition) {
+      TableChange.ModifyColumnPosition modifyPosition =
+          (TableChange.ModifyColumnPosition) modifyColumn;
+      applyModifyColumnPosition(pendingUpdate, modifyPosition);
+
+    } else if (modifyColumn instanceof TableChange.ModifyPhysicalColumnType) {
+      TableChange.ModifyPhysicalColumnType modifyType =
+          (TableChange.ModifyPhysicalColumnType) modifyColumn;
+      Type type = 
FlinkSchemaUtil.convert(modifyType.getNewType().getLogicalType());
+      String columnName = modifyType.getOldColumn().getName();
+      pendingUpdate.updateColumn(columnName, type.asPrimitiveType());
+      if 
(modifyType.getNewColumn().getDataType().getLogicalType().isNullable()) {
+        pendingUpdate.makeColumnOptional(columnName);
+      } else {
+        pendingUpdate.requireColumn(columnName);
+      }
+
+    } else if (modifyColumn instanceof TableChange.ModifyColumnComment) {
+      TableChange.ModifyColumnComment modifyComment =
+          (TableChange.ModifyColumnComment) modifyColumn;
+      pendingUpdate.updateColumnDoc(
+          modifyComment.getOldColumn().getName(), 
modifyComment.getNewComment());
+
+    } else {
+      throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + modifyColumn);

Review Comment:
   nit: more specific error msg



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -605,6 +659,56 @@ private static void commitChanges(
     transaction.commitTransaction();
   }
 
+  private static void commitChanges(
+      Table table,
+      String setLocation,
+      String setSnapshotId,
+      String pickSnapshotId,
+      List<TableChange> schemaChanges,
+      List<TableChange> propertyChanges) {
+    commitChanges(table, setSnapshotId, pickSnapshotId);
+
+    Transaction transaction = table.newTransaction();
+
+    if (setLocation != null) {
+      transaction.updateLocation().setLocation(setLocation).commit();
+    }
+
+    if (!schemaChanges.isEmpty()) {
+      UpdateSchema updateSchema = transaction.updateSchema();
+      FlinkAlterTableUtil.applySchemaChanges(updateSchema, schemaChanges);
+      updateSchema.commit();
+    }
+
+    if (!propertyChanges.isEmpty()) {
+      UpdateProperties updateProperties = transaction.updateProperties();
+      FlinkAlterTableUtil.applyPropertyChanges(updateProperties, 
propertyChanges);
+      updateProperties.commit();
+    }
+
+    transaction.commitTransaction();
+  }
+
+  private static void commitChanges(Table table, String setSnapshotId, String 
pickSnapshotId) {

Review Comment:
   this method name is a little misleading and confusing. maybe 
`applyManageSnapshots` would be more intuitive.
   
   also curious why this method doesn't live inside `FlinkAlterTableUtil`?



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -472,11 +482,16 @@ public void alterTable(ObjectPath tablePath, 
CatalogBaseTable newTable, boolean
 
     CatalogTable table = toCatalogTable(icebergTable);
 
-    // Currently, Flink SQL only support altering table properties.
+    // This alterTable API only supports altering table properties.
 
-    // For current Flink Catalog API, support for adding/removing/renaming 
columns cannot be done by
+    // Support for adding/removing/renaming columns cannot be done by
     // comparing

Review Comment:
   nit: fix the line break from auto formatting. this can be combined with the 
other comment of moving to method Javadoc



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.util;
+
+import java.util.List;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+
+public class FlinkAlterTableUtil {
+  private FlinkAlterTableUtil() {}
+
+  /**
+   * Applies a list of Flink table changes to an {@link UpdateSchema} 
operation.
+   *
+   * @param pendingUpdate an uncommitted UpdateSchema operation to configure
+   * @param schemaChanges a list of Flink table changes
+   */
+  public static void applySchemaChanges(
+      UpdateSchema pendingUpdate, List<TableChange> schemaChanges) {
+    for (TableChange change : schemaChanges) {
+      if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        Column flinkColumn = addColumn.getColumn();
+        Preconditions.checkArgument(
+            FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
+            "Adding computed columns is not supported yet: %s",
+            flinkColumn.getName());
+        Type icebergType = 
FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
+        pendingUpdate.addColumn(flinkColumn.getName(), icebergType);
+
+      } else if (change instanceof TableChange.ModifyColumn) {
+        TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) 
change;
+        applyModifyColumn(pendingUpdate, modifyColumn);
+
+      } else if (change instanceof TableChange.DropColumn) {
+        TableChange.DropColumn dropColumn = (TableChange.DropColumn) change;
+        pendingUpdate.deleteColumn(dropColumn.getColumnName());
+
+      } else if (change instanceof TableChange.AddWatermark) {
+        throw new UnsupportedOperationException("Adding watermark specs is not 
supported yet. ");

Review Comment:
   nit: empty space in the end is not necessary. please check all places



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -605,6 +659,56 @@ private static void commitChanges(
     transaction.commitTransaction();
   }
 
+  private static void commitChanges(
+      Table table,
+      String setLocation,
+      String setSnapshotId,
+      String pickSnapshotId,
+      List<TableChange> schemaChanges,
+      List<TableChange> propertyChanges) {
+    commitChanges(table, setSnapshotId, pickSnapshotId);
+
+    Transaction transaction = table.newTransaction();
+
+    if (setLocation != null) {
+      transaction.updateLocation().setLocation(setLocation).commit();
+    }
+
+    if (!schemaChanges.isEmpty()) {
+      UpdateSchema updateSchema = transaction.updateSchema();
+      FlinkAlterTableUtil.applySchemaChanges(updateSchema, schemaChanges);
+      updateSchema.commit();
+    }
+
+    if (!propertyChanges.isEmpty()) {
+      UpdateProperties updateProperties = transaction.updateProperties();
+      FlinkAlterTableUtil.applyPropertyChanges(updateProperties, 
propertyChanges);
+      updateProperties.commit();
+    }
+
+    transaction.commitTransaction();
+  }
+
+  private static void commitChanges(Table table, String setSnapshotId, String 
pickSnapshotId) {
+    // don't allow setting the snapshot and picking a commit at the same time 
because order is
+    // ambiguous and choosing

Review Comment:
   nit: fix line break



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.util;
+
+import java.util.List;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+
+public class FlinkAlterTableUtil {
+  private FlinkAlterTableUtil() {}
+
+  /**
+   * Applies a list of Flink table changes to an {@link UpdateSchema} 
operation.
+   *
+   * @param pendingUpdate an uncommitted UpdateSchema operation to configure
+   * @param schemaChanges a list of Flink table changes
+   */
+  public static void applySchemaChanges(
+      UpdateSchema pendingUpdate, List<TableChange> schemaChanges) {
+    for (TableChange change : schemaChanges) {
+      if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        Column flinkColumn = addColumn.getColumn();
+        Preconditions.checkArgument(
+            FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
+            "Adding computed columns is not supported yet: %s",
+            flinkColumn.getName());
+        Type icebergType = 
FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
+        pendingUpdate.addColumn(flinkColumn.getName(), icebergType);
+
+      } else if (change instanceof TableChange.ModifyColumn) {
+        TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) 
change;
+        applyModifyColumn(pendingUpdate, modifyColumn);
+
+      } else if (change instanceof TableChange.DropColumn) {
+        TableChange.DropColumn dropColumn = (TableChange.DropColumn) change;
+        pendingUpdate.deleteColumn(dropColumn.getColumnName());
+
+      } else if (change instanceof TableChange.AddWatermark) {
+        throw new UnsupportedOperationException("Adding watermark specs is not 
supported yet. ");
+
+      } else if (change instanceof TableChange.ModifyWatermark) {
+        throw new UnsupportedOperationException("Modifying watermark specs is 
not supported yet. ");
+
+      } else if (change instanceof TableChange.DropWatermark) {
+        throw new UnsupportedOperationException("Watermark specs is not 
supported yet. ");
+
+      } else if (change instanceof TableChange.AddUniqueConstraint) {
+        TableChange.AddUniqueConstraint addPk = 
(TableChange.AddUniqueConstraint) change;
+        applyUniqueConstraint(pendingUpdate, addPk.getConstraint());
+
+      } else if (change instanceof TableChange.ModifyUniqueConstraint) {
+        TableChange.ModifyUniqueConstraint modifyPk = 
(TableChange.ModifyUniqueConstraint) change;
+        applyUniqueConstraint(pendingUpdate, modifyPk.getNewConstraint());
+
+      } else if (change instanceof TableChange.DropConstraint) {
+        throw new UnsupportedOperationException("Dropping constraints is not 
supported yet. ");
+
+      } else {
+        throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + change);
+      }
+    }
+  }
+
+  /**
+   * Applies a list of Flink table changes to an {@link UpdateProperties} 
operation.
+   *
+   * @param pendingUpdate an uncommitted UpdateProperty operation to configure
+   * @param propertyChanges a list of Flink table changes
+   */
+  public static void applyPropertyChanges(
+      UpdateProperties pendingUpdate, List<TableChange> propertyChanges) {
+    for (TableChange change : propertyChanges) {
+      if (change instanceof TableChange.SetOption) {
+        TableChange.SetOption setOption = (TableChange.SetOption) change;
+        pendingUpdate.set(setOption.getKey(), setOption.getValue());
+
+      } else if (change instanceof TableChange.ResetOption) {
+        TableChange.ResetOption resetOption = (TableChange.ResetOption) change;
+        pendingUpdate.remove(resetOption.getKey());
+
+      } else {
+        throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + change);

Review Comment:
   nit: error msg can be more specific to property changes.



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -472,11 +482,16 @@ public void alterTable(ObjectPath tablePath, 
CatalogBaseTable newTable, boolean
 
     CatalogTable table = toCatalogTable(icebergTable);
 
-    // Currently, Flink SQL only support altering table properties.
+    // This alterTable API only supports altering table properties.
 
-    // For current Flink Catalog API, support for adding/removing/renaming 
columns cannot be done by
+    // Support for adding/removing/renaming columns cannot be done by
     // comparing

Review Comment:
   also this comment probably should be moved as method Javadoc



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -605,6 +659,56 @@ private static void commitChanges(
     transaction.commitTransaction();
   }
 
+  private static void commitChanges(
+      Table table,
+      String setLocation,
+      String setSnapshotId,
+      String pickSnapshotId,
+      List<TableChange> schemaChanges,
+      List<TableChange> propertyChanges) {
+    commitChanges(table, setSnapshotId, pickSnapshotId);
+
+    Transaction transaction = table.newTransaction();
+
+    if (setLocation != null) {
+      transaction.updateLocation().setLocation(setLocation).commit();
+    }
+
+    if (!schemaChanges.isEmpty()) {
+      UpdateSchema updateSchema = transaction.updateSchema();
+      FlinkAlterTableUtil.applySchemaChanges(updateSchema, schemaChanges);
+      updateSchema.commit();

Review Comment:
   maybe `commit` action can be moved inside `FlinkAlterTableUtil`? then it 
would be more natural to move `applyManageSnapshots` into `FlinkAlterTableUtil` 
too as mentioned in a comment below.



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -517,6 +532,61 @@ public void alterTable(ObjectPath tablePath, 
CatalogBaseTable newTable, boolean
     commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, 
setProperties);
   }
 
+  @Override
+  public void alterTable(
+      ObjectPath tablePath,
+      CatalogBaseTable newTable,
+      List<TableChange> tableChanges,
+      boolean ignoreIfNotExists)
+      throws TableNotExistException, CatalogException {
+    validateFlinkTable(newTable);
+
+    Table icebergTable;
+    try {
+      icebergTable = loadIcebergTable(tablePath);
+    } catch (TableNotExistException e) {
+      if (!ignoreIfNotExists) {
+        throw e;
+      } else {
+        return;
+      }
+    }
+
+    // Does not support altering partition yet.
+    validateTablePartition(toCatalogTable(icebergTable), (CatalogTable) 
newTable);
+
+    String setLocation = null;
+    String setSnapshotId = null;
+    String pickSnapshotId = null;
+
+    List<TableChange> propertyChanges = Lists.newArrayList();
+    List<TableChange> schemaChanges = Lists.newArrayList();
+    for (TableChange change : tableChanges) {
+      if (change instanceof TableChange.SetOption) {
+        TableChange.SetOption set = (TableChange.SetOption) change;
+
+        if ("location".equalsIgnoreCase(set.getKey())) {
+          setLocation = set.getValue();
+        } else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) {
+          setSnapshotId = set.getValue();
+        } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(set.getKey())) {
+          pickSnapshotId = set.getValue();

Review Comment:
   nit: `cherrypickSnapshotId` would be easier to understand



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to