[
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521996#comment-17521996
]
Chris Egerton edited comment on KAFKA-12495 at 4/14/22 2:14 AM:
----------------------------------------------------------------
{quote}Allowing for consecutive revocations that happen immediately when an
imbalance is detected might mean that the workers overreact to external
circumstances that have caused an imbalanced between the initial calculation of
task assignments of the revocation rebalance and the subsequent rebalance for
the assignment of revoked tasks. Such circumstances might have to do with
rolling upgrades, scaling a cluster up or down or simply might be caused by
temporary instability.
{quote}
Have you identified a plausible case where this may be an issue? Load-balancing
revocations are only necessary when the number of workers has increased (or
when the number of connectors/tasks has decreased, although this is not
addressed in the current rebalancing algorithm). At least with the case of new
workers, is it really an overreaction to find connectors/tasks to allocate to
them as soon as possible?
Going over the example cases provided:
* With a rolling upgrade, the existing delayed rebalancing logic should
already apply, preventing excessive revocations from taking place
* With a cluster scale-down, no load-balancing revocations should be
necessary, since the number of connectors/tasks per worker will increase, not
decrease
* With a cluster scale-up, immediate revocation will not only not be harmful,
it will actually be advantageous as it will allow the new workers to begin work
immediately instead of waiting for the scheduled rebalance delay to elapse.
This could be crucial if there's a load burst across the cluster and an
external auto-scaling process spins up new workers to try to respond as quickly
as possible
* With temporary instability, if workers fall out of the cluster, the existing
delayed rebalancing logic should already apply, preventing excessive
revocations from taking place. There may be another interpretation of what this
scenario would look like in terms of workers leaving/joining the cluster; let
me know if you had something else in mind
The only case I can think of where unconditionally delaying between revocations
may be beneficial is if there's a rapid scale-up and then immediate scale-down
of a cluster. If we hold off on revoking too many connectors/tasks from the
pre-scale-up workers in the cluster, then we'll have to reassign fewer of them
once the scale-down takes place. But unless I'm missing something, this is an
unlikely edge case and should not be prioritized.
{quote}To shield ourselves from infinite such rebalances the leader should also
keep track of how many such attempts have been made and stop attempting to
balance out tasks after a certain number of tries. Of course every other normal
rebalance should reset both this counter and possibly the delay.
{quote}
This is a great suggestion, especially since it can (and should) be implemented
regardless of whether a delay is added between consecutive load-balancing
revocations.
was (Author: chrisegerton):
{quote}Allowing for consecutive revocations that happen immediately when an
imbalance is detected might mean that the workers overreact to external
circumstances that have caused an imbalanced between the initial calculation of
task assignments of the revocation rebalance and the subsequent rebalance for
the assignment of revoked tasks. Such circumstances might have to do with
rolling upgrades, scaling a cluster up or down or simply might be caused by
temporary instability.
{quote}
Have you identified a plausible case where this may be an issue? Load-balancing
revocations are only necessary when the number of workers has increased (or
when the number of connectors/tasks has decreased, although this is not
addressed in the current rebalancing algorithm). At least with the case of new
workers, is it really an overreaction to find connectors/tasks to allocate to
them as soon as possible?
Going over the example cases provided:
* With a rolling upgrade, the existing delayed rebalancing logic should
already apply, preventing excessive revocations from taking place
* With a cluster scale-down, no load-balancing revocations should be
necessary, since the number of connectors/tasks per worker will increase, not
decrease
* With a cluster scale-up, immediate revocation will not only not be harmful,
it will actually be advantageous as it will allow the new workers to begin work
immediately instead of waiting for the scheduled rebalance delay to elapse.
This could be crucial if there's a load burst across the cluster and an
external auto-scaling process spins up new workers to try to respond as quickly
as possible
* With temporary instability, if workers fall out of the cluster, the existing
delayed rebalancing logic should already apply, preventing excessive
revocations from taking place. There may be another interpretation of this
scenario; let me know if you had something else in mind
The only case I can think of where unconditionally delaying between revocations
may be beneficial is if there's a rapid scale-up and then immediate scale-down
of a cluster. If we hold off on revoking too many connectors/tasks from the
pre-scale-up workers in the cluster, then we'll have to reassign fewer of them
once the scale-down takes place. But unless I'm missing something, this is an
unlikely edge case and should not be prioritized.
{quote}To shield ourselves from infinite such rebalances the leader should also
keep track of how many such attempts have been made and stop attempting to
balance out tasks after a certain number of tries. Of course every other normal
rebalance should reset both this counter and possibly the delay.
{quote}
This is a great suggestion, especially since it can (and should) be implemented
regardless of whether a delay is added between consecutive load-balancing
revocations.
> Unbalanced connectors/tasks distribution will happen in Connect's incremental
> cooperative assignor
> --------------------------------------------------------------------------------------------------
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Reporter: Luke Chen
> Assignee: Luke Chen
> Priority: Blocker
> Fix For: 3.2.0
>
> Attachments: image-2021-03-18-15-04-57-854.png,
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm
> based on KIP-415
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
> However, we have a bad assumption in the algorithm implementation, which is:
> after revoking rebalance completed, the member(worker) count will be the same
> as the previous round of reblance.
>
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance
> completed and before 2nd rebalance started? Let's see what will happened?
> Let's see this example: (we'll use 10 tasks here):
>
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1,
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4,
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4,
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1,
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member,
> but we didn't revoke any more C/T in this round, which cause unbalanced
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive
> rebalances (under the same leader), we will have this uneven distribution
> under this situation. We should allow consecutive rebalance to have another
> round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1,
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4,
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4,
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1,
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member,
> **and also revoke some C/T**
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other
> members
> W1 rejoins with assignment: [AC0, AT1, AT2]
> Rebalance is triggered
> W2 joins with assignment: [AT4, AT5, BC0]
> W3 joins with assignment: [BT1, BT2, BT4]
> W4 joins with assignment: [BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> // (final) We assigned all the previous revoked Connectors/Tasks to the
> members
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: [])
> {code}
> Note: The consumer's cooperative sticky assignor won't have this issue since
> we re-compute the assignment in each round.
>
> Note2: this issue makes KAFKA-12283 test flaky.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)