szehon-ho commented on code in PR #9852:
URL: https://github.com/apache/iceberg/pull/9852#discussion_r1511970931


##########
core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseMetastoreOperations {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseMetastoreOperations.class);
+
+  /**
+   * Attempt to load the table and see if any current or past metadata 
location matches the one we
+   * were attempting to set. This is used as a last resort when we are dealing 
with exceptions that
+   * may indicate the commit has failed but don't have proof that this is the 
case. Note that all
+   * the previous locations must also be searched on the chance that a second 
committer was able to
+   * successfully commit on top of our commit.
+   *
+   * @param tableName full name of the table
+   * @param newMetadataLocation the path of the new commit file
+   * @param properties properties for retry
+   * @param loadMetadataLocations supply all the metadata locations
+   * @return Commit Status of Success, Failure or Unknown
+   */
+  protected BaseMetastoreTableOperations.CommitStatus checkCommitStatus(

Review Comment:
   Variable name and comments use 'table'.  Should be more generic, like 
'entity'



##########
core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseMetastoreOperations {

Review Comment:
   I like this code re-use, later we can move code up here like 
refreshMetadata(), now that we have the common interface.



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import java.util.Locale;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.BaseMetadata;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchIcebergViewException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.view.BaseViewOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Hive implementation of Iceberg ViewOperations. */
+final class HiveViewOperations extends BaseViewOperations implements 
HiveOperationsBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveViewOperations.class);
+
+  private final String fullName;
+  private final String database;
+  private final String viewName;
+  private final FileIO fileIO;
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final long maxHiveTablePropertySize;
+  private final Configuration conf;
+  private final String catalogName;
+
+  HiveViewOperations(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      FileIO fileIO,
+      String catalogName,
+      TableIdentifier viewIdentifier) {
+    String dbName = viewIdentifier.namespace().level(0);
+    this.conf = conf;
+    this.catalogName = catalogName;
+    this.metaClients = metaClients;
+    this.fileIO = fileIO;
+    this.fullName = CatalogUtil.fullTableName(catalogName, viewIdentifier);
+    this.database = dbName;
+    this.viewName = viewIdentifier.name();
+    this.maxHiveTablePropertySize =
+        conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, 
HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
+  }
+
+  @Override
+  public void doRefresh() {
+    String metadataLocation = null;
+    Table table;
+
+    try {
+      table = metaClients.run(client -> client.getTable(database, viewName));
+      HiveViewOperations.validateTableIsIcebergView(table, fullName);
+
+      metadataLocation =
+          
table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+
+    } catch (NoSuchObjectException | NoSuchIcebergViewException e) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchViewException("View does not exist: %s.%s", database, 
viewName);
+      }
+    } catch (TException e) {
+      String errMsg =
+          String.format("Failed to get view info from metastore %s.%s", 
database, viewName);
+      throw new RuntimeException(errMsg, e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted during refresh", e);
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  @Override
+  public void doCommit(ViewMetadata base, ViewMetadata metadata) {
+    String newMetadataLocation = writeNewMetadataIfRequired(metadata);
+    String baseMetadataLocation = base != null ? base.metadataFileLocation() : 
null;
+    commitWithLocking(conf, base, metadata, baseMetadataLocation, 
newMetadataLocation, fileIO);
+
+    LOG.info(
+        "Committed to view {} with the new metadata location {}", fullName, 
newMetadataLocation);
+  }
+
+  @Override
+  protected String viewName() {
+    return fullName;
+  }
+
+  @Override
+  public TableType tableType() {
+    return TableType.VIRTUAL_VIEW;
+  }
+
+  @Override
+  public ClientPool<IMetaStoreClient, TException> metaClients() {
+    return metaClients;
+  }
+
+  @Override
+  public long maxHiveTablePropertySize() {
+    return maxHiveTablePropertySize;
+  }
+
+  @Override
+  public String database() {
+    return database;
+  }
+
+  @Override
+  public String table() {
+    return viewName;
+  }
+
+  @Override
+  public String catalogName() {
+    return catalogName;
+  }
+
+  @Override
+  public String opName() {
+    return "View";
+  }
+
+  @Override
+  public BaseMetastoreTableOperations.CommitStatus 
validateNewLocationAndReturnCommitStatus(
+      BaseMetadata metadata, String newMetadataLocation) {
+    return null;
+  }
+
+  @Override
+  public void setHmsParameters(
+      BaseMetadata metadata,
+      Table tbl,
+      String newMetadataLocation,
+      Set<String> obsoleteProps,
+      boolean hiveEngineEnabled) {
+    setCommonHmsParameters(
+        tbl,
+        
BaseMetastoreTableOperations.ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH),
+        newMetadataLocation,
+        metadata.schema(),
+        ((ViewMetadata) metadata).uuid(),
+        obsoleteProps,
+        this::currentMetadataLocation);
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  static void validateTableIsIcebergView(Table table, String fullName) {
+    String tableType = 
table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);

Review Comment:
   Nit: call it tableTypeProp to reduce confusion with table.getTableType()?



##########
core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java:
##########
@@ -309,65 +304,20 @@ protected enum CommitStatus {
    * @return Commit Status of Success, Failure or Unknown
    */
   protected CommitStatus checkCommitStatus(String newMetadataLocation, 
TableMetadata config) {
-    int maxAttempts =
-        PropertyUtil.propertyAsInt(
-            config.properties(), COMMIT_NUM_STATUS_CHECKS, 
COMMIT_NUM_STATUS_CHECKS_DEFAULT);
-    long minWaitMs =
-        PropertyUtil.propertyAsLong(
-            config.properties(),
-            COMMIT_STATUS_CHECKS_MIN_WAIT_MS,
-            COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
-    long maxWaitMs =
-        PropertyUtil.propertyAsLong(
-            config.properties(),
-            COMMIT_STATUS_CHECKS_MAX_WAIT_MS,
-            COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
-    long totalRetryMs =
-        PropertyUtil.propertyAsLong(
-            config.properties(),
-            COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
-            COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);
-
-    AtomicReference<CommitStatus> status = new 
AtomicReference<>(CommitStatus.UNKNOWN);
-
-    Tasks.foreach(newMetadataLocation)
-        .retry(maxAttempts)
-        .suppressFailureWhenFinished()
-        .exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
-        .onFailure(
-            (location, checkException) ->
-                LOG.error("Cannot check if commit to {} exists.", tableName(), 
checkException))
-        .run(
-            location -> {
-              TableMetadata metadata = refresh();
-              String currentMetadataFileLocation = 
metadata.metadataFileLocation();
-              boolean commitSuccess =
-                  currentMetadataFileLocation.equals(newMetadataLocation)
-                      || metadata.previousFiles().stream()
-                          .anyMatch(log -> 
log.file().equals(newMetadataLocation));
-              if (commitSuccess) {
-                LOG.info(
-                    "Commit status check: Commit to {} of {} succeeded",
-                    tableName(),
-                    newMetadataLocation);
-                status.set(CommitStatus.SUCCESS);
-              } else {
-                LOG.warn(
-                    "Commit status check: Commit to {} of {} unknown, new 
metadata location is not current "
-                        + "or in history",
-                    tableName(),
-                    newMetadataLocation);
-              }
-            });
-
-    if (status.get() == CommitStatus.UNKNOWN) {
-      LOG.error(
-          "Cannot determine commit state to {}. Failed during checking {} 
times. "
-              + "Treating commit state as unknown.",
-          tableName(),
-          maxAttempts);
-    }
-    return status.get();
+    return checkCommitStatus(
+        tableName(), newMetadataLocation, config.properties(), 
this::loadMetadataLocations);
+  }
+
+  protected List<String> loadMetadataLocations() {
+    TableMetadata metadata = refresh();
+    ImmutableList.Builder<String> builder = ImmutableList.builder();
+    return builder
+        .add(metadata.metadataFileLocation())
+        .addAll(
+            metadata.previousFiles().stream()

Review Comment:
   I think we need to store and check previousFiles in ViewMetadata , just like 
we are doing for TableMetadata here.
   
   In Hive, unless we enable to provide an atomic getAndSet like in  the mode 
provided by [HIVE-26882](https://issues.apache.org/jira/browse/HIVE-26882)/ 
https://github.com/apache/iceberg/pull/6570 , it is always possible that 
between commit + check, there is an intermediate commit that gets in, and the 
check would then fail with UNKNOWN status.  (I believe it is still the correct 
behavior if it is UNKNOWN , ie it will not delete the metadata.json, but it is 
a chance that we can still return successfully for users as the commits are 
serialized).
   
   Can we fix this?  Looks like there is already somewhere to store 
ViewHistoryEntry.



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java:
##########
@@ -82,6 +90,11 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
 
   public HiveCatalog() {}
 
+  @Override

Review Comment:
   Nit: let's put this near newTableOps()



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import java.util.Locale;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.BaseMetadata;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchIcebergViewException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.view.BaseViewOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Hive implementation of Iceberg ViewOperations. */

Review Comment:
   Nit: use a javadoc link here



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##########
@@ -166,168 +161,33 @@ protected void doRefresh() {
     refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries);
   }
 
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
   @Override
   protected void doCommit(TableMetadata base, TableMetadata metadata) {
     boolean newTable = base == null;
     String newMetadataLocation = writeNewMetadataIfRequired(newTable, 
metadata);
-    boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
-    boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, 
false);
-
-    CommitStatus commitStatus = CommitStatus.FAILURE;
-    boolean updateHiveTable = false;
-
-    HiveLock lock = lockObject(metadata);
-    try {
-      lock.lock();
-
-      Table tbl = loadHmsTable();
-
-      if (tbl != null) {
-        // If we try to create the table but the metadata location is already 
set, then we had a
-        // concurrent commit
-        if (newTable
-            && 
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)
-                != null) {
-          throw new AlreadyExistsException("Table already exists: %s.%s", 
database, tableName);
-        }
-
-        updateHiveTable = true;
-        LOG.debug("Committing existing table: {}", fullName);
-      } else {
-        tbl =
-            newHmsTable(
-                metadata.property(HiveCatalog.HMS_TABLE_OWNER, 
HiveHadoopUtil.currentUser()));
-        LOG.debug("Committing new table: {}", fullName);
-      }
-
-      tbl.setSd(
-          HiveOperationsBase.storageDescriptor(
-              metadata, hiveEngineEnabled)); // set to pickup any schema 
changes
-
-      String metadataLocation = 
tbl.getParameters().get(METADATA_LOCATION_PROP);
-      String baseMetadataLocation = base != null ? base.metadataFileLocation() 
: null;
-      if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
-        throw new CommitFailedException(
-            "Cannot commit: Base metadata location '%s' is not same as the 
current table metadata location '%s' for %s.%s",
-            baseMetadataLocation, metadataLocation, database, tableName);
-      }
-
-      // get Iceberg props that have been removed
-      Set<String> removedProps = Collections.emptySet();
-      if (base != null) {
-        removedProps =
-            base.properties().keySet().stream()
-                .filter(key -> !metadata.properties().containsKey(key))
-                .collect(Collectors.toSet());
-      }
-
-      Map<String, String> summary =
-          Optional.ofNullable(metadata.currentSnapshot())
-              .map(Snapshot::summary)
-              .orElseGet(ImmutableMap::of);
-      setHmsTableParameters(
-          newMetadataLocation, tbl, metadata, removedProps, hiveEngineEnabled, 
summary);
-
-      if (!keepHiveStats) {
-        tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
-      }
-
-      lock.ensureActive();
-
-      try {
-        persistTable(
-            tbl, updateHiveTable, hiveLockEnabled(metadata, conf) ? null : 
baseMetadataLocation);
-        lock.ensureActive();
-
-        commitStatus = CommitStatus.SUCCESS;
-      } catch (LockException le) {
-        commitStatus = CommitStatus.UNKNOWN;
-        throw new CommitStateUnknownException(
-            "Failed to heartbeat for hive lock while "
-                + "committing changes. This can lead to a concurrent commit 
attempt be able to overwrite this commit. "
-                + "Please check the commit history. If you are running into 
this issue, try reducing "
-                + "iceberg.hive.lock-heartbeat-interval-ms.",
-            le);
-      } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
-        throw new AlreadyExistsException(e, "Table already exists: %s.%s", 
database, tableName);
-
-      } catch (InvalidObjectException e) {
-        throw new ValidationException(e, "Invalid Hive object for %s.%s", 
database, tableName);
-
-      } catch (CommitFailedException | CommitStateUnknownException e) {
-        throw e;
-
-      } catch (Throwable e) {
-        if (e.getMessage()
-            .contains(
-                "The table has been modified. The parameter value for key '"
-                    + HiveTableOperations.METADATA_LOCATION_PROP
-                    + "' is")) {
-          throw new CommitFailedException(
-              e, "The table %s.%s has been modified concurrently", database, 
tableName);
-        }
-
-        if (e.getMessage() != null
-            && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not 
exist")) {
-          throw new RuntimeException(
-              "Failed to acquire locks from metastore because the underlying 
metastore "
-                  + "table 'HIVE_LOCKS' does not exist. This can occur when 
using an embedded metastore which does not "
-                  + "support transactions. To fix this use an alternative 
metastore.",
-              e);
-        }
-
-        LOG.error(
-            "Cannot tell if commit to {}.{} succeeded, attempting to reconnect 
and check.",
-            database,
-            tableName,
-            e);
-        commitStatus = checkCommitStatus(newMetadataLocation, metadata);
-        switch (commitStatus) {
-          case SUCCESS:
-            break;
-          case FAILURE:
-            throw e;
-          case UNKNOWN:
-            throw new CommitStateUnknownException(e);
-        }
-      }
-    } catch (TException e) {
-      throw new RuntimeException(
-          String.format("Metastore operation failed for %s.%s", database, 
tableName), e);
-
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException("Interrupted during commit", e);
-
-    } catch (LockException e) {
-      throw new CommitFailedException(e);
-
-    } finally {
-      cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lock);
-    }
+    String baseMetadataLocation = base != null ? base.metadataFileLocation() : 
null;
+    commitWithLocking(conf, base, metadata, baseMetadataLocation, 
newMetadataLocation, fileIO);
 
     LOG.info(
         "Committed to table {} with the new metadata location {}", fullName, 
newMetadataLocation);
   }
 
-  @VisibleForTesting
-  Table loadHmsTable() throws TException, InterruptedException {
-    try {
-      return metaClients.run(client -> client.getTable(database, tableName));
-    } catch (NoSuchObjectException nte) {
-      LOG.trace("Table not found {}", fullName, nte);
-      return null;
-    }
+  @Override
+  public BaseMetastoreTableOperations.CommitStatus 
validateNewLocationAndReturnCommitStatus(

Review Comment:
   What is the point of this method?  It just wraps another method.



##########
core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java:
##########
@@ -42,18 +35,20 @@
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Objects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.LocationUtil;
-import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class BaseMetastoreTableOperations implements TableOperations {
+public abstract class BaseMetastoreTableOperations extends 
BaseMetastoreOperations
+    implements TableOperations {
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseMetastoreTableOperations.class);
 
   public static final String TABLE_TYPE_PROP = "table_type";
   public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";
+  public static final String ICEBERG_VIEW_TYPE_VALUE = "iceberg-view";

Review Comment:
   Seems it belongs in HiveViewOperations or somewhere not tied with tables? 



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java:
##########
@@ -181,4 +264,220 @@ default Table newHmsTable(String hmsTableOwner) {
 
     return newTable;
   }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  default void commitWithLocking(
+      Configuration conf,
+      BaseMetadata base,
+      BaseMetadata metadata,
+      String baseMetadataLocation,
+      String newMetadataLocation,
+      FileIO io) {
+    boolean newTable = base == null;
+    boolean hiveEngineEnabled = hiveEngineEnabled(conf, metadata);

Review Comment:
   what about keepHiveStats?



##########
core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java:
##########
@@ -291,7 +286,7 @@ public long newSnapshotId() {
     };
   }
 
-  protected enum CommitStatus {
+  public enum CommitStatus {

Review Comment:
   Why not just move the CommitStatus up to BaseMetastoreOperations?



##########
core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java:
##########
@@ -309,65 +304,20 @@ protected enum CommitStatus {
    * @return Commit Status of Success, Failure or Unknown
    */
   protected CommitStatus checkCommitStatus(String newMetadataLocation, 
TableMetadata config) {
-    int maxAttempts =
-        PropertyUtil.propertyAsInt(
-            config.properties(), COMMIT_NUM_STATUS_CHECKS, 
COMMIT_NUM_STATUS_CHECKS_DEFAULT);
-    long minWaitMs =
-        PropertyUtil.propertyAsLong(
-            config.properties(),
-            COMMIT_STATUS_CHECKS_MIN_WAIT_MS,
-            COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
-    long maxWaitMs =
-        PropertyUtil.propertyAsLong(
-            config.properties(),
-            COMMIT_STATUS_CHECKS_MAX_WAIT_MS,
-            COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
-    long totalRetryMs =
-        PropertyUtil.propertyAsLong(
-            config.properties(),
-            COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
-            COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);
-
-    AtomicReference<CommitStatus> status = new 
AtomicReference<>(CommitStatus.UNKNOWN);
-
-    Tasks.foreach(newMetadataLocation)
-        .retry(maxAttempts)
-        .suppressFailureWhenFinished()
-        .exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
-        .onFailure(
-            (location, checkException) ->
-                LOG.error("Cannot check if commit to {} exists.", tableName(), 
checkException))
-        .run(
-            location -> {
-              TableMetadata metadata = refresh();
-              String currentMetadataFileLocation = 
metadata.metadataFileLocation();
-              boolean commitSuccess =
-                  currentMetadataFileLocation.equals(newMetadataLocation)
-                      || metadata.previousFiles().stream()
-                          .anyMatch(log -> 
log.file().equals(newMetadataLocation));
-              if (commitSuccess) {
-                LOG.info(
-                    "Commit status check: Commit to {} of {} succeeded",
-                    tableName(),
-                    newMetadataLocation);
-                status.set(CommitStatus.SUCCESS);
-              } else {
-                LOG.warn(
-                    "Commit status check: Commit to {} of {} unknown, new 
metadata location is not current "
-                        + "or in history",
-                    tableName(),
-                    newMetadataLocation);
-              }
-            });
-
-    if (status.get() == CommitStatus.UNKNOWN) {
-      LOG.error(
-          "Cannot determine commit state to {}. Failed during checking {} 
times. "
-              + "Treating commit state as unknown.",
-          tableName(),
-          maxAttempts);
-    }
-    return status.get();
+    return checkCommitStatus(
+        tableName(), newMetadataLocation, config.properties(), 
this::loadMetadataLocations);
+  }
+
+  protected List<String> loadMetadataLocations() {
+    TableMetadata metadata = refresh();
+    ImmutableList.Builder<String> builder = ImmutableList.builder();
+    return builder
+        .add(metadata.metadataFileLocation())

Review Comment:
   Check if null?



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import java.util.Locale;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.BaseMetadata;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchIcebergViewException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.view.BaseViewOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Hive implementation of Iceberg ViewOperations. */
+final class HiveViewOperations extends BaseViewOperations implements 
HiveOperationsBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveViewOperations.class);
+
+  private final String fullName;
+  private final String database;
+  private final String viewName;
+  private final FileIO fileIO;
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final long maxHiveTablePropertySize;
+  private final Configuration conf;
+  private final String catalogName;
+
+  HiveViewOperations(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      FileIO fileIO,
+      String catalogName,
+      TableIdentifier viewIdentifier) {
+    String dbName = viewIdentifier.namespace().level(0);
+    this.conf = conf;
+    this.catalogName = catalogName;
+    this.metaClients = metaClients;
+    this.fileIO = fileIO;
+    this.fullName = CatalogUtil.fullTableName(catalogName, viewIdentifier);
+    this.database = dbName;
+    this.viewName = viewIdentifier.name();
+    this.maxHiveTablePropertySize =
+        conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, 
HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
+  }
+
+  @Override
+  public void doRefresh() {
+    String metadataLocation = null;
+    Table table;
+
+    try {
+      table = metaClients.run(client -> client.getTable(database, viewName));
+      HiveViewOperations.validateTableIsIcebergView(table, fullName);
+
+      metadataLocation =
+          
table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+
+    } catch (NoSuchObjectException | NoSuchIcebergViewException e) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchViewException("View does not exist: %s.%s", database, 
viewName);
+      }
+    } catch (TException e) {
+      String errMsg =
+          String.format("Failed to get view info from metastore %s.%s", 
database, viewName);
+      throw new RuntimeException(errMsg, e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted during refresh", e);
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  @Override
+  public void doCommit(ViewMetadata base, ViewMetadata metadata) {
+    String newMetadataLocation = writeNewMetadataIfRequired(metadata);
+    String baseMetadataLocation = base != null ? base.metadataFileLocation() : 
null;
+    commitWithLocking(conf, base, metadata, baseMetadataLocation, 
newMetadataLocation, fileIO);
+
+    LOG.info(
+        "Committed to view {} with the new metadata location {}", fullName, 
newMetadataLocation);
+  }
+
+  @Override
+  protected String viewName() {
+    return fullName;
+  }
+
+  @Override
+  public TableType tableType() {
+    return TableType.VIRTUAL_VIEW;
+  }
+
+  @Override
+  public ClientPool<IMetaStoreClient, TException> metaClients() {
+    return metaClients;
+  }
+
+  @Override
+  public long maxHiveTablePropertySize() {
+    return maxHiveTablePropertySize;
+  }
+
+  @Override
+  public String database() {
+    return database;
+  }
+
+  @Override
+  public String table() {
+    return viewName;
+  }
+
+  @Override
+  public String catalogName() {
+    return catalogName;
+  }
+
+  @Override
+  public String opName() {
+    return "View";
+  }
+
+  @Override
+  public BaseMetastoreTableOperations.CommitStatus 
validateNewLocationAndReturnCommitStatus(
+      BaseMetadata metadata, String newMetadataLocation) {
+    return null;
+  }
+
+  @Override
+  public void setHmsParameters(
+      BaseMetadata metadata,
+      Table tbl,
+      String newMetadataLocation,
+      Set<String> obsoleteProps,
+      boolean hiveEngineEnabled) {
+    setCommonHmsParameters(
+        tbl,
+        
BaseMetastoreTableOperations.ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH),
+        newMetadataLocation,
+        metadata.schema(),
+        ((ViewMetadata) metadata).uuid(),
+        obsoleteProps,
+        this::currentMetadataLocation);
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  static void validateTableIsIcebergView(Table table, String fullName) {

Review Comment:
   nit: let's move it to the HiveOperationsBase, so it is nearer the 
validateTableIsIceberg?  (It is used by HiveCatalog and HiveViewOperations, 
just like validateTableIsIceberg is used by HiveCatalog and HiveTableOperations 
, so may make more sense there)



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import java.util.Locale;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.BaseMetadata;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchIcebergViewException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.view.BaseViewOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Hive implementation of Iceberg ViewOperations. */
+final class HiveViewOperations extends BaseViewOperations implements 
HiveOperationsBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveViewOperations.class);
+
+  private final String fullName;
+  private final String database;
+  private final String viewName;
+  private final FileIO fileIO;
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final long maxHiveTablePropertySize;
+  private final Configuration conf;
+  private final String catalogName;
+
+  HiveViewOperations(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      FileIO fileIO,
+      String catalogName,
+      TableIdentifier viewIdentifier) {
+    String dbName = viewIdentifier.namespace().level(0);
+    this.conf = conf;
+    this.catalogName = catalogName;
+    this.metaClients = metaClients;
+    this.fileIO = fileIO;
+    this.fullName = CatalogUtil.fullTableName(catalogName, viewIdentifier);
+    this.database = dbName;
+    this.viewName = viewIdentifier.name();
+    this.maxHiveTablePropertySize =
+        conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, 
HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
+  }
+
+  @Override
+  public void doRefresh() {
+    String metadataLocation = null;
+    Table table;
+
+    try {
+      table = metaClients.run(client -> client.getTable(database, viewName));
+      HiveViewOperations.validateTableIsIcebergView(table, fullName);
+
+      metadataLocation =
+          
table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+
+    } catch (NoSuchObjectException | NoSuchIcebergViewException e) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchViewException("View does not exist: %s.%s", database, 
viewName);
+      }
+    } catch (TException e) {
+      String errMsg =
+          String.format("Failed to get view info from metastore %s.%s", 
database, viewName);
+      throw new RuntimeException(errMsg, e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted during refresh", e);
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  @Override
+  public void doCommit(ViewMetadata base, ViewMetadata metadata) {
+    String newMetadataLocation = writeNewMetadataIfRequired(metadata);
+    String baseMetadataLocation = base != null ? base.metadataFileLocation() : 
null;
+    commitWithLocking(conf, base, metadata, baseMetadataLocation, 
newMetadataLocation, fileIO);
+
+    LOG.info(
+        "Committed to view {} with the new metadata location {}", fullName, 
newMetadataLocation);
+  }
+
+  @Override
+  protected String viewName() {
+    return fullName;
+  }
+
+  @Override
+  public TableType tableType() {
+    return TableType.VIRTUAL_VIEW;
+  }
+
+  @Override
+  public ClientPool<IMetaStoreClient, TException> metaClients() {
+    return metaClients;
+  }
+
+  @Override
+  public long maxHiveTablePropertySize() {
+    return maxHiveTablePropertySize;
+  }
+
+  @Override
+  public String database() {
+    return database;
+  }
+
+  @Override
+  public String table() {
+    return viewName;
+  }
+
+  @Override
+  public String catalogName() {
+    return catalogName;
+  }
+
+  @Override
+  public String opName() {
+    return "View";
+  }
+
+  @Override
+  public BaseMetastoreTableOperations.CommitStatus 
validateNewLocationAndReturnCommitStatus(
+      BaseMetadata metadata, String newMetadataLocation) {
+    return null;

Review Comment:
   This looks like a bug, something seems not wired.



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##########
@@ -142,13 +135,15 @@ public FileIO io() {
   @Override
   protected void doRefresh() {
     String metadataLocation = null;
+    Table table;

Review Comment:
   unnecessary change



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java:
##########
@@ -181,4 +264,220 @@ default Table newHmsTable(String hmsTableOwner) {
 
     return newTable;
   }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  default void commitWithLocking(
+      Configuration conf,
+      BaseMetadata base,
+      BaseMetadata metadata,
+      String baseMetadataLocation,
+      String newMetadataLocation,
+      FileIO io) {
+    boolean newTable = base == null;
+    boolean hiveEngineEnabled = hiveEngineEnabled(conf, metadata);
+    BaseMetastoreTableOperations.CommitStatus commitStatus =
+        BaseMetastoreTableOperations.CommitStatus.FAILURE;
+    boolean updateHiveTable = false;
+    HiveLock lock = lockObject(metadata, conf, catalogName());
+    try {
+      lock.lock();
+      Table tbl = loadHmsTable();
+
+      if (tbl != null) {
+        String tableType = tbl.getTableType();
+        if (!tableType.equalsIgnoreCase(tableType().name())) {

Review Comment:
   why do we need to do this?  I feel we should keep existing logic as much as 
possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to