kumarpritam863 commented on PR #11288:
URL: https://github.com/apache/iceberg/pull/11288#issuecomment-2418324422

   Hi Bryan,
   
   Thank you for the review and for your valuable input. I'd like to clarify 
the behavior in the different rebalancing modes:
   
   In Eager mode, during any rebalance, all tasks relinquish their assigned 
partitions and then rejoin, ensuring that each task will receive the Open() 
call. One task will always be elected as the leader. However, the scenario is 
different in Incremental Cooperative Rebalancing (ICR) mode. In ICR, the 
Connect framework only calculates the affected partitions during rebalancing. 
The Open() call is only made if a task is assigned new partitions; otherwise, 
the task continues processing its existing partitions.
   
   In the 
[ConsumerCoordinator](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L336):
   
   
[Here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L378),
 the added partitions are calculated.
   
[Here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L385),
 the revoked partitions are determined.
   
[Here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L402),
 the invokePartitionsRevoked(revokedPartitions) method is called, handling only 
the revoked partitions.
   Following this:
   
   The call proceeds to the 
"[HandleRebalance](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L710)"
 method in the WorkerSinkTask, which implements the rebalance listener.
   The onPartitionsRevoked() method calls 
"[onPartitionsRemoved](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L765),"
 and if the 
"[removedPartitions](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L780)"
 are empty, it returns without calling close() on the sink task.
   Similarly:
   
   In the invokePartitionsAssigned(addedPartitions) call from the 
ConsumerCoordinator, the process reaches the 
"[onPartitionsAssigned](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L712)"
 method within WorkerSinkTask.
   If the "added partitions" are empty in the invokePartitionsAssigned method, 
it returns without calling Open() on the sink task, as seen 
[here](https://github.com/apache/kafka/blob/1de4f27ec091fdac4f05add43bf38f8c18d90d8a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L743).
   I hope this clarifies the behavior. I'd appreciate any further insights or 
thoughts you might have.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to