dajac commented on code in PR #21396:
URL: https://github.com/apache/kafka/pull/21396#discussion_r2779549865


##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -773,7 +773,7 @@ private void freeCurrentBatch() {
             currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
 
             // Release the buffer only if it is not larger than the 
maxBatchSize.
-            int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+            int maxBatchSize = currentBatch.maxBatchSize;

Review Comment:
   nit: Could you please add a comment explaining why we use 
`currentBatch.maxBatchSize` here?



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java:
##########
@@ -335,6 +339,52 @@ public void testRackAwareAssignment(ClusterInstance 
clusterInstance) throws Exec
         }
     }
 
+    @ClusterTest(
+        brokers = 2,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, value = 
"3000")
+        }
+    )
+    public void 
testSingleCoordinatorOwnershipAfterPartitionReassignment(ClusterInstance 
clusterInstance) throws InterruptedException, ExecutionException, 
TimeoutException {
+        try (var producer = clusterInstance.<byte[], byte[]>producer()) {
+            producer.send(new ProducerRecord<>("topic", 
"value".getBytes(StandardCharsets.UTF_8)));
+        }
+
+        try (var admin = clusterInstance.admin()) {
+            admin.createTopics(List.of(new 
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, Map.of(0, List.of(0))))).all().get();
+        }
+
+        try (var consumer = 
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_ID_CONFIG, "test-group"));
+             var admin = clusterInstance.admin()) {
+            consumer.subscribe(List.of("topic"));
+            TestUtils.waitForCondition(() -> 
consumer.poll(Duration.ofMillis(100)).isEmpty(), "polling to join group");
+            // append records to coordinator

Review Comment:
   nit: Could you please align the format of your comments? They should start 
with a capital letter and end with a `.`.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java:
##########
@@ -335,6 +339,52 @@ public void testRackAwareAssignment(ClusterInstance 
clusterInstance) throws Exec
         }
     }
 
+    @ClusterTest(
+        brokers = 2,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, value = 
"3000")
+        }
+    )
+    public void 
testSingleCoordinatorOwnershipAfterPartitionReassignment(ClusterInstance 
clusterInstance) throws InterruptedException, ExecutionException, 
TimeoutException {
+        try (var producer = clusterInstance.<byte[], byte[]>producer()) {
+            producer.send(new ProducerRecord<>("topic", 
"value".getBytes(StandardCharsets.UTF_8)));
+        }
+
+        try (var admin = clusterInstance.admin()) {
+            admin.createTopics(List.of(new 
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, Map.of(0, List.of(0))))).all().get();
+        }
+
+        try (var consumer = 
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_ID_CONFIG, "test-group"));
+             var admin = clusterInstance.admin()) {
+            consumer.subscribe(List.of("topic"));
+            TestUtils.waitForCondition(() -> 
consumer.poll(Duration.ofMillis(100)).isEmpty(), "polling to join group");
+            // append records to coordinator
+            consumer.commitSync();
+
+            var broker0Metrics = clusterInstance.brokers().get(0).metrics();
+            var activeNumPartitions = broker0Metrics.metricName(
+                "num-partitions",
+                GroupCoordinatorRuntimeMetrics.METRICS_GROUP,
+                Map.of("state", "active")
+            );
+
+            assertEquals(1L, 
broker0Metrics.metric(activeNumPartitions).metricValue());
+
+            // unload the coordinator by changing leader (0 -> 1)
+            admin.alterPartitionReassignments(
+                Map.of(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), 
Optional.of(new NewPartitionReassignment(List.of(1))))
+            ).all().get();
+
+            // Wait for the coordinator metrics to update after leadership 
change
+            TestUtils.waitForCondition(() ->

Review Comment:
   While we are here, should we check that the metric of the second broker is 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to