dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r651903223



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -560,14 +560,16 @@ object ConsumerGroupCommand extends Logging {
       val groupOffsets = TreeMap[String, (Option[String], 
Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- 
consumerGroups) yield {
         val state = consumerGroup.state
         val committedOffsets = getCommittedOffsets(groupId)
+        // The admin client returns `null` as a value to indicate that there 
is not committed offset for a partition. The following getPartitionOffset 
function seeks to avoid NullPointerException by filtering out those null values.

Review comment:
       nit: I think that we can shorten the comment a bit. `The admin client 
returns `null` as a value to indicate that there is not committed offset for a 
partition.` seems sufficient to me.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -110,6 +132,12 @@ class ConsumerGroupServiceTest {
     AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)
   }
 
+  private def listGroupNegativeOffsetsResult: ListConsumerGroupOffsetsResult = 
{
+    // Half of the partitions of the testing topics are set to have a negative 
integer offset (null value [KAFKA-9507 for reference])
+    val offsets = topicPartitions.zipWithIndex.map{ case (tp, i) => tp -> ( 
if(i % 2 == 0) null else new OffsetAndMetadata(100) ) }.toMap.asJava

Review comment:
       Could we revert this change and use a `ListConsumerGroupOffsetsResult` 
tailored only for `testAdminRequestsForDescribeNegativeOffsets`?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -580,7 +582,7 @@ object ConsumerGroupCommand extends Logging {
             groupId,
             Option(consumerGroup.coordinator),
             unassignedPartitions.keySet.toSeq,
-            unassignedPartitions.map { case (tp, offset) => tp -> 
Some(offset.offset) },
+            unassignedPartitions.map { case (tp, offset) => tp -> 
getPartitionOffset(tp) },

Review comment:
       Same for this one.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,28 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(listGroupNegativeOffsetsResult)
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(listOffsetsResult)
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    assertEquals(Some("Stable"), state)
+    assertTrue(assignments.nonEmpty)
+    assertEquals(topicPartitions.size, assignments.get.size)

Review comment:
       Could we also assert that the `offset` is `None` for a partition that 
does not have it? Does the test also cover the `unassignedPartitions` case? I 
suppose that it does not, right?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -560,14 +560,16 @@ object ConsumerGroupCommand extends Logging {
       val groupOffsets = TreeMap[String, (Option[String], 
Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- 
consumerGroups) yield {
         val state = consumerGroup.state
         val committedOffsets = getCommittedOffsets(groupId)
+        // The admin client returns `null` as a value to indicate that there 
is not committed offset for a partition. The following getPartitionOffset 
function seeks to avoid NullPointerException by filtering out those null values.
+        def getPartitionOffset(tp: TopicPartition): Option[Long] = 
committedOffsets.get(tp).filter(_ != null).map(_.offset)
         var assignedTopicPartitions = ListBuffer[TopicPartition]()
         val rowsWithConsumer = 
consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
           .sortWith(_.assignment.topicPartitions.size > 
_.assignment.topicPartitions.size).flatMap { consumerSummary =>
           val topicPartitions = 
consumerSummary.assignment.topicPartitions.asScala
           assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
           val partitionOffsets = 
consumerSummary.assignment.topicPartitions.asScala
             .map { topicPartition =>
-              topicPartition -> 
committedOffsets.get(topicPartition).map(_.offset)
+              topicPartition -> getPartitionOffset(topicPartition)

Review comment:
       I think that we could avoid computing `partitionOffsets` and directly 
pass `getPartitionOffset` to `collectConsumerAssignment`. Does it work?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to