junrao commented on code in PR #17583:
URL: https://github.com/apache/kafka/pull/17583#discussion_r1813494019
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -149,15 +149,27 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
- log.trace("Try to complete the delayed share fetch request for group
{}, member {}, topic partitions {}",
- shareFetchData.groupId(), shareFetchData.memberId(),
- shareFetchData.partitionMaxBytes().keySet());
+ // There can be multiple threads which might invoke tryComplete for
same share fetch request
+ // hence check if delay share fetch request is already completed. If
yes, return true.
+ // However, this check alone cannot guarantee that request is really
completed. It is possible that
+ // tryComplete is invoked by multiple threads and state has yet not
updated. Hence, we need to check
+ // the forceComplete response as well.
+ if (isCompleted()) {
Review Comment:
This is unnecessary since the caller
`DelayedOperationPurgatory.Watchers.tryCompleteWatched` already does this.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1807,16 +1807,15 @@ &&
checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.las
// Even if write share group state RPC call fails, we will
still go ahead with the state transition.
// Update the cached state and start and end offsets after
releasing the acquisition lock on timeout.
maybeUpdateCachedStateAndOffsets();
-
- // If we have an acquisition lock timeout for a
share-partition, then we should check if
- // there is a pending share fetch request for the
share-partition and complete it.
- DelayedShareFetchKey delayedShareFetchKey = new
DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(),
topicIdPartition.partition());
-
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
});
}
} finally {
lock.writeLock().unlock();
}
+ // If we have an acquisition lock timeout for a share-partition, then
we should check if
+ // there is a pending share fetch request for the share-partition and
complete it.
+ DelayedShareFetchKey delayedShareFetchKey = new
DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(),
topicIdPartition.partition());
Review Comment:
Should we call this under `if (!stateBatches.isEmpty())`?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -182,22 +194,28 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
// Add the share partition to the list of partitions to be fetched
only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
- // If the share partition is already at capacity, we should
not attempt to fetch.
- if (sharePartition.canAcquireRecords()) {
- topicPartitionData.put(
+ try {
+ // If the share partition is already at capacity, we
should not attempt to fetch.
+ if (sharePartition.canAcquireRecords()) {
+ topicPartitionData.put(
topicIdPartition,
new FetchRequest.PartitionData(
- topicIdPartition.topicId(),
- sharePartition.nextFetchOffset(),
- 0,
- partitionMaxBytes,
- Optional.empty()
+ topicIdPartition.topicId(),
+ sharePartition.nextFetchOffset(),
+ 0,
+ partitionMaxBytes,
+ Optional.empty()
)
- );
- } else {
- sharePartition.releaseFetchLock();
- log.trace("Record lock partition limit exceeded for
SharePartition {}, " +
+ );
+ } else {
+ sharePartition.releaseFetchLock();
+ log.trace("Record lock partition limit exceeded for
SharePartition {}, " +
"cannot acquire more records", sharePartition);
+ }
+ } catch (Exception e) {
Review Comment:
Where is the exception coming from?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]