nastra commented on code in PR #9852:
URL: https://github.com/apache/iceberg/pull/9852#discussion_r1516369706
##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##########
@@ -166,168 +159,33 @@ protected void doRefresh() {
refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries);
}
- @SuppressWarnings("checkstyle:CyclomaticComplexity")
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
boolean newTable = base == null;
String newMetadataLocation = writeNewMetadataIfRequired(newTable,
metadata);
- boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
- boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS,
false);
-
- CommitStatus commitStatus = CommitStatus.FAILURE;
- boolean updateHiveTable = false;
-
- HiveLock lock = lockObject(metadata);
- try {
- lock.lock();
-
- Table tbl = loadHmsTable();
-
- if (tbl != null) {
- // If we try to create the table but the metadata location is already
set, then we had a
- // concurrent commit
- if (newTable
- &&
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)
- != null) {
- throw new AlreadyExistsException("Table already exists: %s.%s",
database, tableName);
- }
-
- updateHiveTable = true;
- LOG.debug("Committing existing table: {}", fullName);
- } else {
- tbl =
- newHmsTable(
- metadata.property(HiveCatalog.HMS_TABLE_OWNER,
HiveHadoopUtil.currentUser()));
- LOG.debug("Committing new table: {}", fullName);
- }
-
- tbl.setSd(
- HiveOperationsBase.storageDescriptor(
- metadata, hiveEngineEnabled)); // set to pickup any schema
changes
-
- String metadataLocation =
tbl.getParameters().get(METADATA_LOCATION_PROP);
- String baseMetadataLocation = base != null ? base.metadataFileLocation()
: null;
- if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
- throw new CommitFailedException(
- "Cannot commit: Base metadata location '%s' is not same as the
current table metadata location '%s' for %s.%s",
- baseMetadataLocation, metadataLocation, database, tableName);
- }
-
- // get Iceberg props that have been removed
- Set<String> removedProps = Collections.emptySet();
- if (base != null) {
- removedProps =
- base.properties().keySet().stream()
- .filter(key -> !metadata.properties().containsKey(key))
- .collect(Collectors.toSet());
- }
-
- Map<String, String> summary =
- Optional.ofNullable(metadata.currentSnapshot())
- .map(Snapshot::summary)
- .orElseGet(ImmutableMap::of);
- setHmsTableParameters(
- newMetadataLocation, tbl, metadata, removedProps, hiveEngineEnabled,
summary);
-
- if (!keepHiveStats) {
- tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
- }
-
- lock.ensureActive();
-
- try {
- persistTable(
- tbl, updateHiveTable, hiveLockEnabled(metadata, conf) ? null :
baseMetadataLocation);
- lock.ensureActive();
-
- commitStatus = CommitStatus.SUCCESS;
- } catch (LockException le) {
- commitStatus = CommitStatus.UNKNOWN;
- throw new CommitStateUnknownException(
- "Failed to heartbeat for hive lock while "
- + "committing changes. This can lead to a concurrent commit
attempt be able to overwrite this commit. "
- + "Please check the commit history. If you are running into
this issue, try reducing "
- + "iceberg.hive.lock-heartbeat-interval-ms.",
- le);
- } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
- throw new AlreadyExistsException(e, "Table already exists: %s.%s",
database, tableName);
-
- } catch (InvalidObjectException e) {
- throw new ValidationException(e, "Invalid Hive object for %s.%s",
database, tableName);
-
- } catch (CommitFailedException | CommitStateUnknownException e) {
- throw e;
-
- } catch (Throwable e) {
- if (e.getMessage()
- .contains(
- "The table has been modified. The parameter value for key '"
- + HiveTableOperations.METADATA_LOCATION_PROP
- + "' is")) {
- throw new CommitFailedException(
- e, "The table %s.%s has been modified concurrently", database,
tableName);
- }
-
- if (e.getMessage() != null
- && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not
exist")) {
- throw new RuntimeException(
- "Failed to acquire locks from metastore because the underlying
metastore "
- + "table 'HIVE_LOCKS' does not exist. This can occur when
using an embedded metastore which does not "
- + "support transactions. To fix this use an alternative
metastore.",
- e);
- }
-
- LOG.error(
- "Cannot tell if commit to {}.{} succeeded, attempting to reconnect
and check.",
- database,
- tableName,
- e);
- commitStatus = checkCommitStatus(newMetadataLocation, metadata);
- switch (commitStatus) {
- case SUCCESS:
- break;
- case FAILURE:
- throw e;
- case UNKNOWN:
- throw new CommitStateUnknownException(e);
- }
- }
- } catch (TException e) {
- throw new RuntimeException(
- String.format("Metastore operation failed for %s.%s", database,
tableName), e);
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted during commit", e);
-
- } catch (LockException e) {
- throw new CommitFailedException(e);
-
- } finally {
- cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lock);
- }
+ String baseMetadataLocation = base != null ? base.metadataFileLocation() :
null;
+ commitWithLocking(conf, base, metadata, baseMetadataLocation,
newMetadataLocation, fileIO);
LOG.info(
"Committed to table {} with the new metadata location {}", fullName,
newMetadataLocation);
}
- @VisibleForTesting
- Table loadHmsTable() throws TException, InterruptedException {
- try {
- return metaClients.run(client -> client.getTable(database, tableName));
- } catch (NoSuchObjectException nte) {
- LOG.trace("Table not found {}", fullName, nte);
- return null;
- }
+ @Override
+ public CommitStatus validateNewLocationAndReturnCommitStatus(
+ BaseMetadata metadata, String newMetadataLocation) {
Review Comment:
I'm not convinced using `BaseMetadata` here is the best approach just to
pass properties or the metadata location(s)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]