dajac commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1471273124
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1350,21 +1321,33 @@ public Map<Uuid, SortedSet<Integer>>
currentAssignment() {
return this.currentAssignment;
}
-
/**
* @return Set of topic IDs received in a target assignment that have not
been reconciled yet
- * because topic names are not in metadata. Visible for testing.
+ * because topic names are not in metadata or reconciliation hasn't
finished. Reconciliation
+ * hasn't finished for a topic if the currently active assignment has a
different set of partitions
+ * for the topic than the target assignment.
+ *
+ * Visible for testing.
*/
- Set<Uuid> topicsWaitingForMetadata() {
- return Collections.unmodifiableSet(assignmentUnresolved.keySet());
+ Set<Uuid> topicsAwaitingReconciliation() {
+ return topicPartitionsAwaitingReconciliation().keySet();
}
/**
- * @return Topic partitions received in a target assignment that have been
resolved in
- * metadata and are ready to be reconciled. Visible for testing.
+ * @return Map of topics partitions received in a target assignment that
have not been
+ * reconciled yet because topic names are not in metadata or
reconciliation hasn't finished.
+ * The value will always contain all partitions in the target assignment.
+ *
+ * Visible for testing.
*/
- Set<TopicIdPartition> assignmentReadyToReconcile() {
- return Collections.unmodifiableSet(assignmentReadyToReconcile);
+ Map<Uuid, SortedSet<Integer>> topicPartitionsAwaitingReconciliation() {
+ final Map<Uuid, SortedSet<Integer>> topicPartitionMap = new
HashMap<>();
+ currentTargetAssignment.forEach((x, y) -> {
+ if (!currentAssignment.containsKey(x) ||
!currentAssignment.get(x).equals(y)) {
Review Comment:
The semantic is a bit weird here. The method says that it returns the
partitions awaiting reconciliation but the method could also return partitions
reconciled if at least one partition of the same topic is not reconciled yet. I
suppose that this is what you meant by `The value will always contain all
partitions in the target assignment.`.
By the way, it seems that this method is not used in the class. Do we only
need it for testing?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -788,23 +767,28 @@ public void transitionToStale() {
}
/**
- * Reconcile the assignment that has been received from the server and for
which topic names
- * are resolved, kept in the {@link #assignmentReadyToReconcile}. This
will commit if needed,
- * trigger the callbacks and update the subscription state. Note that only
one reconciliation
+ * Reconcile the assignment that has been received from the server. If for
some topics, the
+ * topic ID cannot be matched to a topic name, a metadata update will be
triggered and only
+ * the subset of topics that are resolvable will be reconciled.
Reconciliation will trigger the
+ * callbacks and update the subscription state. Note that only one
reconciliation
* can be in progress at a time. If there is already another one in
progress when this is
* triggered, it will be no-op, and the assignment will be reconciled on
the next
* reconciliation loop.
*/
- boolean reconcile() {
+ void reconcile() {
+ if (targetAssignmentReconciled()) {
+ log.debug("Ignoring reconciliation attempt. Target assignment is
equal to the " +
+ "current assignment.");
+ return;
+ }
if (reconciliationInProgress) {
log.debug("Ignoring reconciliation attempt. Another reconciliation
is already in progress. Assignment " +
- assignmentReadyToReconcile + " will be handled in the next
reconciliation loop.");
- return false;
+ currentTargetAssignment + " will be handled in the next
reconciliation loop.");
+ return;
}
- // Make copy of the assignment to reconcile as it could change as new
assignments or metadata updates are received
- SortedSet<TopicIdPartition> assignedTopicIdPartitions = new
TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR);
- assignedTopicIdPartitions.addAll(assignmentReadyToReconcile);
+ // Resolve metadata for target assignment
+ SortedSet<TopicIdPartition> assignedTopicIdPartitions =
resolveMetadataForTargetAssignment();
Review Comment:
nit: The name of the method feels a bit weird. Ok, it indeed triggers the
metadata request but it returns the subset of the target assignment with
metadata. I wonder if we could find a better name.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1032,17 +1011,24 @@ private void resolveMetadataForUnresolvedAssignment() {
Optional<String> nameFromMetadata =
findTopicNameInGlobalOrLocalCache(topicId);
nameFromMetadata.ifPresent(resolvedTopicName -> {
// Name resolved, so assignment is ready for reconciliation.
- addToAssignmentReadyToReconcile(topicId, resolvedTopicName,
topicPartitions);
+ topicPartitions.forEach(tp -> {
+ TopicIdPartition topicIdPartition = new TopicIdPartition(
+ topicId,
+ new TopicPartition(resolvedTopicName, tp));
Review Comment:
nit: You could use the second constructor too: `TopicIdPartition(Uuid
topicId, int partition, String topic)`. It simplify a bit the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]