kumarpritam863 commented on PR #11288: URL: https://github.com/apache/iceberg/pull/11288#issuecomment-2418300052
Thanks @bryanck for the review. In the **Eager** mode, since on any rebalance all the tasks first leave all the partitions that they have and then they rejoin hence in this case it will always be the case that all the task will necessarily get the "Open()" call always and one of them will be elected as leader. But this is not the case in the **Incremental Cooperative Mode (ICR)**. In ICR, on rebalance Connect Framework calculates only the affected partitions and calls open only if the task is assigned any partition other wise that task continues with performing its job with the partitions it already has. In the [Consumer Coordinator](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L336): - [Here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L378) first the addedPartitions are calculated. - [Here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L385) the revoked partitions are caluculated - [Here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L402) the consumer co-ordinator calls "invokePartitionsRevoked(revokedPartitions)" with only the revoked partitions. - Once "invokePartitionsRevoked(revokedPartitions)" is called, the call comes to the "[HandleRebalance](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L710)" of WorkerSinkTask which has implemented the Rebalance Listener to listen to all the changes. - "OnPartitionsRevoked" calls "[onPartitionsRemoved](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L765)" which checks if the "[removedPartitions](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L780)" are empty then it simply returns without calling "close()" on the sink task. - Similarly in The ConsumerCoordinator calls "[invokePartitionsAssigned(addedPartitions)"](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L425), on which the call lands "[onPartitionsAssigned()](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L712)" of Handle Rebalance of Worker SinkTask. - The Worker Sink Task "invokePartitionsAssigned" checks [here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L743) if the "added partitions" are empty then this also simply returns without calling "open" on the sink task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org