dajac commented on a change in pull request #9958:
URL: https://github.com/apache/kafka/pull/9958#discussion_r569206619
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -185,9 +185,9 @@ class GroupCoordinator(val brokerId: Int,
group.remove(memberId)
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID,
Errors.GROUP_MAX_SIZE_REACHED))
} else if (isUnknownMember) {
- doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId,
clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType,
protocols, responseCallback)
+ doNewMemberJoinGroup(group, groupInstanceId,
requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, protocols, responseCallback)
Review comment:
nit: Could we break this long line and the one below as well?
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
offsetMetadata: immutable.Map[TopicPartition,
OffsetAndMetadata],
responseCallback:
immutable.Map[TopicPartition, Errors] => Unit): Unit = {
group.inLock {
- if (group.is(Dead)) {
- // if the group is marked as dead, it means some other thread has just
removed the group
- // from the coordinator metadata; it is likely that the group has
migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the
member retry
- // finding the correct coordinator and rejoin.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.COORDINATOR_NOT_AVAILABLE })
- } else if (group.isStaticMemberFenced(memberId, groupInstanceId,
"txn-commit-offsets")) {
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.FENCED_INSTANCE_ID })
- } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID &&
!group.has(memberId)) {
- // Enforce member id when it is set.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.UNKNOWN_MEMBER_ID })
- } else if (generationId >= 0 && generationId != group.generationId) {
- // Enforce generation check when it is set.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.ILLEGAL_GENERATION })
+ val validationErrorOpt = validateTxnOffsetCommit(
+ group,
+ generationId,
+ memberId,
+ groupInstanceId
+ )
+
+ if (validationErrorOpt.isDefined) {
+ responseCallback(offsetMetadata.map { case (k, _) => k ->
validationErrorOpt.get })
Review comment:
nit: I would use `match` here as well.
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -220,67 +222,163 @@ class GroupCoordinator(val brokerId: Int,
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID,
Errors.INCONSISTENT_GROUP_PROTOCOL))
} else {
val newMemberId = group.generateMemberId(clientId, groupInstanceId)
+ groupInstanceId match {
+ case Some(instanceId) =>
+ doStaticNewMemberJoinGroup(
+ group,
+ instanceId,
+ newMemberId,
+ clientId,
+ clientHost,
+ rebalanceTimeoutMs,
+ sessionTimeoutMs,
+ protocolType,
+ protocols,
+ responseCallback
+ )
- if (group.hasStaticMember(groupInstanceId)) {
- updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId,
protocols, responseCallback)
- } else if (requireKnownMemberId) {
- // If member id required (dynamic membership), register the member
in the pending member list
- // and send back a response to call for another join group request
with allocated member id.
- debug(s"Dynamic member with unknown member id joins group
${group.groupId} in " +
- s"${group.currentState} state. Created a new member id
$newMemberId and request the member to rejoin with this id.")
- group.addPendingMember(newMemberId)
- addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
- responseCallback(JoinGroupResult(newMemberId,
Errors.MEMBER_ID_REQUIRED))
- } else {
- info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"}
Member with unknown member id joins group ${group.groupId} in " +
- s"${group.currentState} state. Created a new member id
$newMemberId for this member and add to the group.")
- addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs,
newMemberId, groupInstanceId,
- clientId, clientHost, protocolType, protocols, group,
responseCallback)
+ case None =>
+ doDynamicNewMemberJoinGroup(
+ group,
+ requireKnownMemberId,
+ newMemberId,
+ clientId,
+ clientHost,
+ rebalanceTimeoutMs,
+ sessionTimeoutMs,
+ protocolType,
+ protocols,
+ responseCallback
+ )
}
}
}
}
- private def doJoinGroup(group: GroupMetadata,
- memberId: String,
- groupInstanceId: Option[String],
- clientId: String,
- clientHost: String,
- rebalanceTimeoutMs: Int,
- sessionTimeoutMs: Int,
- protocolType: String,
- protocols: List[(String, Array[Byte])],
- responseCallback: JoinCallback): Unit = {
+ private def doStaticNewMemberJoinGroup(
+ group: GroupMetadata,
+ groupInstanceId: String,
+ newMemberId: String,
+ clientId: String,
+ clientHost: String,
+ rebalanceTimeoutMs: Int,
+ sessionTimeoutMs: Int,
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ responseCallback: JoinCallback
+ ): Unit = {
+ group.currentStaticMemberId(groupInstanceId) match {
+ case Some(oldMemberId) =>
+ info(s"Static member with groupInstanceId=$groupInstanceId and unknown
member id joins " +
+ s"group ${group.groupId} in ${group.currentState} state. Replacing
previously mapped " +
+ s"member $oldMemberId with this groupInstanceId.")
+ updateStaticMemberAndRebalance(group, newMemberId, oldMemberId,
groupInstanceId, protocols, responseCallback)
+
+ case None =>
+ info(s"Static member with groupInstanceId=$groupInstanceId and unknown
member id joins " +
+ s"group ${group.groupId} in ${group.currentState} state. Created a
new member id $newMemberId " +
+ s"for this member and add to the group.")
+ addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs,
newMemberId, Some(groupInstanceId),
+ clientId, clientHost, protocolType, protocols, group,
responseCallback)
+ }
+ }
+
+ private def doDynamicNewMemberJoinGroup(
+ group: GroupMetadata,
+ requireKnownMemberId: Boolean,
+ newMemberId: String,
+ clientId: String,
+ clientHost: String,
+ rebalanceTimeoutMs: Int,
+ sessionTimeoutMs: Int,
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ responseCallback: JoinCallback
+ ): Unit = {
+ if (requireKnownMemberId) {
+ // If member id required, register the member in the pending member list
and send
+ // back a response to call for another join group request with allocated
member id.
+ info(s"Dynamic member with unknown member id joins group
${group.groupId} in " +
+ s"${group.currentState} state. Created a new member id $newMemberId
and request the " +
+ s"member to rejoin with this id.")
+ group.addPendingMember(newMemberId)
+ addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
+ responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
+ } else {
+ info(s"Dynamic Member with unknown member id joins group
${group.groupId} in " +
+ s"${group.currentState} state. Created a new member id $newMemberId
for this member " +
+ s"and add to the group.")
+ addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId,
None,
+ clientId, clientHost, protocolType, protocols, group, responseCallback)
+ }
+ }
+
+ private def validateCurrentMember(
+ group: GroupMetadata,
+ memberId: String,
+ groupInstanceId: Option[String],
+ operation: String
+ ): Option[Errors] = {
+ // We are validating two things:
+ // 1. If `groupInstanceId` is present, then it exists and is mapped to
`memberId`
+ // 2. The `memberId` exists in the group
+
Review comment:
nit: This empty line could be removed.
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -603,6 +731,25 @@ class GroupCoordinator(val brokerId: Int,
groupError -> partitionErrors
}
+ private def validateHeartbeat(
+ group: GroupMetadata,
+ generationId: Int,
+ memberId: String,
+ groupInstanceId: Option[String]
+ ): Option[Errors] = {
+ if (group.is(Dead)) {
+ Some(Errors.COORDINATOR_NOT_AVAILABLE)
+ } else {
+ validateCurrentMember(group, memberId, groupInstanceId,
"sync-group").orElse {
Review comment:
`heartbeat` instead of `sync-group` should be used here.
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1017,26 +1198,17 @@ class GroupCoordinator(val brokerId: Int,
// for new members. If the new member is still there, we expect it to
retry.
completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
- if (member.isStaticMember) {
- info(s"Adding new static member $groupInstanceId to group
${group.groupId} with member id $memberId.")
- group.addStaticMember(groupInstanceId, memberId)
- } else {
- group.removePendingMember(memberId)
- }
maybePrepareRebalance(group, s"Adding new member $memberId with group
instance id $groupInstanceId")
}
private def updateStaticMemberAndRebalance(group: GroupMetadata,
newMemberId: String,
- groupInstanceId: Option[String],
+ oldMemberId: String,
Review comment:
nit: I would put `oldMemberId` before `newMemberId`. It feels a bit more
natural when reading it.
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1055,11 +1227,15 @@ class GroupCoordinator(val brokerId: Int,
val groupAssignment: Map[String, Array[Byte]] =
group.allMemberMetadata.map(member => member.memberId ->
member.assignment).toMap
groupManager.storeGroup(group, groupAssignment, error => {
if (error != Errors.NONE) {
+
+ // TODO: This logic seems questionable. The write was not
committed, but that doesn't
+ // mean it wasn't written to the log and cannot eventually
become committed.
Review comment:
I do agree. Should we turn this onto a Jira and address it in a follow
up PR?
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
##########
@@ -247,11 +250,15 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
if (leaderId.isEmpty)
leaderId = Some(member.memberId)
+
members.put(member.memberId, member)
member.supportedProtocols.foreach{ case (protocol, _) =>
supportedProtocols(protocol) += 1 }
Review comment:
nit: Not related to your PR but a space is missing before `{`.
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
offsetMetadata: immutable.Map[TopicPartition,
OffsetAndMetadata],
responseCallback:
immutable.Map[TopicPartition, Errors] => Unit): Unit = {
group.inLock {
- if (group.is(Dead)) {
- // if the group is marked as dead, it means some other thread has just
removed the group
- // from the coordinator metadata; it is likely that the group has
migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the
member retry
- // finding the correct coordinator and rejoin.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.COORDINATOR_NOT_AVAILABLE })
- } else if (group.isStaticMemberFenced(memberId, groupInstanceId,
"txn-commit-offsets")) {
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.FENCED_INSTANCE_ID })
- } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID &&
!group.has(memberId)) {
- // Enforce member id when it is set.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.UNKNOWN_MEMBER_ID })
- } else if (generationId >= 0 && generationId != group.generationId) {
- // Enforce generation check when it is set.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.ILLEGAL_GENERATION })
+ val validationErrorOpt = validateTxnOffsetCommit(
+ group,
+ generationId,
+ memberId,
+ groupInstanceId
+ )
+
+ if (validationErrorOpt.isDefined) {
+ responseCallback(offsetMetadata.map { case (k, _) => k ->
validationErrorOpt.get })
} else {
groupManager.storeOffsets(group, memberId, offsetMetadata,
responseCallback, producerId, producerEpoch)
}
}
}
+
+ private def validateOffsetCommit(
+ group: GroupMetadata,
+ generationId: Int,
+ memberId: String,
+ groupInstanceId: Option[String]
+ ): Option[Errors] = {
+ if (group.is(Dead)) {
+ Some(Errors.COORDINATOR_NOT_AVAILABLE)
+ } else if (generationId >= 0 || memberId !=
JoinGroupRequest.UNKNOWN_MEMBER_ID || groupInstanceId.isDefined) {
+ validateCurrentMember(group, memberId, groupInstanceId,
"offset-commit").orElse {
+ if (generationId != group.generationId) {
+ Some(Errors.ILLEGAL_GENERATION)
+ } else {
+ None
+ }
+ }
+ } else if (!group.is(Empty)) {
+ // When the group is non-empty, only members can commit offsets
+ Some(Errors.UNKNOWN_MEMBER_ID)
+ } else {
+ None
+ }
+ }
+
private def doCommitOffsets(group: GroupMetadata,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition,
OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition,
Errors] => Unit): Unit = {
group.inLock {
- if (group.is(Dead)) {
- // if the group is marked as dead, it means some other thread has just
removed the group
- // from the coordinator metadata; it is likely that the group has
migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the
member retry
- // finding the correct coordinator and rejoin.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.COORDINATOR_NOT_AVAILABLE })
- } else if (group.isStaticMemberFenced(memberId, groupInstanceId,
"commit-offsets")) {
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.FENCED_INSTANCE_ID })
- } else if (generationId < 0 && group.is(Empty)) {
- // The group is only using Kafka to store offsets.
- groupManager.storeOffsets(group, memberId, offsetMetadata,
responseCallback)
- } else if (!group.has(memberId)) {
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.UNKNOWN_MEMBER_ID })
- } else if (generationId != group.generationId) {
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.ILLEGAL_GENERATION })
+ val validationErrorOpt = validateOffsetCommit(
+ group,
+ generationId,
+ memberId,
+ groupInstanceId
+ )
+
+ if (validationErrorOpt.isDefined) {
+ responseCallback(offsetMetadata.map { case (k, _) => k ->
validationErrorOpt.get })
Review comment:
nit: I would use `match` here as well.
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -470,33 +600,31 @@ class GroupCoordinator(val brokerId: Int,
val memberErrors = leavingMembers.map { leavingMember =>
val memberId = leavingMember.memberId
val groupInstanceId = Option(leavingMember.groupInstanceId)
- if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID
- && group.isStaticMemberFenced(memberId, groupInstanceId,
"leave-group")) {
- memberLeaveError(leavingMember, Errors.FENCED_INSTANCE_ID)
+
+ // The LeaveGroup API allows administrative removal of
members by GroupInstanceId
+ // in which case we expect the MemberId to be undefined.
+ if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+ groupInstanceId.flatMap(group.currentStaticMemberId) match
{
+ case Some(currentMemberId) =>
+ removeCurrentMemberFromGroup(group, currentMemberId)
+ memberLeaveError(leavingMember, Errors.NONE)
+ case None =>
+ memberLeaveError(leavingMember,
Errors.UNKNOWN_MEMBER_ID)
+ }
} else if (group.isPendingMember(memberId)) {
- if (groupInstanceId.isDefined) {
- throw new IllegalStateException(s"the static member
$groupInstanceId was not expected to be leaving " +
- s"from pending member bucket with member id $memberId")
+ removePendingMemberAndUpdateGroup(group, memberId)
+
heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, memberId))
+ info(s"Pending member with memberId=$memberId has left
group ${group.groupId} " +
+ s"through explicit `LeaveGroup` request")
+ memberLeaveError(leavingMember, Errors.NONE)
+ } else {
+ val memberErrorOpt = validateCurrentMember(group,
memberId, groupInstanceId, "leave-group")
+ if (memberErrorOpt.isDefined) {
+ memberLeaveError(leavingMember, memberErrorOpt.get)
Review comment:
nit: It might be better to use `match` here instead of using `isDefined`
and `get` on `memberErrorOpt`.
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
offsetMetadata: immutable.Map[TopicPartition,
OffsetAndMetadata],
responseCallback:
immutable.Map[TopicPartition, Errors] => Unit): Unit = {
group.inLock {
- if (group.is(Dead)) {
- // if the group is marked as dead, it means some other thread has just
removed the group
- // from the coordinator metadata; it is likely that the group has
migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the
member retry
- // finding the correct coordinator and rejoin.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.COORDINATOR_NOT_AVAILABLE })
- } else if (group.isStaticMemberFenced(memberId, groupInstanceId,
"txn-commit-offsets")) {
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.FENCED_INSTANCE_ID })
- } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID &&
!group.has(memberId)) {
- // Enforce member id when it is set.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.UNKNOWN_MEMBER_ID })
- } else if (generationId >= 0 && generationId != group.generationId) {
- // Enforce generation check when it is set.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.ILLEGAL_GENERATION })
+ val validationErrorOpt = validateTxnOffsetCommit(
+ group,
+ generationId,
+ memberId,
+ groupInstanceId
+ )
+
+ if (validationErrorOpt.isDefined) {
+ responseCallback(offsetMetadata.map { case (k, _) => k ->
validationErrorOpt.get })
} else {
groupManager.storeOffsets(group, memberId, offsetMetadata,
responseCallback, producerId, producerEpoch)
}
}
}
+
+ private def validateOffsetCommit(
+ group: GroupMetadata,
+ generationId: Int,
+ memberId: String,
+ groupInstanceId: Option[String]
+ ): Option[Errors] = {
+ if (group.is(Dead)) {
+ Some(Errors.COORDINATOR_NOT_AVAILABLE)
+ } else if (generationId >= 0 || memberId !=
JoinGroupRequest.UNKNOWN_MEMBER_ID || groupInstanceId.isDefined) {
+ validateCurrentMember(group, memberId, groupInstanceId,
"offset-commit").orElse {
+ if (generationId != group.generationId) {
+ Some(Errors.ILLEGAL_GENERATION)
+ } else {
+ None
+ }
+ }
Review comment:
This block is similar to the one in `validateTxnOffsetCommit` module the
reason. I wonder if we could define a common helper method for both cases.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]