dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r651903223
##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -560,14 +560,16 @@ object ConsumerGroupCommand extends Logging {
val groupOffsets = TreeMap[String, (Option[String],
Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <-
consumerGroups) yield {
val state = consumerGroup.state
val committedOffsets = getCommittedOffsets(groupId)
+ // The admin client returns `null` as a value to indicate that there
is not committed offset for a partition. The following getPartitionOffset
function seeks to avoid NullPointerException by filtering out those null values.
Review comment:
nit: I think that we can shorten the comment a bit. `The admin client
returns `null` as a value to indicate that there is not committed offset for a
partition.` seems sufficient to me.
##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -110,6 +132,12 @@ class ConsumerGroupServiceTest {
AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)
}
+ private def listGroupNegativeOffsetsResult: ListConsumerGroupOffsetsResult =
{
+ // Half of the partitions of the testing topics are set to have a negative
integer offset (null value [KAFKA-9507 for reference])
+ val offsets = topicPartitions.zipWithIndex.map{ case (tp, i) => tp -> (
if(i % 2 == 0) null else new OffsetAndMetadata(100) ) }.toMap.asJava
Review comment:
Could we revert this change and use a `ListConsumerGroupOffsetsResult`
tailored only for `testAdminRequestsForDescribeNegativeOffsets`?
##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -580,7 +582,7 @@ object ConsumerGroupCommand extends Logging {
groupId,
Option(consumerGroup.coordinator),
unassignedPartitions.keySet.toSeq,
- unassignedPartitions.map { case (tp, offset) => tp ->
Some(offset.offset) },
+ unassignedPartitions.map { case (tp, offset) => tp ->
getPartitionOffset(tp) },
Review comment:
Same for this one.
##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,28 @@ class ConsumerGroupServiceTest {
verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
}
+ @Test
+ def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+ val args = Array("--bootstrap-server", "localhost:9092", "--group", group,
"--describe", "--offsets")
+ val groupService = consumerGroupService(args)
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any()))
+ .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+ when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+ .thenReturn(listGroupNegativeOffsetsResult)
+ when(admin.listOffsets(offsetsArgMatcher, any()))
+ .thenReturn(listOffsetsResult)
+
+ val (state, assignments) = groupService.collectGroupOffsets(group)
+ assertEquals(Some("Stable"), state)
+ assertTrue(assignments.nonEmpty)
+ assertEquals(topicPartitions.size, assignments.get.size)
Review comment:
Could we also assert that the `offset` is `None` for a partition that
does not have it? Does the test also cover the `unassignedPartitions` case? I
suppose that it does not, right?
##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -560,14 +560,16 @@ object ConsumerGroupCommand extends Logging {
val groupOffsets = TreeMap[String, (Option[String],
Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <-
consumerGroups) yield {
val state = consumerGroup.state
val committedOffsets = getCommittedOffsets(groupId)
+ // The admin client returns `null` as a value to indicate that there
is not committed offset for a partition. The following getPartitionOffset
function seeks to avoid NullPointerException by filtering out those null values.
+ def getPartitionOffset(tp: TopicPartition): Option[Long] =
committedOffsets.get(tp).filter(_ != null).map(_.offset)
var assignedTopicPartitions = ListBuffer[TopicPartition]()
val rowsWithConsumer =
consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
.sortWith(_.assignment.topicPartitions.size >
_.assignment.topicPartitions.size).flatMap { consumerSummary =>
val topicPartitions =
consumerSummary.assignment.topicPartitions.asScala
assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
val partitionOffsets =
consumerSummary.assignment.topicPartitions.asScala
.map { topicPartition =>
- topicPartition ->
committedOffsets.get(topicPartition).map(_.offset)
+ topicPartition -> getPartitionOffset(topicPartition)
Review comment:
I think that we could avoid computing `partitionOffsets` and directly
pass `getPartitionOffset` to `collectConsumerAssignment`. Does it work?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]