dajac commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1471273124


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1350,21 +1321,33 @@ public Map<Uuid, SortedSet<Integer>> 
currentAssignment() {
         return this.currentAssignment;
     }
 
-
     /**
      * @return Set of topic IDs received in a target assignment that have not 
been reconciled yet
-     * because topic names are not in metadata. Visible for testing.
+     * because topic names are not in metadata or reconciliation hasn't 
finished. Reconciliation
+     * hasn't finished for a topic if the currently active assignment has a 
different set of partitions
+     * for the topic than the target assignment.
+     *
+     * Visible for testing.
      */
-    Set<Uuid> topicsWaitingForMetadata() {
-        return Collections.unmodifiableSet(assignmentUnresolved.keySet());
+    Set<Uuid> topicsAwaitingReconciliation() {
+        return topicPartitionsAwaitingReconciliation().keySet();
     }
 
     /**
-     * @return Topic partitions received in a target assignment that have been 
resolved in
-     * metadata and are ready to be reconciled. Visible for testing.
+     * @return Map of topics partitions received in a target assignment that 
have not been
+     * reconciled yet because topic names are not in metadata or 
reconciliation hasn't finished.
+     * The value will always contain all partitions in the target assignment.
+     *
+     * Visible for testing.
      */
-    Set<TopicIdPartition> assignmentReadyToReconcile() {
-        return Collections.unmodifiableSet(assignmentReadyToReconcile);
+    Map<Uuid, SortedSet<Integer>> topicPartitionsAwaitingReconciliation() {
+        final Map<Uuid, SortedSet<Integer>> topicPartitionMap = new 
HashMap<>();
+        currentTargetAssignment.forEach((x, y) -> {
+            if (!currentAssignment.containsKey(x) || 
!currentAssignment.get(x).equals(y)) {

Review Comment:
   The semantic is a bit weird here. The method says that it returns the 
partitions awaiting reconciliation but the method could also return partitions 
reconciled if at least one partition of the same topic is not reconciled yet. I 
suppose that this is what you meant by `The value will always contain all 
partitions in the target assignment.`.
   
   By the way, it seems that this method is not used in the class. Do we only 
need it for testing?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -788,23 +767,28 @@ public void transitionToStale() {
     }
 
     /**
-     * Reconcile the assignment that has been received from the server and for 
which topic names
-     * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
-     * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+     * Reconcile the assignment that has been received from the server. If for 
some topics, the
+     * topic ID cannot be matched to a topic name, a metadata update will be 
triggered and only
+     * the subset of topics that are resolvable will be reconciled. 
Reconciliation will trigger the
+     * callbacks and update the subscription state. Note that only one 
reconciliation
      * can be in progress at a time. If there is already another one in 
progress when this is
      * triggered, it will be no-op, and the assignment will be reconciled on 
the next
      * reconciliation loop.
      */
-    boolean reconcile() {
+    void reconcile() {
+        if (targetAssignmentReconciled()) {
+            log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+                    "current assignment.");
+            return;
+        }
         if (reconciliationInProgress) {
             log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
-                    assignmentReadyToReconcile + " will be handled in the next 
reconciliation loop.");
-            return false;
+                currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
+            return;
         }
 
-        // Make copy of the assignment to reconcile as it could change as new 
assignments or metadata updates are received
-        SortedSet<TopicIdPartition> assignedTopicIdPartitions = new 
TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR);
-        assignedTopicIdPartitions.addAll(assignmentReadyToReconcile);
+        // Resolve metadata for target assignment
+        SortedSet<TopicIdPartition> assignedTopicIdPartitions = 
resolveMetadataForTargetAssignment();

Review Comment:
   nit: The name of the method feels a bit weird. Ok, it indeed triggers the 
metadata request but it returns the subset of the target assignment with 
metadata. I wonder if we could find a better name.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1032,17 +1011,24 @@ private void resolveMetadataForUnresolvedAssignment() {
             Optional<String> nameFromMetadata = 
findTopicNameInGlobalOrLocalCache(topicId);
             nameFromMetadata.ifPresent(resolvedTopicName -> {
                 // Name resolved, so assignment is ready for reconciliation.
-                addToAssignmentReadyToReconcile(topicId, resolvedTopicName, 
topicPartitions);
+                topicPartitions.forEach(tp -> {
+                    TopicIdPartition topicIdPartition = new TopicIdPartition(
+                        topicId,
+                        new TopicPartition(resolvedTopicName, tp));

Review Comment:
   nit: You could use the second constructor too: `TopicIdPartition(Uuid 
topicId, int partition, String topic)`. It simplify a bit the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to