kumarpritam863 commented on PR #11288: URL: https://github.com/apache/iceberg/pull/11288#issuecomment-2418324422
Hi Bryan, Thank you for the review and for your valuable input. I'd like to clarify the behavior in the different rebalancing modes: In Eager mode, during any rebalance, all tasks relinquish their assigned partitions and then rejoin, ensuring that each task will receive the Open() call. One task will always be elected as the leader. However, the scenario is different in Incremental Cooperative Rebalancing (ICR) mode. In ICR, the Connect framework only calculates the affected partitions during rebalancing. The Open() call is only made if a task is assigned new partitions; otherwise, the task continues processing its existing partitions. In the [ConsumerCoordinator](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L336): [Here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L378), the added partitions are calculated. [Here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L385), the revoked partitions are determined. [Here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L402), the invokePartitionsRevoked(revokedPartitions) method is called, handling only the revoked partitions. Following this: The call proceeds to the "[HandleRebalance](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L710)" method in the WorkerSinkTask, which implements the rebalance listener. The onPartitionsRevoked() method calls "[onPartitionsRemoved](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L765)," and if the "[removedPartitions](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L780)" are empty, it returns without calling close() on the sink task. Similarly: In the invokePartitionsAssigned(addedPartitions) call from the ConsumerCoordinator, the process reaches the "[onPartitionsAssigned](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L712)" method within WorkerSinkTask. If the "added partitions" are empty in the invokePartitionsAssigned method, it returns without calling Open() on the sink task, as seen [here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L743). I hope this clarifies the behavior. I'd appreciate any further insights or thoughts you might have. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org