[
https://issues.apache.org/jira/browse/KAFKA-13331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868860#comment-17868860
]
GEORGE LI commented on KAFKA-13331:
-----------------------------------
>From another environment, it looks like the slowness might be amplified by
>some extra customization. namely the Controller's {{process(event:
>ControllerEvent)}} :
{code:java}
override def process(event: ControllerEvent): Unit = {
try {
event match {
case event: MockEvent =>
// Used only in test cases
event.process()
case ShutdownEventThread =>
error("Received a ShutdownEventThread event. This type of event is
supposed to be handle by ControllerEventThread")
case AutoPreferredReplicaLeaderElection =>
...
case UpdateMetadataResponseReceived(response, brokerId) =>
processUpdateMetadataResponseReceived(response, brokerId)
...
} catch {
case e: ControllerMovedException =>
...
} finally {
updateMetrics()
}
}
{code}
Some instrumentation/debug on why the event processing is taking longer is
needed. e.g. The finally {{updateMetrics()}} , some event can be skipped for
some added custom metrics. e.g. {{ListPartitionReassignments(partitions,
callback)}}, {{UpdateMetadataResponseReceived(response, brokerId)}}, etc. that
does not require metrics re-calculation/update.
Cc: [~satish.duggana]
> Slow reassignments in 2.8 because of large number of
> UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),<broker.id>)
> Events
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-13331
> URL: https://issues.apache.org/jira/browse/KAFKA-13331
> Project: Kafka
> Issue Type: Bug
> Components: admin, controller, core
> Affects Versions: 2.8.1, 3.0.0
> Reporter: GEORGE LI
> Priority: Critical
> Attachments: Screen Shot 2021-09-28 at 12.57.34 PM.png
>
>
> Slowness is observed when doing reassignments on clusters with more brokers
> (e.g. 80 brokers).
> After investigation, it looks like the slowness is because for
> reassignments, it sends the UpdateMetadataRequest to all the broker for every
> topic partition affected by the reassignment (maybe some optimization can be
> done). e.g.
> for a reassignment with batch size of 50 partitions. it will generate about
> 10k - 20k ControllerEventManager.EventQueueSize and the p99 EventQueueTimeMs
> will be 1M. if the batch size is 100 partitions, about 40K
> EventQeuueSize and 3M p99 EventQueueTimeMs. See below screen shot on the
> metrics.
> !Screen Shot 2021-09-28 at 12.57.34 PM.png!
> it takes about 10-30minutes to process 100 reassignments per batch. and
> 20-30 seconds for 1 reassignment per batch even the topic partition is almost
> empty. Version 1.1, the reassignment is almost instant.
> Looking at what is in the ControllerEventManager.EventQueue, the majority
> (depends on the how many brokers in the cluster, it can be 90%+) is
> {{UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),<broker.id>)}}
> events. which is introduced in this commit:
> {code}
> commit 4e431246c31170a7f632da8edfdb9cf4f882f6ef
> Author: Jason Gustafson <[email protected]>
> Date: Thu Nov 21 07:41:29 2019 -0800
> MINOR: Controller should log UpdateMetadata response errors (#7717)
>
> Create a controller event for handling UpdateMetadata responses and log a
> message when a response contains an error.
>
> Reviewers: Stanislav Kozlovski <[email protected]>, Ismael
> Juma <[email protected]>
> {code}
> Checking how the events in the ControllerEventManager are processed for
> {{UpdateMetadata response}}, it seems like it's only checking whether there
> is an error, and simply log the error. but it takes about 3ms - 60ms to
> dequeue each event. Because it's a FIFO queue, other events were waiting in
> the queue.
> {code}
> private def processUpdateMetadataResponseReceived(updateMetadataResponse:
> UpdateMetadataResponse, brokerId: Int): Unit = {
> if (!isActive) return
> if (updateMetadataResponse.error != Errors.NONE) {
> stateChangeLogger.error(s"Received error
> ${updateMetadataResponse.error} in UpdateMetadata " +
> s"response $updateMetadataResponse from broker $brokerId")
> }
> }
> {code}
> There might be more sophisticated logic for handling the UpdateMetadata
> response error in the future. For current version, would it be better to
> check whether the response error code is Errors.NONE before putting into
> the Event Queue? e.g. I put this additional check and see the Reassignment
> Performance increase dramatically on the 80 brokers cluster.
> {code}
> val updateMetadataRequestBuilder = new
> UpdateMetadataRequest.Builder(updateMetadataRequestVersion,
> controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava,
> liveBrokers.asJava, topicIds.asJava)
> sendRequest(broker, updateMetadataRequestBuilder, (r: AbstractResponse)
> => {
> val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse]
> if (updateMetadataResponse.error != Errors.NONE) { //<============
> Add additional check whether the response code, if no error, which is
> almost 99.99% the case, skip adding updateMetadataResponse to the Event
> Queue.
> sendEvent(UpdateMetadataResponseReceived(updateMetadataResponse,
> broker))
> }
> })
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)