dajac commented on code in PR #21396:
URL: https://github.com/apache/kafka/pull/21396#discussion_r2779549865
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -773,7 +773,7 @@ private void freeCurrentBatch() {
currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
// Release the buffer only if it is not larger than the
maxBatchSize.
- int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+ int maxBatchSize = currentBatch.maxBatchSize;
Review Comment:
nit: Could you please add a comment explaining why we use
`currentBatch.maxBatchSize` here?
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java:
##########
@@ -335,6 +339,52 @@ public void testRackAwareAssignment(ClusterInstance
clusterInstance) throws Exec
}
}
+ @ClusterTest(
+ brokers = 2,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, value =
"3000")
+ }
+ )
+ public void
testSingleCoordinatorOwnershipAfterPartitionReassignment(ClusterInstance
clusterInstance) throws InterruptedException, ExecutionException,
TimeoutException {
+ try (var producer = clusterInstance.<byte[], byte[]>producer()) {
+ producer.send(new ProducerRecord<>("topic",
"value".getBytes(StandardCharsets.UTF_8)));
+ }
+
+ try (var admin = clusterInstance.admin()) {
+ admin.createTopics(List.of(new
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, Map.of(0, List.of(0))))).all().get();
+ }
+
+ try (var consumer =
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_ID_CONFIG, "test-group"));
+ var admin = clusterInstance.admin()) {
+ consumer.subscribe(List.of("topic"));
+ TestUtils.waitForCondition(() ->
consumer.poll(Duration.ofMillis(100)).isEmpty(), "polling to join group");
+ // append records to coordinator
Review Comment:
nit: Could you please align the format of your comments? They should start
with a capital letter and end with a `.`.
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java:
##########
@@ -335,6 +339,52 @@ public void testRackAwareAssignment(ClusterInstance
clusterInstance) throws Exec
}
}
+ @ClusterTest(
+ brokers = 2,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, value =
"3000")
+ }
+ )
+ public void
testSingleCoordinatorOwnershipAfterPartitionReassignment(ClusterInstance
clusterInstance) throws InterruptedException, ExecutionException,
TimeoutException {
+ try (var producer = clusterInstance.<byte[], byte[]>producer()) {
+ producer.send(new ProducerRecord<>("topic",
"value".getBytes(StandardCharsets.UTF_8)));
+ }
+
+ try (var admin = clusterInstance.admin()) {
+ admin.createTopics(List.of(new
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, Map.of(0, List.of(0))))).all().get();
+ }
+
+ try (var consumer =
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_ID_CONFIG, "test-group"));
+ var admin = clusterInstance.admin()) {
+ consumer.subscribe(List.of("topic"));
+ TestUtils.waitForCondition(() ->
consumer.poll(Duration.ofMillis(100)).isEmpty(), "polling to join group");
+ // append records to coordinator
+ consumer.commitSync();
+
+ var broker0Metrics = clusterInstance.brokers().get(0).metrics();
+ var activeNumPartitions = broker0Metrics.metricName(
+ "num-partitions",
+ GroupCoordinatorRuntimeMetrics.METRICS_GROUP,
+ Map.of("state", "active")
+ );
+
+ assertEquals(1L,
broker0Metrics.metric(activeNumPartitions).metricValue());
+
+ // unload the coordinator by changing leader (0 -> 1)
+ admin.alterPartitionReassignments(
+ Map.of(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
Optional.of(new NewPartitionReassignment(List.of(1))))
+ ).all().get();
+
+ // Wait for the coordinator metrics to update after leadership
change
+ TestUtils.waitForCondition(() ->
Review Comment:
While we are here, should we check that the metric of the second broker is 1?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]