Copilot commented on code in PR #2755:
URL: https://github.com/apache/pekko/pull/2755#discussion_r2971318005
##########
cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala:
##########
@@ -1092,6 +1106,15 @@ abstract class ShardCoordinator(
case DelayedShardRegionTerminated(ref) =>
regionTerminated(ref)
+
+ case StopShards(shards) =>
+ val requestId = java.util.UUID.randomUUID()
+ shards.foreach { shard =>
+ waitingForShardsToStop = waitingForShardsToStop.updated(
+ shard,
+ waitingForShardsToStop.getOrElse(shard, Set.empty) + ((sender(),
requestId)))
+ }
+ shutdownShards(self, shards.filter(state.shards.contains))
Review Comment:
`StopShards` currently calls `shutdownShards(self, ...)`, which makes the
rebalance worker send `HandOff` to the coordinator itself instead of to the
region that actually hosts the shard. This will prevent shards from stopping
and the caller will never receive the expected `ShardStopped` acks. Consider
looking up each shard’s current region from `state.shards` and using that as
the `from`/`shuttingDownRegion` when initiating the handoff.
```suggestion
// Initiate handoff from the actual hosting regions instead of from
the coordinator itself
val shardsWithRegion: Iterable[(ActorRef, ShardId)] =
shards.flatMap { shardId =>
state.shards.get(shardId).map(regionRef => (regionRef, shardId))
}
shardsWithRegion
.groupBy(_._1)
.foreach {
case (regionRef, regionShards) =>
shutdownShards(regionRef, regionShards.map(_._2))
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]