yihua commented on code in PR #18350:
URL: https://github.com/apache/hudi/pull/18350#discussion_r3037243002


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -62,7 +63,7 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
 
   public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig 
config, HoodieTable<T, I, K, O> table, String instantTime) {
     super(context, config, table, instantTime);
-    this.txnManager = new TransactionManager(config, table.getStorage());
+    this.txnManager = table.getTxnManager().get();

Review Comment:
   🤖 Calling `.get()` on `table.getTxnManager()` is unsafe and will throw a 
`NoSuchElementException` if the table was instantiated without a 
`TransactionManager`. If we expect callers to always provide it, we should 
explicitly validate and throw a clear exception. Otherwise, we should fall back 
to creating one locally like the previous code.



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java:
##########
@@ -245,14 +231,19 @@ public void testTransactionsWithInstantTime() {
     
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
 
     // 6. Transactions with no owners should also go through
-    transactionManager.beginStateChange(Option.empty(), Option.empty());
+    transactionManager.beginStateChange();
     
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
     
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
-    transactionManager.endStateChange(Option.empty());
+    transactionManager.endStateChange();
     
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
     
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
   }
 
+  @Test
+  public void testGenerateInstantTimeFailsWithoutLock() {
+    assertThrows(HoodieLockException.class, () -> 
transactionManager.generateInstantTime());

Review Comment:
   🤖 Could you also add a test case where Thread A acquires the lock and Thread 
B concurrently calls `generateInstantTime()`? Currently, 
`TransactionManager#generateInstantTime` checks `if (lockHolderId < 0)`, which 
means if Thread A holds the lock (`lockHolderId > 0`), Thread B will 
incorrectly bypass the check. The implementation should probably use `if 
(isLockRequired && !isLockHeldByCurrentThread())`.



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java:
##########
@@ -59,9 +59,8 @@ public void preCompact(
         ? instantGenerator.getCompactionInflightInstant(instantTime)
         : instantGenerator.getLogCompactionInflightInstant(instantTime);
     if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      try (TransactionManager transactionManager = new 
TransactionManager(table.getConfig(), table.getStorage())) {
-        table.rollbackInflightCompaction(inflightInstant, transactionManager);
-      }
+      TransactionManager transactionManager = (TransactionManager) 
table.getTxnManager().get();

Review Comment:
   🤖 Could this throw a `NoSuchElementException` if the table was instantiated 
via `HoodieFlinkTable.create(config, context)`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable, 
AutoCloseable {
   protected final LockManager lockManager;
   @Getter
   protected final boolean isLockRequired;
+  private final transient TimeGenerator timeGenerator;
+  private volatile long lockHolderId; // lock holder ID
+  private int permits;                // allows for nested transaction
   protected Option<HoodieInstant> changeActionInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
-    this(new LockManager(config, storage), config.isLockRequired());
+    this(config, new LockManager(config, storage));
   }
 
-  protected TransactionManager(LockManager lockManager, boolean 
isLockRequired) {
+  protected TransactionManager(HoodieWriteConfig writeConfig, LockManager 
lockManager) {
+    this(lockManager, writeConfig.isLockRequired(), 
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+  }
+
+  public TransactionManager(LockManager lockManager, boolean isLockRequired, 
TimeGenerator timeGenerator) {
     this.lockManager = lockManager;
     this.isLockRequired = isLockRequired;
+    this.timeGenerator = timeGenerator;
+    this.lockHolderId = -1;
+    this.permits = 0;
+  }
+
+  /**
+   * Caution: the invoker needs to ensure that API called within a lock 
context.
+   */
+  public String generateInstantTime() {
+    if (lockHolderId < 0 && isLockRequired) {
+      throw new HoodieLockException("Cannot create instant without acquiring a 
lock first.");
+    }
+    return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+  }
+
+  /**
+   * Generates an instant time and executes an action that requires that 
instant time within a lock.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Function<String, T> 
instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(Option.empty(), Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(providedInstantTime, Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param lastCompletedActionInstant optional input representing the last 
completed instant, used for logging purposes.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant, 
Function<String, T> instantTimeConsumingAction) {
+    if (isLockRequired()) {
+      acquireLock();
+    }
+    String requestedInstant = providedInstantTime.orElseGet(() -> 
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+    try {
+      if (lastCompletedActionInstant.isEmpty()) {
+        LOG.info("State change starting for {}", changeActionInstant);
+      } else {
+        LOG.info("State change starting for {} with latest completed action 
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+      }
+      return instantTimeConsumingAction.apply(requestedInstant);
+    } finally {
+      if (isLockRequired()) {
+        releaseLock();
+        LOG.info("State change ended for {}", requestedInstant);
+      }
+    }
+  }
+
+  public void beginStateChange() {
+    beginStateChange(Option.empty(), Option.empty());
   }
 
   public void beginStateChange(Option<HoodieInstant> changeActionInstant,
                                Option<HoodieInstant> 
lastCompletedActionInstant) {
     if (isLockRequired) {
       LOG.info("State change starting for {} with latest completed action 
instant {}",
           changeActionInstant, lastCompletedActionInstant);
-      lockManager.lock();
+      acquireLock();
       reset(this.changeActionInstant, changeActionInstant, 
lastCompletedActionInstant);
       LOG.info("State change started for {} with latest completed action 
instant {}",
           changeActionInstant, lastCompletedActionInstant);
     }
   }
 
+  public void endStateChange() {
+    endStateChange(Option.empty());
+  }
+
   public void endStateChange(Option<HoodieInstant> changeActionInstant) {
     if (isLockRequired) {
       LOG.info("State change ending for action instant {}", 
changeActionInstant);
       if (reset(changeActionInstant, Option.empty(), Option.empty())) {
-        lockManager.unlock();
+        releaseLock();
         LOG.info("State change ended for action instant {}", 
changeActionInstant);
       }
     }
   }
 
+  /**
+   * Caution: the {@code hasLock} flag can not be used to skip the `#lock` 
eagerly if the thread switches,
+   * the corner case below can cause deadlock:
+   *
+   * <pre>
+   *   threadA => acquireLock(), {@code hasLock} setup as true and got the 
lock acquired;
+   *   threadB => acquireLock(), check the {@code hasLock} as true, returns 
early;
+   *   threadB => releaseLock(), set up the {@code hasLock} as false;
+   *   threadA => releaseLock(), detect the {@code hasLock} as false and 
returns early.
+   *
+   *   The lock held by threadA will never be released.
+   * </pre>
+   */
+  private void acquireLock() {
+    if (lockHolderId > 0 && isLockHeldByCurrentThread()) {
+      LOG.info("{}: Lock already acquired, skipping lock acquisition.", this);
+      permits++;
+      return;
+    }
+    lockManager.lock();
+    permits++;
+    this.lockHolderId = Thread.currentThread().getId();
+    LOG.info("{}: Lock acquired for action instant {}", this, 
changeActionInstant);
+  }
+
+  /**
+   * Caution: the `hasLock` flag can not be used to skip the `#unlock` eagerly 
if the thread switches.
+   * see the corner case of {@link #acquireLock()}.
+   */
+  private void releaseLock() {
+    if (isLockHeldByCurrentThread()) {
+      --permits;
+      if (permits == 0) {
+        lockManager.unlock();
+        this.lockHolderId = -1;
+        LOG.info("{}: Lock released for action instant {}", this, 
changeActionInstant);
+      }

Review Comment:
   🤖 This introduces a race condition where a blocked thread can acquire the 
lock and overwrite `lockHolderId` before this thread resets it to `-1`.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -231,10 +231,10 @@ protected void commit(HoodieWriteMetadata<O> result, 
List<HoodieWriteStat> write
       writeTableMetadata(metadata, actionType);
       // cannot serialize maps with null values
       metadata.getExtraMetadata().entrySet().removeIf(entry -> 
entry.getValue() == null);
-      activeTimeline.saveAsComplete(false,
-          table.getMetaClient().createNewInstant(State.INFLIGHT, actionType, 
instantTime), Option.of(metadata),
+      activeTimeline.saveAsComplete(
+          table.getMetaClient().createNewInstant(State.INFLIGHT, actionType, 
instantTime), Option.of(metadata), txnManagerOption.get().generateInstantTime(),

Review Comment:
   🤖 Unsafe unwrap. If `txnManagerOption` is empty (see previous comment in 
`completeCommit`), this will throw a `NoSuchElementException`.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -184,7 +184,7 @@ protected void 
runPrecommitValidators(HoodieWriteMetadata<O> writeMetadata) {
 
   protected void completeCommit(HoodieWriteMetadata result) {
     if (!this.txnManagerOption.isPresent()) {
-      this.txnManagerOption = Option.of(new TransactionManager(config, 
table.getStorage()));
+      this.txnManagerOption = table.getTxnManager();

Review Comment:
   🤖 If `table.getTxnManager()` is empty, this reassignment acts as a no-op and 
`this.txnManagerOption` will remain empty. We need to handle the absence of a 
transaction manager to prevent a `NoSuchElementException` down the line.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1747,22 +1745,25 @@ public void update(HoodieRestoreMetadata 
restoreMetadata, String instantTime) {
       // We cannot create a deltaCommit at instantTime now because a future 
(rollback) block has already been written to the logFiles.
       // We need to choose a timestamp which would be a validInstantTime for 
MDT. This is either a commit timestamp completed on the dataset
       // or a new timestamp which we use for MDT clean, compaction etc.
-      String syncCommitTime = createRestoreInstantTime();
-      processAndCommit(syncCommitTime, () -> {
-        // For Files partition.
-        Map<String, HoodieData<HoodieRecord>> partitionRecords = new 
HashMap<>();
-        
partitionRecords.putAll(HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
-            partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete, 
syncCommitTime));
-        // For ColumnStats partition if enabled.
-        if 
(dataMetaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()))
 {
-          partitionRecords.putAll(convertToColumnStatsRecord(
-              partitionFilesToAdd, partitionFilesToDelete, engineContext, 
dataMetaClient,
-              dataWriteConfig.getMetadataConfig(), 
Option.of(dataWriteConfig.getRecordMerger().getRecordType()),
-              
dataWriteConfig.getMetadataConfig().getColumnStatsIndexParallelism()));
-        }
-        return partitionRecords;
+      
writeClient.getTransactionManager().executeStateChangeWithInstant(metadataTableCommit
 -> {
+        String syncCommitTime = createRestoreTimestamp(metadataTableCommit);
+        processAndCommit(syncCommitTime, () -> {
+          // For Files partition.
+          Map<String, HoodieData<HoodieRecord>> partitionRecords = new 
HashMap<>();
+          
partitionRecords.putAll(HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
+              partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete, 
syncCommitTime));
+          // For ColumnStats partition if enabled.
+          if 
(dataMetaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()))
 {
+            partitionRecords.putAll(convertToColumnStatsRecord(
+                partitionFilesToAdd, partitionFilesToDelete, engineContext, 
dataMetaClient,
+                dataWriteConfig.getMetadataConfig(), 
Option.of(dataWriteConfig.getRecordMerger().getRecordType()),
+                
dataWriteConfig.getMetadataConfig().getColumnStatsIndexParallelism()));
+          }
+          return partitionRecords;
+        });
+        // empty result
+        return Option.empty();

Review Comment:
   🤖 Removing `closeInternal()` here causes a resource leak of the metadata 
write client.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1044,18 +1040,17 @@ public String 
startDeletePartitionCommit(HoodieTableMetaClient metaClient) {
   @VisibleForTesting
   String startCommit(Option<String> providedInstantTime, String actionType, 
HoodieTableMetaClient metaClient) {
     if (needsUpgrade(metaClient)) {
-      // unclear what instant to use, since upgrade does have a given instant.
-      executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient, 
Option.empty()));
+      txnManager.executeStateChangeWithInstant((ignored) -> {
+        // unclear what instant to use, since upgrade does have a given 
instant.
+        tryUpgrade(metaClient, Option.empty());
+        return null;
+      });
     }
     runPreWriteCleanerPolicy(metaClient);
     CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
         HoodieTimeline.COMMIT_ACTION, () -> 
tableServiceClient.rollbackFailedWrites(metaClient));
 
-    txnManager.beginStateChange(Option.empty(), 
lastCompletedTxnAndMetadata.map(Pair::getLeft));
-    String instantTime;
-    HoodieInstant instant = null;
-    try {
-      instantTime = providedInstantTime.orElseGet(() -> 
createNewInstantTime(false));
+    return txnManager.executeStateChangeWithInstant(providedInstantTime, 
lastCompletedTxnAndMetadata.map(Pair::getLeft), instantTime -> {

Review Comment:
   🤖 We should reload the active timeline (`metaClient.reloadActiveTimeline()`) 
inside the lambda before checking for inflight restores. Otherwise, we might 
check a stale timeline state from before the lock was acquired, potentially 
missing a concurrent restore.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java:
##########
@@ -47,17 +46,12 @@ protected HoodieRollbackMetadata 
rollbackInstant(HoodieInstant instantToRollback
         && 
!ClusteringUtils.isClusteringOrReplaceCommitAction(instantToRollback.getAction()))
 {
       throw new HoodieRollbackException("Unsupported action in rollback 
instant:" + instantToRollback);
     }
-    String newInstantTime;
-    try (TransactionManager transactionManager = new 
TransactionManager(config, table.getStorage())) {
+    TransactionManager transactionManager = table.getTxnManager().get();

Review Comment:
   🤖 Another unsafe `.get()` unwrap here. Like the other executors, if 
`table.getTxnManager()` is empty, this will throw a `NoSuchElementException`.



##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java:
##########
@@ -49,9 +49,8 @@ public void preCompact(
     }
     HoodieInstant inflightInstant = 
table.getInstantGenerator().getCompactionInflightInstant(instantTime);
     if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      try (TransactionManager transactionManager = new 
TransactionManager(table.getConfig(), table.getStorage())) {
-        table.rollbackInflightCompaction(inflightInstant, transactionManager);
-      }
+      TransactionManager transactionManager = (TransactionManager) 
table.getTxnManager().get();

Review Comment:
   🤖 Calling `.get()` on the `Option` here will throw a raw 
`NoSuchElementException` if the table was created without a 
`TransactionManager` (e.g., via the convenience `HoodieJavaTable.create(config, 
context)`).



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable, 
AutoCloseable {
   protected final LockManager lockManager;
   @Getter
   protected final boolean isLockRequired;
+  private final transient TimeGenerator timeGenerator;
+  private volatile long lockHolderId; // lock holder ID
+  private int permits;                // allows for nested transaction
   protected Option<HoodieInstant> changeActionInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
-    this(new LockManager(config, storage), config.isLockRequired());
+    this(config, new LockManager(config, storage));
   }
 
-  protected TransactionManager(LockManager lockManager, boolean 
isLockRequired) {
+  protected TransactionManager(HoodieWriteConfig writeConfig, LockManager 
lockManager) {
+    this(lockManager, writeConfig.isLockRequired(), 
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+  }
+
+  public TransactionManager(LockManager lockManager, boolean isLockRequired, 
TimeGenerator timeGenerator) {
     this.lockManager = lockManager;
     this.isLockRequired = isLockRequired;
+    this.timeGenerator = timeGenerator;
+    this.lockHolderId = -1;
+    this.permits = 0;
+  }
+
+  /**
+   * Caution: the invoker needs to ensure that API called within a lock 
context.
+   */
+  public String generateInstantTime() {
+    if (lockHolderId < 0 && isLockRequired) {
+      throw new HoodieLockException("Cannot create instant without acquiring a 
lock first.");
+    }
+    return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+  }
+
+  /**
+   * Generates an instant time and executes an action that requires that 
instant time within a lock.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Function<String, T> 
instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(Option.empty(), Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(providedInstantTime, Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param lastCompletedActionInstant optional input representing the last 
completed instant, used for logging purposes.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant, 
Function<String, T> instantTimeConsumingAction) {
+    if (isLockRequired()) {
+      acquireLock();
+    }
+    String requestedInstant = providedInstantTime.orElseGet(() -> 
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+    try {
+      if (lastCompletedActionInstant.isEmpty()) {
+        LOG.info("State change starting for {}", changeActionInstant);
+      } else {
+        LOG.info("State change starting for {} with latest completed action 
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+      }
+      return instantTimeConsumingAction.apply(requestedInstant);
+    } finally {
+      if (isLockRequired()) {
+        releaseLock();
+        LOG.info("State change ended for {}", requestedInstant);
+      }
+    }
+  }
+
+  public void beginStateChange() {
+    beginStateChange(Option.empty(), Option.empty());
   }
 
   public void beginStateChange(Option<HoodieInstant> changeActionInstant,
                                Option<HoodieInstant> 
lastCompletedActionInstant) {
     if (isLockRequired) {
       LOG.info("State change starting for {} with latest completed action 
instant {}",
           changeActionInstant, lastCompletedActionInstant);
-      lockManager.lock();
+      acquireLock();
       reset(this.changeActionInstant, changeActionInstant, 
lastCompletedActionInstant);
       LOG.info("State change started for {} with latest completed action 
instant {}",
           changeActionInstant, lastCompletedActionInstant);
     }
   }
 
+  public void endStateChange() {
+    endStateChange(Option.empty());
+  }
+
   public void endStateChange(Option<HoodieInstant> changeActionInstant) {
     if (isLockRequired) {
       LOG.info("State change ending for action instant {}", 
changeActionInstant);
       if (reset(changeActionInstant, Option.empty(), Option.empty())) {
-        lockManager.unlock();
+        releaseLock();
         LOG.info("State change ended for action instant {}", 
changeActionInstant);
       }
     }
   }
 
+  /**
+   * Caution: the {@code hasLock} flag can not be used to skip the `#lock` 
eagerly if the thread switches,
+   * the corner case below can cause deadlock:
+   *
+   * <pre>
+   *   threadA => acquireLock(), {@code hasLock} setup as true and got the 
lock acquired;
+   *   threadB => acquireLock(), check the {@code hasLock} as true, returns 
early;
+   *   threadB => releaseLock(), set up the {@code hasLock} as false;
+   *   threadA => releaseLock(), detect the {@code hasLock} as false and 
returns early.
+   *
+   *   The lock held by threadA will never be released.
+   * </pre>
+   */
+  private void acquireLock() {
+    if (lockHolderId > 0 && isLockHeldByCurrentThread()) {
+      LOG.info("{}: Lock already acquired, skipping lock acquisition.", this);
+      permits++;
+      return;
+    }
+    lockManager.lock();
+    permits++;
+    this.lockHolderId = Thread.currentThread().getId();
+    LOG.info("{}: Lock acquired for action instant {}", this, 
changeActionInstant);

Review Comment:
   🤖 Using `permits++` here is vulnerable to memory visibility issues across 
threads if `lockManager` doesn't enforce JVM happen-before boundaries.



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java:
##########
@@ -282,4 +284,17 @@ protected void rollbackFailedWrites(Map<String, 
Option<HoodiePendingRollbackInfo
       assertEquals(expectedRollbackInfo, instantsToRollback);
     }
   }
+
+  private TransactionManager createTransactionManager(List<String> 
instantTimes) {
+    try {
+      TimeGenerator timeGenerator = mock(TimeGenerator.class);
+      for (String instantTime : instantTimes) {

Review Comment:
   🤖 This loop replaces previous Mockito stubbings, causing `generateTime()` to 
always return the parsed value of the last item in `instantTimes`.



##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java:
##########
@@ -45,7 +47,8 @@ public static JavaUpgradeDowngradeHelper getInstance() {
 
   @Override
   public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext 
context) {
-    return HoodieJavaTable.create(config, context);
+    TransactionManager txnManager = new TransactionManager(config, 
HoodieStorageUtils.getStorage(config.getBasePath(), context.getStorageConf()));

Review Comment:
   🤖 Instantiating `TransactionManager` here without closing it creates a 
resource leak (e.g., Zookeeper connections, lock providers, metrics).



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable, 
AutoCloseable {
   protected final LockManager lockManager;
   @Getter
   protected final boolean isLockRequired;
+  private final transient TimeGenerator timeGenerator;
+  private volatile long lockHolderId; // lock holder ID
+  private int permits;                // allows for nested transaction
   protected Option<HoodieInstant> changeActionInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
-    this(new LockManager(config, storage), config.isLockRequired());
+    this(config, new LockManager(config, storage));
   }
 
-  protected TransactionManager(LockManager lockManager, boolean 
isLockRequired) {
+  protected TransactionManager(HoodieWriteConfig writeConfig, LockManager 
lockManager) {
+    this(lockManager, writeConfig.isLockRequired(), 
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+  }
+
+  public TransactionManager(LockManager lockManager, boolean isLockRequired, 
TimeGenerator timeGenerator) {
     this.lockManager = lockManager;
     this.isLockRequired = isLockRequired;
+    this.timeGenerator = timeGenerator;
+    this.lockHolderId = -1;
+    this.permits = 0;
+  }
+
+  /**
+   * Caution: the invoker needs to ensure that API called within a lock 
context.
+   */
+  public String generateInstantTime() {
+    if (lockHolderId < 0 && isLockRequired) {
+      throw new HoodieLockException("Cannot create instant without acquiring a 
lock first.");
+    }
+    return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+  }
+
+  /**
+   * Generates an instant time and executes an action that requires that 
instant time within a lock.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Function<String, T> 
instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(Option.empty(), Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(providedInstantTime, Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param lastCompletedActionInstant optional input representing the last 
completed instant, used for logging purposes.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant, 
Function<String, T> instantTimeConsumingAction) {
+    if (isLockRequired()) {
+      acquireLock();
+    }
+    String requestedInstant = providedInstantTime.orElseGet(() -> 
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+    try {
+      if (lastCompletedActionInstant.isEmpty()) {
+        LOG.info("State change starting for {}", changeActionInstant);
+      } else {
+        LOG.info("State change starting for {} with latest completed action 
instant {}", changeActionInstant, lastCompletedActionInstant.get());

Review Comment:
   🤖 `changeActionInstant` is never updated in this method, so this logs an 
empty or stale value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to