deniskuzZ commented on code in PR #6570:
URL: https://github.com/apache/iceberg/pull/6570#discussion_r1071409985


##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##########
@@ -809,168 +603,38 @@ private static boolean hiveEngineEnabled(TableMetadata 
metadata, Configuration c
   }
 
   /**
-   * Tries to create a lock. If the lock creation fails, and it is possible 
then retries the lock
-   * creation a few times. If the lock creation is successful then a {@link 
LockInfo} is returned,
-   * otherwise an appropriate exception is thrown.
+   * Returns if the hive locking should be enabled on the table, or not.
    *
-   * @param agentInfo The agentInfo which should be used during lock creation
-   * @return The created lock
-   * @throws UnknownHostException When we are not able to fill the hostname 
for lock creation
-   * @throws TException When there is an error during lock creation
-   */
-  @SuppressWarnings("ReverseDnsLookup")
-  private LockInfo tryLock(String agentInfo) throws UnknownHostException, 
TException {
-    LockInfo lockInfo = new LockInfo();
-
-    final LockComponent lockComponent =
-        new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
-    lockComponent.setTablename(tableName);
-    final LockRequest lockRequest =
-        new LockRequest(
-            Lists.newArrayList(lockComponent),
-            System.getProperty("user.name"),
-            InetAddress.getLocalHost().getHostName());
-
-    // Only works in Hive 2 or later.
-    if (HiveVersion.min(HiveVersion.HIVE_2)) {
-      lockRequest.setAgentInfo(agentInfo);
-    }
-
-    Tasks.foreach(lockRequest)
-        .retry(Integer.MAX_VALUE - 100)
-        .exponentialBackoff(
-            lockCreationMinWaitTime, lockCreationMaxWaitTime, 
lockCreationTimeout, 2.0)
-        .shouldRetryTest(e -> e instanceof TException && 
HiveVersion.min(HiveVersion.HIVE_2))
-        .throwFailureWhenFinished()
-        .run(
-            request -> {
-              try {
-                LockResponse lockResponse = metaClients.run(client -> 
client.lock(request));
-                lockInfo.lockId = lockResponse.getLockid();
-                lockInfo.lockState = lockResponse.getState();
-              } catch (TException te) {
-                LOG.warn("Failed to acquire lock {}", request, te);
-                try {
-                  // If we can not check for lock, or we do not find it, then 
rethrow the exception
-                  // Otherwise we are happy as the findLock sets the lockId 
and the state correctly
-                  if (!HiveVersion.min(HiveVersion.HIVE_2)) {
-                    LockInfo lockFound = findLock(agentInfo);
-                    if (lockFound != null) {
-                      lockInfo.lockId = lockFound.lockId;
-                      lockInfo.lockState = lockFound.lockState;
-                      LOG.info("Found lock {} by agentInfo {}", lockInfo, 
agentInfo);
-                      return;
-                    }
-                  }
-
-                  throw te;
-                } catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-                  LOG.warn(
-                      "Interrupted while checking for lock on table {}.{}", 
database, tableName, e);
-                  throw new RuntimeException("Interrupted while checking for 
lock", e);
-                }
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOG.warn("Interrupted while acquiring lock on table {}.{}", 
database, tableName, e);
-                throw new RuntimeException("Interrupted while acquiring lock", 
e);
-              }
-            },
-            TException.class);
-
-    // This should be initialized always, or exception should be thrown.
-    LOG.debug("Lock {} created for table {}.{}", lockInfo, database, 
tableName);
-    return lockInfo;
-  }
-
-  /**
-   * Search for the locks using HMSClient.showLocks identified by the 
agentInfo. If the lock is
-   * there, then a {@link LockInfo} object is returned. If the lock is not 
found <code>null</code>
-   * is returned.
+   * <p>The decision is made like this:
+   *
+   * <ol>
+   *   <li>Table property value {@link TableProperties#LOCK_HIVE_ENABLED}
+   *   <li>If the table property is not set then check the hive-site.xml 
property value {@link
+   *       ConfigProperties#LOCK_HIVE_ENABLED}
+   *   <li>If none of the above is enabled then use the default value {@link
+   *       TableProperties#LOCK_HIVE_ENABLED_DEFAULT}
+   * </ol>
    *
-   * @param agentInfo The key for searching the locks
-   * @return The {@link LockInfo} for the found lock, or <code>null</code> if 
nothing found
+   * @param metadata Table metadata to use
+   * @param conf The hive configuration to use
+   * @return if the hive engine related values should be enabled or not
    */
-  private LockInfo findLock(String agentInfo) throws TException, 
InterruptedException {
-    Preconditions.checkArgument(
-        HiveVersion.min(HiveVersion.HIVE_2),
-        "Minimally Hive 2 HMS client is needed to find the Lock using the 
showLocks API call");
-    ShowLocksRequest showLocksRequest = new ShowLocksRequest();
-    showLocksRequest.setDbname(database);
-    showLocksRequest.setTablename(tableName);
-    ShowLocksResponse response = metaClients.run(client -> 
client.showLocks(showLocksRequest));
-    for (ShowLocksResponseElement lock : response.getLocks()) {
-      if (lock.getAgentInfo().equals(agentInfo)) {
-        // We found our lock
-        return new LockInfo(lock.getLockid(), lock.getState());
-      }
-    }
-
-    // Not found anything
-    return null;
-  }
-
-  private static class HiveLockHeartbeat implements Runnable {
-    private final ClientPool<IMetaStoreClient, TException> hmsClients;
-    private final long lockId;
-    private final long intervalMs;
-    private ScheduledFuture<?> future;
-    private volatile Exception encounteredException = null;
-
-    HiveLockHeartbeat(
-        ClientPool<IMetaStoreClient, TException> hmsClients, long lockId, long 
intervalMs) {
-      this.hmsClients = hmsClients;
-      this.lockId = lockId;
-      this.intervalMs = intervalMs;
-      this.future = null;
-    }
-
-    @Override
-    public void run() {
-      try {
-        hmsClients.run(
-            client -> {
-              client.heartbeat(0, lockId);
-              return null;
-            });
-      } catch (TException | InterruptedException e) {
-        this.encounteredException = e;
-        throw new CommitFailedException(e, "Failed to heartbeat for lock: %d", 
lockId);
-      }
-    }
-
-    public void schedule(ScheduledExecutorService scheduler) {
-      future =
-          scheduler.scheduleAtFixedRate(this, intervalMs / 2, intervalMs, 
TimeUnit.MILLISECONDS);
+  private static boolean hiveLockEnabled(TableMetadata metadata, Configuration 
conf) {
+    if (metadata.properties().get(TableProperties.LOCK_HIVE_ENABLED) != null) {
+      // We know that the property is set, so default value will not be used,
+      return metadata.propertyAsBoolean(TableProperties.LOCK_HIVE_ENABLED, 
false);
     }
 
-    public void cancel() {
-      if (future != null) {
-        future.cancel(false);
-      }
-    }
+    return conf.getBoolean(
+        ConfigProperties.LOCK_HIVE_ENABLED, 
TableProperties.LOCK_HIVE_ENABLED_DEFAULT);
   }
 
-  private static class LockInfo {
-    private long lockId;
-    private LockState lockState;
-
-    private LockInfo() {
-      this.lockId = -1;
-      this.lockState = null;
-    }
-
-    private LockInfo(long lockId, LockState lockState) {
-      this.lockId = lockId;
-      this.lockState = lockState;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("lockId", lockId)
-          .add("lockState", lockState)
-          .toString();
+  @VisibleForTesting
+  HiveLock lockObject(TableMetadata metadata) {
+    if (hiveLockEnabled(metadata, conf)) {
+      return new MetastoreLock(conf, metaClients, catalogName, database, 
tableName);

Review Comment:
   prev locking strategy migrated to MetastoreLock, right?



##########
docs/configuration.md:
##########
@@ -175,8 +175,13 @@ The HMS table locking is a 2-step process:
 | iceberg.hive.lock-heartbeat-interval-ms   | 240000 (4 min)  | The heartbeat 
interval for the HMS locks.                                    |
 | iceberg.hive.metadata-refresh-max-retries | 2               | Maximum number 
of retries when the metadata file is missing                  |
 | iceberg.hive.table-level-lock-evict-ms    | 600000 (10 min) | The timeout 
for the JVM table lock is                                        |
+| iceberg.lock.hive.enabled                 | true            | If enabled HMS 
locks will be used to ensure of the atomicity of the commits  | 
 
 Note: `iceberg.hive.lock-check-max-wait-ms` and 
`iceberg.hive.lock-heartbeat-interval-ms` should be less than the [transaction 
timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout)
 
 of the Hive Metastore (`hive.txn.timeout` or `metastore.txn.timeout` in the 
newer versions). Otherwise, the heartbeats on the lock (which happens during 
the lock checks) would end up expiring in the 
 Hive Metastore before the lock is retried from Iceberg.
 
+Note: `iceberg.lock.hive.enabled` should only be set to `false` if 
[HIVE-26882](https://issues.apache.org/jira/browse/HIVE-26882)

Review Comment:
   Could it cause some issues in the case of HA and not consistent HMS configs 
across the instances? 



##########
core/src/main/java/org/apache/iceberg/TableProperties.java:
##########
@@ -303,6 +303,9 @@ private TableProperties() {}
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String LOCK_HIVE_ENABLED = "lock.hive.enabled";

Review Comment:
   minor, why not `hive.lock.enabled`?



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##########
@@ -809,168 +603,38 @@ private static boolean hiveEngineEnabled(TableMetadata 
metadata, Configuration c
   }
 
   /**
-   * Tries to create a lock. If the lock creation fails, and it is possible 
then retries the lock
-   * creation a few times. If the lock creation is successful then a {@link 
LockInfo} is returned,
-   * otherwise an appropriate exception is thrown.
+   * Returns if the hive locking should be enabled on the table, or not.
    *
-   * @param agentInfo The agentInfo which should be used during lock creation
-   * @return The created lock
-   * @throws UnknownHostException When we are not able to fill the hostname 
for lock creation
-   * @throws TException When there is an error during lock creation
-   */
-  @SuppressWarnings("ReverseDnsLookup")
-  private LockInfo tryLock(String agentInfo) throws UnknownHostException, 
TException {
-    LockInfo lockInfo = new LockInfo();
-
-    final LockComponent lockComponent =
-        new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
-    lockComponent.setTablename(tableName);
-    final LockRequest lockRequest =
-        new LockRequest(
-            Lists.newArrayList(lockComponent),
-            System.getProperty("user.name"),
-            InetAddress.getLocalHost().getHostName());
-
-    // Only works in Hive 2 or later.
-    if (HiveVersion.min(HiveVersion.HIVE_2)) {
-      lockRequest.setAgentInfo(agentInfo);
-    }
-
-    Tasks.foreach(lockRequest)
-        .retry(Integer.MAX_VALUE - 100)
-        .exponentialBackoff(
-            lockCreationMinWaitTime, lockCreationMaxWaitTime, 
lockCreationTimeout, 2.0)
-        .shouldRetryTest(e -> e instanceof TException && 
HiveVersion.min(HiveVersion.HIVE_2))
-        .throwFailureWhenFinished()
-        .run(
-            request -> {
-              try {
-                LockResponse lockResponse = metaClients.run(client -> 
client.lock(request));
-                lockInfo.lockId = lockResponse.getLockid();
-                lockInfo.lockState = lockResponse.getState();
-              } catch (TException te) {
-                LOG.warn("Failed to acquire lock {}", request, te);
-                try {
-                  // If we can not check for lock, or we do not find it, then 
rethrow the exception
-                  // Otherwise we are happy as the findLock sets the lockId 
and the state correctly
-                  if (!HiveVersion.min(HiveVersion.HIVE_2)) {
-                    LockInfo lockFound = findLock(agentInfo);
-                    if (lockFound != null) {
-                      lockInfo.lockId = lockFound.lockId;
-                      lockInfo.lockState = lockFound.lockState;
-                      LOG.info("Found lock {} by agentInfo {}", lockInfo, 
agentInfo);
-                      return;
-                    }
-                  }
-
-                  throw te;
-                } catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-                  LOG.warn(
-                      "Interrupted while checking for lock on table {}.{}", 
database, tableName, e);
-                  throw new RuntimeException("Interrupted while checking for 
lock", e);
-                }
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOG.warn("Interrupted while acquiring lock on table {}.{}", 
database, tableName, e);
-                throw new RuntimeException("Interrupted while acquiring lock", 
e);
-              }
-            },
-            TException.class);
-
-    // This should be initialized always, or exception should be thrown.
-    LOG.debug("Lock {} created for table {}.{}", lockInfo, database, 
tableName);
-    return lockInfo;
-  }
-
-  /**
-   * Search for the locks using HMSClient.showLocks identified by the 
agentInfo. If the lock is
-   * there, then a {@link LockInfo} object is returned. If the lock is not 
found <code>null</code>
-   * is returned.
+   * <p>The decision is made like this:
+   *
+   * <ol>
+   *   <li>Table property value {@link TableProperties#LOCK_HIVE_ENABLED}
+   *   <li>If the table property is not set then check the hive-site.xml 
property value {@link
+   *       ConfigProperties#LOCK_HIVE_ENABLED}
+   *   <li>If none of the above is enabled then use the default value {@link
+   *       TableProperties#LOCK_HIVE_ENABLED_DEFAULT}
+   * </ol>
    *
-   * @param agentInfo The key for searching the locks
-   * @return The {@link LockInfo} for the found lock, or <code>null</code> if 
nothing found
+   * @param metadata Table metadata to use
+   * @param conf The hive configuration to use
+   * @return if the hive engine related values should be enabled or not
    */
-  private LockInfo findLock(String agentInfo) throws TException, 
InterruptedException {
-    Preconditions.checkArgument(
-        HiveVersion.min(HiveVersion.HIVE_2),
-        "Minimally Hive 2 HMS client is needed to find the Lock using the 
showLocks API call");
-    ShowLocksRequest showLocksRequest = new ShowLocksRequest();
-    showLocksRequest.setDbname(database);
-    showLocksRequest.setTablename(tableName);
-    ShowLocksResponse response = metaClients.run(client -> 
client.showLocks(showLocksRequest));
-    for (ShowLocksResponseElement lock : response.getLocks()) {
-      if (lock.getAgentInfo().equals(agentInfo)) {
-        // We found our lock
-        return new LockInfo(lock.getLockid(), lock.getState());
-      }
-    }
-
-    // Not found anything
-    return null;
-  }
-
-  private static class HiveLockHeartbeat implements Runnable {
-    private final ClientPool<IMetaStoreClient, TException> hmsClients;
-    private final long lockId;
-    private final long intervalMs;
-    private ScheduledFuture<?> future;
-    private volatile Exception encounteredException = null;
-
-    HiveLockHeartbeat(
-        ClientPool<IMetaStoreClient, TException> hmsClients, long lockId, long 
intervalMs) {
-      this.hmsClients = hmsClients;
-      this.lockId = lockId;
-      this.intervalMs = intervalMs;
-      this.future = null;
-    }
-
-    @Override
-    public void run() {
-      try {
-        hmsClients.run(
-            client -> {
-              client.heartbeat(0, lockId);
-              return null;
-            });
-      } catch (TException | InterruptedException e) {
-        this.encounteredException = e;
-        throw new CommitFailedException(e, "Failed to heartbeat for lock: %d", 
lockId);
-      }
-    }
-
-    public void schedule(ScheduledExecutorService scheduler) {
-      future =
-          scheduler.scheduleAtFixedRate(this, intervalMs / 2, intervalMs, 
TimeUnit.MILLISECONDS);
+  private static boolean hiveLockEnabled(TableMetadata metadata, Configuration 
conf) {
+    if (metadata.properties().get(TableProperties.LOCK_HIVE_ENABLED) != null) {
+      // We know that the property is set, so default value will not be used,
+      return metadata.propertyAsBoolean(TableProperties.LOCK_HIVE_ENABLED, 
false);
     }
 
-    public void cancel() {
-      if (future != null) {
-        future.cancel(false);
-      }
-    }
+    return conf.getBoolean(
+        ConfigProperties.LOCK_HIVE_ENABLED, 
TableProperties.LOCK_HIVE_ENABLED_DEFAULT);
   }
 
-  private static class LockInfo {
-    private long lockId;
-    private LockState lockState;
-
-    private LockInfo() {
-      this.lockId = -1;
-      this.lockState = null;
-    }
-
-    private LockInfo(long lockId, LockState lockState) {
-      this.lockId = lockId;
-      this.lockState = lockState;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("lockId", lockId)
-          .add("lockState", lockState)
-          .toString();
+  @VisibleForTesting
+  HiveLock lockObject(TableMetadata metadata) {

Review Comment:
   Could we delegate to LockManager to decide what locks need to be created? I 
think it would be more convenient to work in the future. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to