kumarpritam863 commented on code in PR #15955:
URL: https://github.com/apache/iceberg/pull/15955#discussion_r3076635253


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java:
##########
@@ -92,16 +93,49 @@ boolean hasLeaderPartition(Collection<TopicPartition> 
currentAssignedPartitions)
   @VisibleForTesting
   boolean containsFirstPartition(
       Collection<MemberDescription> members, Collection<TopicPartition> 
partitions) {
-    // there should only be one task assigned partition 0 of the first topic,
-    // so elect that one the leader
-    TopicPartition firstTopicPartition =
-        members.stream()
-            .flatMap(member -> member.assignment().topicPartitions().stream())
-            .min(new TopicPartitionComparator())
-            .orElseThrow(
-                () -> new ConnectException("No partitions assigned, cannot 
determine leader"));
-
-    return partitions.contains(firstTopicPartition);
+    // Determine the first partition across all members to elect the leader
+    TopicPartition firstTopicPartition = findFirstTopicPartition(members);
+
+    if (firstTopicPartition == null) {
+      LOG.warn(
+          "Committer {} found no partitions assigned across all members, 
cannot determine leader",
+          identifier);
+      return false;
+    }
+
+    boolean containsFirst = partitions.contains(firstTopicPartition);
+    if (containsFirst) {
+      LOG.info(
+          "Committer {} contains the first partition {}, this task is the 
leader",
+          identifier,
+          firstTopicPartition);
+    } else {
+      LOG.debug(
+          "Committer {} does not contain the first partition {}, not the 
leader",
+          identifier,
+          firstTopicPartition);
+    }
+
+    return containsFirst;
+  }
+
+  /**
+   * Finds the first (minimum) topic partition across all consumer group 
members.
+   *
+   * <p>The "first" partition is determined using {@link 
TopicPartitionComparator}, which orders
+   * {@link TopicPartition} instances lexicographically by topic name and, for 
equal topics, by
+   * ascending partition number.
+   *
+   * @param members the collection of consumer group members
+   * @return the first topic partition according to {@link 
TopicPartitionComparator}, or null if no
+   *     partitions are assigned
+   */

Review Comment:
   @danielcweeks I added these for future devs contributing to have reference 
to this but will revert these as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to