junrao commented on code in PR #16969:
URL: https://github.com/apache/kafka/pull/16969#discussion_r1752319003
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -995,6 +1004,9 @@ boolean canAcquireRecords() {
* @return A boolean which indicates whether the fetch lock is acquired.
*/
boolean maybeAcquireFetchLock() {
+ if (partitionState() != SharePartitionState.ACTIVE) {
Review Comment:
We update partitionState without the lock. So, there is no guarantee we will
read the latest partitionState from a different thread.
##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.server.ReplicaManager;
+import kafka.server.ReplicaQuota;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ShareFetchResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.storage.internals.log.FetchIsolation;
+import org.apache.kafka.storage.internals.log.FetchParams;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
+
+
+public class DelayedShareFetchTest {
+ private static final int MAX_WAIT_MS = 5000;
+
+ @Test
+ public void testDelayedShareFetchTryCompleteReturnsFalse() {
+ String groupId = "grp";
+ Uuid topicId = Uuid.randomUuid();
+ TopicIdPartition tp0 = new TopicIdPartition(topicId, new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(topicId, new
TopicPartition("foo", 1));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ SharePartition sp1 = mock(SharePartition.class);
+
+ when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+
+ Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
+ partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+
+ SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+ 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
+ new CompletableFuture<>(), partitionMaxBytes);
+
+ when(sp0.canAcquireRecords()).thenReturn(false);
+ when(sp1.canAcquireRecords()).thenReturn(false);
+ DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
+ .withShareFetchPartitionData(shareFetchPartitionData)
+ .withPartitionCacheMap(partitionCacheMap)
+ .build();
+
+ // Since there is no partition that can be acquired, tryComplete
should return false.
+ assertFalse(delayedShareFetch.tryComplete());
+ assertFalse(delayedShareFetch.isCompleted());
+ }
+
+ @Test
+ public void testDelayedShareFetchTryCompleteReturnsTrue() {
+ String groupId = "grp";
+ Uuid topicId = Uuid.randomUuid();
+ TopicIdPartition tp0 = new TopicIdPartition(topicId, new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(topicId, new
TopicPartition("foo", 1));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ SharePartition sp1 = mock(SharePartition.class);
+
+ when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+
+ Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
+ partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+
+ SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+ 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
+ new CompletableFuture<>(), partitionMaxBytes);
+
+ when(sp0.canAcquireRecords()).thenReturn(true);
+ when(sp1.canAcquireRecords()).thenReturn(false);
+ DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
+ .withShareFetchPartitionData(shareFetchPartitionData)
+ .withPartitionCacheMap(partitionCacheMap)
+ .build();
+ assertFalse(delayedShareFetch.isCompleted());
+
+ // Since sp1 can be acquired, tryComplete should return true.
+ assertTrue(delayedShareFetch.tryComplete());
+ assertTrue(delayedShareFetch.isCompleted());
+ }
+
+ @Test
+ public void testEmptyFutureReturnedByDelayedShareFetchOnComplete() {
+ String groupId = "grp";
+ Uuid topicId = Uuid.randomUuid();
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ TopicIdPartition tp0 = new TopicIdPartition(topicId, new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(topicId, new
TopicPartition("foo", 1));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ SharePartition sp1 = mock(SharePartition.class);
+
+ when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+
+ Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
+ partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+
+ SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+ 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
+ new CompletableFuture<>(), partitionMaxBytes);
+
+ when(sp0.canAcquireRecords()).thenReturn(false);
+ when(sp1.canAcquireRecords()).thenReturn(false);
+ DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
+ .withShareFetchPartitionData(shareFetchPartitionData)
+ .withReplicaManager(replicaManager)
+ .withPartitionCacheMap(partitionCacheMap)
+ .build();
+ assertFalse(delayedShareFetch.isCompleted());
+ delayedShareFetch.forceComplete();
+
+ // Since no partition could be acquired, the future should be empty
and replicaManager.readFromLog should not be called.
+ assertEquals(0, shareFetchPartitionData.future().join().size());
+ Mockito.verify(replicaManager, times(0)).readFromLog(
+ any(), any(), any(ReplicaQuota.class), anyBoolean());
+ assertTrue(delayedShareFetch.isCompleted());
+ }
+
+ @Test
+ public void testReplicaManagerFetchShouldHappenOnComplete() {
+ String groupId = "grp";
+ Uuid topicId = Uuid.randomUuid();
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ TopicIdPartition tp0 = new TopicIdPartition(topicId, new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(topicId, new
TopicPartition("foo", 1));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ SharePartition sp1 = mock(SharePartition.class);
+
+ when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+
+ Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
+ partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+
+ SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+ 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
+ new CompletableFuture<>(), partitionMaxBytes);
+
+ when(sp0.canAcquireRecords()).thenReturn(true);
+ when(sp1.canAcquireRecords()).thenReturn(false);
+ DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
+ .withShareFetchPartitionData(shareFetchPartitionData)
+ .withReplicaManager(replicaManager)
+ .withPartitionCacheMap(partitionCacheMap)
+ .build();
+ assertFalse(delayedShareFetch.isCompleted());
+ delayedShareFetch.forceComplete();
+
+ // Since we can acquire records from sp0, replicaManager.readFromLog
should be called once and only for sp0.
+ Mockito.verify(replicaManager, times(1)).readFromLog(
+ any(), any(), any(ReplicaQuota.class), anyBoolean());
+ Mockito.verify(sp0, times(1)).nextFetchOffset();
+ Mockito.verify(sp1, times(0)).nextFetchOffset();
+ assertTrue(delayedShareFetch.isCompleted());
+ }
+
+ @Test
+ public void testToCompleteAnAlreadyCompletedFuture() {
+ String groupId = "grp";
+ Uuid topicId = Uuid.randomUuid();
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ TopicIdPartition tp0 = new TopicIdPartition(topicId, new
TopicPartition("foo", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+
+ Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
+ partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+ SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+ 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
+ future, partitionMaxBytes);
+
+ DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
+ .withShareFetchPartitionData(shareFetchPartitionData)
+ .withReplicaManager(replicaManager)
+ .withPartitionCacheMap(partitionCacheMap)
+ .build());
+ assertFalse(delayedShareFetch.isCompleted());
+
+ // Completing the future before calling forceComplete which can happen
in a real world scenario where the future
+ // might be completed by another thread which has the same share fetch
request entry.
+ future.complete(Collections.emptyMap());
Review Comment:
In a real world scenario, the future can only be completed through
delayedShareFetch.forceComplete(), not directly. So perhaps we could just call
delayedShareFetch.forceComplete() twice?
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.server.ReplicaManager;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.message.ShareFetchResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.storage.internals.log.FetchPartitionData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import scala.Option;
+
+/**
+ * Utility class for post-processing of share fetch operations.
+ */
+public class ShareFetchUtils {
+ private static final Logger log =
LoggerFactory.getLogger(ShareFetchUtils.class);
+
+ // Process the replica manager fetch response to update share partitions
and futures.
+ static CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> processFetchResponse(
+ SharePartitionManager.ShareFetchPartitionData
shareFetchPartitionData,
+ Map<TopicIdPartition, FetchPartitionData> responseData,
+ Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap,
+ ReplicaManager replicaManager
+ ) {
+ Map<TopicIdPartition,
CompletableFuture<ShareFetchResponseData.PartitionData>> futures = new
HashMap<>();
+ responseData.forEach((topicIdPartition, fetchPartitionData) -> {
+
+ SharePartition sharePartition = partitionCacheMap.get(new
SharePartitionManager.SharePartitionKey(
+ shareFetchPartitionData.groupId(), topicIdPartition));
+ futures.put(topicIdPartition,
sharePartition.acquire(shareFetchPartitionData.memberId(), fetchPartitionData)
Review Comment:
I had an earlier comment
https://github.com/apache/kafka/pull/16274#discussion_r1700968453 on why
sharePartition.acquire() needs to return a future. This adds a bit more
complexity to the code and it doesn't seem needed. @AndrewJSchofield : Does
sharePartition.acquire ever need to return a future given that we don't persist
acquisition in the design?
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.server.ReplicaManager;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.message.ShareFetchResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.storage.internals.log.FetchPartitionData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import scala.Option;
+
+/**
+ * Utility class for post-processing of share fetch operations.
+ */
+public class ShareFetchUtils {
+ private static final Logger log =
LoggerFactory.getLogger(ShareFetchUtils.class);
+
+ // Process the replica manager fetch response to update share partitions
and futures.
Review Comment:
Could we add that this acquires the fetched data from share partitions?
##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -1750,159 +1647,439 @@ public void testAcknowledgeEmptyPartitionCacheMap() {
}
@Test
- public void testProcessFetchResponseWithLsoMovementForTopicPartition() {
+ public void testFetchQueueProcessingWhenFrontItemIsEmpty() {
String groupId = "grp";
- Uuid fooId = Uuid.randomUuid();
- TopicIdPartition tp0 = new TopicIdPartition(fooId, new
TopicPartition("foo", 0));
- TopicIdPartition tp1 = new TopicIdPartition(fooId, new
TopicPartition("foo", 1));
-
+ String memberId = Uuid.randomUuid().toString();
+ FetchParams fetchParams = new
FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
+ 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty());
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+
+ final Time time = new MockTime();
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+
+ SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData1
= new SharePartitionManager.ShareFetchPartitionData(
+ fetchParams, groupId, memberId, new CompletableFuture<>(),
Collections.emptyMap());
+ SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData2
= new SharePartitionManager.ShareFetchPartitionData(
+ fetchParams, groupId, memberId, new CompletableFuture<>(),
partitionMaxBytes);
+
+ ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData>
fetchQueue = new ConcurrentLinkedQueue<>();
+ // First request added to fetch queue is empty i.e. no topic
partitions to fetch.
+ fetchQueue.add(shareFetchPartitionData1);
+ // Second request added to fetch queue has a topic partition to fetch.
+ fetchQueue.add(shareFetchPartitionData2);
+
+ DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+ "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withReplicaManager(replicaManager)
+ .withTime(time)
+ .withTimer(mockTimer)
+ .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
+ .withFetchQueue(fetchQueue).build();
+
+ doAnswer(invocation -> {
+ sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId,
partitionMaxBytes.keySet());
+ return buildLogReadResult(partitionMaxBytes.keySet());
+ }).when(replicaManager).readFromLog(any(), any(),
any(ReplicaQuota.class), anyBoolean());
+
+ sharePartitionManager.maybeProcessFetchQueue();
+
+ // Verifying that the second item in the fetchQueue is processed, even
though the first item is empty.
+ verify(replicaManager, times(1)).readFromLog(any(), any(),
any(ReplicaQuota.class), anyBoolean());
+ }
+
+ @Test
+ public void testAcknowledgeCompletesDelayedShareFetchRequest() {
+ String groupId = "grp";
+ String memberId = Uuid.randomUuid().toString();
+
+ TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo1", 0));
+ TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo2", 0));
+
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES);
+
+ SharePartition sp1 = mock(SharePartition.class);
+ SharePartition sp2 = mock(SharePartition.class);
- ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
- SharePartition sp0 = Mockito.mock(SharePartition.class);
- SharePartition sp1 = Mockito.mock(SharePartition.class);
+ // mocked share partitions sp1 and sp2 can be acquired once there is
an acknowledgement for it.
+ doAnswer(invocation -> {
+ when(sp1.canAcquireRecords()).thenReturn(true);
+ return CompletableFuture.completedFuture(Optional.empty());
+ }).when(sp1).acknowledge(ArgumentMatchers.eq(memberId), any());
+ doAnswer(invocation -> {
+ when(sp2.canAcquireRecords()).thenReturn(true);
+ return CompletableFuture.completedFuture(Optional.empty());
+ }).when(sp2).acknowledge(ArgumentMatchers.eq(memberId), any());
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+ Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+ partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
+
+ SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
+ 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()),
+ groupId,
+ Uuid.randomUuid().toString(),
+ new CompletableFuture<>(),
+ partitionMaxBytes);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+
+ DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+ "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+
+ // Initially you cannot acquire records for both sp1 and sp2.
+ when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp1.canAcquireRecords()).thenReturn(false);
+ when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp2.canAcquireRecords()).thenReturn(false);
+
+ Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
+ partitionMaxBytes.keySet().forEach(topicIdPartition ->
delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId,
topicIdPartition)));
+
+ DelayedShareFetch delayedShareFetch =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
+ .withShareFetchPartitionData(shareFetchPartitionData)
+ .withReplicaManager(replicaManager)
+ .withPartitionCacheMap(partitionCacheMap)
+ .build();
+
+ delayedShareFetchPurgatory.tryCompleteElseWatch(
+ delayedShareFetch,
CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq());
+
+ // Since acquisition lock for sp1 and sp2 cannot be acquired, we
should have 2 watched keys.
+ assertEquals(2, delayedShareFetchPurgatory.watched());
SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
-
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).build();
+ .withPartitionCacheMap(partitionCacheMap)
+ .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
+ .withReplicaManager(replicaManager)
+ .withTimer(mockTimer)
+ .build();
+
+ doAnswer(invocation -> {
+ sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId,
partitionMaxBytes.keySet());
Review Comment:
Why do we need this since we mock sp1 and sp2? Ditto in other tests below.
##########
core/src/test/java/kafka/server/share/DelayedShareFetchKeyTest.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class DelayedShareFetchKeyTest {
+
+ @Test
+ public void testDelayedShareFetchEqualsAndHashcode() {
+ Uuid topicUuid = Uuid.randomUuid();
+ TopicIdPartition tp0 = new TopicIdPartition(topicUuid, new
TopicPartition("topic", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(topicUuid, new
TopicPartition("topic", 1));
+ TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("topic2", 0));
+
+ Map<String, DelayedShareFetchKey> keyMap = new HashMap<>();
+ keyMap.put("key0", new DelayedShareFetchKey("grp", tp0));
+ keyMap.put("key1", new DelayedShareFetchKey("grp", tp1));
+ keyMap.put("key2", new DelayedShareFetchKey("grp", tp2));
+ keyMap.put("key3", new DelayedShareFetchKey("grp2", tp0));
+ keyMap.put("key4", new DelayedShareFetchKey("grp2", tp1));
+
+ keyMap.forEach((key1, value1) -> keyMap.forEach((key2, value2) -> {
+ if (key1.equals(key2)) {
+ assertEquals(value1, value2);
+ assertEquals(value1.hashCode(), value2.hashCode());
+ } else {
+ assertNotEquals(value1, value2);
+ assertNotEquals(value1.hashCode(), value2.hashCode());
Review Comment:
hash code just guarantees that equal values have the same code. But
different values could have the same hash code, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]