pvary commented on code in PR #6451: URL: https://github.com/apache/iceberg/pull/6451#discussion_r1063372178
########## hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java: ########## @@ -759,6 +812,98 @@ private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration c ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT); } + @SuppressWarnings("ReverseDnsLookup") + private void tryLock( + String agentInfo, AtomicReference<Long> lockId, AtomicReference<LockState> state) + throws UnknownHostException, TException { + final LockComponent lockComponent = + new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database); + lockComponent.setTablename(tableName); + final LockRequest lockRequest = + new LockRequest( + Lists.newArrayList(lockComponent), + System.getProperty("user.name"), + InetAddress.getLocalHost().getHostName()); + + // Only works in Hive 2 or later. + if (MetastoreClientVersion.min(MetastoreClientVersion.HIVE_2)) { + lockRequest.setAgentInfo(agentInfo); + } + + Tasks.foreach(lockRequest) + .retry(Integer.MAX_VALUE - 100) + .exponentialBackoff( + lockCreationMinWaitTime, lockCreationMaxWaitTime, lockCreationTimeout, 2.0) + .shouldRetryTest( + e -> + e instanceof TException + && MetastoreClientVersion.min(MetastoreClientVersion.HIVE_2)) + .throwFailureWhenFinished() + .run( + request -> { + try { + LockResponse lockResponse = metaClients.run(client -> client.lock(request)); + lockId.set(lockResponse.getLockid()); + state.set(lockResponse.getState()); + } catch (TException te) { + LOG.warn("Failed to acquire lock {}", request, te); + try { + // If we can not check for lock, or we do not find it, then rethrow the exception + // Otherwise we are happy as the findLock sets the lockId and the state correctly + if (!MetastoreClientVersion.min(MetastoreClientVersion.HIVE_2) + || !findLock(agentInfo, lockId, state)) { + throw te; + } else { + LOG.info( + "Found lock by agentInfo {} with id {} and state {}", + agentInfo, + lockId, + state); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn( + "Interrupted while checking for lock on table {}.{}", database, tableName, e); + throw new RuntimeException("Interrupted while checking for lock", e); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while acquiring lock on table {}.{}", database, tableName, e); + throw new RuntimeException("Interrupted while acquiring lock", e); + } + }, + TException.class); + } + + /** + * Search for the locks using HMSClient.showLocks identified by the agentInfo. If the lock is + * there, then the lockId and the state is set based on the result. + * + * @param agentInfo The key for searching the locks + * @param lockId The lockId if the lock has found + * @param state The state if the lock has found + * @return <code>true</code> if the the lock is there <code>false</code> if the lock is missing + */ + private boolean findLock( + String agentInfo, AtomicReference<Long> lockId, AtomicReference<LockState> state) + throws TException, InterruptedException { + ShowLocksRequest showLocksRequest = new ShowLocksRequest(); + showLocksRequest.setDbname(database); + showLocksRequest.setTablename(tableName); + ShowLocksResponse response = metaClients.run(client -> client.showLocks(showLocksRequest)); + for (ShowLocksResponseElement lock : response.getLocks()) { + if (lock.getAgentInfo().equals(agentInfo)) { Review Comment: Added the precondition check to the method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org