apoorvmittal10 commented on code in PR #16792:
URL: https://github.com/apache/kafka/pull/16792#discussion_r1707341091
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4367,7 +4443,46 @@ class KafkaApis(val requestChannel: RequestChannel,
CompletableFuture.completedFuture[Unit](())
}
- private def getAcknowledgeBatchesFromShareFetchRequest(shareFetchRequest:
ShareFetchRequest,
+ // Visible for Testing
+ def getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest
: ShareAcknowledgeRequest,
+ topicIdNames :
util.Map[Uuid, String],
+ erroneous :
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]
+ ) :
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = {
+ val acknowledgeBatchesMap = mutable.Map[TopicIdPartition,
util.List[ShareAcknowledgementBatch]]()
+ shareAcknowledgeRequest.data().topics().forEach{ topic =>
+ if(!topicIdNames.containsKey(topic.topicId)) {
+ topic.partitions.forEach{ case partition:
ShareAcknowledgeRequestData.AcknowledgePartition =>
+ val topicIdPartition = new TopicIdPartition(
+ topic.topicId,
+ new TopicPartition(null, partition.partitionIndex))
+ erroneous +=
+ topicIdPartition ->
ShareAcknowledgeResponse.partitionResponse(topicIdPartition,
Errors.UNKNOWN_TOPIC_ID)
+ }
+ } else {
+ topic.partitions().forEach{partition =>
+ val topicIdPartition = new TopicIdPartition(
+ topic.topicId(),
+ new TopicPartition(topicIdNames.get(topic.topicId()),
partition.partitionIndex())
+ )
+ val acknowledgeBatches = new
util.ArrayList[ShareAcknowledgementBatch]()
+ partition.acknowledgementBatches().forEach{ batch =>
+ acknowledgeBatches.add(new ShareAcknowledgementBatch(
+ batch.firstOffset(),
+ batch.lastOffset(),
+ batch.acknowledgeTypes()
+ ))
+ }
+ if (acknowledgeBatches.size() > 0) {
Review Comment:
if `acknowledgeBatches.size()` could be 0 then shouldn't that check exist
first prior creating objects for `TopicIdPartition`, `TopicPartition`, `new
util.ArrayList[ShareAcknowledgementBatch]()`? Also if acknowledgeBatches.size()
is 0 for topic partition then where we are filling response for same?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4327,9 +4327,85 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleShareAcknowledgeRequest(request: RequestChannel.Request): Unit = {
val shareAcknowledgeRequest = request.body[ShareAcknowledgeRequest]
- // TODO: Implement the ShareAcknowledgeRequest handling
- requestHelper.sendMaybeThrottle(request,
shareAcknowledgeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
- CompletableFuture.completedFuture[Unit](())
+
+ if (!isShareGroupProtocolEnabled) {
+ requestHelper.sendMaybeThrottle(request,
+
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
Errors.UNSUPPORTED_VERSION.exception))
+ return
+ }
+
+ val sharePartitionManagerInstance : SharePartitionManager =
sharePartitionManager match {
+ case Some(manager) => manager
+ case None =>
+ // The API is not supported when the SharePartitionManager is not
defined on the broker
+ info("Received share acknowledge request for zookeeper based cluster")
+ requestHelper.sendMaybeThrottle(request,
+
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
Errors.UNSUPPORTED_VERSION.exception))
+ return
+ }
+ val groupId = shareAcknowledgeRequest.data.groupId
+
+ // Share Acknowledge needs permission to perform READ action on the named
group resource (groupId)
+ if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
+ requestHelper.sendMaybeThrottle(request,
+
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ return
+ }
+
+ val memberId = shareAcknowledgeRequest.data.memberId
+ val shareSessionEpoch = shareAcknowledgeRequest.data.shareSessionEpoch
+ val newReqMetadata : ShareFetchMetadata = new
ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch)
Review Comment:
`ShareFetchMetadata` names seems to be a bit inappropriate as we are
handling ShareAcknowledge request. I understand that we need session and epoch
related information from the class. Do you think `ShareRequestMetadata` would
be a better name for `ShareFetchMetadata`?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4396,14 +4511,70 @@ class KafkaApis(val requestChannel: RequestChannel,
batch.acknowledgeTypes()
))
}
- acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches
+ if (acknowledgeBatches.size() > 0) {
Review Comment:
Same as above.
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4430,16 +4430,16 @@ class KafkaApisTest extends Logging {
when(sharePartitionManager.fetchMessages(any(), any(), any(),
any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
new TopicIdPartition(topicId, new TopicPartition(topicName,
partitionIndex)) ->
- new ShareFetchResponseData.PartitionData()
- .setErrorCode(Errors.NONE.code)
- .setAcknowledgeErrorCode(Errors.NONE.code)
- .setRecords(records)
- .setAcquiredRecords(new util.ArrayList(List(
- new ShareFetchResponseData.AcquiredRecords()
- .setFirstOffset(0)
- .setLastOffset(9)
- .setDeliveryCount(1)
- ).asJava))
+ new ShareFetchResponseData.PartitionData()
+ .setErrorCode(Errors.NONE.code)
+ .setAcknowledgeErrorCode(Errors.NONE.code)
+ .setRecords(records)
+ .setAcquiredRecords(new util.ArrayList(List(
+ new ShareFetchResponseData.AcquiredRecords()
+ .setFirstOffset(0)
+ .setLastOffset(9)
+ .setDeliveryCount(1)
+ ).asJava))
Review Comment:
Is it an inteded change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]