kumarpritam863 opened a new pull request, #11288: URL: https://github.com/apache/iceberg/pull/11288
**PROBLEMS:** The **open** and **close** method of a sink task receives the **delta** of **partition**. Like in case of **open(Collection partitions)**, partitions are not the complete set of partition, it is always the **new set of partitions** that this task is getting as part of the rebalance. So consider this case of Task0. **Current Owned** = [TP0, TP1] **After Rebalance:** **Current Owned** = [TP0, TP1, TP2, TP3] So in this case **open** will be called with **Collection<TopicPartition>(TP2, TP3)**. In case of **"Eager mode"** of rebalancing with **"Round Robin"** assignment of partitions, this corresponds to the total assigned as Eager mode leads to **"Stop the World"** scenario where every task and the corresponding consumers first revoke all the partitions and every task leaves and then rejoins. But in case of **"Incremental CO-OPERATIVE Rebalancing"** with **"Co-operative Sticky Assignor"** there can be frequent scenarios when "**partitions**" is **not** total Partitions. **How this is leading to NO-Coordinator Scenario:** **Consider this situation:** Initially we had one worker "**W0**" with two tasks "**T0**" and "**T1**" consuming from two topicPartitions "**TP0**" and "**TP1**", so the initial configuration is: **W0 -> [{T0,TP0}, {T1, TP1}]** -> this will elect "**T0**" as the **co-ordinator** as it has "**TP0**" Now another worker "**W1**" joins:-> this will lead to rebalancing og both tasks as well as topic partitions within those tasks. First "**T0**" will be closed on "**W0**" and for that time "**TP0**" will be assigned to "**T1**". What will happen in this step: a. As close() is called on "**T0**", it will lead to closing of **co-ordinator** on "**T0**". b. As "**TP0**" was assigned to "**T1**", now "**T1**" is the co-ordinator due to "**TP0**". Now since task "**T0**" was moved to "**W1**", a **partition rebalance** will occur and any of the partition "**TP0**" or "**TP1**" could be moved from "**T1**" to "**T0**". In case "**TP1**" is moved from "**T1**" to "**T0**", which is almost always the case. This will lead to the following. a. A close with "**TP1**" will be called on "**T1**", but since we are blindly **closing the co-ordinator**, **co-ordinator** on "**T1**" will be **closed**. b. No since "**T1**" will get "**TP1**", but it will not be elected co-ordinator as it is not getting "**TP0**". Hence, we will end in a **no-co-ordinator scenario**, with workers waiting forever for **START_COMMIT** request leading to a **hung state**. The other problem with the current handling of **open** and **close** is that even in the world of **"Incremental CO-OPERATIVE Rebalancing"** we are not utilising the benefit kafka is trying to provide us with a **NO "STOP-THE-WORLD"** scenario, as we are blindly **throwing away** all the data read from the **TOPIC_PARTITIONS** which is still **owned** by the current task. **SOLUTIONS:** We have two solutions for this situation. One is short term fix to avoid no-cordinator scenario but it will still not solve the second problem. 1. Calling a **dummy Open(Empty list)** from the **close** if it had any partition left with it. This will ensure the the co-ordinator is started and we are not in a hung state. But this will still not solve the second problem as we are blindly closing everything on the close call. 2. The more evolved and efficient approach [for which we have the **Design and Implementation ready**], so we can solve both the problems simultaneously and other problems also. I will share the design and the implementation PR very soon for that. **The current PR implements the first solution.** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org