[
https://issues.apache.org/jira/browse/KAFKA-16371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Jacot resolved KAFKA-16371.
---------------------------------
Fix Version/s: 3.8.0
3.7.1
Assignee: David Jacot
Resolution: Fixed
> Unstable committed offsets after triggering commits where metadata for some
> partitions are over the limit
> ---------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-16371
> URL: https://issues.apache.org/jira/browse/KAFKA-16371
> Project: Kafka
> Issue Type: Bug
> Components: offset manager
> Affects Versions: 3.7.0
> Reporter: mlowicki
> Assignee: David Jacot
> Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> Issue is reproducible with simple CLI tool -
> [https://gist.github.com/mlowicki/c3b942f5545faced93dc414e01a2da70]
> {code:java}
> #!/usr/bin/env bash
> for i in {1..100}
> do
> kafka-committer --bootstrap "ADDR:9092" --topic "TOPIC" --group foo
> --metadata-min 6000 --metadata-max 10000 --partitions 72 --fetch
> done{code}
> What it does it that initially it fetches committed offsets and then tries to
> commit for multiple partitions. If some of commits have metadata over the
> allowed limit then:
> 1. I see errors about too large commits (expected)
> 2. Another run the tool fails at the stage of fetching commits with (this is
> the problem):
> {code:java}
> config: ClientConfig { conf_map: { "group.id": "bar", "bootstrap.servers":
> "ADDR:9092", }, log_level: Error, }
> fetching committed offsets..
> Error: Meta data fetch error: OperationTimedOut (Local: Timed out) Caused by:
> OperationTimedOut (Local: Timed out){code}
> On the Kafka side I see _unstable_offset_commits_ errors reported by out
> internal metric which is derived from:
> {noformat}
>
> kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=X,error=Y{noformat}
> Increasing the timeout doesn't help and the only solution I've found is to
> trigger commits for all partitions with metadata below the limit or to use:
> {code:java}
> isolation.level=read_uncommitted{code}
>
> I don't know that code very well but
> [https://github.com/apache/kafka/blob/3.7/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L492-L496]
> seems fishy:
> {code:java}
> if (isTxnOffsetCommit) {
> addProducerGroup(producerId, group.groupId)
> group.prepareTxnOffsetCommit(producerId, offsetMetadata)
> } else {
> group.prepareOffsetCommit(offsetMetadata)
> }{code}
> as it's using _offsetMetadata_ and not _filteredOffsetMetadata_ and I see
> that while removing those pending commits we use filtered offset metadata
> around
> [https://github.com/apache/kafka/blob/3.7/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L397-L422]
>
> {code:java}
> val responseError = group.inLock {
> if (status.error == Errors.NONE) {
> if (!group.is(Dead)) {
> filteredOffsetMetadata.forKeyValue { (topicIdPartition,
> offsetAndMetadata) =>
> if (isTxnOffsetCommit)
> group.onTxnOffsetCommitAppend(producerId, topicIdPartition,
> CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
> else
> group.onOffsetCommitAppend(topicIdPartition,
> CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
> }
> }
> // Record the number of offsets committed to the log
> offsetCommitsSensor.record(records.size)
> Errors.NONE
> } else {
> if (!group.is(Dead)) {
> if (!group.hasPendingOffsetCommitsFromProducer(producerId))
> removeProducerGroup(producerId, group.groupId)
> filteredOffsetMetadata.forKeyValue { (topicIdPartition,
> offsetAndMetadata) =>
> if (isTxnOffsetCommit)
> group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
> else
> group.failPendingOffsetWrite(topicIdPartition,
> offsetAndMetadata)
> }
> }
> {code}
> so the problem might be related to not cleaning up the data structure with
> pending commits properly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)