nastra commented on code in PR #9852: URL: https://github.com/apache/iceberg/pull/9852#discussion_r1528265184
########## hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java: ########## @@ -181,4 +279,230 @@ default Table newHmsTable(String hmsTableOwner) { return newTable; } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + default void commitWithLocking( + Configuration conf, + BaseMetadata base, + BaseMetadata metadata, + String baseMetadataLocation, + String newMetadataLocation, + FileIO io) { + boolean newTable = base == null; + boolean hiveEngineEnabled = hiveEngineEnabled(conf, metadata); + boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false); + + BaseMetastoreOperations.CommitStatus commitStatus = + BaseMetastoreOperations.CommitStatus.FAILURE; + boolean updateHiveTable = false; + HiveLock lock = lockObject(metadata, conf, catalogName()); + try { + lock.lock(); + Table tbl = loadHmsTable(); + + if (tbl != null) { + String tableType = tbl.getTableType(); + if (!tableType.equalsIgnoreCase(tableType().name())) { + throw new AlreadyExistsException( + "%s with same name already exists: %s.%s", + tableType.equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) ? "View" : "Table", + tbl.getDbName(), + tbl.getTableName()); + } + + // If we try to create the table but the metadata location is already set, then we had a + // concurrent commit + if (newTable + && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) + != null) { + throw new AlreadyExistsException( + "%s already exists: %s.%s", entityType(), database(), table()); + } + + updateHiveTable = true; + LOG.debug("Committing existing {}: {}", entityType().toLowerCase(), fullName()); + } else { + tbl = + newHmsTable( + metadata + .properties() + .getOrDefault(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser())); + LOG.debug("Committing new {}: {}", entityType().toLowerCase(), fullName()); + } + + tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes + + String metadataLocation = + tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + + if (!Objects.equals(baseMetadataLocation, metadataLocation)) { + throw new CommitFailedException( + "Cannot commit: Base metadata location '%s' is not same as the current %s metadata location '%s' for %s.%s", + baseMetadataLocation, + entityType().toLowerCase(), + metadataLocation, + database(), + table()); + } + + setHmsParameters( + metadata, + tbl, + newMetadataLocation, + obsoleteProps(conf, base, metadata), + hiveEngineEnabled); + + if (!keepHiveStats) { + tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE); + } + + lock.ensureActive(); + + try { + persistTable( + tbl, updateHiveTable, hiveLockEnabled(conf, metadata) ? null : baseMetadataLocation); + lock.ensureActive(); + + commitStatus = BaseMetastoreOperations.CommitStatus.SUCCESS; + } catch (LockException le) { + commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; + throw new CommitStateUnknownException( + "Failed to heartbeat for hive lock while " + + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. " + + "Please check the commit history. If you are running into this issue, try reducing " + + "iceberg.hive.lock-heartbeat-interval-ms.", + le); + } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { + throw new AlreadyExistsException( + "%s already exists: %s.%s", entityType(), tbl.getDbName(), tbl.getTableName()); + } catch (InvalidObjectException e) { + throw new ValidationException(e, "Invalid Hive object for %s.%s", database(), table()); + } catch (CommitFailedException | CommitStateUnknownException e) { + throw e; + } catch (Throwable e) { + if (e.getMessage() + .contains( + "The table has been modified. The parameter value for key '" + + BaseMetastoreTableOperations.METADATA_LOCATION_PROP + + "' is")) { + throw new CommitFailedException( + e, "The table %s.%s has been modified concurrently", database(), table()); + } + + if (e.getMessage() != null + && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { + throw new RuntimeException( + "Failed to acquire locks from metastore because the underlying metastore " + + "table 'HIVE_LOCKS' does not exist. This can occur when using an embedded metastore which does not " + + "support transactions. To fix this use an alternative metastore.", + e); + } + + LOG.error( + "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", + database(), + table(), + e); + commitStatus = validateNewLocationAndReturnCommitStatus(metadata, newMetadataLocation); + + switch (commitStatus) { + case SUCCESS: + break; + case FAILURE: + throw e; + case UNKNOWN: + throw new CommitStateUnknownException(e); + } + } + } catch (TException e) { + throw new RuntimeException( + String.format("Metastore operation failed for %s.%s", database(), table()), e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during commit", e); + + } catch (LockException e) { + throw new CommitFailedException(e); + } finally { + cleanupMetadataAndUnlock(lock, io, commitStatus, newMetadataLocation); + } + } + + default String fullName() { + return catalogName() + "." + database() + "." + table(); + } + + default StorageDescriptor storageDescriptor(BaseMetadata metadata, boolean hiveEngineEnabled) { + return HiveOperationsBase.storageDescriptor( + metadata.schema(), metadata.location(), hiveEngineEnabled); + } + + default Set<String> obsoleteProps(Configuration conf, BaseMetadata base, BaseMetadata metadata) { + Set<String> obsoleteProps = Sets.newHashSet(); + if (base != null) { + obsoleteProps = + base.properties().keySet().stream() + .filter(key -> !metadata.properties().containsKey(key)) + .collect(Collectors.toSet()); + } + + if (!conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false)) { + obsoleteProps.add(StatsSetupConst.COLUMN_STATS_ACCURATE); + } + + return obsoleteProps; + } + + /** + * Returns if the hive engine related values should be enabled on the table, or not. + * + * <p>The decision is made like this: + * + * <ol> + * <li>Table property value {@link TableProperties#ENGINE_HIVE_ENABLED} + * <li>If the table property is not set then check the hive-site.xml property value {@link + * ConfigProperties#ENGINE_HIVE_ENABLED} + * <li>If none of the above is enabled then use the default value {@link + * TableProperties#ENGINE_HIVE_ENABLED_DEFAULT} + * </ol> + * + * @param metadata Table metadata to use + * @return if the hive engine related values should be enabled or not + */ + default boolean hiveEngineEnabled(Configuration conf, BaseMetadata metadata) { + if (metadata.properties().get(TableProperties.ENGINE_HIVE_ENABLED) != null) { + // We know that the property is set, so default value will not be used, + return metadata.propertyAsBoolean(TableProperties.ENGINE_HIVE_ENABLED, false); + } + + return conf.getBoolean( + ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT); + } + + /** + * Returns if the hive locking should be enabled on the table, or not. + * + * <p>The decision is made like this: + * + * <ol> + * <li>Table property value {@link TableProperties#HIVE_LOCK_ENABLED} + * <li>If the table property is not set then check the hive-site.xml property value {@link + * ConfigProperties#LOCK_HIVE_ENABLED} + * <li>If none of the above is enabled then use the default value {@link + * TableProperties#HIVE_LOCK_ENABLED_DEFAULT} + * </ol> + * + * @param metadata Table metadata to use + * @return if the hive engine related values should be enabled or not + */ + default boolean hiveLockEnabled(Configuration conf, BaseMetadata metadata) { + if (metadata.properties().get(TableProperties.HIVE_LOCK_ENABLED) != null) { Review Comment: @nk1506 I understand that `engine.hive.lock.enabed` will be available for views. My point is that there's no reason to check `if (metadata.properties().get(TableProperties.HIVE_LOCK_ENABLED) != null)` on a view, because a view won't have `TableProperties.HIVE_LOCK_ENABLED` set in its properties -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org