yihua commented on code in PR #18350:
URL: https://github.com/apache/hudi/pull/18350#discussion_r3037243002
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -62,7 +63,7 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
super(context, config, table, instantTime);
- this.txnManager = new TransactionManager(config, table.getStorage());
+ this.txnManager = table.getTxnManager().get();
Review Comment:
🤖 Calling `.get()` on `table.getTxnManager()` is unsafe and will throw a
`NoSuchElementException` if the table was instantiated without a
`TransactionManager`. If we expect callers to always provide it, we should
explicitly validate and throw a clear exception. Otherwise, we should fall back
to creating one locally like the previous code.
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java:
##########
@@ -245,14 +231,19 @@ public void testTransactionsWithInstantTime() {
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
// 6. Transactions with no owners should also go through
- transactionManager.beginStateChange(Option.empty(), Option.empty());
+ transactionManager.beginStateChange();
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
- transactionManager.endStateChange(Option.empty());
+ transactionManager.endStateChange();
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
}
+ @Test
+ public void testGenerateInstantTimeFailsWithoutLock() {
+ assertThrows(HoodieLockException.class, () ->
transactionManager.generateInstantTime());
Review Comment:
🤖 Could you also add a test case where Thread A acquires the lock and Thread
B concurrently calls `generateInstantTime()`? Currently,
`TransactionManager#generateInstantTime` checks `if (lockHolderId < 0)`, which
means if Thread A holds the lock (`lockHolderId > 0`), Thread B will
incorrectly bypass the check. The implementation should probably use `if
(isLockRequired && !isLockHeldByCurrentThread())`.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java:
##########
@@ -59,9 +59,8 @@ public void preCompact(
? instantGenerator.getCompactionInflightInstant(instantTime)
: instantGenerator.getLogCompactionInflightInstant(instantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- try (TransactionManager transactionManager = new
TransactionManager(table.getConfig(), table.getStorage())) {
- table.rollbackInflightCompaction(inflightInstant, transactionManager);
- }
+ TransactionManager transactionManager = (TransactionManager)
table.getTxnManager().get();
Review Comment:
🤖 Could this throw a `NoSuchElementException` if the table was instantiated
via `HoodieFlinkTable.create(config, context)`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
protected Option<HoodieInstant> changeActionInstant = Option.empty();
private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, storage), config.isLockRequired());
+ this(config, new LockManager(config, storage));
}
- protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
+ protected TransactionManager(HoodieWriteConfig writeConfig, LockManager
lockManager) {
+ this(lockManager, writeConfig.isLockRequired(),
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+ }
+
+ public TransactionManager(LockManager lockManager, boolean isLockRequired,
TimeGenerator timeGenerator) {
this.lockManager = lockManager;
this.isLockRequired = isLockRequired;
+ this.timeGenerator = timeGenerator;
+ this.lockHolderId = -1;
+ this.permits = 0;
+ }
+
+ /**
+ * Caution: the invoker needs to ensure that API called within a lock
context.
+ */
+ public String generateInstantTime() {
+ if (lockHolderId < 0 && isLockRequired) {
+ throw new HoodieLockException("Cannot create instant without acquiring a
lock first.");
+ }
+ return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+ }
+
+ /**
+ * Generates an instant time and executes an action that requires that
instant time within a lock.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Function<String, T>
instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(Option.empty(), Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(providedInstantTime, Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param lastCompletedActionInstant optional input representing the last
completed instant, used for logging purposes.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant,
Function<String, T> instantTimeConsumingAction) {
+ if (isLockRequired()) {
+ acquireLock();
+ }
+ String requestedInstant = providedInstantTime.orElseGet(() ->
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+ try {
+ if (lastCompletedActionInstant.isEmpty()) {
+ LOG.info("State change starting for {}", changeActionInstant);
+ } else {
+ LOG.info("State change starting for {} with latest completed action
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+ }
+ return instantTimeConsumingAction.apply(requestedInstant);
+ } finally {
+ if (isLockRequired()) {
+ releaseLock();
+ LOG.info("State change ended for {}", requestedInstant);
+ }
+ }
+ }
+
+ public void beginStateChange() {
+ beginStateChange(Option.empty(), Option.empty());
}
public void beginStateChange(Option<HoodieInstant> changeActionInstant,
Option<HoodieInstant>
lastCompletedActionInstant) {
if (isLockRequired) {
LOG.info("State change starting for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
- lockManager.lock();
+ acquireLock();
reset(this.changeActionInstant, changeActionInstant,
lastCompletedActionInstant);
LOG.info("State change started for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
}
}
+ public void endStateChange() {
+ endStateChange(Option.empty());
+ }
+
public void endStateChange(Option<HoodieInstant> changeActionInstant) {
if (isLockRequired) {
LOG.info("State change ending for action instant {}",
changeActionInstant);
if (reset(changeActionInstant, Option.empty(), Option.empty())) {
- lockManager.unlock();
+ releaseLock();
LOG.info("State change ended for action instant {}",
changeActionInstant);
}
}
}
+ /**
+ * Caution: the {@code hasLock} flag can not be used to skip the `#lock`
eagerly if the thread switches,
+ * the corner case below can cause deadlock:
+ *
+ * <pre>
+ * threadA => acquireLock(), {@code hasLock} setup as true and got the
lock acquired;
+ * threadB => acquireLock(), check the {@code hasLock} as true, returns
early;
+ * threadB => releaseLock(), set up the {@code hasLock} as false;
+ * threadA => releaseLock(), detect the {@code hasLock} as false and
returns early.
+ *
+ * The lock held by threadA will never be released.
+ * </pre>
+ */
+ private void acquireLock() {
+ if (lockHolderId > 0 && isLockHeldByCurrentThread()) {
+ LOG.info("{}: Lock already acquired, skipping lock acquisition.", this);
+ permits++;
+ return;
+ }
+ lockManager.lock();
+ permits++;
+ this.lockHolderId = Thread.currentThread().getId();
+ LOG.info("{}: Lock acquired for action instant {}", this,
changeActionInstant);
+ }
+
+ /**
+ * Caution: the `hasLock` flag can not be used to skip the `#unlock` eagerly
if the thread switches.
+ * see the corner case of {@link #acquireLock()}.
+ */
+ private void releaseLock() {
+ if (isLockHeldByCurrentThread()) {
+ --permits;
+ if (permits == 0) {
+ lockManager.unlock();
+ this.lockHolderId = -1;
+ LOG.info("{}: Lock released for action instant {}", this,
changeActionInstant);
+ }
Review Comment:
🤖 This introduces a race condition where a blocked thread can acquire the
lock and overwrite `lockHolderId` before this thread resets it to `-1`.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -231,10 +231,10 @@ protected void commit(HoodieWriteMetadata<O> result,
List<HoodieWriteStat> write
writeTableMetadata(metadata, actionType);
// cannot serialize maps with null values
metadata.getExtraMetadata().entrySet().removeIf(entry ->
entry.getValue() == null);
- activeTimeline.saveAsComplete(false,
- table.getMetaClient().createNewInstant(State.INFLIGHT, actionType,
instantTime), Option.of(metadata),
+ activeTimeline.saveAsComplete(
+ table.getMetaClient().createNewInstant(State.INFLIGHT, actionType,
instantTime), Option.of(metadata), txnManagerOption.get().generateInstantTime(),
Review Comment:
🤖 Unsafe unwrap. If `txnManagerOption` is empty (see previous comment in
`completeCommit`), this will throw a `NoSuchElementException`.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -184,7 +184,7 @@ protected void
runPrecommitValidators(HoodieWriteMetadata<O> writeMetadata) {
protected void completeCommit(HoodieWriteMetadata result) {
if (!this.txnManagerOption.isPresent()) {
- this.txnManagerOption = Option.of(new TransactionManager(config,
table.getStorage()));
+ this.txnManagerOption = table.getTxnManager();
Review Comment:
🤖 If `table.getTxnManager()` is empty, this reassignment acts as a no-op and
`this.txnManagerOption` will remain empty. We need to handle the absence of a
transaction manager to prevent a `NoSuchElementException` down the line.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1747,22 +1745,25 @@ public void update(HoodieRestoreMetadata
restoreMetadata, String instantTime) {
// We cannot create a deltaCommit at instantTime now because a future
(rollback) block has already been written to the logFiles.
// We need to choose a timestamp which would be a validInstantTime for
MDT. This is either a commit timestamp completed on the dataset
// or a new timestamp which we use for MDT clean, compaction etc.
- String syncCommitTime = createRestoreInstantTime();
- processAndCommit(syncCommitTime, () -> {
- // For Files partition.
- Map<String, HoodieData<HoodieRecord>> partitionRecords = new
HashMap<>();
-
partitionRecords.putAll(HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
- partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete,
syncCommitTime));
- // For ColumnStats partition if enabled.
- if
(dataMetaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()))
{
- partitionRecords.putAll(convertToColumnStatsRecord(
- partitionFilesToAdd, partitionFilesToDelete, engineContext,
dataMetaClient,
- dataWriteConfig.getMetadataConfig(),
Option.of(dataWriteConfig.getRecordMerger().getRecordType()),
-
dataWriteConfig.getMetadataConfig().getColumnStatsIndexParallelism()));
- }
- return partitionRecords;
+
writeClient.getTransactionManager().executeStateChangeWithInstant(metadataTableCommit
-> {
+ String syncCommitTime = createRestoreTimestamp(metadataTableCommit);
+ processAndCommit(syncCommitTime, () -> {
+ // For Files partition.
+ Map<String, HoodieData<HoodieRecord>> partitionRecords = new
HashMap<>();
+
partitionRecords.putAll(HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
+ partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete,
syncCommitTime));
+ // For ColumnStats partition if enabled.
+ if
(dataMetaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()))
{
+ partitionRecords.putAll(convertToColumnStatsRecord(
+ partitionFilesToAdd, partitionFilesToDelete, engineContext,
dataMetaClient,
+ dataWriteConfig.getMetadataConfig(),
Option.of(dataWriteConfig.getRecordMerger().getRecordType()),
+
dataWriteConfig.getMetadataConfig().getColumnStatsIndexParallelism()));
+ }
+ return partitionRecords;
+ });
+ // empty result
+ return Option.empty();
Review Comment:
🤖 Removing `closeInternal()` here causes a resource leak of the metadata
write client.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1044,18 +1040,17 @@ public String
startDeletePartitionCommit(HoodieTableMetaClient metaClient) {
@VisibleForTesting
String startCommit(Option<String> providedInstantTime, String actionType,
HoodieTableMetaClient metaClient) {
if (needsUpgrade(metaClient)) {
- // unclear what instant to use, since upgrade does have a given instant.
- executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient,
Option.empty()));
+ txnManager.executeStateChangeWithInstant((ignored) -> {
+ // unclear what instant to use, since upgrade does have a given
instant.
+ tryUpgrade(metaClient, Option.empty());
+ return null;
+ });
}
runPreWriteCleanerPolicy(metaClient);
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.COMMIT_ACTION, () ->
tableServiceClient.rollbackFailedWrites(metaClient));
- txnManager.beginStateChange(Option.empty(),
lastCompletedTxnAndMetadata.map(Pair::getLeft));
- String instantTime;
- HoodieInstant instant = null;
- try {
- instantTime = providedInstantTime.orElseGet(() ->
createNewInstantTime(false));
+ return txnManager.executeStateChangeWithInstant(providedInstantTime,
lastCompletedTxnAndMetadata.map(Pair::getLeft), instantTime -> {
Review Comment:
🤖 We should reload the active timeline (`metaClient.reloadActiveTimeline()`)
inside the lambda before checking for inflight restores. Otherwise, we might
check a stale timeline state from before the lock was acquired, potentially
missing a concurrent restore.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java:
##########
@@ -47,17 +46,12 @@ protected HoodieRollbackMetadata
rollbackInstant(HoodieInstant instantToRollback
&&
!ClusteringUtils.isClusteringOrReplaceCommitAction(instantToRollback.getAction()))
{
throw new HoodieRollbackException("Unsupported action in rollback
instant:" + instantToRollback);
}
- String newInstantTime;
- try (TransactionManager transactionManager = new
TransactionManager(config, table.getStorage())) {
+ TransactionManager transactionManager = table.getTxnManager().get();
Review Comment:
🤖 Another unsafe `.get()` unwrap here. Like the other executors, if
`table.getTxnManager()` is empty, this will throw a `NoSuchElementException`.
##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java:
##########
@@ -49,9 +49,8 @@ public void preCompact(
}
HoodieInstant inflightInstant =
table.getInstantGenerator().getCompactionInflightInstant(instantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- try (TransactionManager transactionManager = new
TransactionManager(table.getConfig(), table.getStorage())) {
- table.rollbackInflightCompaction(inflightInstant, transactionManager);
- }
+ TransactionManager transactionManager = (TransactionManager)
table.getTxnManager().get();
Review Comment:
🤖 Calling `.get()` on the `Option` here will throw a raw
`NoSuchElementException` if the table was created without a
`TransactionManager` (e.g., via the convenience `HoodieJavaTable.create(config,
context)`).
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
protected Option<HoodieInstant> changeActionInstant = Option.empty();
private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, storage), config.isLockRequired());
+ this(config, new LockManager(config, storage));
}
- protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
+ protected TransactionManager(HoodieWriteConfig writeConfig, LockManager
lockManager) {
+ this(lockManager, writeConfig.isLockRequired(),
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+ }
+
+ public TransactionManager(LockManager lockManager, boolean isLockRequired,
TimeGenerator timeGenerator) {
this.lockManager = lockManager;
this.isLockRequired = isLockRequired;
+ this.timeGenerator = timeGenerator;
+ this.lockHolderId = -1;
+ this.permits = 0;
+ }
+
+ /**
+ * Caution: the invoker needs to ensure that API called within a lock
context.
+ */
+ public String generateInstantTime() {
+ if (lockHolderId < 0 && isLockRequired) {
+ throw new HoodieLockException("Cannot create instant without acquiring a
lock first.");
+ }
+ return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+ }
+
+ /**
+ * Generates an instant time and executes an action that requires that
instant time within a lock.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Function<String, T>
instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(Option.empty(), Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(providedInstantTime, Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param lastCompletedActionInstant optional input representing the last
completed instant, used for logging purposes.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant,
Function<String, T> instantTimeConsumingAction) {
+ if (isLockRequired()) {
+ acquireLock();
+ }
+ String requestedInstant = providedInstantTime.orElseGet(() ->
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+ try {
+ if (lastCompletedActionInstant.isEmpty()) {
+ LOG.info("State change starting for {}", changeActionInstant);
+ } else {
+ LOG.info("State change starting for {} with latest completed action
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+ }
+ return instantTimeConsumingAction.apply(requestedInstant);
+ } finally {
+ if (isLockRequired()) {
+ releaseLock();
+ LOG.info("State change ended for {}", requestedInstant);
+ }
+ }
+ }
+
+ public void beginStateChange() {
+ beginStateChange(Option.empty(), Option.empty());
}
public void beginStateChange(Option<HoodieInstant> changeActionInstant,
Option<HoodieInstant>
lastCompletedActionInstant) {
if (isLockRequired) {
LOG.info("State change starting for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
- lockManager.lock();
+ acquireLock();
reset(this.changeActionInstant, changeActionInstant,
lastCompletedActionInstant);
LOG.info("State change started for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
}
}
+ public void endStateChange() {
+ endStateChange(Option.empty());
+ }
+
public void endStateChange(Option<HoodieInstant> changeActionInstant) {
if (isLockRequired) {
LOG.info("State change ending for action instant {}",
changeActionInstant);
if (reset(changeActionInstant, Option.empty(), Option.empty())) {
- lockManager.unlock();
+ releaseLock();
LOG.info("State change ended for action instant {}",
changeActionInstant);
}
}
}
+ /**
+ * Caution: the {@code hasLock} flag can not be used to skip the `#lock`
eagerly if the thread switches,
+ * the corner case below can cause deadlock:
+ *
+ * <pre>
+ * threadA => acquireLock(), {@code hasLock} setup as true and got the
lock acquired;
+ * threadB => acquireLock(), check the {@code hasLock} as true, returns
early;
+ * threadB => releaseLock(), set up the {@code hasLock} as false;
+ * threadA => releaseLock(), detect the {@code hasLock} as false and
returns early.
+ *
+ * The lock held by threadA will never be released.
+ * </pre>
+ */
+ private void acquireLock() {
+ if (lockHolderId > 0 && isLockHeldByCurrentThread()) {
+ LOG.info("{}: Lock already acquired, skipping lock acquisition.", this);
+ permits++;
+ return;
+ }
+ lockManager.lock();
+ permits++;
+ this.lockHolderId = Thread.currentThread().getId();
+ LOG.info("{}: Lock acquired for action instant {}", this,
changeActionInstant);
Review Comment:
🤖 Using `permits++` here is vulnerable to memory visibility issues across
threads if `lockManager` doesn't enforce JVM happen-before boundaries.
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java:
##########
@@ -282,4 +284,17 @@ protected void rollbackFailedWrites(Map<String,
Option<HoodiePendingRollbackInfo
assertEquals(expectedRollbackInfo, instantsToRollback);
}
}
+
+ private TransactionManager createTransactionManager(List<String>
instantTimes) {
+ try {
+ TimeGenerator timeGenerator = mock(TimeGenerator.class);
+ for (String instantTime : instantTimes) {
Review Comment:
🤖 This loop replaces previous Mockito stubbings, causing `generateTime()` to
always return the parsed value of the last item in `instantTimes`.
##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java:
##########
@@ -45,7 +47,8 @@ public static JavaUpgradeDowngradeHelper getInstance() {
@Override
public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext
context) {
- return HoodieJavaTable.create(config, context);
+ TransactionManager txnManager = new TransactionManager(config,
HoodieStorageUtils.getStorage(config.getBasePath(), context.getStorageConf()));
Review Comment:
🤖 Instantiating `TransactionManager` here without closing it creates a
resource leak (e.g., Zookeeper connections, lock providers, metrics).
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
protected Option<HoodieInstant> changeActionInstant = Option.empty();
private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, storage), config.isLockRequired());
+ this(config, new LockManager(config, storage));
}
- protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
+ protected TransactionManager(HoodieWriteConfig writeConfig, LockManager
lockManager) {
+ this(lockManager, writeConfig.isLockRequired(),
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+ }
+
+ public TransactionManager(LockManager lockManager, boolean isLockRequired,
TimeGenerator timeGenerator) {
this.lockManager = lockManager;
this.isLockRequired = isLockRequired;
+ this.timeGenerator = timeGenerator;
+ this.lockHolderId = -1;
+ this.permits = 0;
+ }
+
+ /**
+ * Caution: the invoker needs to ensure that API called within a lock
context.
+ */
+ public String generateInstantTime() {
+ if (lockHolderId < 0 && isLockRequired) {
+ throw new HoodieLockException("Cannot create instant without acquiring a
lock first.");
+ }
+ return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+ }
+
+ /**
+ * Generates an instant time and executes an action that requires that
instant time within a lock.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Function<String, T>
instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(Option.empty(), Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(providedInstantTime, Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param lastCompletedActionInstant optional input representing the last
completed instant, used for logging purposes.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant,
Function<String, T> instantTimeConsumingAction) {
+ if (isLockRequired()) {
+ acquireLock();
+ }
+ String requestedInstant = providedInstantTime.orElseGet(() ->
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+ try {
+ if (lastCompletedActionInstant.isEmpty()) {
+ LOG.info("State change starting for {}", changeActionInstant);
+ } else {
+ LOG.info("State change starting for {} with latest completed action
instant {}", changeActionInstant, lastCompletedActionInstant.get());
Review Comment:
🤖 `changeActionInstant` is never updated in this method, so this logs an
empty or stale value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]