dajac commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1553499653
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2415,6 +2415,20 @@ private CoordinatorResult<Void, Record>
classicGroupJoinExistingMember(
return EMPTY_RESULT;
}
+ /**
+ * An overload of {@link
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+ * timeout operation. It additionally looks up the group by the id and
checks the group type.
+ * completeClassicGroupJoin will only be called if the group is CLASSIC.
+ */
+ private CoordinatorResult<Void, Record> completeClassicGroupJoin(String
groupId) {
+ if (containsClassicGroup(groupId)) {
+ return
completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false));
Review Comment:
I am not a fan of this pattern because you effectively have to look up the
group twice. One option would be to use a try..catch to catch the exception
thrown by getOrMaybeCreateClassicGroup. Another option would be to 1) do the
lookup, 2) verify non-null and group type and return if it fails.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2415,6 +2415,20 @@ private CoordinatorResult<Void, Record>
classicGroupJoinExistingMember(
return EMPTY_RESULT;
}
+ /**
+ * An overload of {@link
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+ * timeout operation. It additionally looks up the group by the id and
checks the group type.
+ * completeClassicGroupJoin will only be called if the group is CLASSIC.
+ */
+ private CoordinatorResult<Void, Record> completeClassicGroupJoin(String
groupId) {
+ if (containsClassicGroup(groupId)) {
+ return
completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false));
+ } else {
+ log.info("Group {} is null or not a classic group, skipping
rebalance stage.", groupId);
Review Comment:
I wonder if we could use `debug` here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2805,31 +2826,36 @@ private CoordinatorResult<Void, Record>
maybeCompleteJoinElseSchedule(
* Try to complete the join phase of the initial rebalance.
* Otherwise, extend the rebalance.
*
- * @param group The group under initial rebalance.
+ * @param groupId The group under initial rebalance.
*
* @return The coordinator result that will be appended to the log.
*/
private CoordinatorResult<Void, Record>
tryCompleteInitialRebalanceElseSchedule(
- ClassicGroup group,
+ String groupId,
int delayMs,
int remainingMs
) {
- if (group.newMemberAdded() && remainingMs != 0) {
- // A new member was added. Extend the delay.
- group.setNewMemberAdded(false);
- int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs,
remainingMs);
- int newRemainingMs = Math.max(remainingMs - delayMs, 0);
-
- timer.schedule(
- classicGroupJoinKey(group.groupId()),
- newDelayMs,
- TimeUnit.MILLISECONDS,
- false,
- () -> tryCompleteInitialRebalanceElseSchedule(group,
newDelayMs, newRemainingMs)
- );
+ if (containsClassicGroup(groupId)) {
Review Comment:
ditto.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2533,45 +2547,52 @@ private void schedulePendingSync(ClassicGroup group) {
group.rebalanceTimeoutMs(),
TimeUnit.MILLISECONDS,
false,
- () -> expirePendingSync(group, group.generationId()));
+ () -> expirePendingSync(group.groupId(), group.generationId()));
}
/**
* Invoked when the heartbeat operation is expired from the timer.
Possibly remove the member and
* try complete the join phase.
*
- * @param group The group.
+ * @param groupId The group id.
* @param memberId The member id.
*
* @return The coordinator result that will be appended to the log.
*/
private CoordinatorResult<Void, Record> expireClassicGroupMemberHeartbeat(
- ClassicGroup group,
+ String groupId,
String memberId
) {
- if (group.isInState(DEAD)) {
- log.info("Received notification of heartbeat expiration for member
{} after group {} " +
- "had already been unloaded or deleted.",
- memberId, group.groupId());
- } else if (group.isPendingMember(memberId)) {
- log.info("Pending member {} in group {} has been removed after
session timeout expiration.",
- memberId, group.groupId());
-
- return removePendingMemberAndUpdateClassicGroup(group, memberId);
- } else if (!group.hasMemberId(memberId)) {
- log.debug("Member {} has already been removed from the group.",
memberId);
- } else {
- ClassicGroupMember member = group.member(memberId);
- if (!member.hasSatisfiedHeartbeat()) {
- log.info("Member {} in group {} has failed, removing it from
the group.",
- member.memberId(), group.groupId());
+ if (containsClassicGroup(groupId)) {
+ ClassicGroup group = getOrMaybeCreateClassicGroup(groupId, false);
+ if (group.isInState(DEAD)) {
+ log.info("Received notification of heartbeat expiration for
member {} after group {} " +
+ "had already been unloaded or deleted.",
+ memberId, group.groupId());
+ } else if (group.isPendingMember(memberId)) {
+ log.info("Pending member {} in group {} has been removed after
session timeout expiration.",
+ memberId, group.groupId());
+
+ return removePendingMemberAndUpdateClassicGroup(group,
memberId);
+ } else if (!group.hasMemberId(memberId)) {
+ log.debug("Member {} has already been removed from the
group.", memberId);
+ } else {
+ ClassicGroupMember member = group.member(memberId);
+ if (!member.hasSatisfiedHeartbeat()) {
+ log.info("Member {} in group {} has failed, removing it
from the group.",
+ member.memberId(), group.groupId());
- return removeMemberAndUpdateClassicGroup(
- group,
- member,
- "removing member " + member.memberId() + " on heartbeat
expiration."
- );
+ return removeMemberAndUpdateClassicGroup(
+ group,
+ member,
+ "removing member " + member.memberId() + " on
heartbeat expiration."
+ );
+ }
}
+ } else {
+ log.info("Received notification of heartbeat expiration for member
{} after group {} " +
Review Comment:
nit: debug?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2533,45 +2547,52 @@ private void schedulePendingSync(ClassicGroup group) {
group.rebalanceTimeoutMs(),
TimeUnit.MILLISECONDS,
false,
- () -> expirePendingSync(group, group.generationId()));
+ () -> expirePendingSync(group.groupId(), group.generationId()));
}
/**
* Invoked when the heartbeat operation is expired from the timer.
Possibly remove the member and
* try complete the join phase.
*
- * @param group The group.
+ * @param groupId The group id.
* @param memberId The member id.
*
* @return The coordinator result that will be appended to the log.
*/
private CoordinatorResult<Void, Record> expireClassicGroupMemberHeartbeat(
- ClassicGroup group,
+ String groupId,
String memberId
) {
- if (group.isInState(DEAD)) {
- log.info("Received notification of heartbeat expiration for member
{} after group {} " +
- "had already been unloaded or deleted.",
- memberId, group.groupId());
- } else if (group.isPendingMember(memberId)) {
- log.info("Pending member {} in group {} has been removed after
session timeout expiration.",
- memberId, group.groupId());
-
- return removePendingMemberAndUpdateClassicGroup(group, memberId);
- } else if (!group.hasMemberId(memberId)) {
- log.debug("Member {} has already been removed from the group.",
memberId);
- } else {
- ClassicGroupMember member = group.member(memberId);
- if (!member.hasSatisfiedHeartbeat()) {
- log.info("Member {} in group {} has failed, removing it from
the group.",
- member.memberId(), group.groupId());
+ if (containsClassicGroup(groupId)) {
Review Comment:
nit: Same comment here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2954,38 +2980,42 @@ private void removeSyncExpiration(ClassicGroup group) {
/**
* Expire pending sync.
*
- * @param group The group.
+ * @param groupId The group id.
* @param generationId The generation when the pending sync was
originally scheduled.
*
* @return The coordinator result that will be appended to the log.
* */
private CoordinatorResult<Void, Record> expirePendingSync(
- ClassicGroup group,
+ String groupId,
int generationId
) {
- if (generationId != group.generationId()) {
- log.error("Received unexpected notification of sync expiration for
{} with an old " +
- "generation {} while the group has {}.", group.groupId(),
generationId, group.generationId());
- } else {
- if (group.isInState(DEAD) || group.isInState(EMPTY) ||
group.isInState(PREPARING_REBALANCE)) {
- log.error("Received unexpected notification of sync expiration
after group {} already " +
- "transitioned to {} state.", group.groupId(),
group.stateAsString());
- } else if (group.isInState(COMPLETING_REBALANCE) ||
group.isInState(STABLE)) {
- if (!group.hasReceivedSyncFromAllMembers()) {
- Set<String> pendingSyncMembers = new
HashSet<>(group.allPendingSyncMembers());
- pendingSyncMembers.forEach(memberId -> {
- group.remove(memberId);
- timer.cancel(classicGroupHeartbeatKey(group.groupId(),
memberId));
- });
-
- log.debug("Group {} removed members who haven't sent their
sync requests: {}",
- group.groupId(), pendingSyncMembers);
-
- return prepareRebalance(group, "Removing " +
pendingSyncMembers + " on pending sync request expiration");
+ if (containsClassicGroup(groupId)) {
Review Comment:
ditto.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2805,31 +2826,36 @@ private CoordinatorResult<Void, Record>
maybeCompleteJoinElseSchedule(
* Try to complete the join phase of the initial rebalance.
* Otherwise, extend the rebalance.
*
- * @param group The group under initial rebalance.
+ * @param groupId The group under initial rebalance.
*
* @return The coordinator result that will be appended to the log.
*/
private CoordinatorResult<Void, Record>
tryCompleteInitialRebalanceElseSchedule(
- ClassicGroup group,
+ String groupId,
int delayMs,
int remainingMs
) {
- if (group.newMemberAdded() && remainingMs != 0) {
- // A new member was added. Extend the delay.
- group.setNewMemberAdded(false);
- int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs,
remainingMs);
- int newRemainingMs = Math.max(remainingMs - delayMs, 0);
-
- timer.schedule(
- classicGroupJoinKey(group.groupId()),
- newDelayMs,
- TimeUnit.MILLISECONDS,
- false,
- () -> tryCompleteInitialRebalanceElseSchedule(group,
newDelayMs, newRemainingMs)
- );
+ if (containsClassicGroup(groupId)) {
+ ClassicGroup group = getOrMaybeCreateClassicGroup(groupId, false);
+ if (group.newMemberAdded() && remainingMs != 0) {
+ // A new member was added. Extend the delay.
+ group.setNewMemberAdded(false);
+ int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs,
remainingMs);
+ int newRemainingMs = Math.max(remainingMs - delayMs, 0);
+
+ timer.schedule(
+ classicGroupJoinKey(group.groupId()),
+ newDelayMs,
+ TimeUnit.MILLISECONDS,
+ false,
+ () ->
tryCompleteInitialRebalanceElseSchedule(group.groupId(), newDelayMs,
newRemainingMs)
+ );
+ } else {
+ // No more time remaining. Complete the join phase.
+ return completeClassicGroupJoin(group);
+ }
} else {
- // No more time remaining. Complete the join phase.
- return completeClassicGroupJoin(group);
+ log.info("Group {} is null or not a classic group, skipping the
initial rebalance stage.", groupId);
Review Comment:
ditto.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]