[
https://issues.apache.org/jira/browse/KAFKA-9048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16992214#comment-16992214
]
Pradeep Bansal commented on KAFKA-9048:
---------------------------------------
When is this change planned in Kafka release?
> Improve scalability in number of partitions in replica fetcher
> --------------------------------------------------------------
>
> Key: KAFKA-9048
> URL: https://issues.apache.org/jira/browse/KAFKA-9048
> Project: Kafka
> Issue Type: Task
> Components: core
> Reporter: Lucas Bradstreet
> Assignee: Guozhang Wang
> Priority: Major
>
> https://issues.apache.org/jira/browse/KAFKA-9039
> ([https://github.com/apache/kafka/pull/7443]) improves the performance of the
> replica fetcher (at both small and large numbers of partitions), but it does
> not improve its complexity or scalability in the number of partitions.
> I took a profile using async-profiler for the 1000 partition JMH replica
> fetcher benchmark. The big remaining culprits are:
> * ~18% looking up logStartOffset
> * ~45% FetchSessionHandler$Builder.add
> * ~19% FetchSessionHandler$Builder.build
> *Suggestions*
> # The logStartOffset is looked up for every partition on each doWork pass.
> This requires a hashmap lookup even though the logStartOffset changes rarely.
> If the replica fetcher could be notified of updates to the logStartOffset,
> then we could reduce the overhead to a function of the number of updates to
> the logStartOffset instead of O( n ) on each pass.
> # The use of FetchSessionHandler means that we maintain a partitionStates
> hashmap in the replica fetcher, and a sessionPartitions hashmap in the
> FetchSessionHandler. On each incremental fetch session pass, we need to
> reconcile these two hashmaps to determine which partitions were added/updated
> and which partitions were removed. This reconciliation process is especially
> expensive, requiring multiple passes over the fetching partitions, and
> hashmap remove and puts for most partitions. The replica fetcher could be
> smarter by maintaining the fetch session *updated* hashmap containing
> FetchRequest.PartitionData(s) directly, as well as *removed* partitions list
> so that these do not need to be generated by reconciled on each fetch pass.
> # maybeTruncate requires an O( n ) pass over the elements in partitionStates
> even if there are no partitions in truncating state. If we can maintain some
> additional state about whether truncating partitions exist in
> partitionStates, or if we could separate these states into a separate data
> structure, we would not need to iterate across all partitions on every doWork
> pass. I’ve seen clusters where this work takes about 0.5%-1% of CPU, which is
> minor but will become more substantial as the number of partitions increases.
> If we can achieve 1 and 2, the complexity will be improved from a function of
> the number of partitions to the the number of partitions with updated fetch
> offsets/log start offsets between each fetch. In general, a minority of
> partitions will have changes in these between fetches, so this should improve
> the average case complexity greatly.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)