kumarpritam863 commented on PR #11288:
URL: https://github.com/apache/iceberg/pull/11288#issuecomment-2418300052

   Thanks @bryanck for the review. In the **Eager** mode, since on any 
rebalance all the tasks first leave all the partitions that they have and then 
they rejoin hence in this case it will always be the case that all the task 
will necessarily get the "Open()" call always and one of them will be elected 
as leader. But this is not the case in the **Incremental Cooperative Mode 
(ICR)**. In ICR, on rebalance Connect Framework calculates only the affected 
partitions and calls open only if the task is assigned any partition other wise 
that task continues with performing its job with the partitions it already has.
   
   In the [Consumer 
Coordinator](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L336):
   - 
[Here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L378)
 first the addedPartitions are calculated.
   - 
[Here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L385)
 the revoked partitions are caluculated
   - 
[Here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L402)
 the consumer co-ordinator calls "invokePartitionsRevoked(revokedPartitions)" 
with only the revoked partitions.
   - Once "invokePartitionsRevoked(revokedPartitions)" is called, the call 
comes to the 
"[HandleRebalance](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L710)"
 of WorkerSinkTask which has implemented the Rebalance Listener to listen to 
all the changes.
   - "OnPartitionsRevoked" calls 
"[onPartitionsRemoved](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L765)"
 which checks if the 
"[removedPartitions](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L780)"
 are empty then it simply returns without calling "close()" on the sink task.
   - Similarly in The ConsumerCoordinator calls 
"[invokePartitionsAssigned(addedPartitions)"](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L425),
 on which the call lands 
"[onPartitionsAssigned()](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L712)"
 of Handle Rebalance of Worker SinkTask.
   - The Worker Sink Task "invokePartitionsAssigned" checks 
[here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L743)
 if the "added partitions" are empty then this also simply returns without 
calling "open" on the sink task.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to