[
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Luke Chen updated KAFKA-14024:
------------------------------
Description:
Hi
In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue
that consumer#poll(duration) will be returned after the provided duration. It's
because if rebalance needed, we'll try to commit current offset first before
rebalance synchronously. And if the offset committing takes too long, the
consumer#poll will spend more time than provided duration. To fix that, we
change commit sync with commit async before rebalance (i.e. onPrepareJoin).
However, in this ticket, we found the async commit will keep sending a new
commit request during each Consumer#poll, because the offset commit never
completes in time. The impact is that the existing consumer will be kicked out
of the group after rebalance timeout without joining the group. That is,
suppose we have consumer A in group G, and now consumer B joined the group,
after the rebalance, only consumer B in the group.
The workaround for this issue is to change the assignor back to eager
assignors, ex: StickyAssignor, RoundRobinAssignor.
To fix the issue, we come out 2 solutions:
# we can explicitly wait for the async commit complete in onPrepareJoin, but
that would let the KAFKA-13310 issue happen again.
# 2.we can try to keep the async commit offset future currently inflight. So
that we can make sure each Consumer#poll, we are waiting for the future
completes
Besides, there's also another bug found during fixing this bug. Before
KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when
retriable error until timeout. After KAFKA-13310, we thought we have retry, but
we'll retry after partitions revoking. That is, even though the retried offset
commit successfully, it still causes some partitions offsets un-committed, and
after rebalance, other consumers will consume overlapping records.
===
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
we didn't wait for client to receive commit offset response here, so
onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and
client will loop in invoking onJoinPrepare.
I think the EAGER mode don't have this problem because it will revoke the
partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try to
commit next round.
reproduce:
* single node Kafka version 3.2.0 && client version 3.2.0
* topic1 have 5 partititons
* start a consumer1 (cooperative rebalance)
* start another consumer2 (same consumer group)
* consumer1 will hang for a long time before re-join
* from server log consumer1 rebalance timeout before joineGroup and re-join
with another memberId
consume1's log keeps printing:
16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 54
and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938
(ConsumerCoordinator.java:739)
16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of
offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}}
(ConsumerCoordinator.java:1143)
and coordinator's log:
[2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance
group xxx in state PreparingRebalance with old generation 56
(__consumer_offsets-30) (reason: Adding new member
consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id
None; client reason: rebalance failed due to 'The group member needs to have a
valid member id before actually entering a consumer group.'
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed dynamic
members who haven't joined:
Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a)
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx
generation 57 (__consumer_offsets-30) with 3 members
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with
unknown member id joins group xxx in CompletingRebalance state. Created a new
member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the
member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from
leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for
generation 57. The group has 3 members, 0 of which are static.
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance
group xxx in state PreparingRebalance with old generation 57
(__consumer_offsets-30) (reason: Adding new member
consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id
None; client reason: rebalance failed due to 'The group member needs to have a
valid member id before actually entering a consumer group.'
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
was:
Hi
In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue
that consumer#poll(duration) will be returned after the provided duration. It's
because if rebalance needed, we'll try to commit current offset first before
rebalance synchronously. And if the offset committing takes too long, the
consumer#poll will spend more time than provided duration. To fix that, we
change commit sync with commit async before rebalance (i.e. onPrepareJoin).
However, in this ticket, we found the async commit will keep sending a new
commit request during each Consumer#poll, because the offset commit never
completes in time. The impact is that the existing consumer will be kicked out
of the group after rebalance timeout without joining the group. That is,
suppose we have consumer A in group G, and now consumer B joined the group,
after the rebalance, only consumer B in the group.
The workaround for this issue is to change the assignor back to eager
assignors, ex: StickyAssignor, RoundRobinAssignor.
To fix the issue, we come out 2 solutions:
# we can explicitly wait for the async commit complete in onPrepareJoin, but
that would let the KAFKA-13310 issue happen again.
# 2.we can try to keep the async commit offset future currently inflight. So
that we can make sure each Consumer#poll, we are waiting for the future
completes
===
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
we didn't wait for client to receive commit offset response here, so
onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and
client will loop in invoking onJoinPrepare.
I think the EAGER mode don't have this problem because it will revoke the
partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try to
commit next round.
reproduce:
* single node Kafka version 3.2.0 && client version 3.2.0
* topic1 have 5 partititons
* start a consumer1 (cooperative rebalance)
* start another consumer2 (same consumer group)
* consumer1 will hang for a long time before re-join
* from server log consumer1 rebalance timeout before joineGroup and re-join
with another memberId
consume1's log keeps printing:
16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 54
and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938
(ConsumerCoordinator.java:739)
16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer
clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of
offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}}
(ConsumerCoordinator.java:1143)
and coordinator's log:
[2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance
group xxx in state PreparingRebalance with old generation 56
(__consumer_offsets-30) (reason: Adding new member
consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id
None; client reason: rebalance failed due to 'The group member needs to have a
valid member id before actually entering a consumer group.'
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed dynamic
members who haven't joined:
Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a)
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx
generation 57 (__consumer_offsets-30) with 3 members
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with
unknown member id joins group xxx in CompletingRebalance state. Created a new
member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the
member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from
leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for
generation 57. The group has 3 members, 0 of which are static.
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance
group xxx in state PreparingRebalance with old generation 57
(__consumer_offsets-30) (reason: Adding new member
consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id
None; client reason: rebalance failed due to 'The group member needs to have a
valid member id before actually entering a consumer group.'
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> ------------------------------------------------------------------------------
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 3.2.0
> Reporter: Shawn Wang
> Priority: Blocker
> Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue
> that consumer#poll(duration) will be returned after the provided duration.
> It's because if rebalance needed, we'll try to commit current offset first
> before rebalance synchronously. And if the offset committing takes too long,
> the consumer#poll will spend more time than provided duration. To fix that,
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>
> However, in this ticket, we found the async commit will keep sending a new
> commit request during each Consumer#poll, because the offset commit never
> completes in time. The impact is that the existing consumer will be kicked
> out of the group after rebalance timeout without joining the group. That is,
> suppose we have consumer A in group G, and now consumer B joined the group,
> after the rebalance, only consumer B in the group.
>
> The workaround for this issue is to change the assignor back to eager
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>
> To fix the issue, we come out 2 solutions:
> # we can explicitly wait for the async commit complete in onPrepareJoin, but
> that would let the KAFKA-13310 issue happen again.
> # 2.we can try to keep the async commit offset future currently inflight. So
> that we can make sure each Consumer#poll, we are waiting for the future
> completes
>
> Besides, there's also another bug found during fixing this bug. Before
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry
> when retriable error until timeout. After KAFKA-13310, we thought we have
> retry, but we'll retry after partitions revoking. That is, even though the
> retried offset commit successfully, it still causes some partitions offsets
> un-committed, and after rebalance, other consumers will consume overlapping
> records.
>
>
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>
> we didn't wait for client to receive commit offset response here, so
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try
> to commit next round.
> reproduce:
> * single node Kafka version 3.2.0 && client version 3.2.0
> * topic1 have 5 partititons
> * start a consumer1 (cooperative rebalance)
> * start another consumer2 (same consumer group)
> * consumer1 will hang for a long time before re-join
> * from server log consumer1 rebalance timeout before joineGroup and re-join
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}}
> (ConsumerCoordinator.java:1143)
>
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance
> group xxx in state PreparingRebalance with old generation 56
> (__consumer_offsets-30) (reason: Adding new member
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id
> None; client reason: rebalance failed due to 'The group member needs to have
> a valid member id before actually entering a consumer group.'
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed
> dynamic members who haven't joined:
> Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a)
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx
> generation 57 (__consumer_offsets-30) with 3 members
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with
> unknown member id joins group xxx in CompletingRebalance state. Created a new
> member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the
> member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from
> leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for
> generation 57. The group has 3 members, 0 of which are static.
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance
> group xxx in state PreparingRebalance with old generation 57
> (__consumer_offsets-30) (reason: Adding new member
> consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id
> None; client reason: rebalance failed due to 'The group member needs to have
> a valid member id before actually entering a consumer group.'
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)