vinothchandar commented on code in PR #18350:
URL: https://github.com/apache/hudi/pull/18350#discussion_r3041295438
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
protected Option<HoodieInstant> changeActionInstant = Option.empty();
private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, storage), config.isLockRequired());
+ this(config, new LockManager(config, storage));
}
- protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
+ protected TransactionManager(HoodieWriteConfig writeConfig, LockManager
lockManager) {
+ this(lockManager, writeConfig.isLockRequired(),
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+ }
+
+ public TransactionManager(LockManager lockManager, boolean isLockRequired,
TimeGenerator timeGenerator) {
this.lockManager = lockManager;
this.isLockRequired = isLockRequired;
+ this.timeGenerator = timeGenerator;
+ this.lockHolderId = -1;
+ this.permits = 0;
+ }
+
+ /**
+ * Caution: the invoker needs to ensure that API called within a lock
context.
+ */
+ public String generateInstantTime() {
+ if (lockHolderId < 0 && isLockRequired) {
+ throw new HoodieLockException("Cannot create instant without acquiring a
lock first.");
+ }
+ return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+ }
+
+ /**
+ * Generates an instant time and executes an action that requires that
instant time within a lock.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Function<String, T>
instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(Option.empty(), Option.empty(),
instantTimeConsumingAction);
Review Comment:
please write unit tests for this. This is the core new API used by 13+
callers. `TestTransactionManager` has no tests for it.
Suggest adding at minimum:
1. Lock acquired and released around the action
2. Instant time generated when `providedInstantTime` is empty vs used when
present
3. Lock released when the action throws an exception
4. Nested/reentrant `executeStateChangeWithInstant` calls
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
Review Comment:
this should be a boolean? `boolean allowRetrancy` ? Idk wht nested
transaction means in this context. lets name this in standard terms?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
protected Option<HoodieInstant> changeActionInstant = Option.empty();
private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, storage), config.isLockRequired());
+ this(config, new LockManager(config, storage));
}
- protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
+ protected TransactionManager(HoodieWriteConfig writeConfig, LockManager
lockManager) {
+ this(lockManager, writeConfig.isLockRequired(),
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+ }
+
+ public TransactionManager(LockManager lockManager, boolean isLockRequired,
TimeGenerator timeGenerator) {
this.lockManager = lockManager;
this.isLockRequired = isLockRequired;
+ this.timeGenerator = timeGenerator;
+ this.lockHolderId = -1;
+ this.permits = 0;
+ }
+
+ /**
+ * Caution: the invoker needs to ensure that API called within a lock
context.
+ */
+ public String generateInstantTime() {
+ if (lockHolderId < 0 && isLockRequired) {
+ throw new HoodieLockException("Cannot create instant without acquiring a
lock first.");
+ }
+ return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+ }
+
+ /**
+ * Generates an instant time and executes an action that requires that
instant time within a lock.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Function<String, T>
instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(Option.empty(), Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(providedInstantTime, Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param lastCompletedActionInstant optional input representing the last
completed instant, used for logging purposes.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant,
Function<String, T> instantTimeConsumingAction) {
+ if (isLockRequired()) {
+ acquireLock();
+ }
+ String requestedInstant = providedInstantTime.orElseGet(() ->
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+ try {
+ if (lastCompletedActionInstant.isEmpty()) {
+ LOG.info("State change starting for {}", changeActionInstant);
+ } else {
+ LOG.info("State change starting for {} with latest completed action
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+ }
+ return instantTimeConsumingAction.apply(requestedInstant);
+ } finally {
+ if (isLockRequired()) {
+ releaseLock();
+ LOG.info("State change ended for {}", requestedInstant);
+ }
+ }
+ }
+
+ public void beginStateChange() {
+ beginStateChange(Option.empty(), Option.empty());
}
public void beginStateChange(Option<HoodieInstant> changeActionInstant,
Option<HoodieInstant>
lastCompletedActionInstant) {
if (isLockRequired) {
LOG.info("State change starting for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
- lockManager.lock();
+ acquireLock();
reset(this.changeActionInstant, changeActionInstant,
lastCompletedActionInstant);
LOG.info("State change started for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
}
}
+ public void endStateChange() {
+ endStateChange(Option.empty());
+ }
+
public void endStateChange(Option<HoodieInstant> changeActionInstant) {
if (isLockRequired) {
LOG.info("State change ending for action instant {}",
changeActionInstant);
if (reset(changeActionInstant, Option.empty(), Option.empty())) {
- lockManager.unlock();
+ releaseLock();
LOG.info("State change ended for action instant {}",
changeActionInstant);
}
}
}
+ /**
+ * Caution: the {@code hasLock} flag can not be used to skip the `#lock`
eagerly if the thread switches,
+ * the corner case below can cause deadlock:
+ *
+ * <pre>
+ * threadA => acquireLock(), {@code hasLock} setup as true and got the
lock acquired;
+ * threadB => acquireLock(), check the {@code hasLock} as true, returns
early;
+ * threadB => releaseLock(), set up the {@code hasLock} as false;
+ * threadA => releaseLock(), detect the {@code hasLock} as false and
returns early.
+ *
+ * The lock held by threadA will never be released.
+ * </pre>
+ */
+ private void acquireLock() {
+ if (lockHolderId > 0 && isLockHeldByCurrentThread()) {
+ LOG.info("{}: Lock already acquired, skipping lock acquisition.", this);
+ permits++;
+ return;
+ }
+ lockManager.lock();
+ permits++;
+ this.lockHolderId = Thread.currentThread().getId();
Review Comment:
do we know the ids won't be reused?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java:
##########
@@ -78,6 +82,11 @@ public static HoodieFlinkTable<?> createTable(
*/
public static HoodieFlinkTable<?> createTable(Configuration conf) {
HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, true, false);
- return createTableInternal(writeConfig, HoodieFlinkEngineContext.DEFAULT);
+ return createTableInternal(writeConfig, HoodieFlinkEngineContext.DEFAULT,
Option.empty());
+ }
+
+ public static HoodieFlinkTable<?> createTable(Configuration conf,
TransactionManager txnManager) {
+ HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, true, false);
Review Comment:
💬 **SUGGESTION: Naming inconsistency across engines**
Spark has `HoodieSparkTable.createForReads()` for the read-only path, while
Flink/Java keep `create()` passing `Option.empty()`. Consider aligning so the
intent is clear from the method name.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java:
##########
@@ -104,7 +104,7 @@ public TimelineArchiverV1(HoodieWriteConfig config,
HoodieTable<T, I, K, O> tabl
this.table = table;
this.metaClient = table.getMetaClient();
this.archiveFilePath =
ArchivedTimelineV1.getArchiveLogPath(metaClient.getArchivePath());
- this.txnManager = new TransactionManager(config,
table.getMetaClient().getStorage());
+ this.txnManager = table.getTxnManager().get();
Review Comment:
do we expect this to be absent? My assumption is - this should always be
set?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java:
##########
@@ -53,7 +56,8 @@ public static HoodieFlinkTable<?> createTable(Configuration
conf, RuntimeContext
HadoopFSUtils.getStorageConf(getHadoopConf(conf)),
new FlinkTaskContextSupplier(runtimeContext));
HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, true);
Review Comment:
💬 **SUGGESTION: Track FIXME as follow-up**
`FIXME-vc: we need a txn manager here. for flink clustering/compaction.` —
Currently safe because callers that need a `TransactionManager` get it from the
`writeClient`, not the table. But any future code calling
`table.getTxnManager().get()` on a table created through this path will crash.
Worth tracking.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
protected Option<HoodieInstant> changeActionInstant = Option.empty();
private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, storage), config.isLockRequired());
+ this(config, new LockManager(config, storage));
}
- protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
+ protected TransactionManager(HoodieWriteConfig writeConfig, LockManager
lockManager) {
+ this(lockManager, writeConfig.isLockRequired(),
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+ }
+
+ public TransactionManager(LockManager lockManager, boolean isLockRequired,
TimeGenerator timeGenerator) {
this.lockManager = lockManager;
this.isLockRequired = isLockRequired;
+ this.timeGenerator = timeGenerator;
+ this.lockHolderId = -1;
+ this.permits = 0;
+ }
+
+ /**
+ * Caution: the invoker needs to ensure that API called within a lock
context.
+ */
+ public String generateInstantTime() {
+ if (lockHolderId < 0 && isLockRequired) {
+ throw new HoodieLockException("Cannot create instant without acquiring a
lock first.");
+ }
+ return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+ }
+
+ /**
+ * Generates an instant time and executes an action that requires that
instant time within a lock.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Function<String, T>
instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(Option.empty(), Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(providedInstantTime, Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param lastCompletedActionInstant optional input representing the last
completed instant, used for logging purposes.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant,
Function<String, T> instantTimeConsumingAction) {
+ if (isLockRequired()) {
+ acquireLock();
+ }
+ String requestedInstant = providedInstantTime.orElseGet(() ->
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+ try {
+ if (lastCompletedActionInstant.isEmpty()) {
+ LOG.info("State change starting for {}", changeActionInstant);
+ } else {
+ LOG.info("State change starting for {} with latest completed action
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+ }
+ return instantTimeConsumingAction.apply(requestedInstant);
+ } finally {
+ if (isLockRequired()) {
+ releaseLock();
+ LOG.info("State change ended for {}", requestedInstant);
+ }
+ }
+ }
+
+ public void beginStateChange() {
+ beginStateChange(Option.empty(), Option.empty());
}
public void beginStateChange(Option<HoodieInstant> changeActionInstant,
Option<HoodieInstant>
lastCompletedActionInstant) {
if (isLockRequired) {
LOG.info("State change starting for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
- lockManager.lock();
+ acquireLock();
reset(this.changeActionInstant, changeActionInstant,
lastCompletedActionInstant);
LOG.info("State change started for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
}
}
+ public void endStateChange() {
+ endStateChange(Option.empty());
+ }
+
public void endStateChange(Option<HoodieInstant> changeActionInstant) {
if (isLockRequired) {
LOG.info("State change ending for action instant {}",
changeActionInstant);
if (reset(changeActionInstant, Option.empty(), Option.empty())) {
- lockManager.unlock();
+ releaseLock();
LOG.info("State change ended for action instant {}",
changeActionInstant);
}
}
}
+ /**
+ * Caution: the {@code hasLock} flag can not be used to skip the `#lock`
eagerly if the thread switches,
+ * the corner case below can cause deadlock:
+ *
+ * <pre>
+ * threadA => acquireLock(), {@code hasLock} setup as true and got the
lock acquired;
+ * threadB => acquireLock(), check the {@code hasLock} as true, returns
early;
+ * threadB => releaseLock(), set up the {@code hasLock} as false;
+ * threadA => releaseLock(), detect the {@code hasLock} as false and
returns early.
+ *
+ * The lock held by threadA will never be released.
+ * </pre>
+ */
+ private void acquireLock() {
+ if (lockHolderId > 0 && isLockHeldByCurrentThread()) {
+ LOG.info("{}: Lock already acquired, skipping lock acquisition.", this);
+ permits++;
+ return;
+ }
+ lockManager.lock();
+ permits++;
+ this.lockHolderId = Thread.currentThread().getId();
+ LOG.info("{}: Lock acquired for action instant {}", this,
changeActionInstant);
Review Comment:
@yihua these seem very raw AI generated? lets please contextualize these
for the PR itself.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
protected Option<HoodieInstant> changeActionInstant = Option.empty();
private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, storage), config.isLockRequired());
+ this(config, new LockManager(config, storage));
}
- protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
+ protected TransactionManager(HoodieWriteConfig writeConfig, LockManager
lockManager) {
+ this(lockManager, writeConfig.isLockRequired(),
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+ }
+
+ public TransactionManager(LockManager lockManager, boolean isLockRequired,
TimeGenerator timeGenerator) {
this.lockManager = lockManager;
this.isLockRequired = isLockRequired;
+ this.timeGenerator = timeGenerator;
+ this.lockHolderId = -1;
Review Comment:
pull -1 into constant
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java:
##########
@@ -374,7 +364,8 @@ public HoodieInstant
transitionLogCompactionRequestedToInflight(HoodieInstant re
}
Review Comment:
💬 **SUGGESTION: Remove FIXME comment before merge**
`// FIXME-vc: check all these API changes..` — should be resolved or removed
before merging.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -62,7 +63,7 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
Review Comment:
⚠️ **IMPORTANT: Unchecked `.get()` on `Option<TransactionManager>`**
`table.getTxnManager().get()` will throw `NoSuchElementException` if a table
was created via the read path (`Option.empty()` for txnManager). Same pattern
in `RunIndexActionExecutor`, `BaseRollbackActionExecutor`,
`BaseRestoreActionExecutor`, `CopyOnWriteRestoreActionExecutor`,
`MergeOnReadRestoreActionExecutor`, and
`CompactionUtil.rollbackEarliestCompaction`.
The old code was self-contained (`new TransactionManager(config,
table.getStorage())`) and always safe.
This pattern is diff places. Consider a `getOrThrow()` with a clear error
message, or validation at table creation time. We need to ensure consistent
behavior -- either we need assume it ll be set at table creation time and throw
error there or any get() needs to explicitly throw using `getOrThrow()` instead
of an opaque error
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java:
##########
@@ -42,7 +42,7 @@ public class DirectMarkerTransactionManager extends
TransactionManager {
private final String filePath;
public DirectMarkerTransactionManager(HoodieWriteConfig config,
HoodieStorage storage, String partitionPath, String fileId) {
- super(new LockManager(config, storage, createUpdatedLockProps(config,
partitionPath, fileId)), config.isLockRequired());
Review Comment:
lock is always required now, right? its either in-process or external
distributed lock
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java:
##########
@@ -104,7 +104,7 @@ public TimelineArchiverV1(HoodieWriteConfig config,
HoodieTable<T, I, K, O> tabl
this.table = table;
this.metaClient = table.getMetaClient();
this.archiveFilePath =
ArchivedTimelineV1.getArchiveLogPath(metaClient.getArchivePath());
- this.txnManager = new TransactionManager(config,
table.getMetaClient().getStorage());
+ this.txnManager = table.getTxnManager().get();
Review Comment:
if so, those are bugs, and should fail and not be masked? @danny0405 please
comment
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
protected Option<HoodieInstant> changeActionInstant = Option.empty();
private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, storage), config.isLockRequired());
+ this(config, new LockManager(config, storage));
}
- protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
+ protected TransactionManager(HoodieWriteConfig writeConfig, LockManager
lockManager) {
+ this(lockManager, writeConfig.isLockRequired(),
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+ }
+
+ public TransactionManager(LockManager lockManager, boolean isLockRequired,
TimeGenerator timeGenerator) {
this.lockManager = lockManager;
this.isLockRequired = isLockRequired;
+ this.timeGenerator = timeGenerator;
+ this.lockHolderId = -1;
+ this.permits = 0;
+ }
+
+ /**
+ * Caution: the invoker needs to ensure that API called within a lock
context.
+ */
+ public String generateInstantTime() {
+ if (lockHolderId < 0 && isLockRequired) {
+ throw new HoodieLockException("Cannot create instant without acquiring a
lock first.");
+ }
+ return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+ }
+
+ /**
+ * Generates an instant time and executes an action that requires that
instant time within a lock.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Function<String, T>
instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(Option.empty(), Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(providedInstantTime, Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param lastCompletedActionInstant optional input representing the last
completed instant, used for logging purposes.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant,
Function<String, T> instantTimeConsumingAction) {
+ if (isLockRequired()) {
+ acquireLock();
+ }
+ String requestedInstant = providedInstantTime.orElseGet(() ->
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+ try {
+ if (lastCompletedActionInstant.isEmpty()) {
+ LOG.info("State change starting for {}", changeActionInstant);
+ } else {
+ LOG.info("State change starting for {} with latest completed action
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+ }
+ return instantTimeConsumingAction.apply(requestedInstant);
+ } finally {
+ if (isLockRequired()) {
+ releaseLock();
+ LOG.info("State change ended for {}", requestedInstant);
+ }
+ }
+ }
+
+ public void beginStateChange() {
+ beginStateChange(Option.empty(), Option.empty());
}
public void beginStateChange(Option<HoodieInstant> changeActionInstant,
Option<HoodieInstant>
lastCompletedActionInstant) {
if (isLockRequired) {
LOG.info("State change starting for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
- lockManager.lock();
+ acquireLock();
reset(this.changeActionInstant, changeActionInstant,
lastCompletedActionInstant);
LOG.info("State change started for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
}
}
+ public void endStateChange() {
+ endStateChange(Option.empty());
+ }
+
public void endStateChange(Option<HoodieInstant> changeActionInstant) {
if (isLockRequired) {
LOG.info("State change ending for action instant {}",
changeActionInstant);
if (reset(changeActionInstant, Option.empty(), Option.empty())) {
- lockManager.unlock();
+ releaseLock();
LOG.info("State change ended for action instant {}",
changeActionInstant);
}
}
}
+ /**
+ * Caution: the {@code hasLock} flag can not be used to skip the `#lock`
eagerly if the thread switches,
+ * the corner case below can cause deadlock:
+ *
+ * <pre>
+ * threadA => acquireLock(), {@code hasLock} setup as true and got the
lock acquired;
+ * threadB => acquireLock(), check the {@code hasLock} as true, returns
early;
+ * threadB => releaseLock(), set up the {@code hasLock} as false;
+ * threadA => releaseLock(), detect the {@code hasLock} as false and
returns early.
+ *
+ * The lock held by threadA will never be released.
+ * </pre>
+ */
+ private void acquireLock() {
+ if (lockHolderId > 0 && isLockHeldByCurrentThread()) {
+ LOG.info("{}: Lock already acquired, skipping lock acquisition.", this);
+ permits++;
+ return;
+ }
+ lockManager.lock();
+ permits++;
Review Comment:
What makes the `permits++` immune to races? - `beginStateChange()` is not
synchronized
are you trying to hand implement sth like a semaphore? This reentrancy part
is where I got blocked last time as well. You are attaching a lock
Can we think through this from both angles..
1) There could be table services running in same jvm
2) there could be writers in another jvm or process.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToFiveUpgradeHandler.java:
##########
@@ -65,6 +66,7 @@ public UpgradeDowngrade.TableConfigChangeSet
upgrade(HoodieWriteConfig config,
throw new HoodieException(String.format("Old deprecated \"%s\"
partition found in hudi table. This needs a migration step before we can
upgrade ",
DEPRECATED_DEFAULT_PARTITION_PATH));
}
+ table.getTxnManager().ifPresent(obj -> ((TransactionManager)
obj).close());
Review Comment:
Close the txnManager in a finally block?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
protected Option<HoodieInstant> changeActionInstant = Option.empty();
private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, storage), config.isLockRequired());
+ this(config, new LockManager(config, storage));
}
- protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
+ protected TransactionManager(HoodieWriteConfig writeConfig, LockManager
lockManager) {
+ this(lockManager, writeConfig.isLockRequired(),
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+ }
+
+ public TransactionManager(LockManager lockManager, boolean isLockRequired,
TimeGenerator timeGenerator) {
this.lockManager = lockManager;
this.isLockRequired = isLockRequired;
+ this.timeGenerator = timeGenerator;
+ this.lockHolderId = -1;
+ this.permits = 0;
+ }
+
+ /**
+ * Caution: the invoker needs to ensure that API called within a lock
context.
+ */
+ public String generateInstantTime() {
+ if (lockHolderId < 0 && isLockRequired) {
+ throw new HoodieLockException("Cannot create instant without acquiring a
lock first.");
+ }
+ return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+ }
+
+ /**
+ * Generates an instant time and executes an action that requires that
instant time within a lock.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Function<String, T>
instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(Option.empty(), Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(providedInstantTime, Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param lastCompletedActionInstant optional input representing the last
completed instant, used for logging purposes.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant,
Function<String, T> instantTimeConsumingAction) {
+ if (isLockRequired()) {
+ acquireLock();
+ }
+ String requestedInstant = providedInstantTime.orElseGet(() ->
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
Review Comment:
🚨 **BLOCKING: Lock leak if instant time generation throws**
What happens If `acquireLock()` succeeds at L112 but the `orElseGet` at L114
throws (e.g., `SkewAdjustingTimeGenerator.generateTime()` wraps an
`InterruptedException` as `HoodieException` during `Thread.sleep`), the lock is
never released?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
protected Option<HoodieInstant> changeActionInstant = Option.empty();
private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, storage), config.isLockRequired());
+ this(config, new LockManager(config, storage));
}
- protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
+ protected TransactionManager(HoodieWriteConfig writeConfig, LockManager
lockManager) {
+ this(lockManager, writeConfig.isLockRequired(),
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+ }
+
+ public TransactionManager(LockManager lockManager, boolean isLockRequired,
TimeGenerator timeGenerator) {
this.lockManager = lockManager;
this.isLockRequired = isLockRequired;
+ this.timeGenerator = timeGenerator;
+ this.lockHolderId = -1;
+ this.permits = 0;
+ }
+
+ /**
+ * Caution: the invoker needs to ensure that API called within a lock
context.
+ */
+ public String generateInstantTime() {
+ if (lockHolderId < 0 && isLockRequired) {
+ throw new HoodieLockException("Cannot create instant without acquiring a
lock first.");
+ }
+ return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+ }
+
+ /**
+ * Generates an instant time and executes an action that requires that
instant time within a lock.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Function<String, T>
instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(Option.empty(), Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(providedInstantTime, Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param lastCompletedActionInstant optional input representing the last
completed instant, used for logging purposes.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant,
Function<String, T> instantTimeConsumingAction) {
+ if (isLockRequired()) {
+ acquireLock();
+ }
+ String requestedInstant = providedInstantTime.orElseGet(() ->
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+ try {
+ if (lastCompletedActionInstant.isEmpty()) {
+ LOG.info("State change starting for {}", changeActionInstant);
+ } else {
+ LOG.info("State change starting for {} with latest completed action
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+ }
+ return instantTimeConsumingAction.apply(requestedInstant);
+ } finally {
+ if (isLockRequired()) {
+ releaseLock();
+ LOG.info("State change ended for {}", requestedInstant);
+ }
+ }
+ }
+
+ public void beginStateChange() {
+ beginStateChange(Option.empty(), Option.empty());
}
public void beginStateChange(Option<HoodieInstant> changeActionInstant,
Option<HoodieInstant>
lastCompletedActionInstant) {
if (isLockRequired) {
LOG.info("State change starting for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
- lockManager.lock();
+ acquireLock();
reset(this.changeActionInstant, changeActionInstant,
lastCompletedActionInstant);
LOG.info("State change started for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
}
}
+ public void endStateChange() {
+ endStateChange(Option.empty());
+ }
+
public void endStateChange(Option<HoodieInstant> changeActionInstant) {
if (isLockRequired) {
LOG.info("State change ending for action instant {}",
changeActionInstant);
if (reset(changeActionInstant, Option.empty(), Option.empty())) {
- lockManager.unlock();
+ releaseLock();
LOG.info("State change ended for action instant {}",
changeActionInstant);
}
}
}
+ /**
+ * Caution: the {@code hasLock} flag can not be used to skip the `#lock`
eagerly if the thread switches,
+ * the corner case below can cause deadlock:
+ *
+ * <pre>
+ * threadA => acquireLock(), {@code hasLock} setup as true and got the
lock acquired;
+ * threadB => acquireLock(), check the {@code hasLock} as true, returns
early;
+ * threadB => releaseLock(), set up the {@code hasLock} as false;
+ * threadA => releaseLock(), detect the {@code hasLock} as false and
returns early.
+ *
+ * The lock held by threadA will never be released.
+ * </pre>
+ */
+ private void acquireLock() {
+ if (lockHolderId > 0 && isLockHeldByCurrentThread()) {
+ LOG.info("{}: Lock already acquired, skipping lock acquisition.", this);
+ permits++;
+ return;
+ }
+ lockManager.lock();
+ permits++;
+ this.lockHolderId = Thread.currentThread().getId();
+ LOG.info("{}: Lock acquired for action instant {}", this,
changeActionInstant);
+ }
+
+ /**
+ * Caution: the `hasLock` flag can not be used to skip the `#unlock` eagerly
if the thread switches.
+ * see the corner case of {@link #acquireLock()}.
+ */
+ private void releaseLock() {
+ if (isLockHeldByCurrentThread()) {
+ --permits;
+ if (permits == 0) {
+ lockManager.unlock();
+ this.lockHolderId = -1;
+ LOG.info("{}: Lock released for action instant {}", this,
changeActionInstant);
+ }
Review Comment:
+1
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
protected Option<HoodieInstant> changeActionInstant = Option.empty();
private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, storage), config.isLockRequired());
+ this(config, new LockManager(config, storage));
}
- protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
+ protected TransactionManager(HoodieWriteConfig writeConfig, LockManager
lockManager) {
+ this(lockManager, writeConfig.isLockRequired(),
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+ }
+
+ public TransactionManager(LockManager lockManager, boolean isLockRequired,
TimeGenerator timeGenerator) {
this.lockManager = lockManager;
this.isLockRequired = isLockRequired;
+ this.timeGenerator = timeGenerator;
+ this.lockHolderId = -1;
+ this.permits = 0;
+ }
+
+ /**
+ * Caution: the invoker needs to ensure that API called within a lock
context.
+ */
+ public String generateInstantTime() {
+ if (lockHolderId < 0 && isLockRequired) {
+ throw new HoodieLockException("Cannot create instant without acquiring a
lock first.");
+ }
Review Comment:
🚨 **BLOCKING: Is this check right?
`lockHolderId < 0` checks whether *any* thread holds the lock, not whether
the *calling* thread holds it. If thread A holds the lock (`lockHolderId = A's
id > 0`) and thread B calls `generateInstantTime()`, the check `lockHolderId <
0` is false, so B is allowed to generate an instant time despite not holding
the lock.
**Suggestion:**
```java
if (isLockRequired() && !isLockHeldByCurrentThread()) {
throw new HoodieLockException("Cannot create instant without acquiring a
lock first.");
}
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java:
##########
@@ -266,7 +266,7 @@ protected List<HoodieRollbackStat>
executeRollback(HoodieInstant instantToRollba
}
protected void finishRollback(HoodieInstant inflightInstant,
HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException {
- boolean enableLocking = (!skipLocking && !skipTimelinePublish);
+ boolean enableLocking = (!skipLocking || !skipTimelinePublish);
Review Comment:
⚠️ **IMPORTANT: What's the behavior change due to this? **
Changed from `(!skipLocking && !skipTimelinePublish)` to `(!skipLocking ||
!skipTimelinePublish)`. This means locking now happens when *either* flag is
false, not only when *both* are false. This is likely a correctness fix (needed
because `generateInstantTime()` at L290 requires the lock)
Please clarify how its still safe
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java:
##########
@@ -53,7 +56,8 @@ public static HoodieFlinkTable<?> createTable(Configuration
conf, RuntimeContext
HadoopFSUtils.getStorageConf(getHadoopConf(conf)),
new FlinkTaskContextSupplier(runtimeContext));
HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, true);
Review Comment:
Coment on this?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java:
##########
@@ -374,7 +364,8 @@ public HoodieInstant
transitionLogCompactionRequestedToInflight(HoodieInstant re
}
Review Comment:
anything I marked `FIXME-vc` were my notes to myself to resolve before
landing
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -226,11 +227,11 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K,
O> table, HoodieInstan
);
this.txnManager.beginStateChange(Option.of(inflightInstant),
Option.empty());
writeTableMetadata(metadata, inflightInstant.requestedTime());
- table.getActiveTimeline().transitionCleanInflightToComplete(
- false,
- inflightInstant,
- Option.of(metadata),
- completedInstant ->
table.getMetaClient().getTableFormat().clean(metadata, completedInstant,
table.getContext(), table.getMetaClient(), table.getViewManager()));
+ TableFormatCompletionAction formatCompletionAction = completedInstant ->
table.getMetaClient().getTableFormat()
+ .clean(metadata, completedInstant, table.getContext(),
table.getMetaClient(), table.getViewManager());
+ HoodieInstant completedInstant =
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
Option.of(metadata), txnManager.generateInstantTime());
+ // FIXME-vc: this is an one off..
+ formatCompletionAction.execute(completedInstant);
log.info("Marked clean started on {} as complete",
inflightInstant.requestedTime());
Review Comment:
lets think through this more? and find a better solution. This was supposed
to be fixed in the same PR, once I got tests to pass mostly
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1747,22 +1745,25 @@ public void update(HoodieRestoreMetadata
restoreMetadata, String instantTime) {
// We cannot create a deltaCommit at instantTime now because a future
(rollback) block has already been written to the logFiles.
// We need to choose a timestamp which would be a validInstantTime for
MDT. This is either a commit timestamp completed on the dataset
// or a new timestamp which we use for MDT clean, compaction etc.
- String syncCommitTime = createRestoreInstantTime();
- processAndCommit(syncCommitTime, () -> {
- // For Files partition.
- Map<String, HoodieData<HoodieRecord>> partitionRecords = new
HashMap<>();
-
partitionRecords.putAll(HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
- partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete,
syncCommitTime));
- // For ColumnStats partition if enabled.
- if
(dataMetaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()))
{
- partitionRecords.putAll(convertToColumnStatsRecord(
- partitionFilesToAdd, partitionFilesToDelete, engineContext,
dataMetaClient,
- dataWriteConfig.getMetadataConfig(),
Option.of(dataWriteConfig.getRecordMerger().getRecordType()),
-
dataWriteConfig.getMetadataConfig().getColumnStatsIndexParallelism()));
- }
- return partitionRecords;
+
writeClient.getTransactionManager().executeStateChangeWithInstant(metadataTableCommit
-> {
+ String syncCommitTime = createRestoreTimestamp(metadataTableCommit);
+ processAndCommit(syncCommitTime, () -> {
+ // For Files partition.
+ Map<String, HoodieData<HoodieRecord>> partitionRecords = new
HashMap<>();
+
partitionRecords.putAll(HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
+ partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete,
syncCommitTime));
+ // For ColumnStats partition if enabled.
+ if
(dataMetaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()))
{
+ partitionRecords.putAll(convertToColumnStatsRecord(
+ partitionFilesToAdd, partitionFilesToDelete, engineContext,
dataMetaClient,
+ dataWriteConfig.getMetadataConfig(),
Option.of(dataWriteConfig.getRecordMerger().getRecordType()),
+
dataWriteConfig.getMetadataConfig().getColumnStatsIndexParallelism()));
+ }
+ return partitionRecords;
+ });
+ // empty result
+ return Option.empty();
});
- closeInternal();
} catch (IOException e) {
throw new HoodieMetadataException("IOException during MDT restore sync",
e);
}
Review Comment:
Does not calling `closeInternal()` not leak anything? Please clarify
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java:
##########
@@ -64,6 +66,7 @@ public UpgradeDowngrade.TableConfigChangeSet upgrade(
// for every pending commit, delete old markers and re-create markers in
new format
recreateMarkers(commit, table, context,
config.getMarkersDeleteParallelism());
}
+ table.getTxnManager().ifPresent(obj -> ((TransactionManager) obj).close());
Review Comment:
💅 **NIT:** `((TransactionManager) obj).close()` — the cast is redundant
since `Option<TransactionManager>` is already typed. Could be
`TransactionManager::close`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]