[
https://issues.apache.org/jira/browse/KAFKA-9478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048295#comment-17048295
]
Ivan Yurchenko commented on KAFKA-9478:
---------------------------------------
I did some further investigation, let me explain what I found out.
When a reassignment issued previously is in progress, in both pre- and
post-bdf2446ccce592f3c000290f11de88520327aa19 (let's call them _OLD_ and _NEW_)
the controller doesn't act upon the new reassignment (sent with {{SET}}
operation).
However, in _OLD_ {{/admin/reassign_partitions}} ZooKeeper node is ultimately
deleted (despite containing the second reassignment command) and the watch on
it is restored. The difference in the behaviour is this:
1. In _OLD_: a partition that just have been reassigned is removed from the
in-memory set {{controllerContext.partitionsBeingReassigned}} and the result is
written to Zk (see
[here|https://github.com/apache/kafka/blob/3fe6b5e951db8f7184a4098f8ad8a1afb2b2c1a0/core/src/main/scala/kafka/controller/KafkaController.scala#L1083]).
2. In _NEW_: a partition that just have been reassigned is filtered out from
{{/admin/reassign_partitions}}, which is re-read every time (see
[here|https://github.com/apache/kafka/blob/bdf2446ccce592f3c000290f11de88520327aa19/core/src/main/scala/kafka/controller/KafkaController.scala#L1001]).
Because of this difference, in _NEW_ the second reassignment command stays
forever (until the controller re-election) in ZooKeeper, without the controller
taking any actions upon it, without the deletion, and without the watch being
restored.
This is possible to solve by changing {{maybeRemoveFromZkReassignment}}
function in the following way:
{code:scala}
val reassigningPartitions = zkClient.getPartitionReassignment()
val (removingPartitions, updatedPartitionsBeingReassigned) =
reassigningPartitions.partition { case (tp, replicas) =>
shouldRemoveReassignment(tp, replicas) ||
!controllerContext.partitionsBeingReassigned.contains(tp)
}
{code}
i.e. by explicitly filtering out partitions that are not in
{{partitionsBeingReassigned}}. I made a
[PR|https://github.com/apache/kafka/pull/8196] with this change. How do you
feel about it?
> Controller may stop react on partition reassignment command in ZooKeeper
> ------------------------------------------------------------------------
>
> Key: KAFKA-9478
> URL: https://issues.apache.org/jira/browse/KAFKA-9478
> Project: Kafka
> Issue Type: Bug
> Components: controller, core
> Affects Versions: 2.4.0, 2.4.1
> Reporter: Ivan Yurchenko
> Assignee: Ivan Yurchenko
> Priority: Major
>
> Seemingly after
> [bdf2446ccce592f3c000290f11de88520327aa19|https://github.com/apache/kafka/commit/bdf2446ccce592f3c000290f11de88520327aa19],
> the controller may stop watching {{/admin/reassign_partitions}} node in
> ZooKeeper and consequently accept partition reassignment commands via
> ZooKeeper.
> I'm not 100% sure that bdf2446ccce592f3c000290f11de88520327aa19 causes this,
> but it doesn't reproduce on
> [3fe6b5e951db8f7184a4098f8ad8a1afb2b2c1a0|https://github.com/apache/kafka/commit/3fe6b5e951db8f7184a4098f8ad8a1afb2b2c1a0]
> - the one right before it.
> Also, reproduces on the trunk HEAD
> [a87decb9e4df5bfa092c26ae4346f65c426f1321|https://github.com/apache/kafka/commit/a87decb9e4df5bfa092c26ae4346f65c426f1321].
> h1. How to reproduce
> 1. Run ZooKeeper and two Kafka brokers.
> 2. Create a topic with 100 partitions and place them on Broker 0:
> {code:bash}
> distro/bin/kafka-topics.sh --bootstrap-server localhost:9092,localhost:9093
> --create \
> --topic foo \
> --replica-assignment $(for i in {0..99}; do echo -n "0,"; done | sed
> 's/.$$//')
> {code}
> 3. Add some data:
> {code:bash}
> seq 1 1000000 | bin/kafka-console-producer.sh --broker-list
> localhost:9092,localhost:9093 --topic foo
> {code}
> 4. Create the partition reassignment node {{/admin/reassign_partitions}} in
> Zoo and shortly after that update the data in the node (even the same value
> will do). I made a simple Python script for this:
> {code:python}
> import time
> import json
> from kazoo.client import KazooClient
> zk = KazooClient(hosts='127.0.0.1:2181')
> zk.start()
> reassign = {
> "version": 1,
> "partitions":[]
> }
> for p in range(100):
> reassign["partitions"].append({"topic": "foo", "partition": p,
> "replicas": [1]})
> zk.create("/admin/reassign_partitions", json.dumps(reassign).encode())
> time.sleep(0.05)
> zk.set("/admin/reassign_partitions", json.dumps(reassign).encode())
> {code}
> 4. Observe that the controller doesn't react on further updates to
> {{/admin/reassign_partitions}} and doesn't delete the node.
> Also, it can be confirmed with
> {code:bash}
> echo wchc | nc 127.0.0.1 2181
> {code}
> that there is no watch on the node in ZooKeeper (for this, you should run
> ZooKeeper with {{4lw.commands.whitelist=*}}).
> Since it's about timing, it might not work on first attempt, so you might
> need to do 4 a couple of times. However, the reproducibility rate is pretty
> high.
> The data in the topic and the big amount of partitions are not needed per se,
> only to make the timing more favourable.
> Controller re-election will solve the issue, but a new controller can be put
> in this state the same way.
> h1. Proposed solution
> TBD, suggestions are welcome.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)