lucasbru commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1472523826
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1350,21 +1321,33 @@ public Map<Uuid, SortedSet<Integer>>
currentAssignment() {
return this.currentAssignment;
}
-
/**
* @return Set of topic IDs received in a target assignment that have not
been reconciled yet
- * because topic names are not in metadata. Visible for testing.
+ * because topic names are not in metadata or reconciliation hasn't
finished. Reconciliation
+ * hasn't finished for a topic if the currently active assignment has a
different set of partitions
+ * for the topic than the target assignment.
+ *
+ * Visible for testing.
*/
- Set<Uuid> topicsWaitingForMetadata() {
- return Collections.unmodifiableSet(assignmentUnresolved.keySet());
+ Set<Uuid> topicsAwaitingReconciliation() {
+ return topicPartitionsAwaitingReconciliation().keySet();
}
/**
- * @return Topic partitions received in a target assignment that have been
resolved in
- * metadata and are ready to be reconciled. Visible for testing.
+ * @return Map of topics partitions received in a target assignment that
have not been
+ * reconciled yet because topic names are not in metadata or
reconciliation hasn't finished.
+ * The value will always contain all partitions in the target assignment.
+ *
+ * Visible for testing.
*/
- Set<TopicIdPartition> assignmentReadyToReconcile() {
- return Collections.unmodifiableSet(assignmentReadyToReconcile);
+ Map<Uuid, SortedSet<Integer>> topicPartitionsAwaitingReconciliation() {
+ final Map<Uuid, SortedSet<Integer>> topicPartitionMap = new
HashMap<>();
+ currentTargetAssignment.forEach((x, y) -> {
+ if (!currentAssignment.containsKey(x) ||
!currentAssignment.get(x).equals(y)) {
Review Comment:
Yes, it's only used for testing. This method is a replacement for
`assignmentReadyToReconcile` which was also used only in testing. You are right
that semantics were a bit weird, I implemented so that it replicates the
behavior of the (old) internal state. I changed it to have more reasonable
semantics (the value contains only the partitions that are missing from the
currently reconciled assignment). For the existing tests, there is no
difference between the two semantics.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1032,17 +1011,24 @@ private void resolveMetadataForUnresolvedAssignment() {
Optional<String> nameFromMetadata =
findTopicNameInGlobalOrLocalCache(topicId);
nameFromMetadata.ifPresent(resolvedTopicName -> {
// Name resolved, so assignment is ready for reconciliation.
- addToAssignmentReadyToReconcile(topicId, resolvedTopicName,
topicPartitions);
+ topicPartitions.forEach(tp -> {
+ TopicIdPartition topicIdPartition = new TopicIdPartition(
+ topicId,
+ new TopicPartition(resolvedTopicName, tp));
Review Comment:
Done
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -788,23 +767,28 @@ public void transitionToStale() {
}
/**
- * Reconcile the assignment that has been received from the server and for
which topic names
- * are resolved, kept in the {@link #assignmentReadyToReconcile}. This
will commit if needed,
- * trigger the callbacks and update the subscription state. Note that only
one reconciliation
+ * Reconcile the assignment that has been received from the server. If for
some topics, the
+ * topic ID cannot be matched to a topic name, a metadata update will be
triggered and only
+ * the subset of topics that are resolvable will be reconciled.
Reconciliation will trigger the
+ * callbacks and update the subscription state. Note that only one
reconciliation
* can be in progress at a time. If there is already another one in
progress when this is
* triggered, it will be no-op, and the assignment will be reconciled on
the next
* reconciliation loop.
*/
- boolean reconcile() {
+ void reconcile() {
+ if (targetAssignmentReconciled()) {
+ log.debug("Ignoring reconciliation attempt. Target assignment is
equal to the " +
+ "current assignment.");
+ return;
+ }
if (reconciliationInProgress) {
log.debug("Ignoring reconciliation attempt. Another reconciliation
is already in progress. Assignment " +
- assignmentReadyToReconcile + " will be handled in the next
reconciliation loop.");
- return false;
+ currentTargetAssignment + " will be handled in the next
reconciliation loop.");
+ return;
}
- // Make copy of the assignment to reconcile as it could change as new
assignments or metadata updates are received
- SortedSet<TopicIdPartition> assignedTopicIdPartitions = new
TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR);
- assignedTopicIdPartitions.addAll(assignmentReadyToReconcile);
+ // Resolve metadata for target assignment
+ SortedSet<TopicIdPartition> assignedTopicIdPartitions =
resolveMetadataForTargetAssignment();
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]