chirag-wadhwa5 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1697502447
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4069,6 +4431,211 @@ class KafkaApis(val requestChannel: RequestChannel,
CompletableFuture.completedFuture[Unit](())
}
+ private def getAcknowledgeBatchesFromShareFetchRequest(
+ shareFetchRequest :
ShareFetchRequest,
+ topicIdNames :
util.Map[Uuid, String],
+ erroneous :
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
+ ) :
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = {
+
+ val acknowledgeBatchesMap = mutable.Map[TopicIdPartition,
util.List[ShareAcknowledgementBatch]]()
+ shareFetchRequest.data().topics().forEach ( topic => {
+
+ if(!topicIdNames.containsKey(topic.topicId)) {
+ topic.partitions.forEach((partition:
ShareFetchRequestData.FetchPartition) => {
+ val topicIdPartition = new TopicIdPartition(
+ topic.topicId,
+ new TopicPartition(null, partition.partitionIndex))
+ erroneous +=
+ topicIdPartition ->
ShareAcknowledgeResponse.partitionResponse(topicIdPartition,
Errors.UNKNOWN_TOPIC_ID)
+ })
+ }
+ else {
+ topic.partitions().forEach ( partition => {
+ val topicIdPartition = new TopicIdPartition(
+ topic.topicId(),
+ new TopicPartition(topicIdNames.get(topic.topicId()),
partition.partitionIndex())
+ )
+ val acknowledgeBatches = new
util.ArrayList[ShareAcknowledgementBatch]()
+ partition.acknowledgementBatches().forEach( batch => {
+ acknowledgeBatches.add(new ShareAcknowledgementBatch(
+ batch.firstOffset(),
+ batch.lastOffset(),
+ batch.acknowledgeTypes()
+ ))
+ })
+ acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches
+ })
+ }
+ })
+ acknowledgeBatchesMap
+ }
+
+ def validateAcknowledgementBatches(
+ acknowledgementDataFromRequest:
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]],
+ erroneous: mutable.Map[TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData]
+ ): mutable.Set[TopicIdPartition] = {
+ val erroneousTopicIdPartitions: mutable.Set[TopicIdPartition] =
mutable.Set.empty[TopicIdPartition]
+
+ acknowledgementDataFromRequest.foreach { case (tp: TopicIdPartition,
acknowledgeBatches: util.List[ShareAcknowledgementBatch]) =>
+ var prevEndOffset = -1L
+ var isErroneous = false
+ acknowledgeBatches.forEach { batch =>
+ if (!isErroneous) {
+ if (batch.firstOffset > batch.lastOffset) {
+ erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp,
Errors.INVALID_REQUEST)
+ erroneousTopicIdPartitions.add(tp)
+ isErroneous = true
+ } else if (batch.firstOffset < prevEndOffset) {
+ erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp,
Errors.INVALID_REQUEST)
+ erroneousTopicIdPartitions.add(tp)
+ isErroneous = true
+ } else if (batch.acknowledgeTypes == null ||
batch.acknowledgeTypes.isEmpty) {
+ erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp,
Errors.INVALID_REQUEST)
+ erroneousTopicIdPartitions.add(tp)
+ isErroneous = true
+ } else if (batch.acknowledgeTypes.size() > 1 && batch.lastOffset -
batch.firstOffset != batch.acknowledgeTypes.size() - 1) {
+ erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp,
Errors.INVALID_REQUEST)
+ erroneousTopicIdPartitions.add(tp)
+ isErroneous = true
+ } else if (batch.acknowledgeTypes.stream().anyMatch(ackType =>
ackType < 0 || ackType > 3)) {
+ erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp,
Errors.INVALID_REQUEST)
+ erroneousTopicIdPartitions.add(tp)
+ isErroneous = true
+ } else {
+ prevEndOffset = batch.lastOffset
+ }
+ }
+ }
+ }
+
+ erroneousTopicIdPartitions
+ }
+
+ // the callback for processing a share fetch response, invoked before
throttling.
+ def processShareFetchResponse(
+ responsePartitionData:
mutable.Map[TopicIdPartition, ShareFetchResponseData.PartitionData],
+ request: RequestChannel.Request,
+ topicIdNames : util.Map[Uuid, String],
+ shareFetchContext : ShareFetchContext
+ ): ShareFetchResponse = {
+
+ val clientId = request.header.clientId
+ val versionId = request.header.apiVersion
+ val shareFetchRequest = request.body[ShareFetchRequest]
+ val groupId = shareFetchRequest.data.groupId
+ val memberId = shareFetchRequest.data.memberId
+
+ val partitions = new util.LinkedHashMap[TopicIdPartition,
ShareFetchResponseData.PartitionData]
+ val nodeEndpoints = new mutable.HashMap[Int, Node]
+ responsePartitionData.foreach { case(tp, partitionData) =>
+ partitionData.errorCode match {
+ case errCode if errCode == Errors.NOT_LEADER_OR_FOLLOWER.code |
errCode == Errors.FENCED_LEADER_EPOCH.code =>
+ val leaderNode = getCurrentLeader(tp.topicPartition,
request.context.listenerName)
+ leaderNode.node.foreach { node =>
+ nodeEndpoints.put(node.id, node)
+ }
+ partitionData.currentLeader
+ .setLeaderId(leaderNode.leaderId)
+ .setLeaderEpoch(leaderNode.leaderEpoch)
+ case _ =>
+ }
+
+ partitions.put(tp, partitionData)
Review Comment:
Thanks for the review. Yes we could use the same, but the definition of some
methods of shareFetchContext require a util.LinkedHashMap, so we would anyways
require a new variable to store the converted map as it is required as an
argument to multiple methods. Talking about why do we need a util.LinkedHashMap
altogether, maybe we could change those method signatures to use a scala map as
well, but I think that would out of scope for this PR as it would include
making changes to others code as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]