[
https://issues.apache.org/jira/browse/KAFKA-18020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17898616#comment-17898616
]
Edoardo Comar commented on KAFKA-18020:
---------------------------------------
Client snippet to reproduce.
Start a local Kafka and create a topic 'mytopic'
{code:java}
package mysamples.jira.kafka18020;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.server.config.QuotaConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MyAdminKafka18020 {
public static void main(String[] args) throws Exception {
String topicName = "mytopic";
try (AdminClient adminClient =
AdminClient.create(Map.of("bootstrap.servers", "localhost:9092"))) {
for (int i=0; i<10; i++) {
StringBuilder sb = new StringBuilder();
for(int j=1000*i; j<1000*(i+1); j++) {
sb.append(j).append(":").append(j).append(",");
}
sb.setLength(sb.length()-1);
String throttles = sb.toString(); //"0:0,...,999:999" for i=0;
then "1000:1000,...,1999:1999" and so on
Map<ConfigResource, Collection<AlterConfigOp>> configs = new
HashMap<>();
List<AlterConfigOp> ops = new ArrayList<>();
ops.add(new AlterConfigOp(new
ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG,
throttles), AlterConfigOp.OpType.APPEND));
ops.add(new AlterConfigOp(new
ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG,
throttles), AlterConfigOp.OpType.APPEND));
configs.put(new ConfigResource(ConfigResource.Type.TOPIC,
topicName), ops);
System.out.println(new
StringBuilder().append("i=").append(i).append(" throttles=").
append(throttles.substring(0,
10)).append("...").append(throttles.substring(throttles.length()-10,
throttles.length())));
adminClient.incrementalAlterConfigs(configs).all().get();
Thread.sleep(1000);
}
}
}
}
{code}
output :
{code:java}
i=0 throttles=0:0,1:1,2:...98,999:999
i=1 throttles=1000:1000,...,1999:1999
i=2 throttles=2000:2000,...,2999:2999
i=3 throttles=3000:3000,...,3999:3999
Exception in thread "main" java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.UnknownServerException: The server experienced
an unexpected error when processing the request.
at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:155)
at
mysamples.jira.kafka18020.MyAdminKafka18020.main(MyAdminKafka18020.java:36)
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server
experienced an unexpected error when processing the request
{code}
server logs:
{code:java}
[2024-11-15 13:05:44,001] ERROR Encountered quorum controller fault:
incrementalAlterConfigs: event failed with RuntimeException (treated as
UnknownServerException) at epoch 5 in 40005 microseconds. Renouncing leadership
and reverting to the last committed offset 7788.
(org.apache.kafka.server.fault.LoggingFaultHandler)
java.lang.RuntimeException: 'value' field is too long to be serialized
at
org.apache.kafka.common.metadata.ConfigRecord.addSize(ConfigRecord.java:192)
at org.apache.kafka.common.protocol.Message.size(Message.java:51)
at
org.apache.kafka.server.common.serialization.AbstractApiMessageSerde.recordSize(AbstractApiMessageSerde.java:66)
at
org.apache.kafka.server.common.serialization.AbstractApiMessageSerde.recordSize(AbstractApiMessageSerde.java:43)
at
org.apache.kafka.raft.internals.BatchBuilder.bytesNeededForRecords(BatchBuilder.java:340)
at
org.apache.kafka.raft.internals.BatchBuilder.bytesNeeded(BatchBuilder.java:136)
at
org.apache.kafka.raft.internals.BatchAccumulator.maybeAllocateBatch(BatchAccumulator.java:186)
at
org.apache.kafka.raft.internals.BatchAccumulator.append(BatchAccumulator.java:146)
at org.apache.kafka.raft.KafkaRaftClient.append(KafkaRaftClient.java:3334)
at
org.apache.kafka.raft.KafkaRaftClient.prepareAppend(KafkaRaftClient.java:3320)
at
org.apache.kafka.controller.QuorumController$ControllerWriteEvent.lambda$run$0(QuorumController.java:806)
at
org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:891)
at
org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:800)
at
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:132)
at
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215)
at
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186)
at java.base/java.lang.Thread.run(Thread.java:840)
[2024-11-15 13:05:44,003] INFO [RaftManager id=1] Received user request to
resign from the current epoch 5 (org.apache.kafka.raft.KafkaRaftClient)
[2024-11-15 13:05:44,004] INFO [RaftManager id=1] Completed transition to
ResignedState(localId=1, epoch=5, voters=[1], electionTimeoutMs=1906,
unackedVoters=[], preferredSuccessors=[]) from
Leader(localReplicaKey=ReplicaKey(id=1,
directoryId=Optional[rT810KS9zfczOsLnqapVBQ]), epoch=5, epochStartOffset=6568,
highWatermark=Optional[LogOffsetMetadata(offset=7789,
metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=777628)])],
voterStates={1=ReplicaState(replicaKey=ReplicaKey(id=1,
directoryId=Optional.empty), endOffset=Optional[LogOffsetMetadata(offset=7789,
metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=777628)])],
lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)})
(org.apache.kafka.raft.QuorumState)
[2024-11-15 13:05:45,564] ERROR [ControllerApis nodeId=1] Unexpected error
handling request RequestHeader(apiKey=BROKER_HEARTBEAT, apiVersion=1,
clientId=1, correlationId=1915, headerVersion=2) --
BrokerHeartbeatRequestData(brokerId=1, brokerEpoch=7,
currentMetadataOffset=7788, wantFence=false, wantShutDown=false,
offlineLogDirs=[]) with context
RequestContext(header=RequestHeader(apiKey=BROKER_HEARTBEAT, apiVersion=1,
clientId=1, correlationId=1915, headerVersion=2),
connectionId='127.0.0.1:9093-127.0.0.1:54455-0-25', clientAddress=/127.0.0.1,
principal=User:ANONYMOUS, listenerName=ListenerName(CONTROLLER),
securityProtocol=PLAINTEXT,
clientInformation=ClientInformation(softwareName=apache-kafka-java,
softwareVersion=4.0.0-SNAPSHOT), fromPrivilegedListener=false,
principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@16a3a3da])
(kafka.server.ControllerApis)
org.apache.kafka.common.errors.NotControllerException: The active controller
appears to be node 1.
[2024-11-15 13:05:45,565] INFO [NodeToControllerChannelManager id=1
name=heartbeat] Client requested disconnect from node 1
(org.apache.kafka.clients.NetworkClient)
[2024-11-15 13:05:45,565] INFO
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new KRaft
controller, from now on will use node localhost:9093 (id: 1 rack: null)
(kafka.server.NodeToControllerRequestThread)
{code}
> Encountered quorum controller fault: incrementalAlterConfigs ..
> RuntimeException: 'value' field is too long to be serialized
> ----------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-18020
> URL: https://issues.apache.org/jira/browse/KAFKA-18020
> Project: Kafka
> Issue Type: Bug
> Components: kraft
> Affects Versions: 4.0.0
> Reporter: Edoardo Comar
> Priority: Major
>
> On a topic with a large number of partitions (5000) by repeatedly invoking
> {color:#000000}ReassignPartitionsCommand{color}{color:#00627a}.modifyTopicThrottles
> {color}
> each time with a new set of replica throttles,
> the Quorum controller attempts to write a ConfigRecord that is too large :
>
> {{[2024-11-14 15:34:41,612] ERROR Encountered quorum controller fault:
> incrementalAlterConfigs: event failed with RuntimeException (treated as
> UnknownServerException) at epoch 24 in 75784 microseconds. Renouncing
> leadership and reverting to the last committed offset 214588.
> (org.apache.kafka.server.fault.LoggingFaultHandler)}}
> {{java.lang.RuntimeException: 'value' field is too long to be serialized}}
> {{ at
> org.apache.kafka.common.metadata.ConfigRecord.addSize(ConfigRecord.java:192)}}
> {{ at org.apache.kafka.common.protocol.Message.size(Message.java:51)}}
> {{ at
> org.apache.kafka.server.common.serialization.AbstractApiMessageSerde.recordSize(AbstractApiMessageSerde.java:66)}}
> {{ at
> org.apache.kafka.server.common.serialization.AbstractApiMessageSerde.recordSize(AbstractApiMessageSerde.java:43)}}
> {{ at
> org.apache.kafka.raft.internals.BatchBuilder.bytesNeededForRecords(BatchBuilder.java:340)}}
> {{ at
> org.apache.kafka.raft.internals.BatchBuilder.bytesNeeded(BatchBuilder.java:136)}}
> {{ at
> org.apache.kafka.raft.internals.BatchAccumulator.maybeAllocateBatch(BatchAccumulator.java:186)}}
> {{ at
> org.apache.kafka.raft.internals.BatchAccumulator.append(BatchAccumulator.java:146)}}
> {{ at
> org.apache.kafka.raft.KafkaRaftClient.append(KafkaRaftClient.java:3334)}}
> {{ at
> org.apache.kafka.raft.KafkaRaftClient.prepareAppend(KafkaRaftClient.java:3320)}}
> {{ at
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.lambda$run$0(QuorumController.java:806)}}
> {{ at
> org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:891)}}
> {{ at
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:800)}}
> {{ at
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:132)}}
> {{ at
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215)}}
> {{ at
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186)}}
> {{ at java.base/java.lang.Thread.run(Thread.java:840)}}
>
> the adminClient receives an UnknownServerException:
>
> {{Error: org.apache.kafka.common.errors.UnknownServerException: The server
> experienced an unexpected error when processing the request.}}
> {{java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.UnknownServerException: The server experienced
> an unexpected error when processing the request.}}
> {{ at
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)}}
> {{ at
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)}}
> {{ at
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:155)}}
> {{ at
> org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyTopicThrottles(ReassignPartitionsCommand.java:1112)}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)