[
https://issues.apache.org/jira/browse/KAFKA-15035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17934211#comment-17934211
]
zhangzhisheng commented on KAFKA-15035:
---------------------------------------
We have also encountered this problem in the production environment, and the
specific manifestation is that some groups have site resets
kafka server 2.7.2 and 2.4.2
{code:java}
[2025-03-11 19:45:03,061] INFO [GroupCoordinator 2]: Preparing to rebalance
group nihao-newbee-bill in state PreparingRebalance with old generation 4549
(__consumer_offsets-16) (reason: removing member
nihao-newbee-bill_80ab55b3f31540da-86e2b6ec-8932-4869-b53c-dbd4862491bf on
heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2025-03-11 19:45:03,985] INFO [GroupCoordinator 2]: Member
nihao-newbee-bill_0eff5fbfbf1cf62f-e65cc081-e44e-41af-9fac-262f8a3a0f6a in
group nihao-newbee-bill has failed, removing it from the group
(kafka.coordinator.group.GroupCoordinator)
[2025-03-11 19:45:03,992] INFO [GroupCoordinator 2]: Member
nihao-newbee-bill_dbde5089929e8b07-b4967d22-c142-45b1-823f-0f09f296c625 in
group nihao-newbee-bill has failed, removing it from the group
(kafka.coordinator.group.GroupCoordinator)
{code}
> Consumer offsets can be deleted if kafka does not detect a consumer as empty
> ----------------------------------------------------------------------------
>
> Key: KAFKA-15035
> URL: https://issues.apache.org/jira/browse/KAFKA-15035
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 2.7.2
> Reporter: Sam Cantero
> Priority: Major
>
> We've recently encountered a scenario where a consumer group got their
> committed offsets deleted some minutes (around 3 minutes) after the consumer
> got into inactive state (the underlying node went away).
> As per
> [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets],
> committed offsets for an active (i.e running) consumer group should not be
> deleted. However, if a consumer becomes inactive, {+}the deletion of
> committed offsets will not occur immediately{+}. Instead, the committed
> offsets will only be removed if the consumer remains inactive for at least
> the duration specified by
> [offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes].
> In our case {{offset.retention.minutes}} is set to 7 days and the consumer
> was only inactive for 5 minutes, so deletion should have not occurred.
> Later on
> [KIP-496|https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets]
> was introduced to fix the following issue in KIP-211:
> {quote}When a consumer subscription changes, we are still left with the
> committed offsets of the previous subscription. These will never be cleaned
> up as long as the group remains active. We were aware of this problem in
> KIP-211, but the solution was not implemented because the coordinator is
> presently agnostic to join group metadata and we were unclear about the
> compatibility implications of changing that.
> {quote}
> However this introduced a regression as explained in
> https://issues.apache.org/jira/browse/KAFKA-13636.
> {quote}The group coordinator might delete invalid offsets during a group
> rebalance. During a rebalance, the coordinator is relying on the last commit
> timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state
> modification {_}timestamp (currentStateTimestamp{_}) to detect expired
> offsets.
> {quote}
> It is implied, that Kafka employs two approaches for offset expiration:
> * The deletion timer is activated when a consumer group enters the Empty
> state (i.e., not running). Once the timer exceeds the
> {{offset.retention.minutes}} threshold, the committed offsets are deleted.
> * If a consumer is in a "running" state (i.e., not in the Empty state) but
> is no longer consuming from topics with committed offsets older than the
> offset.retention.minutes duration, the committed offsets are deleted.
> However, the Kafka issue KAFKA-13636 specifically states that this situation
> could occur during a group rebalance. In my particular scenario, I have
> observed that the affected consumer group did not transition into the Empty
> state. I'm uncertain whether the rebalance has finished. The reason why Kafka
> did not detect this consumer group as Empty remains unclear.
> *Logs*
> {noformat}
> 01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg
> has failed, removing it from the group
> 01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in
> state PreparingRebalance with old generation 432 (__consumer_offsets-16)
> (reason: removing member consumer-mycg-1-uuid on heartbeat expiration)
> 1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg
> has failed, removing it from the group
> 01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433
> (__consumer_offsets-16)
> 01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group
> mycg for generation 433{noformat}
> This suggests that kafka might have followed the second approach and that's
> why kafka deleted the offsets 3 minutes later.
> {noformat}
> 1:33:17 am -
> [GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8
> milliseconds.{noformat}
> h4. Was the consumer group in a rebalance state when the offsets were removed?
> By looking at the logs and the kafka codebase, we can find the following:
> * The [onExpireHeartbeat
> callback|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1278]
> on the GroupCoordinator is called. The
> [removeMemberAndUpdateGroup|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1291]
> method is called.
> * The removeMemberAndUpdateGroup method will call [maybePrepareRebalance
> when the group is
> stable|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1158].
> This was the case as the group was not empty (not running), dead (removed)
> or PreparingRebalance state from kafka perspective.
> * maybePrepareRebalance [just calls
> prepareRebalance|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1118]
> if the group can rebalance.
> * PrepareRebalance will call
> [DelayedJoin|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1136]
> as the group is not empty. [This
> object|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala#L25-L28]
> takes care of the join-group operation (first phase of a rebalance).
> * When the group is not empty nor dead, it [will
> log|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1222]
> {{{}Stabilized group ${group.groupId} generation
> ${group.generationId}{{}}}}. {*}This does not mean the rebalance phase is
> over{*}. It will immediately after wait on the JoinGroupResponse requests
> from all members. This is still the first phase of the rebalance.
> * Later on we can find a "Assignment received from leader for group" log
> line. This log line comes from the group coordinator when handling a sync
> group request (2nd phase of the rebalance) in the [CompletingRebalance
> state|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L412].
> * By following that codepath, if no errors are logged, then [the assignment
> will be propagated and the group transition to Stable
> state|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L432-L433].
> This seems to indicate that we were not in a rebalance state and [the
> fix|https://github.com/apache/kafka/pull/11742/files] for KAFKA-13636 won't
> fix the issue seen here. The main question remains as to why Kafka did not
> transition the consumer group to an empty state.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)