[
https://issues.apache.org/jira/browse/KAFKA-15035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sam Cantero updated KAFKA-15035:
--------------------------------
Description:
We've recently encountered a scenario where a consumer group got their
committed offsets deleted some minutes (around 3 minutes) after the consumer
got into inactive state (the underlying node went away).
As per
[KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets],
committed offsets for an active (i.e running) consumer group should not be
deleted. However, if a consumer becomes inactive, {+}the deletion of committed
offsets will not occur immediately{+}. Instead, the committed offsets will only
be removed if the consumer remains inactive for at least the duration specified
by
[offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes].
In our case {{offset.retention.minutes}} is set to 7 days and the consumer was
only inactive for 5 minutes, so deletion should have not occurred.
Later on
[KIP-496|https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets]
was introduced to fix the following issue in KIP-211:
{quote}When a consumer subscription changes, we are still left with the
committed offsets of the previous subscription. These will never be cleaned up
as long as the group remains active. We were aware of this problem in KIP-211,
but the solution was not implemented because the coordinator is presently
agnostic to join group metadata and we were unclear about the compatibility
implications of changing that.
{quote}
However this introduced a regression as explained in
https://issues.apache.org/jira/browse/KAFKA-13636.
{quote}The group coordinator might delete invalid offsets during a group
rebalance. During a rebalance, the coordinator is relying on the last commit
timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state
modification {_}timestamp (currentStateTimestamp{_}) to detect expired offsets.
{quote}
It is implied, that Kafka employs two approaches for offset expiration:
* The deletion timer is activated when a consumer group enters the Empty state
(i.e., not running). Once the timer exceeds the {{offset.retention.minutes}}
threshold, the committed offsets are deleted.
* If a consumer is in a "running" state (i.e., not in the Empty state) but is
no longer consuming from topics with committed offsets older than the
offset.retention.minutes duration, the committed offsets are deleted.
However, the Kafka issue KAFKA-13636 specifically states that this situation
could occur during a group rebalance. In my particular scenario, I have
observed that the affected consumer group did not transition into the Empty
state. I'm uncertain whether the rebalance has finished. The reason why Kafka
did not detect this consumer group as Empty remains unclear.
*Logs*
{noformat}
01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg
has failed, removing it from the group
01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in state
PreparingRebalance with old generation 432 (__consumer_offsets-16) (reason:
removing member consumer-mycg-1-uuid on heartbeat expiration)
1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg
has failed, removing it from the group
01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433
(__consumer_offsets-16)
01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group
mycg for generation 433{noformat}
This suggests that kafka might have followed the second approach and that's why
kafka deleted the offsets 3 minutes later.
{noformat}
1:33:17 am -
[GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8
milliseconds.{noformat}
h4. Was the consumer group in a rebalance state when the offsets were removed?
By looking at the logs and the kafka codebase, we can find the following:
* The [onExpireHeartbeat
callback|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1278]
on the GroupCoordinator is called. The
[removeMemberAndUpdateGroup|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1291]
method is called.
* The removeMemberAndUpdateGroup method will call [maybePrepareRebalance when
the group is
stable|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1158].
This was the case as the group was not empty (not running), dead (removed) or
PreparingRebalance state from kafka perspective.
* maybePrepareRebalance [just calls
prepareRebalance|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1118]
if the group can rebalance.
* PrepareRebalance will call
[DelayedJoin|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1136]
as the group is not empty. [This
object|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala#L25-L28]
takes care of the join-group operation (first phase of a rebalance).
* When the group is not empty nor dead, it [will
log|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1222]
{{{}Stabilized group ${group.groupId} generation ${group.generationId}{{}}}}.
{*}This does not mean the rebalance phase is over{*}. It will immediately after
wait on the JoinGroupResponse requests from all members. This is still the
first phase of the rebalance.
* Later on we can find a "Assignment received from leader for group" log line.
This log line comes from the group coordinator when handling a sync group
request (2nd phase of the rebalance) in the [CompletingRebalance
state|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L412].
* By following that codepath, if no errors are logged, then [the assignment
will be propagated and the group transition to Stable
state|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L432-L433].
This seems to indicate that we were not in a rebalance state and [the
fix|https://github.com/apache/kafka/pull/11742/files] for KAFKA-13636 won't fix
the issue seen here. The main question remains as to why Kafka did not
transition the consumer group to an empty state.
was:
We've recently encountered a scenario where a consumer group got their
committed offsets deleted some minutes (around 3 minutes) after the consumer
got into inactive state (the underlying node went away).
As per
[KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets],
committed offsets for an active (i.e running) consumer group should not be
deleted. However, if a consumer becomes inactive, {+}the deletion of committed
offsets will not occur immediately{+}. Instead, the committed offsets will only
be removed if the consumer remains inactive for at least the duration specified
by
[offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes].
In our case {{offset.retention.minutes}} is set to 7 days and the consumer was
only inactive for 5 minutes, so deletion should have not occurred.
Later on
[KIP-496|https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets]
was introduced to fix the following issue in KIP-211:
{quote}When a consumer subscription changes, we are still left with the
committed offsets of the previous subscription. These will never be cleaned up
as long as the group remains active. We were aware of this problem in KIP-211,
but the solution was not implemented because the coordinator is presently
agnostic to join group metadata and we were unclear about the compatibility
implications of changing that.
{quote}
However this introduced a regression as explained in
https://issues.apache.org/jira/browse/KAFKA-13636.
{quote}The group coordinator might delete invalid offsets during a group
rebalance. During a rebalance, the coordinator is relying on the last commit
timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state
modification {_}timestamp (currentStateTimestamp{_}) to detect expired offsets.
{quote}
It is implied, that Kafka employs two approaches for offset expiration:
* The deletion timer is activated when a consumer group enters the Empty state
(i.e., not running). Once the timer exceeds the {{offset.retention.minutes}}
threshold, the committed offsets are deleted.
* If a consumer is in a "running" state (i.e., not in the Empty state) but is
no longer consuming from topics with committed offsets older than the
offset.retention.minutes duration, the committed offsets are deleted.
However, the Kafka issue KAFKA-13636 specifically states that this situation
could occur during a group rebalance. In my particular scenario, I have
observed that the affected consumer group did not transition into the Empty
state. I'm uncertain whether the rebalance has finished. The reason why Kafka
did not detect this consumer group as Empty remains unclear.
*Logs*
{noformat}
01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg
has failed, removing it from the group
01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in state
PreparingRebalance with old generation 432 (__consumer_offsets-16) (reason:
removing member consumer-mycg-1-uuid on heartbeat expiration)
1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg
has failed, removing it from the group
01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433
(__consumer_offsets-16)
01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group
mycg for generation 433{noformat}
This suggests that kafka might have followed the second approach and that's why
kafka deleted the offsets 3 minutes later.
{noformat}
1:33:17 am -
[GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8
milliseconds.{noformat}
h4. Was the consumer group in a rebalance state when the offsets were removed?
By looking at the logs and the kafka codebase, we can find the following:
* The [onExpireHeartbeat
callback|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1278]
on the GroupCoordinator is called. The
[removeMemberAndUpdateGroup|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1291]
method is called.
* The removeMemberAndUpdateGroup method will call [maybePrepareRebalance when
the group is
stable|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1158].
This was the case as the group was not empty (not running), dead (removed) or
PreparingRebalance state from kafka perspective.
* maybePrepareRebalance [just calls
prepareRebalance|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1118]
if the group can rebalance.
* PrepareRebalance will call
[DelayedJoin|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1136]
as the group is not empty. [This
object|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala#L25-L28]
takes care of the join-group operation (first phase of a rebalance).
* When the group is not empty nor dead, it [will
log|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1222]
{{{}Stabilized group ${group.groupId} generation ${group.generationId}{{}}}}.
{*}This does not mean the rebalance phase is over{*}. It will immediately after
wait on the JoinGroupResponse requests from all members. This is still the
first phase of the rebalance.
* Later on we can find a "Assignment received from leader for group" log line.
This log line comes from the group coordinator when handling a sync group
request (2nd phase of the rebalance) in the [CompletingRebalance
state|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L412].
* By following that codepath, if no errors are logged, then [the assignment
will be propagated and the group transition to Stable
state|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L432-L433].
This seems to indicate that [the
fix|https://github.com/apache/kafka/pull/11742/files] for KAFKA-13636 won't fix
the issue here and the main question is why kafka didn't move the consumer
group as empty?
> Consumer offsets can be deleted if kafka does not detect a consumer as empty
> ----------------------------------------------------------------------------
>
> Key: KAFKA-15035
> URL: https://issues.apache.org/jira/browse/KAFKA-15035
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 2.7.2
> Reporter: Sam Cantero
> Priority: Major
>
> We've recently encountered a scenario where a consumer group got their
> committed offsets deleted some minutes (around 3 minutes) after the consumer
> got into inactive state (the underlying node went away).
> As per
> [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets],
> committed offsets for an active (i.e running) consumer group should not be
> deleted. However, if a consumer becomes inactive, {+}the deletion of
> committed offsets will not occur immediately{+}. Instead, the committed
> offsets will only be removed if the consumer remains inactive for at least
> the duration specified by
> [offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes].
> In our case {{offset.retention.minutes}} is set to 7 days and the consumer
> was only inactive for 5 minutes, so deletion should have not occurred.
> Later on
> [KIP-496|https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets]
> was introduced to fix the following issue in KIP-211:
> {quote}When a consumer subscription changes, we are still left with the
> committed offsets of the previous subscription. These will never be cleaned
> up as long as the group remains active. We were aware of this problem in
> KIP-211, but the solution was not implemented because the coordinator is
> presently agnostic to join group metadata and we were unclear about the
> compatibility implications of changing that.
> {quote}
> However this introduced a regression as explained in
> https://issues.apache.org/jira/browse/KAFKA-13636.
> {quote}The group coordinator might delete invalid offsets during a group
> rebalance. During a rebalance, the coordinator is relying on the last commit
> timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state
> modification {_}timestamp (currentStateTimestamp{_}) to detect expired
> offsets.
> {quote}
> It is implied, that Kafka employs two approaches for offset expiration:
> * The deletion timer is activated when a consumer group enters the Empty
> state (i.e., not running). Once the timer exceeds the
> {{offset.retention.minutes}} threshold, the committed offsets are deleted.
> * If a consumer is in a "running" state (i.e., not in the Empty state) but
> is no longer consuming from topics with committed offsets older than the
> offset.retention.minutes duration, the committed offsets are deleted.
> However, the Kafka issue KAFKA-13636 specifically states that this situation
> could occur during a group rebalance. In my particular scenario, I have
> observed that the affected consumer group did not transition into the Empty
> state. I'm uncertain whether the rebalance has finished. The reason why Kafka
> did not detect this consumer group as Empty remains unclear.
> *Logs*
> {noformat}
> 01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg
> has failed, removing it from the group
> 01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in
> state PreparingRebalance with old generation 432 (__consumer_offsets-16)
> (reason: removing member consumer-mycg-1-uuid on heartbeat expiration)
> 1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg
> has failed, removing it from the group
> 01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433
> (__consumer_offsets-16)
> 01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group
> mycg for generation 433{noformat}
> This suggests that kafka might have followed the second approach and that's
> why kafka deleted the offsets 3 minutes later.
> {noformat}
> 1:33:17 am -
> [GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8
> milliseconds.{noformat}
> h4. Was the consumer group in a rebalance state when the offsets were removed?
> By looking at the logs and the kafka codebase, we can find the following:
> * The [onExpireHeartbeat
> callback|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1278]
> on the GroupCoordinator is called. The
> [removeMemberAndUpdateGroup|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1291]
> method is called.
> * The removeMemberAndUpdateGroup method will call [maybePrepareRebalance
> when the group is
> stable|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1158].
> This was the case as the group was not empty (not running), dead (removed)
> or PreparingRebalance state from kafka perspective.
> * maybePrepareRebalance [just calls
> prepareRebalance|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1118]
> if the group can rebalance.
> * PrepareRebalance will call
> [DelayedJoin|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1136]
> as the group is not empty. [This
> object|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala#L25-L28]
> takes care of the join-group operation (first phase of a rebalance).
> * When the group is not empty nor dead, it [will
> log|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1222]
> {{{}Stabilized group ${group.groupId} generation
> ${group.generationId}{{}}}}. {*}This does not mean the rebalance phase is
> over{*}. It will immediately after wait on the JoinGroupResponse requests
> from all members. This is still the first phase of the rebalance.
> * Later on we can find a "Assignment received from leader for group" log
> line. This log line comes from the group coordinator when handling a sync
> group request (2nd phase of the rebalance) in the [CompletingRebalance
> state|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L412].
> * By following that codepath, if no errors are logged, then [the assignment
> will be propagated and the group transition to Stable
> state|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L432-L433].
> This seems to indicate that we were not in a rebalance state and [the
> fix|https://github.com/apache/kafka/pull/11742/files] for KAFKA-13636 won't
> fix the issue seen here. The main question remains as to why Kafka did not
> transition the consumer group to an empty state.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)