[ 
https://issues.apache.org/jira/browse/KAFKA-16171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16171:
---------------------------------
    Description: 
h2. Description

During the ZK migration, after KRaft becomes the active controller we enter a 
state called hybrid mode. This means we have a mixture of ZK and KRaft brokers. 
The KRaft controller updates the ZK brokers using the deprecated controller 
RPCs (LeaderAndIsr, UpdateMetadata, etc). 

 

A race condition exists where the KRaft controller will get stuck in a retry 
loop while initializing itself after a failover which prevents it from sending 
these RPCs to ZK brokers.
h2. Impact

Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK 
brokers will not receive any metadata updates. The ZK brokers will be able to 
send requests to the controller (such as AlterPartitions), but the metadata 
updates which come as a result of those requests will never be seen. This 
essentially looks like the controller is unavailable from the ZK brokers 
perspective.
h2. Detection and Mitigation

This bug can be seen by observing failed ZK writes from a recently elected 
controller.

The tell-tale error message is:
{code:java}
Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This 
indicates that another KRaft controller is making writes to ZooKeeper. {code}
with a stacktrace like:
{noformat}
java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. Expected 
zkVersion = 507823. This indicates that another KRaft controller is making 
writes to ZooKeeper.
        at 
kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613)
        at 
kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639)
        at 
kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664)
        at 
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
        at 
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
        at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
        at 
kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664)
        at 
kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158)
        at 
kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790)
        at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
        at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
        at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
        at java.base/java.lang.Thread.run(Thread.java:1583)
        at 
org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat}
To mitigate this problem, a new KRaft controller should be elected. This can be 
done by restarting the problematic active controller. To verify that the new 
controller does not encounter the race condition, look for 
{code:java}
[KRaftMigrationDriver id=9991] 9991 transitioning from SYNC_KRAFT_TO_ZK to 
KRAFT_CONTROLLER_TO_BROKER_COMM state {code}
 
h2. Details

Controller A loses leadership via Raft event (e.g., from a timeout in the Raft 
layer). A KRaftLeaderEvent is added to KRaftMigrationDriver event queue behind 
any pending MetadataChangeEvents. 

 

Controller B is elected and a KRaftLeaderEvent is added to 
KRaftMigrationDriver's queue. Since this controller is inactive, it processes 
the event immediately. This event simply loads the migration state from ZK 
(/migration) to check if the migration has been completed. This information is 
used to determine the downstream transitions in the state machine. Controller B 
passes through WAIT_FOR_ACTIVE_CONTROLLER and transitions to BECOME_CONTROLLER 
since the migration is done. While handling the BecomeZkControllerEvent, the 
controller forcibly takes ZK controller leadership by writing its ID into 
/controller and its epoch into /controller_epoch.

 

The change to /controller_epoch causes all of the pending writes on Controller 
A to fail since those writes are doing a check op on /controller_epoch as part 
of the multi-op writes to ZK. 

 

However, there is a race between Controller B loading the state in /migration 
and when it updates /controller_epoch. It is possible for Controller A to 
successfully write to ZK with its older epoch. This causes the znode version of 
/migration to increase which will cause Controller B to get stuck.

 

It is safe for the old controller to be making these writes, since we only 
dual-write committed state from KRaft (i.e., “write-behind), but this race 
causes the new controller to have a stale version of /migration. 

  was:
h2. Description

During the ZK migration, after KRaft becomes the active controller we enter a 
state called hybrid mode. This means we have a mixture of ZK and KRaft brokers. 
The KRaft controller updates the ZK brokers using the deprecated controller 
RPCs (LeaderAndIsr, UpdateMetadata, etc). 

 

A race condition exists where the KRaft controller will get stuck in a retry 
loop while initializing itself after a failover which prevents it from sending 
these RPCs to ZK brokers.
h2. Impact

Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK 
brokers will not receive any metadata updates. The ZK brokers will be able to 
send requests to the controller (such as AlterPartitions), but the metadata 
updates which come as a result of those requests will never be seen. 
h2. Detection and Mitigation

This bug can be seen by observing failed ZK writes from a recently elected 
controller.

The tell-tale error message is:
{code:java}
Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This 
indicates that another KRaft controller is making writes to ZooKeeper. {code}
with a stacktrace like:
{noformat}
java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. Expected 
zkVersion = 507823. This indicates that another KRaft controller is making 
writes to ZooKeeper.
        at 
kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613)
        at 
kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639)
        at 
kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664)
        at 
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
        at 
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
        at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
        at 
kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664)
        at 
kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158)
        at 
kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790)
        at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
        at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
        at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
        at java.base/java.lang.Thread.run(Thread.java:1583)
        at 
org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat}
To mitigate this problem, a new KRaft controller should be elected. This can be 
done by restarting the problematic active controller. To verify that the new 
controller does not encounter the race condition, look for 
{code:java}
[KRaftMigrationDriver id=9991] 9991 transitioning from SYNC_KRAFT_TO_ZK to 
KRAFT_CONTROLLER_TO_BROKER_COMM state {code}
 
h2. Details

Controller A loses leadership via Raft event (e.g., from a timeout in the Raft 
layer). A KRaftLeaderEvent is added to KRaftMigrationDriver event queue behind 
any pending MetadataChangeEvents. 

 

Controller B is elected and a KRaftLeaderEvent is added to 
KRaftMigrationDriver's queue. Since this controller is inactive, it processes 
the event immediately. This event simply loads the migration state from ZK 
(/migration) to check if the migration has been completed. This information is 
used to determine the downstream transitions in the state machine. Controller B 
passes through WAIT_FOR_ACTIVE_CONTROLLER and transitions to BECOME_CONTROLLER 
since the migration is done. While handling the BecomeZkControllerEvent, the 
controller forcibly takes ZK controller leadership by writing its ID into 
/controller and its epoch into /controller_epoch.

 

The change to /controller_epoch causes all of the pending writes on Controller 
A to fail since those writes are doing a check op on /controller_epoch as part 
of the multi-op writes to ZK. 

 

However, there is a race between Controller B loading the state in /migration 
and when it updates /controller_epoch. It is possible for Controller A to 
successfully write to ZK with its older epoch. This causes the znode version of 
/migration to increase which will cause Controller B to get stuck.

 

It is safe for the old controller to be making these writes, since we only 
dual-write committed state from KRaft (i.e., “write-behind), but this race 
causes the new controller to have a stale version of /migration. 


> Controller failover during ZK migration can prevent metadata updates to ZK 
> brokers
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-16171
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16171
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller, kraft
>    Affects Versions: 3.6.0, 3.7.0, 3.6.1
>            Reporter: David Arthur
>            Assignee: David Arthur
>            Priority: Blocker
>
> h2. Description
> During the ZK migration, after KRaft becomes the active controller we enter a 
> state called hybrid mode. This means we have a mixture of ZK and KRaft 
> brokers. The KRaft controller updates the ZK brokers using the deprecated 
> controller RPCs (LeaderAndIsr, UpdateMetadata, etc). 
>  
> A race condition exists where the KRaft controller will get stuck in a retry 
> loop while initializing itself after a failover which prevents it from 
> sending these RPCs to ZK brokers.
> h2. Impact
> Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK 
> brokers will not receive any metadata updates. The ZK brokers will be able to 
> send requests to the controller (such as AlterPartitions), but the metadata 
> updates which come as a result of those requests will never be seen. This 
> essentially looks like the controller is unavailable from the ZK brokers 
> perspective.
> h2. Detection and Mitigation
> This bug can be seen by observing failed ZK writes from a recently elected 
> controller.
> The tell-tale error message is:
> {code:java}
> Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This 
> indicates that another KRaft controller is making writes to ZooKeeper. {code}
> with a stacktrace like:
> {noformat}
> java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. 
> Expected zkVersion = 507823. This indicates that another KRaft controller is 
> making writes to ZooKeeper.
>       at 
> kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613)
>       at 
> kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639)
>       at 
> kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664)
>       at 
> scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
>       at 
> scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
>       at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
>       at 
> kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664)
>       at 
> kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158)
>       at 
> kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141)
>       at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441)
>       at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262)
>       at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64)
>       at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791)
>       at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880)
>       at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438)
>       at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>       at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436)
>       at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115)
>       at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:1583)
>       at 
> org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat}
> To mitigate this problem, a new KRaft controller should be elected. This can 
> be done by restarting the problematic active controller. To verify that the 
> new controller does not encounter the race condition, look for 
> {code:java}
> [KRaftMigrationDriver id=9991] 9991 transitioning from SYNC_KRAFT_TO_ZK to 
> KRAFT_CONTROLLER_TO_BROKER_COMM state {code}
>  
> h2. Details
> Controller A loses leadership via Raft event (e.g., from a timeout in the 
> Raft layer). A KRaftLeaderEvent is added to KRaftMigrationDriver event queue 
> behind any pending MetadataChangeEvents. 
>  
> Controller B is elected and a KRaftLeaderEvent is added to 
> KRaftMigrationDriver's queue. Since this controller is inactive, it processes 
> the event immediately. This event simply loads the migration state from ZK 
> (/migration) to check if the migration has been completed. This information 
> is used to determine the downstream transitions in the state machine. 
> Controller B passes through WAIT_FOR_ACTIVE_CONTROLLER and transitions to 
> BECOME_CONTROLLER since the migration is done. While handling the 
> BecomeZkControllerEvent, the controller forcibly takes ZK controller 
> leadership by writing its ID into /controller and its epoch into 
> /controller_epoch.
>  
> The change to /controller_epoch causes all of the pending writes on 
> Controller A to fail since those writes are doing a check op on 
> /controller_epoch as part of the multi-op writes to ZK. 
>  
> However, there is a race between Controller B loading the state in /migration 
> and when it updates /controller_epoch. It is possible for Controller A to 
> successfully write to ZK with its older epoch. This causes the znode version 
> of /migration to increase which will cause Controller B to get stuck.
>  
> It is safe for the old controller to be making these writes, since we only 
> dual-write committed state from KRaft (i.e., “write-behind), but this race 
> causes the new controller to have a stale version of /migration. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to