This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a663ce3f457 KAFKA-18265: Move acquisition lock classes from share
partition (1/N) (#20227)
a663ce3f457 is described below
commit a663ce3f4572056bb6c89c7640dbf1dd77b65d55
Author: Apoorv Mittal <[email protected]>
AuthorDate: Wed Jul 23 20:21:42 2025 +0100
KAFKA-18265: Move acquisition lock classes from share partition (1/N)
(#20227)
While working on KAFKA-19476, I realized that we need to refactor
SharePartition for read/write lock handling. I have started some work in
the area. For the initial PR, I have moved AcquisitionLockTimeout class
outside of SharePartition.
Reviewers: Andrew Schofield <[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 117 ++++++++-------------
.../kafka/server/share/SharePartitionTest.java | 7 +-
.../share/fetch/AcquisitionLockTimeoutHandler.java | 34 ++++++
.../share/fetch/AcquisitionLockTimerTask.java | 66 ++++++++++++
4 files changed, 150 insertions(+), 74 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 8a8e62b5d8d..353d66e1fd8 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -41,6 +41,8 @@ import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
+import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
@@ -2391,59 +2393,61 @@ public class SharePartition {
long lastOffset,
long delayMs
) {
- return new AcquisitionLockTimerTask(delayMs, memberId, firstOffset,
lastOffset);
+ return new AcquisitionLockTimerTask(time, delayMs, memberId,
firstOffset, lastOffset, releaseAcquisitionLockOnTimeout(),
sharePartitionMetrics);
}
- private void releaseAcquisitionLockOnTimeout(String memberId, long
firstOffset, long lastOffset) {
- List<PersisterStateBatch> stateBatches;
- lock.writeLock().lock();
- try {
- Map.Entry<Long, InFlightBatch> floorOffset =
cachedState.floorEntry(firstOffset);
- if (floorOffset == null) {
- log.error("Base offset {} not found for share partition:
{}-{}", firstOffset, groupId, topicIdPartition);
- return;
- }
- stateBatches = new ArrayList<>();
- NavigableMap<Long, InFlightBatch> subMap =
cachedState.subMap(floorOffset.getKey(), true, lastOffset, true);
- for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
- InFlightBatch inFlightBatch = entry.getValue();
+ private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout() {
+ return (memberId, firstOffset, lastOffset) -> {
+ List<PersisterStateBatch> stateBatches;
+ lock.writeLock().lock();
+ try {
+ Map.Entry<Long, InFlightBatch> floorOffset =
cachedState.floorEntry(firstOffset);
+ if (floorOffset == null) {
+ log.error("Base offset {} not found for share partition:
{}-{}", firstOffset, groupId, topicIdPartition);
+ return;
+ }
+ stateBatches = new ArrayList<>();
+ NavigableMap<Long, InFlightBatch> subMap =
cachedState.subMap(floorOffset.getKey(), true, lastOffset, true);
+ for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet())
{
+ InFlightBatch inFlightBatch = entry.getValue();
- if (inFlightBatch.offsetState() == null
+ if (inFlightBatch.offsetState() == null
&& inFlightBatch.batchState() == RecordState.ACQUIRED
&&
checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(),
inFlightBatch.lastOffset())) {
- // For the case when batch.firstOffset < start offset <=
batch.lastOffset, we will be having some
- // acquired records that need to move to archived state
despite their delivery count.
- inFlightBatch.maybeInitializeOffsetStateUpdate();
- }
+ // For the case when batch.firstOffset < start offset
<= batch.lastOffset, we will be having some
+ // acquired records that need to move to archived
state despite their delivery count.
+ inFlightBatch.maybeInitializeOffsetStateUpdate();
+ }
- // Case when the state of complete batch is valid
- if (inFlightBatch.offsetState() == null) {
-
releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches,
memberId);
- } else { // Case when batch has a valid offset state map.
-
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches,
memberId, firstOffset, lastOffset);
+ // Case when the state of complete batch is valid
+ if (inFlightBatch.offsetState() == null) {
+
releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches,
memberId);
+ } else { // Case when batch has a valid offset state map.
+
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches,
memberId, firstOffset, lastOffset);
+ }
}
- }
- if (!stateBatches.isEmpty()) {
- writeShareGroupState(stateBatches).whenComplete((result,
exception) -> {
- if (exception != null) {
- log.debug("Failed to write the share group state on
acquisition lock timeout for share partition: {}-{} memberId: {}",
- groupId, topicIdPartition, memberId, exception);
- }
- // Even if write share group state RPC call fails, we will
still go ahead with the state transition.
- // Update the cached state and start and end offsets after
releasing the acquisition lock on timeout.
- maybeUpdateCachedStateAndOffsets();
- });
+ if (!stateBatches.isEmpty()) {
+ writeShareGroupState(stateBatches).whenComplete((result,
exception) -> {
+ if (exception != null) {
+ log.debug("Failed to write the share group state
on acquisition lock timeout for share partition: {}-{} memberId: {}",
+ groupId, topicIdPartition, memberId,
exception);
+ }
+ // Even if write share group state RPC call fails, we
will still go ahead with the state transition.
+ // Update the cached state and start and end offsets
after releasing the acquisition lock on timeout.
+ maybeUpdateCachedStateAndOffsets();
+ });
+ }
+ } finally {
+ lock.writeLock().unlock();
}
- } finally {
- lock.writeLock().unlock();
- }
- // If we have an acquisition lock timeout for a share-partition, then
we should check if
- // there is a pending share fetch request for the share-partition and
complete it.
- // Skip null check for stateBatches, it should always be initialized
if reached here.
- maybeCompleteDelayedShareFetchRequest(!stateBatches.isEmpty());
+ // If we have an acquisition lock timeout for a share-partition,
then we should check if
+ // there is a pending share fetch request for the share-partition
and complete it.
+ // Skip null check for stateBatches, it should always be
initialized if reached here.
+ maybeCompleteDelayedShareFetchRequest(!stateBatches.isEmpty());
+ };
}
private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch
inFlightBatch,
@@ -2834,35 +2838,6 @@ public class SharePartition {
}
}
- // Visible for testing
- final class AcquisitionLockTimerTask extends TimerTask {
- private final long expirationMs;
- private final String memberId;
- private final long firstOffset;
- private final long lastOffset;
-
- AcquisitionLockTimerTask(long delayMs, String memberId, long
firstOffset, long lastOffset) {
- super(delayMs);
- this.expirationMs = time.hiResClockMs() + delayMs;
- this.memberId = memberId;
- this.firstOffset = firstOffset;
- this.lastOffset = lastOffset;
- }
-
- long expirationMs() {
- return expirationMs;
- }
-
- /**
- * The task is executed when the acquisition lock timeout is reached.
The task releases the acquired records.
- */
- @Override
- public void run() {
-
sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset -
firstOffset + 1);
- releaseAcquisitionLockOnTimeout(memberId, firstOffset, lastOffset);
- }
- }
-
/**
* The InFlightBatch maintains the in-memory state of the fetched records
i.e. in-flight records.
*/
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index c52b4e257d9..5059b4c892e 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -55,6 +55,7 @@ import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
@@ -5153,7 +5154,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withGroupConfigManager(groupConfigManager).build();
- SharePartition.AcquisitionLockTimerTask timerTask =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
+ AcquisitionLockTimerTask timerTask =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
Mockito.verify(groupConfigManager,
Mockito.times(2)).groupConfig(GROUP_ID);
Mockito.verify(groupConfig).shareRecordLockDurationMs();
@@ -5175,13 +5176,13 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withGroupConfigManager(groupConfigManager).build();
- SharePartition.AcquisitionLockTimerTask timerTask1 =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
+ AcquisitionLockTimerTask timerTask1 =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
Mockito.verify(groupConfigManager,
Mockito.times(2)).groupConfig(GROUP_ID);
Mockito.verify(groupConfig).shareRecordLockDurationMs();
assertEquals(expectedDurationMs1, timerTask1.delayMs);
- SharePartition.AcquisitionLockTimerTask timerTask2 =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
+ AcquisitionLockTimerTask timerTask2 =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
Mockito.verify(groupConfigManager,
Mockito.times(4)).groupConfig(GROUP_ID);
Mockito.verify(groupConfig,
Mockito.times(2)).shareRecordLockDurationMs();
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
new file mode 100644
index 00000000000..c83d7e537da
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch;
+
+/**
+ * AcquisitionLockTimeoutHandler is an interface that defines a handler for
acquisition lock timeouts.
+ * It is used to handle cases where the acquisition lock for a share partition
times out.
+ */
+public interface AcquisitionLockTimeoutHandler {
+
+ /**
+ * Handles the acquisition lock timeout for a share partition.
+ *
+ * @param memberId the id of the member that requested the lock
+ * @param firstOffset the first offset
+ * @param lastOffset the last offset
+ */
+ void handle(String memberId, long firstOffset, long lastOffset);
+
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
new file mode 100644
index 00000000000..6796d24d374
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * AcquisitionLockTimerTask is a timer task that is executed when the
acquisition lock timeout is reached.
+ * It releases the acquired records.
+ */
+public class AcquisitionLockTimerTask extends TimerTask {
+
+ private final long expirationMs;
+ private final String memberId;
+ private final long firstOffset;
+ private final long lastOffset;
+ private final AcquisitionLockTimeoutHandler timeoutHandler;
+ private final SharePartitionMetrics sharePartitionMetrics;
+
+ public AcquisitionLockTimerTask(
+ Time time,
+ long delayMs,
+ String memberId,
+ long firstOffset,
+ long lastOffset,
+ AcquisitionLockTimeoutHandler timeoutHandler,
+ SharePartitionMetrics sharePartitionMetrics
+ ) {
+ super(delayMs);
+ this.expirationMs = time.hiResClockMs() + delayMs;
+ this.memberId = memberId;
+ this.firstOffset = firstOffset;
+ this.lastOffset = lastOffset;
+ this.timeoutHandler = timeoutHandler;
+ this.sharePartitionMetrics = sharePartitionMetrics;
+ }
+
+ public long expirationMs() {
+ return expirationMs;
+ }
+
+ /**
+ * The task is executed when the acquisition lock timeout is reached. The
task releases the acquired records.
+ */
+ @Override
+ public void run() {
+ sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset -
firstOffset + 1);
+ timeoutHandler.handle(memberId, firstOffset, lastOffset);
+ }
+}