roryqi commented on code in PR #9914:
URL: https://github.com/apache/gravitino/pull/9914#discussion_r2964292368
##########
core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java:
##########
@@ -176,6 +192,80 @@ public <E extends Exception> void withLockAndThrow(
}
}
+ /**
+ * Acquires locks for multiple keys in a consistent order (sorted by stripe
index) to avoid
+ * deadlocks, then executes the action.
+ *
+ * @param keys The keys to lock
+ * @param action Action to run
+ * @param <E> Exception type
+ * @throws E Exception from the action
+ */
+ public <E extends Exception> void withMultipleKeyLockAndThrow(
+ List<? extends Object> keys, EntityCache.ThrowingRunnable<E> action)
throws E {
+ waitForGlobalComplete();
+ List<Lock> sortedLocks = getDistinctSortedLocks(keys);
+ acquireAllLocks(sortedLocks);
+ try {
+ action.run();
+ } finally {
+ releaseAllLocks(sortedLocks);
+ }
+ }
+
+ /**
+ * Acquires locks for multiple keys in a consistent order (sorted by stripe
index) to avoid
+ * deadlocks, then executes the action and returns the result.
+ *
+ * @param keys The keys to lock
+ * @param action Action to run
+ * @param <T> Result type
+ * @param <E> Exception type
+ * @return Action result
+ * @throws E Exception from the action
+ */
+ public <T, E extends Exception> T withMultipleKeyLockAndThrow(
+ List<? extends Object> keys, EntityCache.ThrowingSupplier<T, E> action)
throws E {
+ waitForGlobalComplete();
+ List<Lock> sortedLocks = getDistinctSortedLocks(keys);
+ acquireAllLocks(sortedLocks);
+ try {
+ return action.get();
+ } finally {
+ releaseAllLocks(sortedLocks);
+ }
+ }
+
+ private List<Lock> getDistinctSortedLocks(List<? extends Object> keys) {
+ return keys.stream()
+ .map(this::getSegmentLock)
+ .distinct()
+ .sorted(Comparator.comparingInt(lockIndices::get))
+ .collect(Collectors.toList());
+ }
+
+ private void acquireAllLocks(List<Lock> locks) {
+ int lockedCount = 0;
+ try {
+ for (Lock lock : locks) {
+ lock.lockInterruptibly();
+ lockedCount++;
+ }
+ } catch (InterruptedException e) {
+ for (int i = lockedCount - 1; i >= 0; i--) {
+ locks.get(i).unlock();
+ }
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Thread was interrupted while acquiring
batch lock", e);
+ }
+ }
+
+ private void releaseAllLocks(List<Lock> locks) {
Review Comment:
OK.Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]