dajac commented on a change in pull request #9958:
URL: https://github.com/apache/kafka/pull/9958#discussion_r582631095
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -602,6 +765,30 @@ class GroupCoordinator(val brokerId: Int,
groupError -> partitionErrors
}
+ private def validateHeartbeat(
+ group: GroupMetadata,
+ generationId: Int,
+ memberId: String,
+ groupInstanceId: Option[String]
+ ): Option[Errors] = {
+ if (group.is(Dead)) {
+ Some(Errors.COORDINATOR_NOT_AVAILABLE)
+ } else {
+ validateCurrentMember(
+ group,
+ memberId,
+ groupInstanceId,
+ operation = "sync-group"
Review comment:
`sync-group` => `heartbeat`?
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
##########
@@ -247,16 +250,20 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
if (leaderId.isEmpty)
leaderId = Some(member.memberId)
+
members.put(member.memberId, member)
- member.supportedProtocols.foreach{ case (protocol, _) =>
supportedProtocols(protocol) += 1 }
+ member.supportedProtocols.foreach { case (protocol, _) =>
supportedProtocols(protocol) += 1 }
member.awaitingJoinCallback = callback
+
if (member.isAwaitingJoin)
numMembersAwaitingJoin += 1
+
+ pendingMembers.remove(member.memberId)
}
def remove(memberId: String): Unit = {
members.remove(memberId).foreach { member =>
- member.supportedProtocols.foreach{ case (protocol, _) =>
supportedProtocols(protocol) -= 1 }
+ member.supportedProtocols.foreach { case (protocol, _) =>
supportedProtocols(protocol) -= 1 }
Review comment:
I have noticed that we update `supportedProtocols` in multiple places
with the exact same line (modulo +/-). I think that we should consolidate them
into one or two helper methods. We can address this separately though. I just
wanted to point that out.
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -378,24 +537,19 @@ class GroupCoordinator(val brokerId: Int,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit = {
group.inLock {
- if (group.is(Dead)) {
- // if the group is marked as dead, it means some other thread has just
removed the group
- // from the coordinator metadata; this is likely that the group has
migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the
member retry
- // finding the correct coordinator and rejoin.
- responseCallback(SyncGroupResult(Errors.COORDINATOR_NOT_AVAILABLE))
- } else if (group.isStaticMemberFenced(memberId, groupInstanceId,
"sync-group")) {
- responseCallback(SyncGroupResult(Errors.FENCED_INSTANCE_ID))
- } else if (!group.has(memberId)) {
- responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
- } else if (generationId != group.generationId) {
- responseCallback(SyncGroupResult(Errors.ILLEGAL_GENERATION))
- } else if (protocolType.isDefined &&
!group.protocolType.contains(protocolType.get)) {
- responseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
- } else if (protocolName.isDefined &&
!group.protocolName.contains(protocolName.get)) {
- responseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
- } else {
- group.currentState match {
+ val validationErrorOpt = validateSyncGroup(
+ group,
+ generationId,
+ memberId,
+ protocolType,
+ protocolName,
+ groupInstanceId
+ )
+
+ validationErrorOpt match {
+ case Some(error) => responseCallback(SyncGroupResult(error))
Review comment:
nit: There is an extra space before `responseCallback`.
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
##########
@@ -430,7 +429,7 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
private def candidateProtocols: Set[String] = {
// get the set of protocols that are commonly supported by all members
val numMembers = members.size
- supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet
+ supportedProtocols.filter(_._2 == numMembers).keys.toSet
Review comment:
nit: Could we directly use `keysSet` instead of `.keys.toSet`?
##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
offsetMetadata: immutable.Map[TopicPartition,
OffsetAndMetadata],
responseCallback:
immutable.Map[TopicPartition, Errors] => Unit): Unit = {
group.inLock {
- if (group.is(Dead)) {
- // if the group is marked as dead, it means some other thread has just
removed the group
- // from the coordinator metadata; it is likely that the group has
migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the
member retry
- // finding the correct coordinator and rejoin.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.COORDINATOR_NOT_AVAILABLE })
- } else if (group.isStaticMemberFenced(memberId, groupInstanceId,
"txn-commit-offsets")) {
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.FENCED_INSTANCE_ID })
- } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID &&
!group.has(memberId)) {
- // Enforce member id when it is set.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.UNKNOWN_MEMBER_ID })
- } else if (generationId >= 0 && generationId != group.generationId) {
- // Enforce generation check when it is set.
- responseCallback(offsetMetadata.map { case (k, _) => k ->
Errors.ILLEGAL_GENERATION })
+ val validationErrorOpt = validateTxnOffsetCommit(
+ group,
+ generationId,
+ memberId,
+ groupInstanceId
+ )
+
+ if (validationErrorOpt.isDefined) {
+ responseCallback(offsetMetadata.map { case (k, _) => k ->
validationErrorOpt.get })
Review comment:
Ah.. I see your point regarding the indentation. That makes sense to
avoid them. I do not see the reasonable way that you are referring to though.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]