lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521750747
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -508,9 +508,30 @@ private void
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
*/
private void replaceTargetAssignmentWithNewAssignment(
ConsumerGroupHeartbeatResponseData.Assignment assignment) {
- currentTargetAssignment.clear();
+
Review Comment:
1. I did not do it, because the `LocalAssignment` is leaked from this file
via the `currentAssignment` method, and I didn't necessarily want to put so
much logic in the public interface `MembershipManager`. However, I think I
could possibly define the return value of `currentAssignment` by a light
interface, and then put the fat class with all the updating logic in here. I'll
give it a try.
2. I wouldn't necessarily call it `EMPTY` (to avoid confusion with an empty
assignment), but rather `NONE` or something, but other than that it sounds like
a good idea.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData
buildRequestData() {
}
});
- // RebalanceTimeoutMs - only sent if has changed since the last
heartbeat
- if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+ // RebalanceTimeoutMs - only sent when joining or if has changed
since the last heartbeat
+ if (membershipManager.memberEpoch() == 0 ||
sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
Review Comment:
true, this can be simplified
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData
buildRequestData() {
// ClientAssignors - not supported yet
- // TopicPartitions - only sent if it has changed since the last
heartbeat. Note that
- // the string consists of just the topic ID and the partitions.
When an assignment is
- // received, we might not yet know the topic name, and then it is
learnt subsequently
- // by a metadata update.
- TreeSet<String> assignedPartitions =
membershipManager.currentAssignment().entrySet().stream()
- .map(entry -> entry.getKey() + "-" + entry.getValue())
- .collect(Collectors.toCollection(TreeSet::new));
- if (!assignedPartitions.equals(sentFields.topicPartitions)) {
+ // TopicPartitions - sent with the first heartbeat after a new
assignment from the server was
+ // reconciled. This is ensured by resending the topic partitions
whenever the local assignment,
+ // including its local epoch is changed (although the local epoch
is not sent in the heartbeat).
+ LocalAssignment local = membershipManager.currentAssignment();
+ if (local == null) {
+ data.setTopicPartitions(Collections.emptyList());
+ sentFields.topicPartitions = null;
+ } else if (!local.equals(sentFields.topicPartitions)) {
Review Comment:
That's what is being done. Maybe I should have renamed the field
`sentFields.topicPartitions` to `sentFields.localAssignment`, but the type is
now `LocalAssignment` and the `equals` compares the local epoch as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]