chia7712 commented on code in PR #16942:
URL: https://github.com/apache/kafka/pull/16942#discussion_r1723824462
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -351,7 +351,7 @@ public CompletableFuture<List<AcquiredRecords>> acquire(
String memberId,
FetchPartitionData fetchPartitionData
) {
- log.trace("Received acquire request for share partition: {}-{}",
memberId, fetchPartitionData);
+ log.trace("Received acquire request for share partition: {}-{}",
memberId, topicIdPartition);
Review Comment:
The other logs use `groupId` and `topicIdPartition`, so I guess `memberId`
should be replaced by `groupId`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -266,11 +266,18 @@ private Optional<UnsentRequest>
maybeBuildRequest(AcknowledgeRequestState acknow
return Optional.empty();
} else if (!acknowledgeRequestState.maybeExpire()) {
if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
- acknowledgeRequestState.onSendAttempt(currentTimeMs);
- if (onCommitAsync) {
- isAsyncDone.set(true);
+ UnsentRequest request =
acknowledgeRequestState.buildRequest(currentTimeMs);
Review Comment:
This method has some nested if-else. Maybe we can flatten them? for example:
```java
private Optional<UnsentRequest>
maybeBuildRequest(AcknowledgeRequestState acknowledgeRequestState,
long currentTimeMs,
boolean onCommitAsync,
AtomicBoolean
isAsyncDone) {
boolean asyncDone = true;
try {
if (acknowledgeRequestState == null ||
(!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) {
return Optional.empty();
}
if (acknowledgeRequestState.maybeExpire()) {
// Fill in TimeoutException
for (TopicIdPartition tip :
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
}
acknowledgeRequestState.incompleteAcknowledgements.clear();
return Optional.empty();
}
if (!acknowledgeRequestState.canSendRequest(currentTimeMs)) {
// We wait for the backoff before we can send this request.
asyncDone = false;
return Optional.empty();
}
UnsentRequest request =
acknowledgeRequestState.buildRequest(currentTimeMs);
if (request == null) {
asyncDone = false;
return Optional.empty();
}
acknowledgeRequestState.onSendAttempt(currentTimeMs);
return Optional.of(request);
} finally {
if (onCommitAsync) {
isAsyncDone.set(asyncDone);
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]