szehon-ho commented on code in PR #6648:
URL: https://github.com/apache/iceberg/pull/6648#discussion_r1091303197


##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##########
@@ -135,13 +92,6 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
 
   private static Cache<String, ReentrantLock> commitLockCache;

Review Comment:
   can this be removed now?



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java:
##########
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetastoreLock implements HiveLock {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetastoreLock.class);
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = 
"iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = 
"iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = 
"iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_CREATION_TIMEOUT_MS =
+      "iceberg.hive.lock-creation-timeout-ms";
+  private static final String HIVE_LOCK_CREATION_MIN_WAIT_MS =
+      "iceberg.hive.lock-creation-min-wait-ms";
+  private static final String HIVE_LOCK_CREATION_MAX_WAIT_MS =
+      "iceberg.hive.lock-creation-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 
1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 
milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 
5 seconds
+  private static final long HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT = 3 * 60 * 
1000; // 3 minutes
+  private static final long HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT = 50; // 50 
milliseconds
+  private static final long HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT = 5 * 1000; 
// 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 
1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = 
TimeUnit.MINUTES.toMillis(10);
+  private static volatile Cache<String, ReentrantLock> commitLockCache;
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final String databaseName;
+  private final String tableName;
+  private final String fullName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockCreationTimeout;
+  private final long lockCreationMinWaitTime;
+  private final long lockCreationMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+  private final String agentInfo;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private ReentrantLock jvmLock = null;
+  private HiveLockHeartbeat hiveLockHeartbeat = null;
+
+  public MetastoreLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, 
HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, 
HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, 
HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockCreationTimeout =
+        conf.getLong(HIVE_LOCK_CREATION_TIMEOUT_MS, 
HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT);
+    this.lockCreationMinWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MIN_WAIT_MS, 
HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT);
+    this.lockCreationMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MAX_WAIT_MS, 
HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, 
HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, 
HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+
+    this.agentInfo = "Iceberg-" + UUID.randomUUID();
+
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-" + fullName + 
"-%d")
+                .build());
+
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  @Override
+  public void lock() throws LockException {
+    // getting a process-level lock per table to avoid concurrent commit 
attempts to the same table
+    // from the same JVM process, which would result in unnecessary HMS lock 
acquisition requests
+    acquireJvmLock();
+
+    // Getting HMS lock
+    hmsLockId = Optional.of(acquireLock());
+
+    // Starting heartbeat for the HMS lock
+    hiveLockHeartbeat =
+        new HiveLockHeartbeat(metaClients, hmsLockId.get(), 
lockHeartbeatIntervalTime);
+    hiveLockHeartbeat.schedule(exitingScheduledExecutorService);
+  }
+
+  @Override
+  public void ensureActive() throws LockException {
+    if (hiveLockHeartbeat != null && hiveLockHeartbeat.encounteredException != 
null) {
+      throw new LockException(
+          hiveLockHeartbeat.encounteredException,
+          "Failed to heartbeat for hive lock. %s",
+          hiveLockHeartbeat.encounteredException.getMessage());
+    }
+    if (hiveLockHeartbeat == null

Review Comment:
   Nit: would be clearer to have of this logic encapsulated in 
HiveLockHeartBeat.
   
   ```
   boolean active() {
      return future != null && !future.iscCancelled;
   }
   ```



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java:
##########
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetastoreLock implements HiveLock {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetastoreLock.class);
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = 
"iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = 
"iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = 
"iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_CREATION_TIMEOUT_MS =
+      "iceberg.hive.lock-creation-timeout-ms";
+  private static final String HIVE_LOCK_CREATION_MIN_WAIT_MS =
+      "iceberg.hive.lock-creation-min-wait-ms";
+  private static final String HIVE_LOCK_CREATION_MAX_WAIT_MS =
+      "iceberg.hive.lock-creation-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 
1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 
milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 
5 seconds
+  private static final long HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT = 3 * 60 * 
1000; // 3 minutes
+  private static final long HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT = 50; // 50 
milliseconds
+  private static final long HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT = 5 * 1000; 
// 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 
1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = 
TimeUnit.MINUTES.toMillis(10);
+  private static volatile Cache<String, ReentrantLock> commitLockCache;
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final String databaseName;
+  private final String tableName;
+  private final String fullName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockCreationTimeout;
+  private final long lockCreationMinWaitTime;
+  private final long lockCreationMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+  private final String agentInfo;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private ReentrantLock jvmLock = null;
+  private HiveLockHeartbeat hiveLockHeartbeat = null;
+
+  public MetastoreLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, 
HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, 
HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, 
HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockCreationTimeout =
+        conf.getLong(HIVE_LOCK_CREATION_TIMEOUT_MS, 
HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT);
+    this.lockCreationMinWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MIN_WAIT_MS, 
HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT);
+    this.lockCreationMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MAX_WAIT_MS, 
HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, 
HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, 
HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+
+    this.agentInfo = "Iceberg-" + UUID.randomUUID();
+
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-" + fullName + 
"-%d")
+                .build());
+
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  @Override
+  public void lock() throws LockException {
+    // getting a process-level lock per table to avoid concurrent commit 
attempts to the same table
+    // from the same JVM process, which would result in unnecessary HMS lock 
acquisition requests
+    acquireJvmLock();
+
+    // Getting HMS lock
+    hmsLockId = Optional.of(acquireLock());
+
+    // Starting heartbeat for the HMS lock
+    hiveLockHeartbeat =
+        new HiveLockHeartbeat(metaClients, hmsLockId.get(), 
lockHeartbeatIntervalTime);
+    hiveLockHeartbeat.schedule(exitingScheduledExecutorService);
+  }
+
+  @Override
+  public void ensureActive() throws LockException {
+    if (hiveLockHeartbeat != null && hiveLockHeartbeat.encounteredException != 
null) {
+      throw new LockException(
+          hiveLockHeartbeat.encounteredException,
+          "Failed to heartbeat for hive lock. %s",
+          hiveLockHeartbeat.encounteredException.getMessage());
+    }
+    if (hiveLockHeartbeat == null
+        || hiveLockHeartbeat.future == null
+        || hiveLockHeartbeat.future.isCancelled()) {
+      throw new LockException("Lock is not active");
+    }
+  }
+
+  @Override
+  public void unlock() {
+    if (hiveLockHeartbeat != null) {
+      hiveLockHeartbeat.cancel();
+      exitingScheduledExecutorService.shutdown();
+    }
+
+    try {
+      unlock(hmsLockId);
+    } finally {
+      releaseJvmLock();
+    }
+  }
+
+  private long acquireLock() throws LockException {
+    LockInfo lockInfo = tryLock();
+
+    final long start = System.currentTimeMillis();
+    long duration = 0;
+    boolean timeout = false;
+    TException error = null;
+
+    try {
+      if (lockInfo.lockState.equals(LockState.WAITING)) {
+        // Retry count is the typical "upper bound of retries" for Tasks.run() 
function. In fact,
+        // the maximum number of
+        // attempts the Tasks.run() would try is `retries + 1`. Here, for 
checking locks, we use
+        // timeout as the
+        // upper bound of retries. So it is just reasonable to set a large 
retry count. However, if
+        // we set
+        // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow 
into
+        // Integer.MIN_VALUE. Hence,
+        // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it 
doesn't hit any
+        // boundary issues.
+        Tasks.foreach(lockInfo.lockId)
+            .retry(Integer.MAX_VALUE - 100)
+            .exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime, 
lockAcquireTimeout, 1.5)
+            .throwFailureWhenFinished()
+            .onlyRetryOn(WaitingForLockException.class)
+            .run(
+                id -> {
+                  try {
+                    LockResponse response = metaClients.run(client -> 
client.checkLock(id));
+                    LockState newState = response.getState();
+                    lockInfo.lockState = newState;
+                    if (newState.equals(LockState.WAITING)) {
+                      throw new WaitingForLockException(
+                          String.format(
+                              "Waiting for lock on table %s.%s", databaseName, 
tableName));
+                    }
+                  } catch (InterruptedException e) {
+                    Thread.interrupted(); // Clear the interrupt status flag
+                    LOG.warn(
+                        "Interrupted while waiting for lock on table {}.{}",
+                        databaseName,
+                        tableName,
+                        e);
+                  }
+                },
+                TException.class);
+      }
+    } catch (WaitingForLockException e) {
+      timeout = true;
+      duration = System.currentTimeMillis() - start;
+    } catch (TException e) {
+      error = e;
+    } finally {
+      if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
+        unlock(Optional.of(lockInfo.lockId));
+      }
+    }
+
+    if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
+      // timeout and do not have lock acquired

Review Comment:
   Nit: up to you, but feel we can remove these inline comments now that code 
is refactored to be relatively self-documenting?



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java:
##########
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetastoreLock implements HiveLock {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetastoreLock.class);
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = 
"iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = 
"iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = 
"iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_CREATION_TIMEOUT_MS =
+      "iceberg.hive.lock-creation-timeout-ms";
+  private static final String HIVE_LOCK_CREATION_MIN_WAIT_MS =
+      "iceberg.hive.lock-creation-min-wait-ms";
+  private static final String HIVE_LOCK_CREATION_MAX_WAIT_MS =
+      "iceberg.hive.lock-creation-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 
1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 
milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 
5 seconds
+  private static final long HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT = 3 * 60 * 
1000; // 3 minutes
+  private static final long HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT = 50; // 50 
milliseconds
+  private static final long HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT = 5 * 1000; 
// 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 
1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = 
TimeUnit.MINUTES.toMillis(10);
+  private static volatile Cache<String, ReentrantLock> commitLockCache;
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final String databaseName;
+  private final String tableName;
+  private final String fullName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockCreationTimeout;
+  private final long lockCreationMinWaitTime;
+  private final long lockCreationMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+  private final String agentInfo;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private ReentrantLock jvmLock = null;
+  private HiveLockHeartbeat hiveLockHeartbeat = null;
+
+  public MetastoreLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, 
HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, 
HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, 
HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockCreationTimeout =
+        conf.getLong(HIVE_LOCK_CREATION_TIMEOUT_MS, 
HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT);
+    this.lockCreationMinWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MIN_WAIT_MS, 
HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT);
+    this.lockCreationMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MAX_WAIT_MS, 
HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, 
HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, 
HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+
+    this.agentInfo = "Iceberg-" + UUID.randomUUID();
+
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-" + fullName + 
"-%d")
+                .build());
+
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  @Override
+  public void lock() throws LockException {
+    // getting a process-level lock per table to avoid concurrent commit 
attempts to the same table
+    // from the same JVM process, which would result in unnecessary HMS lock 
acquisition requests
+    acquireJvmLock();
+
+    // Getting HMS lock
+    hmsLockId = Optional.of(acquireLock());
+
+    // Starting heartbeat for the HMS lock
+    hiveLockHeartbeat =
+        new HiveLockHeartbeat(metaClients, hmsLockId.get(), 
lockHeartbeatIntervalTime);
+    hiveLockHeartbeat.schedule(exitingScheduledExecutorService);
+  }
+
+  @Override
+  public void ensureActive() throws LockException {
+    if (hiveLockHeartbeat != null && hiveLockHeartbeat.encounteredException != 
null) {
+      throw new LockException(
+          hiveLockHeartbeat.encounteredException,
+          "Failed to heartbeat for hive lock. %s",
+          hiveLockHeartbeat.encounteredException.getMessage());
+    }
+    if (hiveLockHeartbeat == null
+        || hiveLockHeartbeat.future == null
+        || hiveLockHeartbeat.future.isCancelled()) {
+      throw new LockException("Lock is not active");
+    }
+  }
+
+  @Override
+  public void unlock() {
+    if (hiveLockHeartbeat != null) {
+      hiveLockHeartbeat.cancel();
+      exitingScheduledExecutorService.shutdown();
+    }
+
+    try {
+      unlock(hmsLockId);
+    } finally {
+      releaseJvmLock();
+    }
+  }
+
+  private long acquireLock() throws LockException {
+    LockInfo lockInfo = tryLock();
+
+    final long start = System.currentTimeMillis();
+    long duration = 0;
+    boolean timeout = false;
+    TException error = null;
+
+    try {
+      if (lockInfo.lockState.equals(LockState.WAITING)) {
+        // Retry count is the typical "upper bound of retries" for Tasks.run() 
function. In fact,
+        // the maximum number of
+        // attempts the Tasks.run() would try is `retries + 1`. Here, for 
checking locks, we use
+        // timeout as the
+        // upper bound of retries. So it is just reasonable to set a large 
retry count. However, if
+        // we set
+        // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow 
into
+        // Integer.MIN_VALUE. Hence,
+        // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it 
doesn't hit any
+        // boundary issues.
+        Tasks.foreach(lockInfo.lockId)
+            .retry(Integer.MAX_VALUE - 100)
+            .exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime, 
lockAcquireTimeout, 1.5)
+            .throwFailureWhenFinished()
+            .onlyRetryOn(WaitingForLockException.class)
+            .run(
+                id -> {
+                  try {
+                    LockResponse response = metaClients.run(client -> 
client.checkLock(id));
+                    LockState newState = response.getState();
+                    lockInfo.lockState = newState;
+                    if (newState.equals(LockState.WAITING)) {
+                      throw new WaitingForLockException(
+                          String.format(
+                              "Waiting for lock on table %s.%s", databaseName, 
tableName));
+                    }
+                  } catch (InterruptedException e) {
+                    Thread.interrupted(); // Clear the interrupt status flag
+                    LOG.warn(
+                        "Interrupted while waiting for lock on table {}.{}",
+                        databaseName,
+                        tableName,
+                        e);
+                  }
+                },
+                TException.class);
+      }
+    } catch (WaitingForLockException e) {
+      timeout = true;
+      duration = System.currentTimeMillis() - start;
+    } catch (TException e) {
+      error = e;
+    } finally {
+      if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
+        unlock(Optional.of(lockInfo.lockId));
+      }
+    }
+
+    if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
+      // timeout and do not have lock acquired
+      if (timeout) {
+        throw new LockException(
+            "Timed out after %s ms waiting for lock on %s.%s", duration, 
databaseName, tableName);
+      }
+
+      // On thrift error and do not have lock acquired
+      if (error != null) {
+        throw new LockException(
+            error, "Metastore operation failed for %s.%s", databaseName, 
tableName);
+      }
+
+      // Just for safety. We should not get here.
+      throw new LockException(
+          "Could not acquire the lock on %s.%s, lock request ended in state 
%s",
+          databaseName, tableName, lockInfo.lockState);
+    } else {
+      return lockInfo.lockId;
+    }
+  }
+
+  /**
+   * Creates a lock, retrying if possible on failure.
+   *
+   * @return The {@link LockInfo} object for the successfully created lock
+   * @throws LockException When we are not able to fill the hostname for lock 
creation, or there is
+   *     an error during lock creation
+   */
+  @SuppressWarnings("ReverseDnsLookup")
+  private LockInfo tryLock() throws LockException {

Review Comment:
   Can we just rename this 'createLock' to go with the terminology we are 
choosing elsewhere?  Took me a little bit to remember what this method does 
reading the other part of code.



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java:
##########
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetastoreLock implements HiveLock {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetastoreLock.class);
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = 
"iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = 
"iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = 
"iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_CREATION_TIMEOUT_MS =
+      "iceberg.hive.lock-creation-timeout-ms";
+  private static final String HIVE_LOCK_CREATION_MIN_WAIT_MS =
+      "iceberg.hive.lock-creation-min-wait-ms";
+  private static final String HIVE_LOCK_CREATION_MAX_WAIT_MS =
+      "iceberg.hive.lock-creation-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 
1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 
milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 
5 seconds
+  private static final long HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT = 3 * 60 * 
1000; // 3 minutes
+  private static final long HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT = 50; // 50 
milliseconds
+  private static final long HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT = 5 * 1000; 
// 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 
1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = 
TimeUnit.MINUTES.toMillis(10);
+  private static volatile Cache<String, ReentrantLock> commitLockCache;
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final String databaseName;
+  private final String tableName;
+  private final String fullName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockCreationTimeout;
+  private final long lockCreationMinWaitTime;
+  private final long lockCreationMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+  private final String agentInfo;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private ReentrantLock jvmLock = null;
+  private HiveLockHeartbeat hiveLockHeartbeat = null;
+
+  public MetastoreLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, 
HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, 
HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, 
HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockCreationTimeout =
+        conf.getLong(HIVE_LOCK_CREATION_TIMEOUT_MS, 
HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT);
+    this.lockCreationMinWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MIN_WAIT_MS, 
HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT);
+    this.lockCreationMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MAX_WAIT_MS, 
HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, 
HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, 
HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+
+    this.agentInfo = "Iceberg-" + UUID.randomUUID();
+
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-" + fullName + 
"-%d")
+                .build());
+
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  @Override
+  public void lock() throws LockException {
+    // getting a process-level lock per table to avoid concurrent commit 
attempts to the same table
+    // from the same JVM process, which would result in unnecessary HMS lock 
acquisition requests
+    acquireJvmLock();
+
+    // Getting HMS lock
+    hmsLockId = Optional.of(acquireLock());
+
+    // Starting heartbeat for the HMS lock
+    hiveLockHeartbeat =
+        new HiveLockHeartbeat(metaClients, hmsLockId.get(), 
lockHeartbeatIntervalTime);
+    hiveLockHeartbeat.schedule(exitingScheduledExecutorService);
+  }
+
+  @Override
+  public void ensureActive() throws LockException {
+    if (hiveLockHeartbeat != null && hiveLockHeartbeat.encounteredException != 
null) {

Review Comment:
   I think this would be cleaner (via adding appropriate methods in 
HiveLockHeartBeat inner class as suggested in other comment).  What do you 
think?)
   
   ```
   if (heartbeat == null)
      throw new LockException("Lock is not active");
   
   if  (!heartBeat.active()) {
      throw new LockException("Hive lock heartbeat thread not active");
   }
   
   if (heartBeat.exceptionThrown != null)
      throw new LockException("Failed to heartbeat");
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to