vinothchandar commented on code in PR #18350:
URL: https://github.com/apache/hudi/pull/18350#discussion_r3041295438


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable, 
AutoCloseable {
   protected final LockManager lockManager;
   @Getter
   protected final boolean isLockRequired;
+  private final transient TimeGenerator timeGenerator;
+  private volatile long lockHolderId; // lock holder ID
+  private int permits;                // allows for nested transaction
   protected Option<HoodieInstant> changeActionInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
-    this(new LockManager(config, storage), config.isLockRequired());
+    this(config, new LockManager(config, storage));
   }
 
-  protected TransactionManager(LockManager lockManager, boolean 
isLockRequired) {
+  protected TransactionManager(HoodieWriteConfig writeConfig, LockManager 
lockManager) {
+    this(lockManager, writeConfig.isLockRequired(), 
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+  }
+
+  public TransactionManager(LockManager lockManager, boolean isLockRequired, 
TimeGenerator timeGenerator) {
     this.lockManager = lockManager;
     this.isLockRequired = isLockRequired;
+    this.timeGenerator = timeGenerator;
+    this.lockHolderId = -1;
+    this.permits = 0;
+  }
+
+  /**
+   * Caution: the invoker needs to ensure that API called within a lock 
context.
+   */
+  public String generateInstantTime() {
+    if (lockHolderId < 0 && isLockRequired) {
+      throw new HoodieLockException("Cannot create instant without acquiring a 
lock first.");
+    }
+    return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+  }
+
+  /**
+   * Generates an instant time and executes an action that requires that 
instant time within a lock.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Function<String, T> 
instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(Option.empty(), Option.empty(), 
instantTimeConsumingAction);

Review Comment:
   please write unit tests for this. This is the core new API used by 13+ 
callers. `TestTransactionManager` has no tests for it. 
   
   Suggest adding at minimum:
   1. Lock acquired and released around the action
   2. Instant time generated when `providedInstantTime` is empty vs used when 
present
   3. Lock released when the action throws an exception
   4. Nested/reentrant `executeStateChangeWithInstant` calls



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable, 
AutoCloseable {
   protected final LockManager lockManager;
   @Getter
   protected final boolean isLockRequired;
+  private final transient TimeGenerator timeGenerator;
+  private volatile long lockHolderId; // lock holder ID
+  private int permits;                // allows for nested transaction

Review Comment:
   this should be a boolean? `boolean allowRetrancy` ? Idk wht nested 
transaction means in this context. lets name this in standard terms?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable, 
AutoCloseable {
   protected final LockManager lockManager;
   @Getter
   protected final boolean isLockRequired;
+  private final transient TimeGenerator timeGenerator;
+  private volatile long lockHolderId; // lock holder ID
+  private int permits;                // allows for nested transaction
   protected Option<HoodieInstant> changeActionInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
-    this(new LockManager(config, storage), config.isLockRequired());
+    this(config, new LockManager(config, storage));
   }
 
-  protected TransactionManager(LockManager lockManager, boolean 
isLockRequired) {
+  protected TransactionManager(HoodieWriteConfig writeConfig, LockManager 
lockManager) {
+    this(lockManager, writeConfig.isLockRequired(), 
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+  }
+
+  public TransactionManager(LockManager lockManager, boolean isLockRequired, 
TimeGenerator timeGenerator) {
     this.lockManager = lockManager;
     this.isLockRequired = isLockRequired;
+    this.timeGenerator = timeGenerator;
+    this.lockHolderId = -1;
+    this.permits = 0;
+  }
+
+  /**
+   * Caution: the invoker needs to ensure that API called within a lock 
context.
+   */
+  public String generateInstantTime() {
+    if (lockHolderId < 0 && isLockRequired) {
+      throw new HoodieLockException("Cannot create instant without acquiring a 
lock first.");
+    }
+    return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+  }
+
+  /**
+   * Generates an instant time and executes an action that requires that 
instant time within a lock.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Function<String, T> 
instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(Option.empty(), Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(providedInstantTime, Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param lastCompletedActionInstant optional input representing the last 
completed instant, used for logging purposes.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant, 
Function<String, T> instantTimeConsumingAction) {
+    if (isLockRequired()) {
+      acquireLock();
+    }
+    String requestedInstant = providedInstantTime.orElseGet(() -> 
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+    try {
+      if (lastCompletedActionInstant.isEmpty()) {
+        LOG.info("State change starting for {}", changeActionInstant);
+      } else {
+        LOG.info("State change starting for {} with latest completed action 
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+      }
+      return instantTimeConsumingAction.apply(requestedInstant);
+    } finally {
+      if (isLockRequired()) {
+        releaseLock();
+        LOG.info("State change ended for {}", requestedInstant);
+      }
+    }
+  }
+
+  public void beginStateChange() {
+    beginStateChange(Option.empty(), Option.empty());
   }
 
   public void beginStateChange(Option<HoodieInstant> changeActionInstant,
                                Option<HoodieInstant> 
lastCompletedActionInstant) {
     if (isLockRequired) {
       LOG.info("State change starting for {} with latest completed action 
instant {}",
           changeActionInstant, lastCompletedActionInstant);
-      lockManager.lock();
+      acquireLock();
       reset(this.changeActionInstant, changeActionInstant, 
lastCompletedActionInstant);
       LOG.info("State change started for {} with latest completed action 
instant {}",
           changeActionInstant, lastCompletedActionInstant);
     }
   }
 
+  public void endStateChange() {
+    endStateChange(Option.empty());
+  }
+
   public void endStateChange(Option<HoodieInstant> changeActionInstant) {
     if (isLockRequired) {
       LOG.info("State change ending for action instant {}", 
changeActionInstant);
       if (reset(changeActionInstant, Option.empty(), Option.empty())) {
-        lockManager.unlock();
+        releaseLock();
         LOG.info("State change ended for action instant {}", 
changeActionInstant);
       }
     }
   }
 
+  /**
+   * Caution: the {@code hasLock} flag can not be used to skip the `#lock` 
eagerly if the thread switches,
+   * the corner case below can cause deadlock:
+   *
+   * <pre>
+   *   threadA => acquireLock(), {@code hasLock} setup as true and got the 
lock acquired;
+   *   threadB => acquireLock(), check the {@code hasLock} as true, returns 
early;
+   *   threadB => releaseLock(), set up the {@code hasLock} as false;
+   *   threadA => releaseLock(), detect the {@code hasLock} as false and 
returns early.
+   *
+   *   The lock held by threadA will never be released.
+   * </pre>
+   */
+  private void acquireLock() {
+    if (lockHolderId > 0 && isLockHeldByCurrentThread()) {
+      LOG.info("{}: Lock already acquired, skipping lock acquisition.", this);
+      permits++;
+      return;
+    }
+    lockManager.lock();
+    permits++;
+    this.lockHolderId = Thread.currentThread().getId();

Review Comment:
   do we know the ids won't be reused?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java:
##########
@@ -78,6 +82,11 @@ public static HoodieFlinkTable<?> createTable(
    */
   public static HoodieFlinkTable<?> createTable(Configuration conf) {
     HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, true, false);
-    return createTableInternal(writeConfig, HoodieFlinkEngineContext.DEFAULT);
+    return createTableInternal(writeConfig, HoodieFlinkEngineContext.DEFAULT, 
Option.empty());
+  }
+
+  public static HoodieFlinkTable<?> createTable(Configuration conf, 
TransactionManager txnManager) {
+    HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, true, false);

Review Comment:
   💬 **SUGGESTION: Naming inconsistency across engines**
   
   Spark has `HoodieSparkTable.createForReads()` for the read-only path, while 
Flink/Java keep `create()` passing `Option.empty()`. Consider aligning so the 
intent is clear from the method name.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java:
##########
@@ -104,7 +104,7 @@ public TimelineArchiverV1(HoodieWriteConfig config, 
HoodieTable<T, I, K, O> tabl
     this.table = table;
     this.metaClient = table.getMetaClient();
     this.archiveFilePath = 
ArchivedTimelineV1.getArchiveLogPath(metaClient.getArchivePath());
-    this.txnManager = new TransactionManager(config, 
table.getMetaClient().getStorage());
+    this.txnManager = table.getTxnManager().get();

Review Comment:
   do we expect this to be absent?  My assumption is - this should always be 
set?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java:
##########
@@ -53,7 +56,8 @@ public static HoodieFlinkTable<?> createTable(Configuration 
conf, RuntimeContext
         HadoopFSUtils.getStorageConf(getHadoopConf(conf)),
         new FlinkTaskContextSupplier(runtimeContext));
     HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, true);

Review Comment:
   💬 **SUGGESTION: Track FIXME as follow-up**
   
   `FIXME-vc: we need a txn manager here. for flink clustering/compaction.` — 
Currently safe because callers that need a `TransactionManager` get it from the 
`writeClient`, not the table. But any future code calling 
`table.getTxnManager().get()` on a table created through this path will crash. 
Worth tracking.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable, 
AutoCloseable {
   protected final LockManager lockManager;
   @Getter
   protected final boolean isLockRequired;
+  private final transient TimeGenerator timeGenerator;
+  private volatile long lockHolderId; // lock holder ID
+  private int permits;                // allows for nested transaction
   protected Option<HoodieInstant> changeActionInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
-    this(new LockManager(config, storage), config.isLockRequired());
+    this(config, new LockManager(config, storage));
   }
 
-  protected TransactionManager(LockManager lockManager, boolean 
isLockRequired) {
+  protected TransactionManager(HoodieWriteConfig writeConfig, LockManager 
lockManager) {
+    this(lockManager, writeConfig.isLockRequired(), 
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+  }
+
+  public TransactionManager(LockManager lockManager, boolean isLockRequired, 
TimeGenerator timeGenerator) {
     this.lockManager = lockManager;
     this.isLockRequired = isLockRequired;
+    this.timeGenerator = timeGenerator;
+    this.lockHolderId = -1;
+    this.permits = 0;
+  }
+
+  /**
+   * Caution: the invoker needs to ensure that API called within a lock 
context.
+   */
+  public String generateInstantTime() {
+    if (lockHolderId < 0 && isLockRequired) {
+      throw new HoodieLockException("Cannot create instant without acquiring a 
lock first.");
+    }
+    return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+  }
+
+  /**
+   * Generates an instant time and executes an action that requires that 
instant time within a lock.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Function<String, T> 
instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(Option.empty(), Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(providedInstantTime, Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param lastCompletedActionInstant optional input representing the last 
completed instant, used for logging purposes.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant, 
Function<String, T> instantTimeConsumingAction) {
+    if (isLockRequired()) {
+      acquireLock();
+    }
+    String requestedInstant = providedInstantTime.orElseGet(() -> 
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+    try {
+      if (lastCompletedActionInstant.isEmpty()) {
+        LOG.info("State change starting for {}", changeActionInstant);
+      } else {
+        LOG.info("State change starting for {} with latest completed action 
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+      }
+      return instantTimeConsumingAction.apply(requestedInstant);
+    } finally {
+      if (isLockRequired()) {
+        releaseLock();
+        LOG.info("State change ended for {}", requestedInstant);
+      }
+    }
+  }
+
+  public void beginStateChange() {
+    beginStateChange(Option.empty(), Option.empty());
   }
 
   public void beginStateChange(Option<HoodieInstant> changeActionInstant,
                                Option<HoodieInstant> 
lastCompletedActionInstant) {
     if (isLockRequired) {
       LOG.info("State change starting for {} with latest completed action 
instant {}",
           changeActionInstant, lastCompletedActionInstant);
-      lockManager.lock();
+      acquireLock();
       reset(this.changeActionInstant, changeActionInstant, 
lastCompletedActionInstant);
       LOG.info("State change started for {} with latest completed action 
instant {}",
           changeActionInstant, lastCompletedActionInstant);
     }
   }
 
+  public void endStateChange() {
+    endStateChange(Option.empty());
+  }
+
   public void endStateChange(Option<HoodieInstant> changeActionInstant) {
     if (isLockRequired) {
       LOG.info("State change ending for action instant {}", 
changeActionInstant);
       if (reset(changeActionInstant, Option.empty(), Option.empty())) {
-        lockManager.unlock();
+        releaseLock();
         LOG.info("State change ended for action instant {}", 
changeActionInstant);
       }
     }
   }
 
+  /**
+   * Caution: the {@code hasLock} flag can not be used to skip the `#lock` 
eagerly if the thread switches,
+   * the corner case below can cause deadlock:
+   *
+   * <pre>
+   *   threadA => acquireLock(), {@code hasLock} setup as true and got the 
lock acquired;
+   *   threadB => acquireLock(), check the {@code hasLock} as true, returns 
early;
+   *   threadB => releaseLock(), set up the {@code hasLock} as false;
+   *   threadA => releaseLock(), detect the {@code hasLock} as false and 
returns early.
+   *
+   *   The lock held by threadA will never be released.
+   * </pre>
+   */
+  private void acquireLock() {
+    if (lockHolderId > 0 && isLockHeldByCurrentThread()) {
+      LOG.info("{}: Lock already acquired, skipping lock acquisition.", this);
+      permits++;
+      return;
+    }
+    lockManager.lock();
+    permits++;
+    this.lockHolderId = Thread.currentThread().getId();
+    LOG.info("{}: Lock acquired for action instant {}", this, 
changeActionInstant);

Review Comment:
   @yihua these seem very raw AI generated?  lets please contextualize these 
for the PR itself.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable, 
AutoCloseable {
   protected final LockManager lockManager;
   @Getter
   protected final boolean isLockRequired;
+  private final transient TimeGenerator timeGenerator;
+  private volatile long lockHolderId; // lock holder ID
+  private int permits;                // allows for nested transaction
   protected Option<HoodieInstant> changeActionInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
-    this(new LockManager(config, storage), config.isLockRequired());
+    this(config, new LockManager(config, storage));
   }
 
-  protected TransactionManager(LockManager lockManager, boolean 
isLockRequired) {
+  protected TransactionManager(HoodieWriteConfig writeConfig, LockManager 
lockManager) {
+    this(lockManager, writeConfig.isLockRequired(), 
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+  }
+
+  public TransactionManager(LockManager lockManager, boolean isLockRequired, 
TimeGenerator timeGenerator) {
     this.lockManager = lockManager;
     this.isLockRequired = isLockRequired;
+    this.timeGenerator = timeGenerator;
+    this.lockHolderId = -1;

Review Comment:
   pull -1 into constant



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java:
##########
@@ -374,7 +364,8 @@ public HoodieInstant 
transitionLogCompactionRequestedToInflight(HoodieInstant re
   }
 

Review Comment:
   💬 **SUGGESTION: Remove FIXME comment before merge**
   
   `// FIXME-vc: check all these API changes..` — should be resolved or removed 
before merging.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -62,7 +63,7 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
 

Review Comment:
   ⚠️ **IMPORTANT: Unchecked `.get()` on `Option<TransactionManager>`**
   
   `table.getTxnManager().get()` will throw `NoSuchElementException` if a table 
was created via the read path (`Option.empty()` for txnManager). Same pattern 
in `RunIndexActionExecutor`, `BaseRollbackActionExecutor`, 
`BaseRestoreActionExecutor`, `CopyOnWriteRestoreActionExecutor`, 
`MergeOnReadRestoreActionExecutor`, and 
`CompactionUtil.rollbackEarliestCompaction`.
   
   The old code was self-contained (`new TransactionManager(config, 
table.getStorage())`) and always safe. 
   
   This pattern is diff places. Consider a `getOrThrow()` with a clear error 
message, or validation at table creation time. We need to ensure consistent 
behavior -- either we need assume it ll be set at table creation time and throw 
error there or any get() needs to explicitly throw using `getOrThrow()` instead 
of an opaque error



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java:
##########
@@ -42,7 +42,7 @@ public class DirectMarkerTransactionManager extends 
TransactionManager {
   private final String filePath;
 
   public DirectMarkerTransactionManager(HoodieWriteConfig config, 
HoodieStorage storage, String partitionPath, String fileId) {
-    super(new LockManager(config, storage, createUpdatedLockProps(config, 
partitionPath, fileId)), config.isLockRequired());

Review Comment:
   lock is always required now, right? its either in-process or external 
distributed lock



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java:
##########
@@ -104,7 +104,7 @@ public TimelineArchiverV1(HoodieWriteConfig config, 
HoodieTable<T, I, K, O> tabl
     this.table = table;
     this.metaClient = table.getMetaClient();
     this.archiveFilePath = 
ArchivedTimelineV1.getArchiveLogPath(metaClient.getArchivePath());
-    this.txnManager = new TransactionManager(config, 
table.getMetaClient().getStorage());
+    this.txnManager = table.getTxnManager().get();

Review Comment:
   if so, those are bugs, and should fail and not be masked? @danny0405 please 
comment



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable, 
AutoCloseable {
   protected final LockManager lockManager;
   @Getter
   protected final boolean isLockRequired;
+  private final transient TimeGenerator timeGenerator;
+  private volatile long lockHolderId; // lock holder ID
+  private int permits;                // allows for nested transaction
   protected Option<HoodieInstant> changeActionInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
-    this(new LockManager(config, storage), config.isLockRequired());
+    this(config, new LockManager(config, storage));
   }
 
-  protected TransactionManager(LockManager lockManager, boolean 
isLockRequired) {
+  protected TransactionManager(HoodieWriteConfig writeConfig, LockManager 
lockManager) {
+    this(lockManager, writeConfig.isLockRequired(), 
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+  }
+
+  public TransactionManager(LockManager lockManager, boolean isLockRequired, 
TimeGenerator timeGenerator) {
     this.lockManager = lockManager;
     this.isLockRequired = isLockRequired;
+    this.timeGenerator = timeGenerator;
+    this.lockHolderId = -1;
+    this.permits = 0;
+  }
+
+  /**
+   * Caution: the invoker needs to ensure that API called within a lock 
context.
+   */
+  public String generateInstantTime() {
+    if (lockHolderId < 0 && isLockRequired) {
+      throw new HoodieLockException("Cannot create instant without acquiring a 
lock first.");
+    }
+    return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+  }
+
+  /**
+   * Generates an instant time and executes an action that requires that 
instant time within a lock.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Function<String, T> 
instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(Option.empty(), Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(providedInstantTime, Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param lastCompletedActionInstant optional input representing the last 
completed instant, used for logging purposes.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant, 
Function<String, T> instantTimeConsumingAction) {
+    if (isLockRequired()) {
+      acquireLock();
+    }
+    String requestedInstant = providedInstantTime.orElseGet(() -> 
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+    try {
+      if (lastCompletedActionInstant.isEmpty()) {
+        LOG.info("State change starting for {}", changeActionInstant);
+      } else {
+        LOG.info("State change starting for {} with latest completed action 
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+      }
+      return instantTimeConsumingAction.apply(requestedInstant);
+    } finally {
+      if (isLockRequired()) {
+        releaseLock();
+        LOG.info("State change ended for {}", requestedInstant);
+      }
+    }
+  }
+
+  public void beginStateChange() {
+    beginStateChange(Option.empty(), Option.empty());
   }
 
   public void beginStateChange(Option<HoodieInstant> changeActionInstant,
                                Option<HoodieInstant> 
lastCompletedActionInstant) {
     if (isLockRequired) {
       LOG.info("State change starting for {} with latest completed action 
instant {}",
           changeActionInstant, lastCompletedActionInstant);
-      lockManager.lock();
+      acquireLock();
       reset(this.changeActionInstant, changeActionInstant, 
lastCompletedActionInstant);
       LOG.info("State change started for {} with latest completed action 
instant {}",
           changeActionInstant, lastCompletedActionInstant);
     }
   }
 
+  public void endStateChange() {
+    endStateChange(Option.empty());
+  }
+
   public void endStateChange(Option<HoodieInstant> changeActionInstant) {
     if (isLockRequired) {
       LOG.info("State change ending for action instant {}", 
changeActionInstant);
       if (reset(changeActionInstant, Option.empty(), Option.empty())) {
-        lockManager.unlock();
+        releaseLock();
         LOG.info("State change ended for action instant {}", 
changeActionInstant);
       }
     }
   }
 
+  /**
+   * Caution: the {@code hasLock} flag can not be used to skip the `#lock` 
eagerly if the thread switches,
+   * the corner case below can cause deadlock:
+   *
+   * <pre>
+   *   threadA => acquireLock(), {@code hasLock} setup as true and got the 
lock acquired;
+   *   threadB => acquireLock(), check the {@code hasLock} as true, returns 
early;
+   *   threadB => releaseLock(), set up the {@code hasLock} as false;
+   *   threadA => releaseLock(), detect the {@code hasLock} as false and 
returns early.
+   *
+   *   The lock held by threadA will never be released.
+   * </pre>
+   */
+  private void acquireLock() {
+    if (lockHolderId > 0 && isLockHeldByCurrentThread()) {
+      LOG.info("{}: Lock already acquired, skipping lock acquisition.", this);
+      permits++;
+      return;
+    }
+    lockManager.lock();
+    permits++;

Review Comment:
   What makes the `permits++` immune to races? - `beginStateChange()` is not 
synchronized
   
   are you trying to hand implement sth like a semaphore? This reentrancy part 
is where I got blocked last time as well. You are attaching a lock 
   
   Can we think through this from both angles.. 
   1) There could be table services running in same jvm 
   2) there could be writers in another jvm or process.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToFiveUpgradeHandler.java:
##########
@@ -65,6 +66,7 @@ public UpgradeDowngrade.TableConfigChangeSet 
upgrade(HoodieWriteConfig config,
         throw new HoodieException(String.format("Old deprecated \"%s\" 
partition found in hudi table. This needs a migration step before we can 
upgrade ",
             DEPRECATED_DEFAULT_PARTITION_PATH));
       }
+      table.getTxnManager().ifPresent(obj -> ((TransactionManager) 
obj).close());

Review Comment:
   Close the txnManager in a finally block? 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable, 
AutoCloseable {
   protected final LockManager lockManager;
   @Getter
   protected final boolean isLockRequired;
+  private final transient TimeGenerator timeGenerator;
+  private volatile long lockHolderId; // lock holder ID
+  private int permits;                // allows for nested transaction
   protected Option<HoodieInstant> changeActionInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
-    this(new LockManager(config, storage), config.isLockRequired());
+    this(config, new LockManager(config, storage));
   }
 
-  protected TransactionManager(LockManager lockManager, boolean 
isLockRequired) {
+  protected TransactionManager(HoodieWriteConfig writeConfig, LockManager 
lockManager) {
+    this(lockManager, writeConfig.isLockRequired(), 
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+  }
+
+  public TransactionManager(LockManager lockManager, boolean isLockRequired, 
TimeGenerator timeGenerator) {
     this.lockManager = lockManager;
     this.isLockRequired = isLockRequired;
+    this.timeGenerator = timeGenerator;
+    this.lockHolderId = -1;
+    this.permits = 0;
+  }
+
+  /**
+   * Caution: the invoker needs to ensure that API called within a lock 
context.
+   */
+  public String generateInstantTime() {
+    if (lockHolderId < 0 && isLockRequired) {
+      throw new HoodieLockException("Cannot create instant without acquiring a 
lock first.");
+    }
+    return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+  }
+
+  /**
+   * Generates an instant time and executes an action that requires that 
instant time within a lock.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Function<String, T> 
instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(Option.empty(), Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(providedInstantTime, Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param lastCompletedActionInstant optional input representing the last 
completed instant, used for logging purposes.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant, 
Function<String, T> instantTimeConsumingAction) {
+    if (isLockRequired()) {
+      acquireLock();
+    }
+    String requestedInstant = providedInstantTime.orElseGet(() -> 
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));

Review Comment:
   🚨 **BLOCKING: Lock leak if instant time generation throws**
   
   What happens If `acquireLock()` succeeds at L112 but the `orElseGet` at L114 
throws (e.g., `SkewAdjustingTimeGenerator.generateTime()` wraps an 
`InterruptedException` as `HoodieException` during `Thread.sleep`), the lock is 
never released?  



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable, 
AutoCloseable {
   protected final LockManager lockManager;
   @Getter
   protected final boolean isLockRequired;
+  private final transient TimeGenerator timeGenerator;
+  private volatile long lockHolderId; // lock holder ID
+  private int permits;                // allows for nested transaction
   protected Option<HoodieInstant> changeActionInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
-    this(new LockManager(config, storage), config.isLockRequired());
+    this(config, new LockManager(config, storage));
   }
 
-  protected TransactionManager(LockManager lockManager, boolean 
isLockRequired) {
+  protected TransactionManager(HoodieWriteConfig writeConfig, LockManager 
lockManager) {
+    this(lockManager, writeConfig.isLockRequired(), 
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+  }
+
+  public TransactionManager(LockManager lockManager, boolean isLockRequired, 
TimeGenerator timeGenerator) {
     this.lockManager = lockManager;
     this.isLockRequired = isLockRequired;
+    this.timeGenerator = timeGenerator;
+    this.lockHolderId = -1;
+    this.permits = 0;
+  }
+
+  /**
+   * Caution: the invoker needs to ensure that API called within a lock 
context.
+   */
+  public String generateInstantTime() {
+    if (lockHolderId < 0 && isLockRequired) {
+      throw new HoodieLockException("Cannot create instant without acquiring a 
lock first.");
+    }
+    return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+  }
+
+  /**
+   * Generates an instant time and executes an action that requires that 
instant time within a lock.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Function<String, T> 
instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(Option.empty(), Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+    return executeStateChangeWithInstant(providedInstantTime, Option.empty(), 
instantTimeConsumingAction);
+  }
+
+  /**
+   * Uses the provided instant if present or else generates an instant time 
and executes an action that requires that instant time within a lock.
+   * @param providedInstantTime an optional instant time provided by the 
caller. If not provided, a new instant time will be generated.
+   * @param lastCompletedActionInstant optional input representing the last 
completed instant, used for logging purposes.
+   * @param instantTimeConsumingAction a function that takes the generated 
instant time and performs some action
+   * @return the result of the action
+   * @param <T> type of the result
+   */
+  public <T> T executeStateChangeWithInstant(Option<String> 
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant, 
Function<String, T> instantTimeConsumingAction) {
+    if (isLockRequired()) {
+      acquireLock();
+    }
+    String requestedInstant = providedInstantTime.orElseGet(() -> 
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+    try {
+      if (lastCompletedActionInstant.isEmpty()) {
+        LOG.info("State change starting for {}", changeActionInstant);
+      } else {
+        LOG.info("State change starting for {} with latest completed action 
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+      }
+      return instantTimeConsumingAction.apply(requestedInstant);
+    } finally {
+      if (isLockRequired()) {
+        releaseLock();
+        LOG.info("State change ended for {}", requestedInstant);
+      }
+    }
+  }
+
+  public void beginStateChange() {
+    beginStateChange(Option.empty(), Option.empty());
   }
 
   public void beginStateChange(Option<HoodieInstant> changeActionInstant,
                                Option<HoodieInstant> 
lastCompletedActionInstant) {
     if (isLockRequired) {
       LOG.info("State change starting for {} with latest completed action 
instant {}",
           changeActionInstant, lastCompletedActionInstant);
-      lockManager.lock();
+      acquireLock();
       reset(this.changeActionInstant, changeActionInstant, 
lastCompletedActionInstant);
       LOG.info("State change started for {} with latest completed action 
instant {}",
           changeActionInstant, lastCompletedActionInstant);
     }
   }
 
+  public void endStateChange() {
+    endStateChange(Option.empty());
+  }
+
   public void endStateChange(Option<HoodieInstant> changeActionInstant) {
     if (isLockRequired) {
       LOG.info("State change ending for action instant {}", 
changeActionInstant);
       if (reset(changeActionInstant, Option.empty(), Option.empty())) {
-        lockManager.unlock();
+        releaseLock();
         LOG.info("State change ended for action instant {}", 
changeActionInstant);
       }
     }
   }
 
+  /**
+   * Caution: the {@code hasLock} flag can not be used to skip the `#lock` 
eagerly if the thread switches,
+   * the corner case below can cause deadlock:
+   *
+   * <pre>
+   *   threadA => acquireLock(), {@code hasLock} setup as true and got the 
lock acquired;
+   *   threadB => acquireLock(), check the {@code hasLock} as true, returns 
early;
+   *   threadB => releaseLock(), set up the {@code hasLock} as false;
+   *   threadA => releaseLock(), detect the {@code hasLock} as false and 
returns early.
+   *
+   *   The lock held by threadA will never be released.
+   * </pre>
+   */
+  private void acquireLock() {
+    if (lockHolderId > 0 && isLockHeldByCurrentThread()) {
+      LOG.info("{}: Lock already acquired, skipping lock acquisition.", this);
+      permits++;
+      return;
+    }
+    lockManager.lock();
+    permits++;
+    this.lockHolderId = Thread.currentThread().getId();
+    LOG.info("{}: Lock acquired for action instant {}", this, 
changeActionInstant);
+  }
+
+  /**
+   * Caution: the `hasLock` flag can not be used to skip the `#unlock` eagerly 
if the thread switches.
+   * see the corner case of {@link #acquireLock()}.
+   */
+  private void releaseLock() {
+    if (isLockHeldByCurrentThread()) {
+      --permits;
+      if (permits == 0) {
+        lockManager.unlock();
+        this.lockHolderId = -1;
+        LOG.info("{}: Lock released for action instant {}", this, 
changeActionInstant);
+      }

Review Comment:
   +1



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable, 
AutoCloseable {
   protected final LockManager lockManager;
   @Getter
   protected final boolean isLockRequired;
+  private final transient TimeGenerator timeGenerator;
+  private volatile long lockHolderId; // lock holder ID
+  private int permits;                // allows for nested transaction
   protected Option<HoodieInstant> changeActionInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
-    this(new LockManager(config, storage), config.isLockRequired());
+    this(config, new LockManager(config, storage));
   }
 
-  protected TransactionManager(LockManager lockManager, boolean 
isLockRequired) {
+  protected TransactionManager(HoodieWriteConfig writeConfig, LockManager 
lockManager) {
+    this(lockManager, writeConfig.isLockRequired(), 
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+  }
+
+  public TransactionManager(LockManager lockManager, boolean isLockRequired, 
TimeGenerator timeGenerator) {
     this.lockManager = lockManager;
     this.isLockRequired = isLockRequired;
+    this.timeGenerator = timeGenerator;
+    this.lockHolderId = -1;
+    this.permits = 0;
+  }
+
+  /**
+   * Caution: the invoker needs to ensure that API called within a lock 
context.
+   */
+  public String generateInstantTime() {
+    if (lockHolderId < 0 && isLockRequired) {
+      throw new HoodieLockException("Cannot create instant without acquiring a 
lock first.");
+    }

Review Comment:
   🚨 **BLOCKING:  Is this check right?
   
   `lockHolderId < 0` checks whether *any* thread holds the lock, not whether 
the *calling* thread holds it. If thread A holds the lock (`lockHolderId = A's 
id > 0`) and thread B calls `generateInstantTime()`, the check `lockHolderId < 
0` is false, so B is allowed to generate an instant time despite not holding 
the lock.
   
   **Suggestion:**
   ```java
   if (isLockRequired() && !isLockHeldByCurrentThread()) {
     throw new HoodieLockException("Cannot create instant without acquiring a 
lock first.");
   }
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java:
##########
@@ -266,7 +266,7 @@ protected List<HoodieRollbackStat> 
executeRollback(HoodieInstant instantToRollba
   }
 
   protected void finishRollback(HoodieInstant inflightInstant, 
HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException {
-    boolean enableLocking = (!skipLocking && !skipTimelinePublish);
+    boolean enableLocking = (!skipLocking || !skipTimelinePublish);

Review Comment:
   ⚠️ **IMPORTANT: What's the behavior change due to this? **
   
   Changed from `(!skipLocking && !skipTimelinePublish)` to `(!skipLocking || 
!skipTimelinePublish)`. This means locking now happens when *either* flag is 
false, not only when *both* are false. This is likely a correctness fix (needed 
because `generateInstantTime()` at L290 requires the lock)
   
   Please clarify how its still safe



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java:
##########
@@ -53,7 +56,8 @@ public static HoodieFlinkTable<?> createTable(Configuration 
conf, RuntimeContext
         HadoopFSUtils.getStorageConf(getHadoopConf(conf)),
         new FlinkTaskContextSupplier(runtimeContext));
     HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, true);

Review Comment:
   Coment on this?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java:
##########
@@ -374,7 +364,8 @@ public HoodieInstant 
transitionLogCompactionRequestedToInflight(HoodieInstant re
   }
 

Review Comment:
   anything I marked `FIXME-vc` were my notes to myself to resolve before 
landing



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -226,11 +227,11 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, 
O> table, HoodieInstan
       );
       this.txnManager.beginStateChange(Option.of(inflightInstant), 
Option.empty());
       writeTableMetadata(metadata, inflightInstant.requestedTime());
-      table.getActiveTimeline().transitionCleanInflightToComplete(
-          false,
-          inflightInstant,
-          Option.of(metadata),
-          completedInstant -> 
table.getMetaClient().getTableFormat().clean(metadata, completedInstant, 
table.getContext(), table.getMetaClient(), table.getViewManager()));
+      TableFormatCompletionAction formatCompletionAction = completedInstant -> 
table.getMetaClient().getTableFormat()
+          .clean(metadata, completedInstant, table.getContext(), 
table.getMetaClient(), table.getViewManager());
+      HoodieInstant completedInstant = 
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant, 
Option.of(metadata), txnManager.generateInstantTime());
+      // FIXME-vc: this is an one off..
+      formatCompletionAction.execute(completedInstant);
       log.info("Marked clean started on {} as complete", 
inflightInstant.requestedTime());

Review Comment:
   lets think through this more? and find a better solution. This was supposed 
to be fixed in the same PR, once I got tests to pass mostly



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1747,22 +1745,25 @@ public void update(HoodieRestoreMetadata 
restoreMetadata, String instantTime) {
       // We cannot create a deltaCommit at instantTime now because a future 
(rollback) block has already been written to the logFiles.
       // We need to choose a timestamp which would be a validInstantTime for 
MDT. This is either a commit timestamp completed on the dataset
       // or a new timestamp which we use for MDT clean, compaction etc.
-      String syncCommitTime = createRestoreInstantTime();
-      processAndCommit(syncCommitTime, () -> {
-        // For Files partition.
-        Map<String, HoodieData<HoodieRecord>> partitionRecords = new 
HashMap<>();
-        
partitionRecords.putAll(HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
-            partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete, 
syncCommitTime));
-        // For ColumnStats partition if enabled.
-        if 
(dataMetaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()))
 {
-          partitionRecords.putAll(convertToColumnStatsRecord(
-              partitionFilesToAdd, partitionFilesToDelete, engineContext, 
dataMetaClient,
-              dataWriteConfig.getMetadataConfig(), 
Option.of(dataWriteConfig.getRecordMerger().getRecordType()),
-              
dataWriteConfig.getMetadataConfig().getColumnStatsIndexParallelism()));
-        }
-        return partitionRecords;
+      
writeClient.getTransactionManager().executeStateChangeWithInstant(metadataTableCommit
 -> {
+        String syncCommitTime = createRestoreTimestamp(metadataTableCommit);
+        processAndCommit(syncCommitTime, () -> {
+          // For Files partition.
+          Map<String, HoodieData<HoodieRecord>> partitionRecords = new 
HashMap<>();
+          
partitionRecords.putAll(HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
+              partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete, 
syncCommitTime));
+          // For ColumnStats partition if enabled.
+          if 
(dataMetaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()))
 {
+            partitionRecords.putAll(convertToColumnStatsRecord(
+                partitionFilesToAdd, partitionFilesToDelete, engineContext, 
dataMetaClient,
+                dataWriteConfig.getMetadataConfig(), 
Option.of(dataWriteConfig.getRecordMerger().getRecordType()),
+                
dataWriteConfig.getMetadataConfig().getColumnStatsIndexParallelism()));
+          }
+          return partitionRecords;
+        });
+        // empty result
+        return Option.empty();
       });
-      closeInternal();
     } catch (IOException e) {
       throw new HoodieMetadataException("IOException during MDT restore sync", 
e);
     }

Review Comment:
   Does not calling `closeInternal()` not leak anything? Please clarify



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java:
##########
@@ -64,6 +66,7 @@ public UpgradeDowngrade.TableConfigChangeSet upgrade(
       // for every pending commit, delete old markers and re-create markers in 
new format
       recreateMarkers(commit, table, context, 
config.getMarkersDeleteParallelism());
     }
+    table.getTxnManager().ifPresent(obj -> ((TransactionManager) obj).close());

Review Comment:
   💅 **NIT:** `((TransactionManager) obj).close()` — the cast is redundant 
since `Option<TransactionManager>` is already typed. Could be 
`TransactionManager::close`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to