[
https://issues.apache.org/jira/browse/KAFKA-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gaurav Narula updated KAFKA-9401:
---------------------------------
Fix Version/s: 3.8.0
3.7.1
> High lock contention for kafka.server.FetchManager.newContext
> -------------------------------------------------------------
>
> Key: KAFKA-9401
> URL: https://issues.apache.org/jira/browse/KAFKA-9401
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Reporter: Lucas Bradstreet
> Assignee: Gaurav Narula
> Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> kafka.server.FetchManager.newContext takes out what is essentially a global
> fetch lock on kafka.server.FetchSessionCache, for updates to not only the
> FetchSessionCache but the also update the fetch sessions stored with in it.
> This causes a high amount of lock contention for fetches, as every fetch
> request must go through this lock.
> I have taken an async-profiler lock profile on a high throughput cluster, and
> I see around 25s of waiting on this lock for a sixty second profile.
> {noformat}
> *— 25818577497 ns (20.84%), 5805 samples
> [ 0] kafka.server.FetchSessionCache
> [ 1] kafka.server.FetchManager.newContext
> [ 2] kafka.server.KafkaApis.handleFetchRequest
> [ 3] kafka.server.KafkaApis.handle
> [ 4] kafka.server.KafkaRequestHandler.run
> [ 5] java.lang.Thread.run
> {noformat}
> FetchSession.scala:
> {code:java}
> cache.synchronized {
> cache.get(reqMetadata.sessionId) match {
> case None => {
> debug(s"Session error for ${reqMetadata.sessionId}: no such session
> ID found.")
> new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND,
> reqMetadata)
> }
> case Some(session) => session.synchronized {
> if (session.epoch != reqMetadata.epoch) {
> debug(s"Session error for ${reqMetadata.sessionId}: expected epoch
> " +
> s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
> new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH,
> reqMetadata)
> } else {
> val (added, updated, removed) = session.update(fetchData, toForget,
> reqMetadata)
> if (session.isEmpty) {
> debug(s"Created a new sessionless FetchContext and closing
> session id ${session.id}, " +
> s"epoch ${session.epoch}: after removing
> ${partitionsToLogString(removed)}, " +
> s"there are no more partitions left.")
> cache.remove(session)
> new SessionlessFetchContext(fetchData)
> } else {
> cache.touch(session, time.milliseconds())
> session.epoch = JFetchMetadata.nextEpoch(session.epoch)
> debug(s"Created a new incremental FetchContext for session id
> ${session.id}, " +
> s"epoch ${session.epoch}: added
> ${partitionsToLogString(added)}, " +
> s"updated ${partitionsToLogString(updated)}, " +
> s"removed ${partitionsToLogString(removed)}")
> new IncrementalFetchContext(time, reqMetadata, session)
> }
> }
> }
> }
> }
> {code}
> Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect
> FetchSessionCache eviction logic"
> ([https://github.com/apache/kafka/pull/7640),] as the cache is correctly
> touched now, whereas previously the touch was being skipped.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)