ahuang98 commented on code in PR #19664:
URL: https://github.com/apache/kafka/pull/19664#discussion_r2112336920


##########
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala:
##########
@@ -163,7 +163,7 @@ class RequestQuotaTest extends BaseRequestTest {
   @Test
   def testExemptRequestTime(): Unit = {
     // Exclude `DESCRIBE_QUORUM`, maybe it shouldn't be a cluster action
-    val actions = clusterActions -- clusterActionsWithThrottleForBroker -- 
RequestQuotaTest.Envelope -- RequestQuotaTest.ShareGroupState - 
ApiKeys.DESCRIBE_QUORUM
+    val actions = clusterActions -- clusterActionsWithThrottleForBroker -- 
RequestQuotaTest.Envelope -- RequestQuotaTest.ShareGroupState - 
ApiKeys.DESCRIBE_QUORUM - ApiKeys.GET_REPLICA_LOG_INFO

Review Comment:
   hm, what determines if we an action/request should be exempt / tracked by 
the exempt-request-time metric?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -13321,6 +13323,284 @@ class KafkaApisTest extends Logging {
     assertEquals(alterShareGroupOffsetsResponseData, response.data)
   }
 
+  def verifyGetReplicaLogInfoRequest(builder: 
GetReplicaLogInfoRequest.Builder, withResponse: (GetReplicaLogInfoResponse => 
Unit)): Unit = {
+    val request = buildRequest(builder.build())
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val clusterResource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
+    val clusterActions = Collections.singletonList(new 
Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
+    val allowList = Collections.singletonList(AuthorizationResult.ALLOWED)
+    when(authorizer.authorize(request.context, 
clusterActions)).thenReturn(allowList)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    kafkaApis.handleGetReplicaLogInfo(request)
+    withResponse(verifyNoThrottling[GetReplicaLogInfoResponse](request))
+  }
+
+  @Test
+  def testUnauthorizedGetReplicaLogInfo(): Unit = {
+    val builder = new GetReplicaLogInfoRequest.Builder(new 
GetReplicaLogInfoRequestData())
+    val request = buildRequest(builder.build(0))
+    val clusterResource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
+    val clusterActions = Collections.singletonList(new 
Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
+    val allowList = Collections.singletonList(AuthorizationResult.DENIED)
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(request.context, 
clusterActions)).thenReturn(allowList)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    assertThrows(classOf[ClusterAuthorizationException],
+      () => kafkaApis.handleGetReplicaLogInfo(request))
+  }
+
+  @Test
+  def testGetReplicaLogInfoFailedToRetrievePartition(): Unit = {

Review Comment:
   nit: this test seems to test the happy case too (getReplicaLogInfo succeeds 
in retrieving partition) - should we split them out or is the happy case 
already redundant with other tests?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -13321,6 +13323,284 @@ class KafkaApisTest extends Logging {
     assertEquals(alterShareGroupOffsetsResponseData, response.data)
   }
 
+  def verifyGetReplicaLogInfoRequest(builder: 
GetReplicaLogInfoRequest.Builder, withResponse: (GetReplicaLogInfoResponse => 
Unit)): Unit = {
+    val request = buildRequest(builder.build())
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val clusterResource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
+    val clusterActions = Collections.singletonList(new 
Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
+    val allowList = Collections.singletonList(AuthorizationResult.ALLOWED)
+    when(authorizer.authorize(request.context, 
clusterActions)).thenReturn(allowList)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    kafkaApis.handleGetReplicaLogInfo(request)
+    withResponse(verifyNoThrottling[GetReplicaLogInfoResponse](request))
+  }
+
+  @Test
+  def testUnauthorizedGetReplicaLogInfo(): Unit = {
+    val builder = new GetReplicaLogInfoRequest.Builder(new 
GetReplicaLogInfoRequestData())
+    val request = buildRequest(builder.build(0))
+    val clusterResource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
+    val clusterActions = Collections.singletonList(new 
Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
+    val allowList = Collections.singletonList(AuthorizationResult.DENIED)
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(request.context, 
clusterActions)).thenReturn(allowList)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    assertThrows(classOf[ClusterAuthorizationException],
+      () => kafkaApis.handleGetReplicaLogInfo(request))
+  }
+
+  @Test
+  def testGetReplicaLogInfoFailedToRetrievePartition(): Unit = {
+    val uuids = (1 to 4).map(_ => Uuid.randomUuid()).toList
+    val tps = uuids.map(new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(_)
+      .setPartitions(Collections.singletonList(1)))
+
+    def mockTopicName(uuid: Uuid, idx: Int): String = 
s"topic-idx-$idx-with-uuid-$uuid"
+
+    metadataCache = mock(classOf[MetadataCache])
+    uuids.zipWithIndex.foreach { case (uuid, idx) =>
+      val name = mockTopicName(uuid, idx)
+      when(metadataCache.getTopicName(uuid)).thenReturn(Optional.of(name))
+    }
+
+    val log = mock(classOf[UnifiedLog])
+    when(log.logEndOffset).thenReturn(100L)
+    when(log.latestEpoch).thenReturn(Optional.of(10))
+    val partition = mock(classOf[Partition])
+    when(partition.log).thenReturn(Some(log))
+    when(partition.getLeaderEpoch).thenReturn(1)
+    when(partition.partitionId).thenReturn(1)
+    val valid = new TopicPartition(mockTopicName(uuids.head, 0), 1)
+    
when(replicaManager.getPartitionOrError(valid)).thenReturn(Right(partition))
+
+    val expected = new 
GetReplicaLogInfoResponseData().setBrokerEpoch(brokerEpoch)
+    expected.topicPartitionLogInfoList().add(new 
GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+      .setTopicId(uuids.head)
+      .setPartitionLogInfo(Collections.singletonList(new 
GetReplicaLogInfoResponseData.PartitionLogInfo()
+        .setPartition(1)
+        .setLogEndOffset(100L)
+        .setLastWrittenLeaderEpoch(10)
+        .setCurrentLeaderEpoch(1))))
+
+    var idx = 1
+    List(Errors.KAFKA_STORAGE_ERROR, Errors.NOT_LEADER_OR_FOLLOWER, 
Errors.UNKNOWN_TOPIC_OR_PARTITION).foreach { err =>
+      val uuid = uuids(idx)
+      val name = mockTopicName(uuid, idx)
+      val invalid = new TopicPartition(name, 1)
+      when(replicaManager.getPartitionOrError(invalid)).thenReturn(Left(err))
+      expected.topicPartitionLogInfoList().add(new 
GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+        .setTopicId(uuid)
+        .setPartitionLogInfo(Collections.singletonList(new 
GetReplicaLogInfoResponseData.PartitionLogInfo()
+          .setErrorCode(err.code())
+          .setPartition(1))))
+      idx += 1
+    }
+
+
+    val builder = new GetReplicaLogInfoRequest.Builder(
+      new GetReplicaLogInfoRequestData().setTopicPartitions(tps asJava)
+    )
+    verifyGetReplicaLogInfoRequest(builder, { response =>
+      assertEquals(expected, response.data())
+    })
+  }
+
+  @Test
+  def testGetReplicaLogInfoFailedToRetrieveLog(): Unit = {
+    val topic1 = new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(Uuid.randomUuid())
+      .setPartitions(Collections.singletonList(1))
+    val topic2 = new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(Uuid.randomUuid())
+      .setPartitions(Collections.singletonList(2))
+    val builder = new GetReplicaLogInfoRequest.Builder(List(topic1, topic2) 
asJava)
+
+    metadataCache = mock(classOf[MetadataCache])
+    
when(metadataCache.getTopicName(topic1.topicId())).thenReturn(Optional.of("topic1"))
+    
when(metadataCache.getTopicName(topic2.topicId())).thenReturn(Optional.of("topic2"))
+
+    val log1 = mock(classOf[UnifiedLog])
+    when(log1.logEndOffset).thenReturn(100L)
+    when(log1.latestEpoch).thenReturn(Optional.of(10))
+    val partition1 = mock(classOf[Partition])
+    when(partition1.log).thenReturn(Some(log1))
+    when(partition1.getLeaderEpoch).thenReturn(1)
+    when(partition1.partitionId).thenReturn(1)
+
+    val partition2 = mock(classOf[Partition])
+    when(partition2.log).thenReturn(None)
+    when(partition2.getLeaderEpoch).thenReturn(2)
+    when(partition2.partitionId).thenReturn(2)
+
+    val tp1 = new TopicPartition("topic1", 1)
+    when(replicaManager.getPartitionOrError(tp1)).thenReturn(Right(partition1))
+    val tp2 = new TopicPartition("topic2", 2)
+    when(replicaManager.getPartitionOrError(tp2)).thenReturn(Right(partition2))
+
+    val expected = new GetReplicaLogInfoResponseData()
+      .setTopicPartitionLogInfoList(List(
+        new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+          .setTopicId(topic1.topicId())
+          .setPartitionLogInfo(Collections.singletonList(
+            new GetReplicaLogInfoResponseData.PartitionLogInfo()
+              .setPartition(1)
+              .setLogEndOffset(100L)
+              .setLastWrittenLeaderEpoch(10)
+              .setCurrentLeaderEpoch(1))),
+        new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+          .setTopicId(topic2.topicId())
+          .setPartitionLogInfo(Collections.singletonList(
+            new GetReplicaLogInfoResponseData.PartitionLogInfo()
+              .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
+          ))) asJava)
+      .setBrokerEpoch(brokerEpoch)
+
+    verifyGetReplicaLogInfoRequest(builder, { response =>
+      assertEquals(expected, response.data())
+    })
+  }
+
+  @Test
+  def testGetReplicaLogInfoUnknownTopic(): Unit = {

Review Comment:
   may be worth having one of these error-case tests confirm that requests with 
a mix of "good" and "bad" topics/partitions will not impact the return of 
information of the "good" partition 



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -13321,6 +13323,284 @@ class KafkaApisTest extends Logging {
     assertEquals(alterShareGroupOffsetsResponseData, response.data)
   }
 
+  def verifyGetReplicaLogInfoRequest(builder: 
GetReplicaLogInfoRequest.Builder, withResponse: (GetReplicaLogInfoResponse => 
Unit)): Unit = {

Review Comment:
   could we simplify this? it seems like we could replace `withResponse` with 
the expected  GetReplicaLogInfoResponseData



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -13321,6 +13323,284 @@ class KafkaApisTest extends Logging {
     assertEquals(alterShareGroupOffsetsResponseData, response.data)
   }
 
+  def verifyGetReplicaLogInfoRequest(builder: 
GetReplicaLogInfoRequest.Builder, withResponse: (GetReplicaLogInfoResponse => 
Unit)): Unit = {
+    val request = buildRequest(builder.build())
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val clusterResource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
+    val clusterActions = Collections.singletonList(new 
Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
+    val allowList = Collections.singletonList(AuthorizationResult.ALLOWED)
+    when(authorizer.authorize(request.context, 
clusterActions)).thenReturn(allowList)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    kafkaApis.handleGetReplicaLogInfo(request)
+    withResponse(verifyNoThrottling[GetReplicaLogInfoResponse](request))
+  }
+
+  @Test
+  def testUnauthorizedGetReplicaLogInfo(): Unit = {
+    val builder = new GetReplicaLogInfoRequest.Builder(new 
GetReplicaLogInfoRequestData())
+    val request = buildRequest(builder.build(0))
+    val clusterResource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
+    val clusterActions = Collections.singletonList(new 
Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
+    val allowList = Collections.singletonList(AuthorizationResult.DENIED)
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(request.context, 
clusterActions)).thenReturn(allowList)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    assertThrows(classOf[ClusterAuthorizationException],
+      () => kafkaApis.handleGetReplicaLogInfo(request))
+  }
+
+  @Test
+  def testGetReplicaLogInfoFailedToRetrievePartition(): Unit = {
+    val uuids = (1 to 4).map(_ => Uuid.randomUuid()).toList
+    val tps = uuids.map(new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(_)
+      .setPartitions(Collections.singletonList(1)))
+
+    def mockTopicName(uuid: Uuid, idx: Int): String = 
s"topic-idx-$idx-with-uuid-$uuid"
+
+    metadataCache = mock(classOf[MetadataCache])
+    uuids.zipWithIndex.foreach { case (uuid, idx) =>
+      val name = mockTopicName(uuid, idx)
+      when(metadataCache.getTopicName(uuid)).thenReturn(Optional.of(name))
+    }
+
+    val log = mock(classOf[UnifiedLog])
+    when(log.logEndOffset).thenReturn(100L)
+    when(log.latestEpoch).thenReturn(Optional.of(10))
+    val partition = mock(classOf[Partition])
+    when(partition.log).thenReturn(Some(log))
+    when(partition.getLeaderEpoch).thenReturn(1)
+    when(partition.partitionId).thenReturn(1)
+    val valid = new TopicPartition(mockTopicName(uuids.head, 0), 1)
+    
when(replicaManager.getPartitionOrError(valid)).thenReturn(Right(partition))
+
+    val expected = new 
GetReplicaLogInfoResponseData().setBrokerEpoch(brokerEpoch)
+    expected.topicPartitionLogInfoList().add(new 
GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+      .setTopicId(uuids.head)
+      .setPartitionLogInfo(Collections.singletonList(new 
GetReplicaLogInfoResponseData.PartitionLogInfo()
+        .setPartition(1)
+        .setLogEndOffset(100L)
+        .setLastWrittenLeaderEpoch(10)
+        .setCurrentLeaderEpoch(1))))
+
+    var idx = 1
+    List(Errors.KAFKA_STORAGE_ERROR, Errors.NOT_LEADER_OR_FOLLOWER, 
Errors.UNKNOWN_TOPIC_OR_PARTITION).foreach { err =>
+      val uuid = uuids(idx)
+      val name = mockTopicName(uuid, idx)
+      val invalid = new TopicPartition(name, 1)
+      when(replicaManager.getPartitionOrError(invalid)).thenReturn(Left(err))
+      expected.topicPartitionLogInfoList().add(new 
GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+        .setTopicId(uuid)
+        .setPartitionLogInfo(Collections.singletonList(new 
GetReplicaLogInfoResponseData.PartitionLogInfo()
+          .setErrorCode(err.code())
+          .setPartition(1))))
+      idx += 1
+    }
+
+
+    val builder = new GetReplicaLogInfoRequest.Builder(
+      new GetReplicaLogInfoRequestData().setTopicPartitions(tps asJava)
+    )
+    verifyGetReplicaLogInfoRequest(builder, { response =>
+      assertEquals(expected, response.data())
+    })
+  }
+
+  @Test
+  def testGetReplicaLogInfoFailedToRetrieveLog(): Unit = {
+    val topic1 = new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(Uuid.randomUuid())
+      .setPartitions(Collections.singletonList(1))
+    val topic2 = new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(Uuid.randomUuid())
+      .setPartitions(Collections.singletonList(2))
+    val builder = new GetReplicaLogInfoRequest.Builder(List(topic1, topic2) 
asJava)
+
+    metadataCache = mock(classOf[MetadataCache])
+    
when(metadataCache.getTopicName(topic1.topicId())).thenReturn(Optional.of("topic1"))
+    
when(metadataCache.getTopicName(topic2.topicId())).thenReturn(Optional.of("topic2"))
+
+    val log1 = mock(classOf[UnifiedLog])
+    when(log1.logEndOffset).thenReturn(100L)
+    when(log1.latestEpoch).thenReturn(Optional.of(10))
+    val partition1 = mock(classOf[Partition])
+    when(partition1.log).thenReturn(Some(log1))
+    when(partition1.getLeaderEpoch).thenReturn(1)
+    when(partition1.partitionId).thenReturn(1)
+
+    val partition2 = mock(classOf[Partition])
+    when(partition2.log).thenReturn(None)
+    when(partition2.getLeaderEpoch).thenReturn(2)
+    when(partition2.partitionId).thenReturn(2)
+
+    val tp1 = new TopicPartition("topic1", 1)
+    when(replicaManager.getPartitionOrError(tp1)).thenReturn(Right(partition1))
+    val tp2 = new TopicPartition("topic2", 2)
+    when(replicaManager.getPartitionOrError(tp2)).thenReturn(Right(partition2))
+
+    val expected = new GetReplicaLogInfoResponseData()
+      .setTopicPartitionLogInfoList(List(
+        new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+          .setTopicId(topic1.topicId())
+          .setPartitionLogInfo(Collections.singletonList(
+            new GetReplicaLogInfoResponseData.PartitionLogInfo()
+              .setPartition(1)
+              .setLogEndOffset(100L)
+              .setLastWrittenLeaderEpoch(10)
+              .setCurrentLeaderEpoch(1))),
+        new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+          .setTopicId(topic2.topicId())
+          .setPartitionLogInfo(Collections.singletonList(
+            new GetReplicaLogInfoResponseData.PartitionLogInfo()
+              .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
+          ))) asJava)
+      .setBrokerEpoch(brokerEpoch)
+
+    verifyGetReplicaLogInfoRequest(builder, { response =>
+      assertEquals(expected, response.data())
+    })
+  }
+
+  @Test
+  def testGetReplicaLogInfoUnknownTopic(): Unit = {
+    val expectedPartition = 1
+    val expectedUuid = Uuid.randomUuid()
+    val builder = new GetReplicaLogInfoRequest.Builder(new 
GetReplicaLogInfoRequestData()
+      .setTopicPartitions(
+        Collections.singletonList(new 
GetReplicaLogInfoRequestData.TopicPartitions()
+          .setTopicId(expectedUuid)
+          .setPartitions(Collections.singletonList(expectedPartition)))))
+    metadataCache = mock(classOf[MetadataCache])
+    when(metadataCache.getTopicName(expectedUuid)).thenReturn(Optional.empty())
+
+    val expectedResponseData = new GetReplicaLogInfoResponseData()
+      .setBrokerEpoch(brokerEpoch)
+      .setTopicPartitionLogInfoList(Collections.singletonList(
+        new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+          .setTopicId(expectedUuid)
+          .setPartitionLogInfo(Collections.singletonList(
+            new GetReplicaLogInfoResponseData.PartitionLogInfo()
+              .setPartition(expectedPartition)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code())
+          ))
+      ))
+    verifyGetReplicaLogInfoRequest(builder, { response =>
+      assertEquals(expectedResponseData, response.data())
+    })
+  }
+
+  @Test
+  def testGetReplicaLogInfoRequestTooManyTopics(): Unit = {
+    // 100 topics, 20 partitions per topic = 2k topic-partitions
+    // only first 1000 should be sent back and truncated = true
+    val numberUuids = 100
+    val numberPartitions = 20
+    val uuids: List[Uuid] = (1 to numberUuids).map(_ => 
Uuid.randomUuid()).toList
+    val tps = uuids.map(new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(_)
+      .setPartitions((1 to numberPartitions).map(new Integer(_)).asJava))
+    val builder = new GetReplicaLogInfoRequest.Builder(
+      new GetReplicaLogInfoRequestData().setTopicPartitions(tps asJava))
+    val expectedLogEndOffset = 10L
+    val expectedLeaderEpoch = 2
+    val expectedLatestEpoch = 3
+
+    def mockTopicName(uuid: Uuid, idx: Int): String = 
s"topic-idx-$idx-with-uuid-$uuid"
+
+    metadataCache = mock(classOf[MetadataCache])
+    uuids.zipWithIndex.foreach { case (uuid, idx) =>
+      
when(metadataCache.getTopicName(uuid)).thenReturn(Optional.of(mockTopicName(uuid,
 idx)))
+    }
+
+    val expectedResponseData =
+      new 
GetReplicaLogInfoResponseData().setHasMoreData(true).setBrokerEpoch(brokerEpoch)
+    uuids.take(50).zipWithIndex.foreach { case (uuid, idx) =>
+      val tpli = new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+        .setTopicId(uuid)
+      val topicName = mockTopicName(uuid, idx)
+      val log = mock(classOf[UnifiedLog])
+      when(log.logEndOffset).thenReturn(expectedLogEndOffset)
+      when(log.latestEpoch).thenReturn(Optional.of(expectedLatestEpoch))
+      (1 to numberPartitions).foreach { pid =>
+        val partition = mock(classOf[Partition])
+        when(partition.log).thenReturn(Some(log))
+        when(partition.getLeaderEpoch).thenReturn(expectedLeaderEpoch)
+        when(partition.partitionId).thenReturn(pid)
+        when(replicaManager.getPartitionOrError(new TopicPartition(topicName, 
pid))).thenReturn(Right(partition))
+        tpli.partitionLogInfo().add(new 
GetReplicaLogInfoResponseData.PartitionLogInfo()
+          .setPartition(pid)
+          .setLogEndOffset(expectedLogEndOffset)
+          .setLastWrittenLeaderEpoch(expectedLatestEpoch)
+          .setCurrentLeaderEpoch(expectedLeaderEpoch))
+      }
+      expectedResponseData.topicPartitionLogInfoList().add(tpli)
+    }
+
+    verifyGetReplicaLogInfoRequest(builder, { response =>
+      assertEquals(expectedResponseData, response.data())
+    })
+  }
+
+  @Test
+  def testGetReplicaInfoRequestHappyTrail(): Unit = {

Review Comment:
   hehe happy trail, never heard that one before 🥾 



##########
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala:
##########
@@ -191,7 +191,7 @@ class RequestQuotaTest extends BaseRequestTest {
     ApiKeys.brokerApis.asScala.filter(_.clusterAction).toSet
   }
 
-  private def clusterActionsWithThrottleForBroker: Set[ApiKeys] = {
+  private def clusterActionsWithThrottleForBroker: Set[ApiKeys] = 
{RequestQuotaTest

Review Comment:
   typo?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -13321,6 +13323,284 @@ class KafkaApisTest extends Logging {
     assertEquals(alterShareGroupOffsetsResponseData, response.data)
   }
 
+  def verifyGetReplicaLogInfoRequest(builder: 
GetReplicaLogInfoRequest.Builder, withResponse: (GetReplicaLogInfoResponse => 
Unit)): Unit = {
+    val request = buildRequest(builder.build())
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val clusterResource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
+    val clusterActions = Collections.singletonList(new 
Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
+    val allowList = Collections.singletonList(AuthorizationResult.ALLOWED)
+    when(authorizer.authorize(request.context, 
clusterActions)).thenReturn(allowList)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    kafkaApis.handleGetReplicaLogInfo(request)
+    withResponse(verifyNoThrottling[GetReplicaLogInfoResponse](request))
+  }
+
+  @Test
+  def testUnauthorizedGetReplicaLogInfo(): Unit = {
+    val builder = new GetReplicaLogInfoRequest.Builder(new 
GetReplicaLogInfoRequestData())
+    val request = buildRequest(builder.build(0))
+    val clusterResource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
+    val clusterActions = Collections.singletonList(new 
Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
+    val allowList = Collections.singletonList(AuthorizationResult.DENIED)
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(request.context, 
clusterActions)).thenReturn(allowList)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    assertThrows(classOf[ClusterAuthorizationException],
+      () => kafkaApis.handleGetReplicaLogInfo(request))
+  }
+
+  @Test
+  def testGetReplicaLogInfoFailedToRetrievePartition(): Unit = {
+    val uuids = (1 to 4).map(_ => Uuid.randomUuid()).toList
+    val tps = uuids.map(new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(_)
+      .setPartitions(Collections.singletonList(1)))
+
+    def mockTopicName(uuid: Uuid, idx: Int): String = 
s"topic-idx-$idx-with-uuid-$uuid"
+
+    metadataCache = mock(classOf[MetadataCache])
+    uuids.zipWithIndex.foreach { case (uuid, idx) =>
+      val name = mockTopicName(uuid, idx)
+      when(metadataCache.getTopicName(uuid)).thenReturn(Optional.of(name))
+    }
+
+    val log = mock(classOf[UnifiedLog])
+    when(log.logEndOffset).thenReturn(100L)
+    when(log.latestEpoch).thenReturn(Optional.of(10))
+    val partition = mock(classOf[Partition])
+    when(partition.log).thenReturn(Some(log))
+    when(partition.getLeaderEpoch).thenReturn(1)
+    when(partition.partitionId).thenReturn(1)
+    val valid = new TopicPartition(mockTopicName(uuids.head, 0), 1)
+    
when(replicaManager.getPartitionOrError(valid)).thenReturn(Right(partition))
+
+    val expected = new 
GetReplicaLogInfoResponseData().setBrokerEpoch(brokerEpoch)
+    expected.topicPartitionLogInfoList().add(new 
GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+      .setTopicId(uuids.head)
+      .setPartitionLogInfo(Collections.singletonList(new 
GetReplicaLogInfoResponseData.PartitionLogInfo()
+        .setPartition(1)
+        .setLogEndOffset(100L)
+        .setLastWrittenLeaderEpoch(10)
+        .setCurrentLeaderEpoch(1))))
+
+    var idx = 1
+    List(Errors.KAFKA_STORAGE_ERROR, Errors.NOT_LEADER_OR_FOLLOWER, 
Errors.UNKNOWN_TOPIC_OR_PARTITION).foreach { err =>
+      val uuid = uuids(idx)
+      val name = mockTopicName(uuid, idx)
+      val invalid = new TopicPartition(name, 1)
+      when(replicaManager.getPartitionOrError(invalid)).thenReturn(Left(err))
+      expected.topicPartitionLogInfoList().add(new 
GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+        .setTopicId(uuid)
+        .setPartitionLogInfo(Collections.singletonList(new 
GetReplicaLogInfoResponseData.PartitionLogInfo()
+          .setErrorCode(err.code())
+          .setPartition(1))))
+      idx += 1
+    }
+
+
+    val builder = new GetReplicaLogInfoRequest.Builder(
+      new GetReplicaLogInfoRequestData().setTopicPartitions(tps asJava)
+    )
+    verifyGetReplicaLogInfoRequest(builder, { response =>
+      assertEquals(expected, response.data())
+    })
+  }
+
+  @Test
+  def testGetReplicaLogInfoFailedToRetrieveLog(): Unit = {
+    val topic1 = new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(Uuid.randomUuid())
+      .setPartitions(Collections.singletonList(1))
+    val topic2 = new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(Uuid.randomUuid())
+      .setPartitions(Collections.singletonList(2))
+    val builder = new GetReplicaLogInfoRequest.Builder(List(topic1, topic2) 
asJava)
+
+    metadataCache = mock(classOf[MetadataCache])
+    
when(metadataCache.getTopicName(topic1.topicId())).thenReturn(Optional.of("topic1"))
+    
when(metadataCache.getTopicName(topic2.topicId())).thenReturn(Optional.of("topic2"))
+
+    val log1 = mock(classOf[UnifiedLog])
+    when(log1.logEndOffset).thenReturn(100L)
+    when(log1.latestEpoch).thenReturn(Optional.of(10))
+    val partition1 = mock(classOf[Partition])
+    when(partition1.log).thenReturn(Some(log1))
+    when(partition1.getLeaderEpoch).thenReturn(1)
+    when(partition1.partitionId).thenReturn(1)
+
+    val partition2 = mock(classOf[Partition])
+    when(partition2.log).thenReturn(None)
+    when(partition2.getLeaderEpoch).thenReturn(2)
+    when(partition2.partitionId).thenReturn(2)
+
+    val tp1 = new TopicPartition("topic1", 1)
+    when(replicaManager.getPartitionOrError(tp1)).thenReturn(Right(partition1))
+    val tp2 = new TopicPartition("topic2", 2)
+    when(replicaManager.getPartitionOrError(tp2)).thenReturn(Right(partition2))
+
+    val expected = new GetReplicaLogInfoResponseData()
+      .setTopicPartitionLogInfoList(List(
+        new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+          .setTopicId(topic1.topicId())
+          .setPartitionLogInfo(Collections.singletonList(
+            new GetReplicaLogInfoResponseData.PartitionLogInfo()
+              .setPartition(1)
+              .setLogEndOffset(100L)
+              .setLastWrittenLeaderEpoch(10)
+              .setCurrentLeaderEpoch(1))),
+        new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+          .setTopicId(topic2.topicId())
+          .setPartitionLogInfo(Collections.singletonList(
+            new GetReplicaLogInfoResponseData.PartitionLogInfo()
+              .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
+          ))) asJava)
+      .setBrokerEpoch(brokerEpoch)
+
+    verifyGetReplicaLogInfoRequest(builder, { response =>
+      assertEquals(expected, response.data())
+    })
+  }
+
+  @Test
+  def testGetReplicaLogInfoUnknownTopic(): Unit = {
+    val expectedPartition = 1
+    val expectedUuid = Uuid.randomUuid()
+    val builder = new GetReplicaLogInfoRequest.Builder(new 
GetReplicaLogInfoRequestData()
+      .setTopicPartitions(
+        Collections.singletonList(new 
GetReplicaLogInfoRequestData.TopicPartitions()
+          .setTopicId(expectedUuid)
+          .setPartitions(Collections.singletonList(expectedPartition)))))
+    metadataCache = mock(classOf[MetadataCache])
+    when(metadataCache.getTopicName(expectedUuid)).thenReturn(Optional.empty())
+
+    val expectedResponseData = new GetReplicaLogInfoResponseData()
+      .setBrokerEpoch(brokerEpoch)
+      .setTopicPartitionLogInfoList(Collections.singletonList(
+        new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+          .setTopicId(expectedUuid)
+          .setPartitionLogInfo(Collections.singletonList(
+            new GetReplicaLogInfoResponseData.PartitionLogInfo()
+              .setPartition(expectedPartition)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code())
+          ))
+      ))
+    verifyGetReplicaLogInfoRequest(builder, { response =>
+      assertEquals(expectedResponseData, response.data())
+    })
+  }
+
+  @Test
+  def testGetReplicaLogInfoRequestTooManyTopics(): Unit = {
+    // 100 topics, 20 partitions per topic = 2k topic-partitions
+    // only first 1000 should be sent back and truncated = true

Review Comment:
   don't forget to change the comment to reflect the new field name



##########
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala:
##########
@@ -163,7 +163,7 @@ class RequestQuotaTest extends BaseRequestTest {
   @Test
   def testExemptRequestTime(): Unit = {
     // Exclude `DESCRIBE_QUORUM`, maybe it shouldn't be a cluster action
-    val actions = clusterActions -- clusterActionsWithThrottleForBroker -- 
RequestQuotaTest.Envelope -- RequestQuotaTest.ShareGroupState - 
ApiKeys.DESCRIBE_QUORUM
+    val actions = clusterActions -- clusterActionsWithThrottleForBroker -- 
RequestQuotaTest.Envelope -- RequestQuotaTest.ShareGroupState - 
ApiKeys.DESCRIBE_QUORUM - ApiKeys.GET_REPLICA_LOG_INFO

Review Comment:
   oh interesting, is it related to which apis are throttle-able (in which case 
we want all new apis to not be exempt from throttling, even cluster actions?)



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -13321,6 +13323,284 @@ class KafkaApisTest extends Logging {
     assertEquals(alterShareGroupOffsetsResponseData, response.data)
   }
 
+  def verifyGetReplicaLogInfoRequest(builder: 
GetReplicaLogInfoRequest.Builder, withResponse: (GetReplicaLogInfoResponse => 
Unit)): Unit = {
+    val request = buildRequest(builder.build())
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val clusterResource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
+    val clusterActions = Collections.singletonList(new 
Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
+    val allowList = Collections.singletonList(AuthorizationResult.ALLOWED)
+    when(authorizer.authorize(request.context, 
clusterActions)).thenReturn(allowList)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    kafkaApis.handleGetReplicaLogInfo(request)
+    withResponse(verifyNoThrottling[GetReplicaLogInfoResponse](request))
+  }
+
+  @Test
+  def testUnauthorizedGetReplicaLogInfo(): Unit = {
+    val builder = new GetReplicaLogInfoRequest.Builder(new 
GetReplicaLogInfoRequestData())
+    val request = buildRequest(builder.build(0))
+    val clusterResource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
+    val clusterActions = Collections.singletonList(new 
Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
+    val allowList = Collections.singletonList(AuthorizationResult.DENIED)
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(request.context, 
clusterActions)).thenReturn(allowList)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    assertThrows(classOf[ClusterAuthorizationException],
+      () => kafkaApis.handleGetReplicaLogInfo(request))
+  }
+
+  @Test
+  def testGetReplicaLogInfoFailedToRetrievePartition(): Unit = {
+    val uuids = (1 to 4).map(_ => Uuid.randomUuid()).toList
+    val tps = uuids.map(new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(_)
+      .setPartitions(Collections.singletonList(1)))
+
+    def mockTopicName(uuid: Uuid, idx: Int): String = 
s"topic-idx-$idx-with-uuid-$uuid"
+
+    metadataCache = mock(classOf[MetadataCache])
+    uuids.zipWithIndex.foreach { case (uuid, idx) =>
+      val name = mockTopicName(uuid, idx)
+      when(metadataCache.getTopicName(uuid)).thenReturn(Optional.of(name))
+    }
+
+    val log = mock(classOf[UnifiedLog])
+    when(log.logEndOffset).thenReturn(100L)
+    when(log.latestEpoch).thenReturn(Optional.of(10))
+    val partition = mock(classOf[Partition])
+    when(partition.log).thenReturn(Some(log))
+    when(partition.getLeaderEpoch).thenReturn(1)
+    when(partition.partitionId).thenReturn(1)
+    val valid = new TopicPartition(mockTopicName(uuids.head, 0), 1)
+    
when(replicaManager.getPartitionOrError(valid)).thenReturn(Right(partition))
+
+    val expected = new 
GetReplicaLogInfoResponseData().setBrokerEpoch(brokerEpoch)
+    expected.topicPartitionLogInfoList().add(new 
GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+      .setTopicId(uuids.head)
+      .setPartitionLogInfo(Collections.singletonList(new 
GetReplicaLogInfoResponseData.PartitionLogInfo()
+        .setPartition(1)
+        .setLogEndOffset(100L)
+        .setLastWrittenLeaderEpoch(10)
+        .setCurrentLeaderEpoch(1))))
+
+    var idx = 1
+    List(Errors.KAFKA_STORAGE_ERROR, Errors.NOT_LEADER_OR_FOLLOWER, 
Errors.UNKNOWN_TOPIC_OR_PARTITION).foreach { err =>
+      val uuid = uuids(idx)
+      val name = mockTopicName(uuid, idx)
+      val invalid = new TopicPartition(name, 1)
+      when(replicaManager.getPartitionOrError(invalid)).thenReturn(Left(err))
+      expected.topicPartitionLogInfoList().add(new 
GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+        .setTopicId(uuid)
+        .setPartitionLogInfo(Collections.singletonList(new 
GetReplicaLogInfoResponseData.PartitionLogInfo()
+          .setErrorCode(err.code())
+          .setPartition(1))))
+      idx += 1
+    }
+
+
+    val builder = new GetReplicaLogInfoRequest.Builder(
+      new GetReplicaLogInfoRequestData().setTopicPartitions(tps asJava)
+    )
+    verifyGetReplicaLogInfoRequest(builder, { response =>
+      assertEquals(expected, response.data())
+    })
+  }
+
+  @Test
+  def testGetReplicaLogInfoFailedToRetrieveLog(): Unit = {
+    val topic1 = new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(Uuid.randomUuid())
+      .setPartitions(Collections.singletonList(1))
+    val topic2 = new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(Uuid.randomUuid())
+      .setPartitions(Collections.singletonList(2))
+    val builder = new GetReplicaLogInfoRequest.Builder(List(topic1, topic2) 
asJava)
+
+    metadataCache = mock(classOf[MetadataCache])
+    
when(metadataCache.getTopicName(topic1.topicId())).thenReturn(Optional.of("topic1"))
+    
when(metadataCache.getTopicName(topic2.topicId())).thenReturn(Optional.of("topic2"))
+
+    val log1 = mock(classOf[UnifiedLog])
+    when(log1.logEndOffset).thenReturn(100L)
+    when(log1.latestEpoch).thenReturn(Optional.of(10))
+    val partition1 = mock(classOf[Partition])
+    when(partition1.log).thenReturn(Some(log1))
+    when(partition1.getLeaderEpoch).thenReturn(1)
+    when(partition1.partitionId).thenReturn(1)
+
+    val partition2 = mock(classOf[Partition])
+    when(partition2.log).thenReturn(None)
+    when(partition2.getLeaderEpoch).thenReturn(2)
+    when(partition2.partitionId).thenReturn(2)
+
+    val tp1 = new TopicPartition("topic1", 1)
+    when(replicaManager.getPartitionOrError(tp1)).thenReturn(Right(partition1))
+    val tp2 = new TopicPartition("topic2", 2)
+    when(replicaManager.getPartitionOrError(tp2)).thenReturn(Right(partition2))
+
+    val expected = new GetReplicaLogInfoResponseData()
+      .setTopicPartitionLogInfoList(List(
+        new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+          .setTopicId(topic1.topicId())
+          .setPartitionLogInfo(Collections.singletonList(
+            new GetReplicaLogInfoResponseData.PartitionLogInfo()
+              .setPartition(1)
+              .setLogEndOffset(100L)
+              .setLastWrittenLeaderEpoch(10)
+              .setCurrentLeaderEpoch(1))),
+        new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+          .setTopicId(topic2.topicId())
+          .setPartitionLogInfo(Collections.singletonList(
+            new GetReplicaLogInfoResponseData.PartitionLogInfo()
+              .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
+          ))) asJava)
+      .setBrokerEpoch(brokerEpoch)
+
+    verifyGetReplicaLogInfoRequest(builder, { response =>
+      assertEquals(expected, response.data())
+    })
+  }
+
+  @Test
+  def testGetReplicaLogInfoUnknownTopic(): Unit = {
+    val expectedPartition = 1
+    val expectedUuid = Uuid.randomUuid()
+    val builder = new GetReplicaLogInfoRequest.Builder(new 
GetReplicaLogInfoRequestData()
+      .setTopicPartitions(
+        Collections.singletonList(new 
GetReplicaLogInfoRequestData.TopicPartitions()
+          .setTopicId(expectedUuid)
+          .setPartitions(Collections.singletonList(expectedPartition)))))
+    metadataCache = mock(classOf[MetadataCache])
+    when(metadataCache.getTopicName(expectedUuid)).thenReturn(Optional.empty())
+
+    val expectedResponseData = new GetReplicaLogInfoResponseData()
+      .setBrokerEpoch(brokerEpoch)
+      .setTopicPartitionLogInfoList(Collections.singletonList(
+        new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+          .setTopicId(expectedUuid)
+          .setPartitionLogInfo(Collections.singletonList(
+            new GetReplicaLogInfoResponseData.PartitionLogInfo()
+              .setPartition(expectedPartition)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code())
+          ))
+      ))
+    verifyGetReplicaLogInfoRequest(builder, { response =>
+      assertEquals(expectedResponseData, response.data())
+    })
+  }
+
+  @Test
+  def testGetReplicaLogInfoRequestTooManyTopics(): Unit = {
+    // 100 topics, 20 partitions per topic = 2k topic-partitions
+    // only first 1000 should be sent back and truncated = true
+    val numberUuids = 100
+    val numberPartitions = 20
+    val uuids: List[Uuid] = (1 to numberUuids).map(_ => 
Uuid.randomUuid()).toList
+    val tps = uuids.map(new GetReplicaLogInfoRequestData.TopicPartitions()
+      .setTopicId(_)
+      .setPartitions((1 to numberPartitions).map(new Integer(_)).asJava))
+    val builder = new GetReplicaLogInfoRequest.Builder(
+      new GetReplicaLogInfoRequestData().setTopicPartitions(tps asJava))
+    val expectedLogEndOffset = 10L
+    val expectedLeaderEpoch = 2
+    val expectedLatestEpoch = 3
+
+    def mockTopicName(uuid: Uuid, idx: Int): String = 
s"topic-idx-$idx-with-uuid-$uuid"
+
+    metadataCache = mock(classOf[MetadataCache])
+    uuids.zipWithIndex.foreach { case (uuid, idx) =>
+      
when(metadataCache.getTopicName(uuid)).thenReturn(Optional.of(mockTopicName(uuid,
 idx)))
+    }
+
+    val expectedResponseData =
+      new 
GetReplicaLogInfoResponseData().setHasMoreData(true).setBrokerEpoch(brokerEpoch)
+    uuids.take(50).zipWithIndex.foreach { case (uuid, idx) =>

Review Comment:
   nit, could you highlight that we only include the first 50 topics / 1000 
partitions with a comment? I know you already have a comment at the top of the 
test which says a similar thing, but this would help folks find the logic faster



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to