apoorvmittal10 commented on code in PR #19598:
URL: https://github.com/apache/kafka/pull/19598#discussion_r2075278984
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -96,6 +97,10 @@ public class DelayedShareFetch extends DelayedOperation {
* Metric for the rate of expired delayed fetch requests.
*/
private final Meter expiredRequestMeter;
+ /**
+ * fetchId serves as a token while acquiring/releasing share partition's
fetch lock from a DelayedShareFetch instance.
Review Comment:
```suggestion
* fetchId serves as a token while acquiring/releasing share partition's
fetch lock.
```
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -376,20 +384,22 @@ LinkedHashMap<TopicIdPartition, Long>
acquirablePartitions(
sharePartitionsForAcquire.forEach((topicIdPartition, sharePartition)
-> {
// Add the share partition to the list of partitions to be fetched
only if we can
// acquire the fetch lock on it.
- if (sharePartition.maybeAcquireFetchLock()) {
+ if (sharePartition.maybeAcquireFetchLock(fetchId)) {
try {
+ log.trace("Fetch lock for share partition {}-{} has been
acquired by {}", shareFetch.groupId(), topicIdPartition, fetchId);
// If the share partition is already at capacity, we
should not attempt to fetch.
if (sharePartition.canAcquireRecords()) {
topicPartitionData.put(topicIdPartition,
sharePartition.nextFetchOffset());
} else {
- sharePartition.releaseFetchLock();
- log.trace("Record lock partition limit exceeded for
SharePartition {}, " +
- "cannot acquire more records", sharePartition);
+ sharePartition.releaseFetchLock(fetchId);
+ log.trace("Record lock partition limit exceeded for
SharePartition {}-{}, " +
+ "cannot acquire more records. Releasing the fetch
lock by {}", shareFetch.groupId(), topicIdPartition, fetchId);
}
} catch (Exception e) {
log.error("Error checking condition for SharePartition:
{}", sharePartition, e);
Review Comment:
While you are in the file, can you fix logs where we are logging
sharePartition, the log line will be of no help.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1338,13 +1340,14 @@ boolean canAcquireRecords() {
* share partition is not fetched concurrently by multiple clients. The
fetch lock is released once
* the records are fetched and acquired.
*
+ * @param fetchId - the DelayedShareFetch instance uuid that is trying to
acquire the fetch lock.
Review Comment:
Why do we need to talk about `DelayedShareFetch` it's not relevant in
SharePartition. It's just uuid which represents the caller's id.
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7134,6 +7137,23 @@ public void
testFetchAbortedTransactionRecordBatchesForAbortedAndCommittedTransa
assertEquals(1, actual.get(3).producerId());
}
+ @Test
+ public void testFetchLockReleasedByDifferentId() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+ Uuid fetchId1 = mock(Uuid.class);
+ Uuid fetchId2 = mock(Uuid.class);
+
+ // Initially, fetch lock is not acquired.
+ assertNull(sharePartition.fetchLock());
+ // fetchId1 acquires the fetch lock.
+ assertTrue(sharePartition.maybeAcquireFetchLock(fetchId1));
+ // If we release fetch lock by fetchId2, it will work.
+ sharePartition.releaseFetchLock(fetchId2);
+ assertNull(sharePartition.fetchLock()); // Fetch lock has been
released.
Review Comment:
Please write comments her, why this currently releases the lock. And once we
make the locks handling strict then this test case need to be updated.
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1647,14 +1649,15 @@ public void testRecordFetchLockRatioMetric() {
.thenReturn(80L) // for time when lock is released
.thenReturn(160L); // to update lock idle duration while acquiring
lock again.
- assertTrue(sharePartition.maybeAcquireFetchLock());
- sharePartition.releaseFetchLock();
+ Uuid fetchId = mock(Uuid.class);
Review Comment:
Is mocking really helpful with Uuid in any way?
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7134,6 +7137,23 @@ public void
testFetchAbortedTransactionRecordBatchesForAbortedAndCommittedTransa
assertEquals(1, actual.get(3).producerId());
}
+ @Test
+ public void testFetchLockReleasedByDifferentId() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+ Uuid fetchId1 = mock(Uuid.class);
+ Uuid fetchId2 = mock(Uuid.class);
Review Comment:
nit: Can't be just `Uuid.randomUuid()`, why to mock?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]