lianetm commented on code in PR #19577:
URL: https://github.com/apache/kafka/pull/19577#discussion_r2093194584
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -730,7 +739,10 @@ public NetworkClientDelegate.UnsentRequest
toUnsentRequest() {
lastEpochSentOnCommit = Optional.empty();
}
- OffsetCommitRequest.Builder builder =
OffsetCommitRequest.Builder.forTopicNames(data);
+ boolean canUseTopicIds = partitionsWithoutTopicIds == 0;
Review Comment:
is the `partitionsWithoutTopicIds` really needed? wonder if we can simplify,
remove it and this, and only keep the `canUseTopicIds` since the beginning
(setting it to false whenever we find a missing topic)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -809,6 +826,9 @@ public void onResponse(final ClientResponse response) {
if (!unauthorizedTopics.isEmpty()) {
log.error("OffsetCommit failed due to not authorized to commit
to topics {}", unauthorizedTopics);
future.completeExceptionally(new
TopicAuthorizationException(unauthorizedTopics));
+ } else if (!unknownTopicIds.isEmpty()) {
+ log.error("OffsetCommit failed due to unknown topic id to
commit to topic ids {}", unknownTopicIds);
Review Comment:
the message reads a bit off, maybe simply `OffsetCommit failed due to
unknown topic IDs {}` would do?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -695,15 +696,22 @@ class OffsetCommitRequestState extends
RetriableRequestState {
}
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
+ Map<String, Uuid> topicIds = metadata.topicIds();
Review Comment:
yes, it is
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -809,6 +826,9 @@ public void onResponse(final ClientResponse response) {
if (!unauthorizedTopics.isEmpty()) {
log.error("OffsetCommit failed due to not authorized to commit
to topics {}", unauthorizedTopics);
future.completeExceptionally(new
TopicAuthorizationException(unauthorizedTopics));
+ } else if (!unknownTopicIds.isEmpty()) {
+ log.error("OffsetCommit failed due to unknown topic id to
commit to topic ids {}", unknownTopicIds);
+ future.completeExceptionally(new
UnknownTopicIdException(Errors.UNKNOWN_TOPIC_ID.message()));
Review Comment:
should we reuse `Errors.UNKNOWN_TOPIC_ID.exception()` instead of creating a
new exception here? (we don't want a custom message in this case)
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -469,6 +469,35 @@ public void
testCommitSyncFailsWithCommitFailedExceptionOnStaleMemberEpoch() {
assertFutureThrows(CommitFailedException.class, commitResult);
}
+ @Test
+ public void testCommitSyncShouldSuccessWithTopicHasId() {
Review Comment:
```suggestion
public void testCommitSyncShouldSucceedWithTopicId() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]