[ 
https://issues.apache.org/jira/browse/KAFKA-9478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048295#comment-17048295
 ] 

Ivan Yurchenko commented on KAFKA-9478:
---------------------------------------

I did some further investigation, let me explain what I found out.

When a reassignment issued previously is in progress, in both pre- and 
post-bdf2446ccce592f3c000290f11de88520327aa19 (let's call them _OLD_ and _NEW_) 
the controller doesn't act upon the new reassignment (sent with {{SET}} 
operation).

However, in _OLD_ {{/admin/reassign_partitions}} ZooKeeper node is ultimately 
deleted (despite containing the second reassignment command) and the watch on 
it is restored. The difference in the behaviour is this:
1. In _OLD_: a partition that just have been reassigned is removed from the 
in-memory set {{controllerContext.partitionsBeingReassigned}} and the result is 
written to Zk (see 
[here|https://github.com/apache/kafka/blob/3fe6b5e951db8f7184a4098f8ad8a1afb2b2c1a0/core/src/main/scala/kafka/controller/KafkaController.scala#L1083]).
2. In _NEW_: a partition that just have been reassigned is filtered out from 
{{/admin/reassign_partitions}}, which is re-read every time (see 
[here|https://github.com/apache/kafka/blob/bdf2446ccce592f3c000290f11de88520327aa19/core/src/main/scala/kafka/controller/KafkaController.scala#L1001]).

Because of this difference, in _NEW_ the second reassignment command stays 
forever (until the controller re-election) in ZooKeeper, without the controller 
taking any actions upon it, without the deletion, and without the watch being 
restored.

This is possible to solve by changing {{maybeRemoveFromZkReassignment}} 
function in the following way:

{code:scala}
    val reassigningPartitions = zkClient.getPartitionReassignment()
    val (removingPartitions, updatedPartitionsBeingReassigned) = 
reassigningPartitions.partition { case (tp, replicas) =>
      shouldRemoveReassignment(tp, replicas) || 
!controllerContext.partitionsBeingReassigned.contains(tp)
    }
{code}

i.e. by explicitly filtering out partitions that are not in 
{{partitionsBeingReassigned}}. I made a 
[PR|https://github.com/apache/kafka/pull/8196] with this change. How do you 
feel about it?


> Controller may stop react on partition reassignment command in ZooKeeper
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-9478
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9478
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller, core
>    Affects Versions: 2.4.0, 2.4.1
>            Reporter: Ivan Yurchenko
>            Assignee: Ivan Yurchenko
>            Priority: Major
>
> Seemingly after 
> [bdf2446ccce592f3c000290f11de88520327aa19|https://github.com/apache/kafka/commit/bdf2446ccce592f3c000290f11de88520327aa19],
>  the controller may stop watching {{/admin/reassign_partitions}} node in 
> ZooKeeper and consequently accept partition reassignment commands via 
> ZooKeeper.
> I'm not 100% sure that bdf2446ccce592f3c000290f11de88520327aa19 causes this, 
> but it doesn't reproduce on 
> [3fe6b5e951db8f7184a4098f8ad8a1afb2b2c1a0|https://github.com/apache/kafka/commit/3fe6b5e951db8f7184a4098f8ad8a1afb2b2c1a0]
>  - the one right before it.
> Also, reproduces on the trunk HEAD 
> [a87decb9e4df5bfa092c26ae4346f65c426f1321|https://github.com/apache/kafka/commit/a87decb9e4df5bfa092c26ae4346f65c426f1321].
> h1. How to reproduce
> 1. Run ZooKeeper and two Kafka brokers.
> 2. Create a topic with 100 partitions and place them on Broker 0:
> {code:bash}
> distro/bin/kafka-topics.sh --bootstrap-server localhost:9092,localhost:9093 
> --create \
>     --topic foo \
>     --replica-assignment $(for i in {0..99}; do echo -n "0,"; done | sed 
> 's/.$$//')
> {code}
> 3. Add some data:
> {code:bash}
> seq 1 1000000 | bin/kafka-console-producer.sh --broker-list 
> localhost:9092,localhost:9093 --topic foo
> {code}
> 4. Create the partition reassignment node {{/admin/reassign_partitions}} in 
> Zoo and shortly after that update the data in the node (even the same value 
> will do). I made a simple Python script for this:
> {code:python}
> import time
> import json
> from kazoo.client import KazooClient
> zk = KazooClient(hosts='127.0.0.1:2181')
> zk.start()
> reassign = {
>       "version": 1,
>       "partitions":[]
> }
> for p in range(100):
>       reassign["partitions"].append({"topic": "foo", "partition": p, 
> "replicas": [1]})
> zk.create("/admin/reassign_partitions", json.dumps(reassign).encode())
> time.sleep(0.05)
> zk.set("/admin/reassign_partitions", json.dumps(reassign).encode())
> {code}
> 4. Observe that the controller doesn't react on further updates to 
> {{/admin/reassign_partitions}} and doesn't delete the node.
> Also, it can be confirmed with
> {code:bash}
> echo wchc | nc 127.0.0.1 2181
> {code}
> that there is no watch on the node in ZooKeeper (for this, you should run 
> ZooKeeper with {{4lw.commands.whitelist=*}}).
> Since it's about timing, it might not work on first attempt, so you might 
> need to do 4 a couple of times. However, the reproducibility rate is pretty 
> high.
> The data in the topic and the big amount of partitions are not needed per se, 
> only to make the timing more favourable.
> Controller re-election will solve the issue, but a new controller can be put 
> in this state the same way.
> h1. Proposed solution
> TBD, suggestions are welcome.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to