dajac commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521552751
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData
buildRequestData() {
}
});
- // RebalanceTimeoutMs - only sent if has changed since the last
heartbeat
- if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+ // RebalanceTimeoutMs - only sent when joining or if has changed
since the last heartbeat
+ if (membershipManager.memberEpoch() == 0 ||
sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
Review Comment:
If not mistaken, the `rebalanceTimeoutMs` timeout is a static config so we
could actually set it only in when joining the group (when epoch == 0).
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -508,9 +508,30 @@ private void
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
*/
private void replaceTargetAssignmentWithNewAssignment(
ConsumerGroupHeartbeatResponseData.Assignment assignment) {
- currentTargetAssignment.clear();
+
Review Comment:
I have to high level thoughts but I am not sure whether they are worth it:
1) Have you considered moving all the update logic into `LocalAssignment`?
We could have a method such a `updateWith(Assignment)` which returns an
`Optional` containing the new assignment if it was updated.
2) On a similar line, I wonder if we could have an `EMPTY` constant for the
default `LocalAssignment(-1, null)` instead of relying on `null`. The reasoning
of this one is that it avoids having to deal with `null` in a few places in
this file.
What do you think?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData
buildRequestData() {
// ClientAssignors - not supported yet
- // TopicPartitions - only sent if it has changed since the last
heartbeat. Note that
- // the string consists of just the topic ID and the partitions.
When an assignment is
- // received, we might not yet know the topic name, and then it is
learnt subsequently
- // by a metadata update.
- TreeSet<String> assignedPartitions =
membershipManager.currentAssignment().entrySet().stream()
- .map(entry -> entry.getKey() + "-" + entry.getValue())
- .collect(Collectors.toCollection(TreeSet::new));
- if (!assignedPartitions.equals(sentFields.topicPartitions)) {
+ // TopicPartitions - sent with the first heartbeat after a new
assignment from the server was
+ // reconciled. This is ensured by resending the topic partitions
whenever the local assignment,
+ // including its local epoch is changed (although the local epoch
is not sent in the heartbeat).
+ LocalAssignment local = membershipManager.currentAssignment();
+ if (local == null) {
+ data.setTopicPartitions(Collections.emptyList());
+ sentFields.topicPartitions = null;
+ } else if (!local.equals(sentFields.topicPartitions)) {
Review Comment:
Don't we need to also take the assignment epoch into consideration here? In
other words, should we store the LocalAssignment in `sentFields` and use it to
do the comparison?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]