AndrewJSchofield commented on code in PR #19815:
URL: https://github.com/apache/kafka/pull/19815#discussion_r2113705265
##########
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java:
##########
@@ -451,15 +451,15 @@ public void testWriteStateFencedLeaderEpochError() {
.setFirstOffset(11)
.setLastOffset(20)
.setDeliveryCount((short) 1)
- .setDeliveryState((byte) 0)))))));
+ .setDeliveryState((byte)
0)))).iterator()))).iterator()));
Review Comment:
nit: indentation
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -714,9 +714,11 @@ public
CoordinatorResult<DeleteShareGroupOffsetsResultHolder, CoordinatorRecord>
);
}
+ DeleteShareGroupStateRequestData.DeleteStateDataCollection
topicCollection = new
DeleteShareGroupStateRequestData.DeleteStateDataCollection();
+ deleteShareGroupStateRequestTopicsData.forEach(d ->
topicCollection.add(d.duplicate()));
Review Comment:
Is this duplication really necessary?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -807,9 +815,10 @@ public
CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestC
DeleteShareGroupStateRequestData requestForCurrentPartition =
new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
- .setTopics(List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopics(new
DeleteShareGroupStateRequestData.DeleteStateDataCollection(
+ List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId)
- .setPartitions(List.of(partitionData))));
+ .setPartitions(new
DeleteShareGroupStateRequestData.PartitionDataCollection(List.of(partitionData.duplicate()).iterator()))).iterator()));
Review Comment:
I'm trying to decide whether this is really worth making so many changes to
`mapKey`. There's a lot of added complexity in situations like this. In test
cases, it's not really that important. In the main code paths, it's more of a
concern. wdyt?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3812,7 +3814,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
)
} else {
- authorizedTopics.add(topic)
+ authorizedTopics.add(topic.duplicate())
Review Comment:
And here.
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3696,14 +3696,16 @@ class KafkaApis(val requestChannel: RequestChannel,
requestContext,
DESCRIBE,
TOPIC,
- groupDescribeOffsetsRequest.topics.asScala
+ groupDescribeOffsetsRequest.topics.valuesList.asScala
)(_.topicName)
+ val topicCollection = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopicCollection()
+ authorizedTopics.foreach(t => topicCollection.add(t.duplicate))
Review Comment:
I don't see why we now need duplication where previously we did not. Could
you explain? Thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]