nastra commented on code in PR #9852:
URL: https://github.com/apache/iceberg/pull/9852#discussion_r1516394903


##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java:
##########
@@ -181,4 +279,230 @@ default Table newHmsTable(String hmsTableOwner) {
 
     return newTable;
   }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  default void commitWithLocking(
+      Configuration conf,
+      BaseMetadata base,
+      BaseMetadata metadata,
+      String baseMetadataLocation,
+      String newMetadataLocation,
+      FileIO io) {
+    boolean newTable = base == null;
+    boolean hiveEngineEnabled = hiveEngineEnabled(conf, metadata);
+    boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, 
false);
+
+    BaseMetastoreOperations.CommitStatus commitStatus =
+        BaseMetastoreOperations.CommitStatus.FAILURE;
+    boolean updateHiveTable = false;
+    HiveLock lock = lockObject(metadata, conf, catalogName());
+    try {
+      lock.lock();
+      Table tbl = loadHmsTable();
+
+      if (tbl != null) {
+        String tableType = tbl.getTableType();
+        if (!tableType.equalsIgnoreCase(tableType().name())) {
+          throw new AlreadyExistsException(
+              "%s with same name already exists: %s.%s",
+              tableType.equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) ? 
"View" : "Table",
+              tbl.getDbName(),
+              tbl.getTableName());
+        }
+
+        // If we try to create the table but the metadata location is already 
set, then we had a
+        // concurrent commit
+        if (newTable
+            && 
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)
+                != null) {
+          throw new AlreadyExistsException(
+              "%s already exists: %s.%s", entityType(), database(), table());
+        }
+
+        updateHiveTable = true;
+        LOG.debug("Committing existing {}: {}", entityType().toLowerCase(), 
fullName());
+      } else {
+        tbl =
+            newHmsTable(
+                metadata
+                    .properties()
+                    .getOrDefault(HiveCatalog.HMS_TABLE_OWNER, 
HiveHadoopUtil.currentUser()));
+        LOG.debug("Committing new {}: {}", entityType().toLowerCase(), 
fullName());
+      }
+
+      tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to 
pickup any schema changes
+
+      String metadataLocation =
+          
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+
+      if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
+        throw new CommitFailedException(
+            "Cannot commit: Base metadata location '%s' is not same as the 
current %s metadata location '%s' for %s.%s",
+            baseMetadataLocation,
+            entityType().toLowerCase(),
+            metadataLocation,
+            database(),
+            table());
+      }
+
+      setHmsParameters(
+          metadata,
+          tbl,
+          newMetadataLocation,
+          obsoleteProps(conf, base, metadata),
+          hiveEngineEnabled);
+
+      if (!keepHiveStats) {
+        tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
+      }
+
+      lock.ensureActive();
+
+      try {
+        persistTable(
+            tbl, updateHiveTable, hiveLockEnabled(conf, metadata) ? null : 
baseMetadataLocation);
+        lock.ensureActive();
+
+        commitStatus = BaseMetastoreOperations.CommitStatus.SUCCESS;
+      } catch (LockException le) {
+        commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
+        throw new CommitStateUnknownException(
+            "Failed to heartbeat for hive lock while "
+                + "committing changes. This can lead to a concurrent commit 
attempt be able to overwrite this commit. "
+                + "Please check the commit history. If you are running into 
this issue, try reducing "
+                + "iceberg.hive.lock-heartbeat-interval-ms.",
+            le);
+      } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
+        throw new AlreadyExistsException(
+            "%s already exists: %s.%s", entityType(), tbl.getDbName(), 
tbl.getTableName());
+      } catch (InvalidObjectException e) {
+        throw new ValidationException(e, "Invalid Hive object for %s.%s", 
database(), table());
+      } catch (CommitFailedException | CommitStateUnknownException e) {
+        throw e;
+      } catch (Throwable e) {
+        if (e.getMessage()
+            .contains(
+                "The table has been modified. The parameter value for key '"
+                    + BaseMetastoreTableOperations.METADATA_LOCATION_PROP
+                    + "' is")) {
+          throw new CommitFailedException(
+              e, "The table %s.%s has been modified concurrently", database(), 
table());
+        }
+
+        if (e.getMessage() != null
+            && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not 
exist")) {
+          throw new RuntimeException(
+              "Failed to acquire locks from metastore because the underlying 
metastore "
+                  + "table 'HIVE_LOCKS' does not exist. This can occur when 
using an embedded metastore which does not "
+                  + "support transactions. To fix this use an alternative 
metastore.",
+              e);
+        }
+
+        LOG.error(
+            "Cannot tell if commit to {}.{} succeeded, attempting to reconnect 
and check.",
+            database(),
+            table(),
+            e);
+        commitStatus = validateNewLocationAndReturnCommitStatus(metadata, 
newMetadataLocation);
+
+        switch (commitStatus) {
+          case SUCCESS:
+            break;
+          case FAILURE:
+            throw e;
+          case UNKNOWN:
+            throw new CommitStateUnknownException(e);
+        }
+      }
+    } catch (TException e) {
+      throw new RuntimeException(
+          String.format("Metastore operation failed for %s.%s", database(), 
table()), e);
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted during commit", e);
+
+    } catch (LockException e) {
+      throw new CommitFailedException(e);
+    } finally {
+      cleanupMetadataAndUnlock(lock, io, commitStatus, newMetadataLocation);
+    }
+  }
+
+  default String fullName() {
+    return catalogName() + "." + database() + "." + table();
+  }
+
+  default StorageDescriptor storageDescriptor(BaseMetadata metadata, boolean 
hiveEngineEnabled) {
+    return HiveOperationsBase.storageDescriptor(
+        metadata.schema(), metadata.location(), hiveEngineEnabled);
+  }
+
+  default Set<String> obsoleteProps(Configuration conf, BaseMetadata base, 
BaseMetadata metadata) {
+    Set<String> obsoleteProps = Sets.newHashSet();
+    if (base != null) {
+      obsoleteProps =
+          base.properties().keySet().stream()
+              .filter(key -> !metadata.properties().containsKey(key))
+              .collect(Collectors.toSet());
+    }
+
+    if (!conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false)) {
+      obsoleteProps.add(StatsSetupConst.COLUMN_STATS_ACCURATE);
+    }
+
+    return obsoleteProps;
+  }
+
+  /**
+   * Returns if the hive engine related values should be enabled on the table, 
or not.
+   *
+   * <p>The decision is made like this:
+   *
+   * <ol>
+   *   <li>Table property value {@link TableProperties#ENGINE_HIVE_ENABLED}
+   *   <li>If the table property is not set then check the hive-site.xml 
property value {@link
+   *       ConfigProperties#ENGINE_HIVE_ENABLED}
+   *   <li>If none of the above is enabled then use the default value {@link
+   *       TableProperties#ENGINE_HIVE_ENABLED_DEFAULT}
+   * </ol>
+   *
+   * @param metadata Table metadata to use
+   * @return if the hive engine related values should be enabled or not
+   */
+  default boolean hiveEngineEnabled(Configuration conf, BaseMetadata metadata) 
{

Review Comment:
   why do we pass `BaseMetadata` here if this method is only applicable for 
tables? `TableProperties.ENGINE_HIVE_ENABLED` won't be set on a view



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java:
##########
@@ -181,4 +279,230 @@ default Table newHmsTable(String hmsTableOwner) {
 
     return newTable;
   }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  default void commitWithLocking(
+      Configuration conf,
+      BaseMetadata base,
+      BaseMetadata metadata,
+      String baseMetadataLocation,
+      String newMetadataLocation,
+      FileIO io) {
+    boolean newTable = base == null;
+    boolean hiveEngineEnabled = hiveEngineEnabled(conf, metadata);
+    boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, 
false);
+
+    BaseMetastoreOperations.CommitStatus commitStatus =
+        BaseMetastoreOperations.CommitStatus.FAILURE;
+    boolean updateHiveTable = false;
+    HiveLock lock = lockObject(metadata, conf, catalogName());
+    try {
+      lock.lock();
+      Table tbl = loadHmsTable();
+
+      if (tbl != null) {
+        String tableType = tbl.getTableType();
+        if (!tableType.equalsIgnoreCase(tableType().name())) {
+          throw new AlreadyExistsException(
+              "%s with same name already exists: %s.%s",
+              tableType.equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) ? 
"View" : "Table",
+              tbl.getDbName(),
+              tbl.getTableName());
+        }
+
+        // If we try to create the table but the metadata location is already 
set, then we had a
+        // concurrent commit
+        if (newTable
+            && 
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)
+                != null) {
+          throw new AlreadyExistsException(
+              "%s already exists: %s.%s", entityType(), database(), table());
+        }
+
+        updateHiveTable = true;
+        LOG.debug("Committing existing {}: {}", entityType().toLowerCase(), 
fullName());
+      } else {
+        tbl =
+            newHmsTable(
+                metadata
+                    .properties()
+                    .getOrDefault(HiveCatalog.HMS_TABLE_OWNER, 
HiveHadoopUtil.currentUser()));
+        LOG.debug("Committing new {}: {}", entityType().toLowerCase(), 
fullName());
+      }
+
+      tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to 
pickup any schema changes
+
+      String metadataLocation =
+          
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+
+      if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
+        throw new CommitFailedException(
+            "Cannot commit: Base metadata location '%s' is not same as the 
current %s metadata location '%s' for %s.%s",
+            baseMetadataLocation,
+            entityType().toLowerCase(),
+            metadataLocation,
+            database(),
+            table());
+      }
+
+      setHmsParameters(
+          metadata,
+          tbl,
+          newMetadataLocation,
+          obsoleteProps(conf, base, metadata),
+          hiveEngineEnabled);
+
+      if (!keepHiveStats) {
+        tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
+      }
+
+      lock.ensureActive();
+
+      try {
+        persistTable(
+            tbl, updateHiveTable, hiveLockEnabled(conf, metadata) ? null : 
baseMetadataLocation);
+        lock.ensureActive();
+
+        commitStatus = BaseMetastoreOperations.CommitStatus.SUCCESS;
+      } catch (LockException le) {
+        commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
+        throw new CommitStateUnknownException(
+            "Failed to heartbeat for hive lock while "
+                + "committing changes. This can lead to a concurrent commit 
attempt be able to overwrite this commit. "
+                + "Please check the commit history. If you are running into 
this issue, try reducing "
+                + "iceberg.hive.lock-heartbeat-interval-ms.",
+            le);
+      } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
+        throw new AlreadyExistsException(
+            "%s already exists: %s.%s", entityType(), tbl.getDbName(), 
tbl.getTableName());
+      } catch (InvalidObjectException e) {
+        throw new ValidationException(e, "Invalid Hive object for %s.%s", 
database(), table());
+      } catch (CommitFailedException | CommitStateUnknownException e) {
+        throw e;
+      } catch (Throwable e) {
+        if (e.getMessage()
+            .contains(
+                "The table has been modified. The parameter value for key '"
+                    + BaseMetastoreTableOperations.METADATA_LOCATION_PROP
+                    + "' is")) {
+          throw new CommitFailedException(
+              e, "The table %s.%s has been modified concurrently", database(), 
table());
+        }
+
+        if (e.getMessage() != null
+            && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not 
exist")) {
+          throw new RuntimeException(
+              "Failed to acquire locks from metastore because the underlying 
metastore "
+                  + "table 'HIVE_LOCKS' does not exist. This can occur when 
using an embedded metastore which does not "
+                  + "support transactions. To fix this use an alternative 
metastore.",
+              e);
+        }
+
+        LOG.error(
+            "Cannot tell if commit to {}.{} succeeded, attempting to reconnect 
and check.",
+            database(),
+            table(),
+            e);
+        commitStatus = validateNewLocationAndReturnCommitStatus(metadata, 
newMetadataLocation);
+
+        switch (commitStatus) {
+          case SUCCESS:
+            break;
+          case FAILURE:
+            throw e;
+          case UNKNOWN:
+            throw new CommitStateUnknownException(e);
+        }
+      }
+    } catch (TException e) {
+      throw new RuntimeException(
+          String.format("Metastore operation failed for %s.%s", database(), 
table()), e);
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted during commit", e);
+
+    } catch (LockException e) {
+      throw new CommitFailedException(e);
+    } finally {
+      cleanupMetadataAndUnlock(lock, io, commitStatus, newMetadataLocation);
+    }
+  }
+
+  default String fullName() {
+    return catalogName() + "." + database() + "." + table();
+  }
+
+  default StorageDescriptor storageDescriptor(BaseMetadata metadata, boolean 
hiveEngineEnabled) {
+    return HiveOperationsBase.storageDescriptor(
+        metadata.schema(), metadata.location(), hiveEngineEnabled);
+  }
+
+  default Set<String> obsoleteProps(Configuration conf, BaseMetadata base, 
BaseMetadata metadata) {
+    Set<String> obsoleteProps = Sets.newHashSet();
+    if (base != null) {
+      obsoleteProps =
+          base.properties().keySet().stream()
+              .filter(key -> !metadata.properties().containsKey(key))
+              .collect(Collectors.toSet());
+    }
+
+    if (!conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false)) {
+      obsoleteProps.add(StatsSetupConst.COLUMN_STATS_ACCURATE);
+    }
+
+    return obsoleteProps;
+  }
+
+  /**
+   * Returns if the hive engine related values should be enabled on the table, 
or not.
+   *
+   * <p>The decision is made like this:
+   *
+   * <ol>
+   *   <li>Table property value {@link TableProperties#ENGINE_HIVE_ENABLED}
+   *   <li>If the table property is not set then check the hive-site.xml 
property value {@link
+   *       ConfigProperties#ENGINE_HIVE_ENABLED}
+   *   <li>If none of the above is enabled then use the default value {@link
+   *       TableProperties#ENGINE_HIVE_ENABLED_DEFAULT}
+   * </ol>
+   *
+   * @param metadata Table metadata to use
+   * @return if the hive engine related values should be enabled or not
+   */
+  default boolean hiveEngineEnabled(Configuration conf, BaseMetadata metadata) 
{
+    if (metadata.properties().get(TableProperties.ENGINE_HIVE_ENABLED) != 
null) {
+      // We know that the property is set, so default value will not be used,
+      return metadata.propertyAsBoolean(TableProperties.ENGINE_HIVE_ENABLED, 
false);
+    }
+
+    return conf.getBoolean(
+        ConfigProperties.ENGINE_HIVE_ENABLED, 
TableProperties.ENGINE_HIVE_ENABLED_DEFAULT);
+  }
+
+  /**
+   * Returns if the hive locking should be enabled on the table, or not.
+   *
+   * <p>The decision is made like this:
+   *
+   * <ol>
+   *   <li>Table property value {@link TableProperties#HIVE_LOCK_ENABLED}
+   *   <li>If the table property is not set then check the hive-site.xml 
property value {@link
+   *       ConfigProperties#LOCK_HIVE_ENABLED}
+   *   <li>If none of the above is enabled then use the default value {@link
+   *       TableProperties#HIVE_LOCK_ENABLED_DEFAULT}
+   * </ol>
+   *
+   * @param metadata Table metadata to use
+   * @return if the hive engine related values should be enabled or not
+   */
+  default boolean hiveLockEnabled(Configuration conf, BaseMetadata metadata) {
+    if (metadata.properties().get(TableProperties.HIVE_LOCK_ENABLED) != null) {

Review Comment:
   same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to