This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new ef00073439 ddata: Add wildcard prefix subscriptions to Replicator 
(#2735)
ef00073439 is described below

commit ef00073439e2b48a569c4520fa03835fda9cfb48
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Mar 18 10:14:53 2026 +0100

    ddata: Add wildcard prefix subscriptions to Replicator (#2735)
    
    * Initial plan
    
    * ddata: Add wildcard prefix subscriptions to Replicator (port of 
akka/akka-core#31731)
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Port missing wildcard subscription changes from Akka PR #31731 (#11)
    
    * Initial plan
    
    * Add wildcard subscription improvements: override withId, serialization, 
docs, and tests
    
    Co-authored-by: pjfanning <[email protected]>
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix post-rebase issues: sendExpiredIfMissing, expiryWildcards, isExpired 
Duration.Zero bug
    
    Co-authored-by: pjfanning <[email protected]>
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../org/apache/pekko/cluster/ddata/GSet.scala      |   5 +-
 .../org/apache/pekko/cluster/ddata/Flag.scala      |   5 +-
 .../org/apache/pekko/cluster/ddata/GCounter.scala  |   5 +-
 .../scala/org/apache/pekko/cluster/ddata/Key.scala |   7 +
 .../org/apache/pekko/cluster/ddata/LWWMap.scala    |   5 +-
 .../apache/pekko/cluster/ddata/LWWRegister.scala   |   5 +-
 .../org/apache/pekko/cluster/ddata/ORMap.scala     |   5 +-
 .../apache/pekko/cluster/ddata/ORMultiMap.scala    |   5 +-
 .../org/apache/pekko/cluster/ddata/ORSet.scala     |   5 +-
 .../org/apache/pekko/cluster/ddata/PNCounter.scala |   5 +-
 .../apache/pekko/cluster/ddata/PNCounterMap.scala  |   5 +-
 .../apache/pekko/cluster/ddata/Replicator.scala    |  85 ++++++--
 .../ddata/protobuf/ReplicatedDataSerializer.scala  |   5 +-
 .../cluster/ddata/WildcardSubscribeSpec.scala      | 215 +++++++++++++++++++++
 .../ddata/ReplicatorWildcardSubscriptionSpec.scala | 126 ++++++++++++
 .../protobuf/ReplicatedDataSerializerSpec.scala    |  14 ++
 docs/src/main/paradox/typed/distributed-data.md    |   4 +
 17 files changed, 478 insertions(+), 28 deletions(-)

diff --git 
a/distributed-data/src/main/scala-2/org/apache/pekko/cluster/ddata/GSet.scala 
b/distributed-data/src/main/scala-2/org/apache/pekko/cluster/ddata/GSet.scala
index d1804b5ca5..24540c08c9 100644
--- 
a/distributed-data/src/main/scala-2/org/apache/pekko/cluster/ddata/GSet.scala
+++ 
b/distributed-data/src/main/scala-2/org/apache/pekko/cluster/ddata/GSet.scala
@@ -104,4 +104,7 @@ object GSetKey {
 }
 
 @SerialVersionUID(1L)
-final case class GSetKey[A](_id: String) extends Key[GSet[A]](_id) with 
ReplicatedDataSerialization
+final case class GSetKey[A](_id: String) extends Key[GSet[A]](_id) with 
ReplicatedDataSerialization {
+  override def withId(newId: Key.KeyId): GSetKey[A] =
+    GSetKey(newId)
+}
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Flag.scala 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Flag.scala
index 0d55a14195..95e2c31b7c 100644
--- a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Flag.scala
+++ b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Flag.scala
@@ -65,4 +65,7 @@ object FlagKey {
 }
 
 @SerialVersionUID(1L)
-final case class FlagKey(_id: String) extends Key[Flag](_id) with 
ReplicatedDataSerialization
+final case class FlagKey(_id: String) extends Key[Flag](_id) with 
ReplicatedDataSerialization {
+  override def withId(newId: Key.KeyId): FlagKey =
+    FlagKey(newId)
+}
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/GCounter.scala 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/GCounter.scala
index 7080d99e17..eb7d229b1f 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/GCounter.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/GCounter.scala
@@ -167,4 +167,7 @@ object GCounterKey {
 }
 
 @SerialVersionUID(1L)
-final case class GCounterKey(_id: String) extends Key[GCounter](_id) with 
ReplicatedDataSerialization
+final case class GCounterKey(_id: String) extends Key[GCounter](_id) with 
ReplicatedDataSerialization {
+  override def withId(newId: Key.KeyId): GCounterKey =
+    GCounterKey(newId)
+}
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Key.scala 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Key.scala
index 1a3dca332d..7891b7aeab 100644
--- a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Key.scala
+++ b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Key.scala
@@ -13,6 +13,8 @@
 
 package org.apache.pekko.cluster.ddata
 
+import org.apache.pekko.cluster.ddata.Key.UnspecificKey
+
 object Key {
 
   /**
@@ -24,6 +26,8 @@ object Key {
 
   type KeyId = String
 
+  final case class UnspecificKey(_id: KeyId) extends Key[ReplicatedData](_id) 
with ReplicatedDataSerialization
+
 }
 
 /**
@@ -36,6 +40,9 @@ object Key {
  */
 abstract class Key[+T <: ReplicatedData](val id: Key.KeyId) extends 
Serializable {
 
+  def withId(newId: Key.KeyId): Key[ReplicatedData] =
+    UnspecificKey(newId)
+
   override final def equals(o: Any): Boolean = o match {
     case k: Key[_] => id == k.id
     case _         => false
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWMap.scala 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWMap.scala
index 867977403e..8d029ec2f6 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWMap.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWMap.scala
@@ -186,4 +186,7 @@ object LWWMapKey {
 }
 
 @SerialVersionUID(1L)
-final case class LWWMapKey[A, B](_id: String) extends Key[LWWMap[A, B]](_id) 
with ReplicatedDataSerialization
+final case class LWWMapKey[A, B](_id: String) extends Key[LWWMap[A, B]](_id) 
with ReplicatedDataSerialization {
+  override def withId(newId: Key.KeyId): LWWMapKey[A, B] =
+    LWWMapKey(newId)
+}
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWRegister.scala
 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWRegister.scala
index 0f70c4620a..9224591d4f 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWRegister.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWRegister.scala
@@ -202,4 +202,7 @@ object LWWRegisterKey {
 }
 
 @SerialVersionUID(1L)
-final case class LWWRegisterKey[A](_id: String) extends 
Key[LWWRegister[A]](_id) with ReplicatedDataSerialization
+final case class LWWRegisterKey[A](_id: String) extends 
Key[LWWRegister[A]](_id) with ReplicatedDataSerialization {
+  override def withId(newId: Key.KeyId): LWWRegisterKey[A] =
+    LWWRegisterKey(newId)
+}
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMap.scala 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMap.scala
index 3e3cffadd8..1070ee3e63 100644
--- a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMap.scala
+++ b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMap.scala
@@ -562,4 +562,7 @@ object ORMapKey {
 @SerialVersionUID(1L)
 final case class ORMapKey[A, B <: ReplicatedData](_id: String)
     extends Key[ORMap[A, B]](_id)
-    with ReplicatedDataSerialization
+    with ReplicatedDataSerialization {
+  override def withId(newId: Key.KeyId): ORMapKey[A, B] =
+    ORMapKey(newId)
+}
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMultiMap.scala
 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMultiMap.scala
index 6a682181e8..a582d84d3e 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMultiMap.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMultiMap.scala
@@ -328,4 +328,7 @@ object ORMultiMapKey {
 }
 
 @SerialVersionUID(1L)
-final case class ORMultiMapKey[A, B](_id: String) extends Key[ORMultiMap[A, 
B]](_id) with ReplicatedDataSerialization
+final case class ORMultiMapKey[A, B](_id: String) extends Key[ORMultiMap[A, 
B]](_id) with ReplicatedDataSerialization {
+  override def withId(newId: Key.KeyId): ORMultiMapKey[A, B] =
+    ORMultiMapKey(newId)
+}
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORSet.scala 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORSet.scala
index d618face23..59212fa6a0 100644
--- a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORSet.scala
+++ b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORSet.scala
@@ -574,4 +574,7 @@ object ORSetKey {
 }
 
 @SerialVersionUID(1L)
-final case class ORSetKey[A](_id: String) extends Key[ORSet[A]](_id) with 
ReplicatedDataSerialization
+final case class ORSetKey[A](_id: String) extends Key[ORSet[A]](_id) with 
ReplicatedDataSerialization {
+  override def withId(newId: Key.KeyId): ORSetKey[A] =
+    ORSetKey(newId)
+}
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounter.scala
 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounter.scala
index 36ff6495fb..8873db8e62 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounter.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounter.scala
@@ -217,4 +217,7 @@ object PNCounterKey {
 }
 
 @SerialVersionUID(1L)
-final case class PNCounterKey(_id: String) extends Key[PNCounter](_id) with 
ReplicatedDataSerialization
+final case class PNCounterKey(_id: String) extends Key[PNCounter](_id) with 
ReplicatedDataSerialization {
+  override def withId(newId: Key.KeyId): PNCounterKey =
+    PNCounterKey(newId)
+}
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounterMap.scala
 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounterMap.scala
index f615081465..56d96bf0f9 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounterMap.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounterMap.scala
@@ -188,4 +188,7 @@ object PNCounterMapKey {
 }
 
 @SerialVersionUID(1L)
-final case class PNCounterMapKey[A](_id: String) extends 
Key[PNCounterMap[A]](_id) with ReplicatedDataSerialization
+final case class PNCounterMapKey[A](_id: String) extends 
Key[PNCounterMap[A]](_id) with ReplicatedDataSerialization {
+  override def withId(newId: Key.KeyId): PNCounterMapKey[A] =
+    PNCounterMapKey(newId)
+}
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
index bb94ca7e8b..2721108a36 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
@@ -567,6 +567,10 @@ object Replicator {
    * when the value of the given `key` is changed. Current value is also
    * sent as a [[Changed]] message to a new subscriber.
    *
+   * In addition to subscribing to individual keys it is possible to subscribe 
to all keys with a given prefix
+   * by using a `*` at the end of the key `id`. For example 
`GCounterKey("counter-*")`. Notifications will be
+   * sent for all matching keys, also new keys added later.
+   *
    * Subscribers will be notified periodically with the configured 
`notify-subscribers-interval`,
    * and it is also possible to send an explicit `FlushChanges` message to
    * the `Replicator` to notify the subscribers immediately.
@@ -1310,14 +1314,14 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
   val serializer = 
SerializationExtension(context.system).serializerFor(classOf[DataEnvelope])
   val maxPruningDisseminationNanos = maxPruningDissemination.toNanos
 
-  val expiryWildcards = settings.expiryKeys.collect { case (k, v) if 
k.endsWith("*") => k.dropRight(1) -> v }
+  val expiryWildcards = settings.expiryKeys.collect { case (k, v) if 
isWildcard(k) => dropWildcard(k) -> v }
   val expiryEnabled: Boolean = settings.expiryKeys.nonEmpty
   // updated on the gossip tick to avoid too many calls to 
`currentTimeMillis()`
   private var currentUsedTimestamp = if (expiryEnabled) 
System.currentTimeMillis() else 0L
 
   val hasDurableKeys = settings.durableKeys.nonEmpty
-  val durable = settings.durableKeys.filterNot(_.endsWith("*"))
-  val durableWildcards = settings.durableKeys.collect { case k if 
k.endsWith("*") => k.dropRight(1) }
+  val durable = settings.durableKeys.filterNot(isWildcard)
+  val durableWildcards = settings.durableKeys.collect { case k if 
isWildcard(k) => dropWildcard(k) }
   val durableStore: ActorRef =
     if (hasDurableKeys) {
       val props = settings.durableStoreProps match {
@@ -1414,6 +1418,8 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
   @nowarn("msg=deprecated")
   val newSubscribers = new mutable.HashMap[KeyId, mutable.Set[ActorRef]] with 
mutable.MultiMap[KeyId, ActorRef]
   var subscriptionKeys = Map.empty[KeyId, KeyR]
+  @nowarn("msg=deprecated")
+  val wildcardSubscribers = new mutable.HashMap[KeyId, mutable.Set[ActorRef]] 
with mutable.MultiMap[KeyId, ActorRef]
 
   // To be able to do efficient stashing we use this field instead of sender().
   // Using internal buffer instead of Stash to avoid the overhead of the Stash 
mailbox.
@@ -1883,7 +1889,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
     }
 
     val dig =
-      if (subscribers.contains(key) && !changed.contains(key)) {
+      if (hasSubscriber(key) && !changed.contains(key)) {
         val oldDigest = getDigest(key)
         val (dig, payloadSize) = digest(newEnvelope)
         payloadSizeAggregator.updatePayloadSize(key, payloadSize)
@@ -1963,7 +1969,12 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
   }
 
   def isExpired(key: KeyId, timestamp: Timestamp, now: Long): Boolean = {
-    expiryEnabled && timestamp != 0L && timestamp <= now - 
getExpiryDuration(key).toMillis
+    if (expiryEnabled && timestamp != 0L) {
+      val expiryDuration = getExpiryDuration(key)
+      expiryDuration != Duration.Zero && timestamp <= now - 
expiryDuration.toMillis
+    } else {
+      false
+    }
   }
 
   def updateUsedTimestamp(key: KeyId, timestamp: Timestamp): Unit = {
@@ -1993,8 +2004,15 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
 
   @nowarn("msg=deprecated")
   def receiveFlushChanges(): Unit = {
-    def notify(keyId: KeyId, subs: mutable.Set[ActorRef], 
sendExpiredIfMissing: Boolean): Unit = {
-      val key = subscriptionKeys(keyId)
+    def notify(keyId: KeyId, subs: Iterator[ActorRef], sendExpiredIfMissing: 
Boolean): Unit = {
+      val key = subscriptionKeys.get(keyId) match {
+        case Some(r) => r
+        case None    =>
+          subscriptionKeys
+            .collectFirst { case (k, r) if isWildcard(k) && 
keyId.startsWith(dropWildcard(k)) => r.withId(keyId) }
+            .getOrElse(throw new IllegalStateException(s"Subscription 
notification of [$keyId], but no matching " +
+              s"subscription key in 
[${subscriptionKeys.keysIterator.mkString(", ")}]"))
+      }
       getData(keyId) match {
         case Some(envelope) =>
           val msg = if (envelope.data == DeletedData) Deleted(key) else 
Changed(key)(envelope.data)
@@ -2009,17 +2027,25 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
       }
     }
 
-    if (subscribers.nonEmpty) {
-      for (key <- changed; if subscribers.contains(key); subs <- 
subscribers.get(key))
-        notify(key, subs, sendExpiredIfMissing = true)
+    if (subscribers.nonEmpty || wildcardSubscribers.nonEmpty) {
+      changed.foreach { key =>
+        if (hasSubscriber(key)) notify(key, getSubscribersIterator(key), 
sendExpiredIfMissing = true)
+      }
     }
 
     // Changed event is sent to new subscribers even though the key has not 
changed,
     // i.e. send current value. Expired is not sent to new subscribers as the 
first event.
     if (newSubscribers.nonEmpty) {
       for ((key, subs) <- newSubscribers) {
-        notify(key, subs, sendExpiredIfMissing = false)
-        subs.foreach { subscribers.addBinding(key, _) }
+        if (isWildcard(key)) {
+          
dataEntries.keysIterator.filter(_.startsWith(dropWildcard(key))).foreach { 
matchingKey =>
+            notify(matchingKey, subs.iterator, sendExpiredIfMissing = false)
+          }
+          subs.foreach { wildcardSubscribers.addBinding(dropWildcard(key), _) }
+        } else {
+          notify(key, subs.iterator, sendExpiredIfMissing = false)
+          subs.foreach { subscribers.addBinding(key, _) }
+        }
       }
       newSubscribers.clear()
     }
@@ -2331,18 +2357,39 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
   }
 
   def receiveUnsubscribe(key: KeyR, subscriber: ActorRef): Unit = {
-    subscribers.removeBinding(key.id, subscriber)
+    if (isWildcard(key.id)) 
wildcardSubscribers.removeBinding(dropWildcard(key.id), subscriber)
+    else subscribers.removeBinding(key.id, subscriber)
     newSubscribers.removeBinding(key.id, subscriber)
     if (!hasSubscriber(subscriber))
       context.unwatch(subscriber)
-    if (!subscribers.contains(key.id) && !newSubscribers.contains(key.id))
+    if (!hasSubscriber(key.id) && !newSubscribers.contains(key.id))
       subscriptionKeys -= key.id
   }
 
   def hasSubscriber(subscriber: ActorRef): Boolean =
     subscribers.exists { case (_, s) => s.contains(subscriber) } ||
+    wildcardSubscribers.exists { case (_, s) => s.contains(subscriber) } ||
     newSubscribers.exists { case (_, s) => s.contains(subscriber) }
 
+  private def hasSubscriber(keyId: KeyId): Boolean =
+    subscribers.contains(keyId) ||
+    (wildcardSubscribers.nonEmpty && wildcardSubscribers.exists { case (k, _) 
=> keyId.startsWith(k) })
+
+  private def getSubscribersIterator(keyId: KeyId): Iterator[ActorRef] = {
+    val subscribersIter = 
subscribers.get(keyId).map(_.iterator).getOrElse(Iterator.empty)
+    if (wildcardSubscribers.isEmpty) subscribersIter
+    else
+      subscribersIter ++
+      wildcardSubscribers
+        .collectFirst { case (k, v) if keyId.startsWith(k) => v }
+        .map(_.iterator)
+        .getOrElse(Iterator.empty)
+  }
+
+  private def isWildcard(keyId: KeyId): Boolean = keyId.endsWith("*")
+
+  private def dropWildcard(keyId: KeyId): KeyId = keyId.dropRight(1)
+
   def receiveTerminated(ref: ActorRef): Unit = {
     if (ref == durableStore) {
       log.error("Stopping distributed-data Replicator because durable store 
terminated")
@@ -2352,13 +2399,17 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
       keys1.foreach { key =>
         subscribers.removeBinding(key, ref)
       }
-      val keys2 = newSubscribers.collect { case (k, s) if s.contains(ref) => k 
}
+      val keys2 = wildcardSubscribers.collect { case (k, s) if s.contains(ref) 
=> k }
       keys2.foreach { key =>
+        wildcardSubscribers.removeBinding(key, ref)
+      }
+      val keys3 = newSubscribers.collect { case (k, s) if s.contains(ref) => k 
}
+      keys3.foreach { key =>
         newSubscribers.removeBinding(key, ref)
       }
 
-      (keys1 ++ keys2).foreach { key =>
-        if (!subscribers.contains(key) && !newSubscribers.contains(key))
+      (keys1 ++ keys3).foreach { key =>
+        if (!hasSubscriber(key) && !newSubscribers.contains(key))
           subscriptionKeys -= key
       }
     }
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializer.scala
 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializer.scala
index 37b33d6f31..6496dab09f 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializer.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializer.scala
@@ -304,6 +304,7 @@ class ReplicatedDataSerializer(val system: 
ExtendedActorSystem)
   private val ORMultiMapManifest = "K"
   private val ORMultiMapKeyManifest = "k"
   private val VersionVectorManifest = "L"
+  private val UnspecificKeyManifest = "m"
 
   private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] 
=> AnyRef](
     GSetManifest -> gsetFromBinary,
@@ -336,7 +337,8 @@ class ReplicatedDataSerializer(val system: 
ExtendedActorSystem)
     ORMapKeyManifest -> (bytes => ORMapKey(keyIdFromBinary(bytes))),
     LWWMapKeyManifest -> (bytes => LWWMapKey(keyIdFromBinary(bytes))),
     PNCounterMapKeyManifest -> (bytes => 
PNCounterMapKey(keyIdFromBinary(bytes))),
-    ORMultiMapKeyManifest -> (bytes => ORMultiMapKey(keyIdFromBinary(bytes))))
+    ORMultiMapKeyManifest -> (bytes => ORMultiMapKey(keyIdFromBinary(bytes))),
+    UnspecificKeyManifest -> (bytes => 
Key.UnspecificKey(keyIdFromBinary(bytes))))
 
   override def manifest(obj: AnyRef): String = obj match {
     case _: ORSet[_]                     => ORSetManifest
@@ -368,6 +370,7 @@ class ReplicatedDataSerializer(val system: 
ExtendedActorSystem)
     case _: LWWMapKey[_, _]     => LWWMapKeyManifest
     case _: PNCounterMapKey[_]  => PNCounterMapKeyManifest
     case _: ORMultiMapKey[_, _] => ORMultiMapKeyManifest
+    case _: Key.UnspecificKey   => UnspecificKeyManifest
 
     case _: ORSet.DeltaGroup[_]       => ORSetDeltaGroupManifest
     case _: ORMap.DeltaGroup[_, _]    => ORMapDeltaGroupManifest
diff --git 
a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/WildcardSubscribeSpec.scala
 
b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/WildcardSubscribeSpec.scala
new file mode 100644
index 0000000000..4bd29743d7
--- /dev/null
+++ 
b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/WildcardSubscribeSpec.scala
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.cluster.ddata
+
+import scala.concurrent.duration._
+
+import com.typesafe.config.ConfigFactory
+
+import org.apache.pekko.cluster.Cluster
+import org.apache.pekko.remote.testconductor.RoleName
+import org.apache.pekko.remote.testkit.MultiNodeConfig
+import org.apache.pekko.remote.testkit.MultiNodeSpec
+import org.apache.pekko.testkit._
+
+object WildcardSubscribeSpec extends MultiNodeConfig {
+  val first = role("first")
+  val second = role("second")
+
+  commonConfig(ConfigFactory.parseString("""
+    pekko.loglevel = INFO
+    pekko.actor.provider = "cluster"
+    pekko.cluster.distributed-data {
+      gossip-interval = 500 millis
+      notify-subscribers-interval = 100 millis
+      expire-keys-after-inactivity {
+        "expiry-*" = 2 seconds
+      }
+    }
+    """))
+
+}
+
+class WildcardSubscribeSpecMultiJvmNode1 extends WildcardSubscribeSpec
+class WildcardSubscribeSpecMultiJvmNode2 extends WildcardSubscribeSpec
+
+class WildcardSubscribeSpec extends MultiNodeSpec(WildcardSubscribeSpec) with 
STMultiNodeSpec with ImplicitSender {
+  import WildcardSubscribeSpec._
+  import Replicator._
+
+  override def initialParticipants: Int = roles.size
+
+  private val cluster = Cluster(system)
+  private implicit val selfUniqueAddress: SelfUniqueAddress = 
DistributedData(system).selfUniqueAddress
+  private val replicator = 
system.actorOf(Replicator.props(ReplicatorSettings(system)), "replicator")
+
+  private val KeyA = GCounterKey("counter-A")
+  private val KeyB = GCounterKey("counter-B")
+  private val KeyOtherA = GCounterKey("other-A")
+  private val KeyExpiryA = GCounterKey("expiry-A")
+
+  def join(from: RoleName, to: RoleName): Unit = {
+    runOn(from) {
+      cluster.join(node(to).address)
+    }
+    enterBarrier(from.name + "-joined")
+  }
+
+  "Replicator wildcard subscriptions" must {
+
+    "notify changed entry" in {
+      join(first, first)
+
+      runOn(first) {
+        val subscriberProbe = TestProbe()
+        replicator ! Subscribe(GCounterKey("counter-*"), subscriberProbe.ref)
+
+        replicator ! Update(KeyA, GCounter.empty, WriteLocal)(_ :+ 1)
+        expectMsgType[UpdateSuccess[_]]
+        val chg1 = subscriberProbe.expectMsgType[Changed[GCounter]]
+        chg1.key should ===(KeyA)
+        chg1.key.getClass should ===(KeyA.getClass)
+        chg1.get(KeyA).value should ===(1)
+
+        replicator ! Update(KeyA, GCounter.empty, WriteLocal)(_ :+ 1)
+        expectMsgType[UpdateSuccess[_]]
+        val chg2 = subscriberProbe.expectMsgType[Changed[GCounter]]
+        chg2.key should ===(KeyA)
+        chg2.get(KeyA).value should ===(2)
+
+        replicator ! Update(KeyB, GCounter.empty, WriteLocal)(_ :+ 1)
+        expectMsgType[UpdateSuccess[_]]
+        val chg3 = subscriberProbe.expectMsgType[Changed[GCounter]]
+        chg3.key should ===(KeyB)
+        chg3.get(KeyB).value should ===(1)
+
+        replicator ! Update(KeyOtherA, GCounter.empty, WriteLocal)(_ :+ 17)
+        expectMsgType[UpdateSuccess[_]]
+        subscriberProbe.expectNoMessage(200.millis)
+
+        // a few more subscribers
+        val subscriberProbe2 = TestProbe()
+        replicator ! Subscribe(GCounterKey("counter-*"), subscriberProbe2.ref)
+        val subscriberProbeA = TestProbe()
+        replicator ! Subscribe(KeyA, subscriberProbeA.ref)
+        val subscriberProbeOther = TestProbe()
+        replicator ! Subscribe(GCounterKey("other-*"), 
subscriberProbeOther.ref)
+        subscriberProbe.expectNoMessage(200.millis)
+        subscriberProbe2.receiveN(2).foreach {
+          case chg: Changed[GCounter] @unchecked =>
+            if (chg.key == KeyA)
+              chg.get(KeyA).value should ===(2)
+            else if (chg.key == KeyB)
+              chg.get(KeyB).value should ===(1)
+            else
+              fail(s"unexpected change ${chg.key}")
+          case other =>
+            fail(s"Unexpected $other")
+        }
+        subscriberProbe2.expectNoMessage()
+        subscriberProbeA.expectMsgType[Changed[GCounter]].get(KeyA).value 
should ===(2)
+        
subscriberProbeOther.expectMsgType[Changed[GCounter]].get(KeyOtherA).value 
should ===(17)
+
+        replicator ! Update(KeyB, GCounter.empty, WriteLocal)(_ :+ 10)
+        expectMsgType[UpdateSuccess[_]]
+        val chg4 = subscriberProbe.expectMsgType[Changed[GCounter]]
+        chg4.key should ===(KeyB)
+        chg4.get(KeyB).value should ===(11)
+        val chg5 = subscriberProbe2.expectMsgType[Changed[GCounter]]
+        chg5.key should ===(KeyB)
+        chg5.get(KeyB).value should ===(11)
+
+        // unsubscribe
+        replicator ! Unsubscribe(GCounterKey("counter-*"), subscriberProbe.ref)
+        replicator ! Update(KeyB, GCounter.empty, WriteLocal)(_ :+ 5)
+        expectMsgType[UpdateSuccess[_]]
+        subscriberProbe.expectNoMessage(200.millis)
+        val chg6 = subscriberProbe2.expectMsgType[Changed[GCounter]]
+        chg6.key should ===(KeyB)
+        chg6.get(KeyB).value should ===(16)
+      }
+
+      enterBarrier("done-1")
+    }
+
+    "notify expired entry" in {
+      runOn(first) {
+        val subscriberProbe = TestProbe()
+        replicator ! Subscribe(GCounterKey("expiry-*"), subscriberProbe.ref)
+
+        replicator ! Update(KeyExpiryA, GCounter.empty, WriteLocal)(_ :+ 1)
+        expectMsgType[UpdateSuccess[_]]
+        subscriberProbe.expectMsgType[Changed[GCounter]]
+
+        replicator ! Get(KeyExpiryA, ReadLocal)
+        expectMsgType[GetSuccess[GCounter]].get(KeyExpiryA).value should ===(1)
+
+        expectNoMessage(5.seconds)
+        replicator ! Get(KeyExpiryA, ReadLocal)
+        expectMsg(NotFound(KeyExpiryA, None))
+        subscriberProbe.expectMsg(Expired[GCounter](KeyExpiryA))
+
+        // same key can be used again
+        replicator ! Update(KeyExpiryA, GCounter.empty, WriteLocal)(_ :+ 2)
+        expectMsgType[UpdateSuccess[_]]
+        subscriberProbe.expectMsgType[Changed[GCounter]]
+        replicator ! Get(KeyExpiryA, ReadLocal)
+        expectMsgType[GetSuccess[GCounter]].get(KeyExpiryA).value should ===(2)
+      }
+
+      enterBarrier("done-2")
+    }
+
+    "notify when changed from another node" in {
+      runOn(first) {
+        replicator ! Update(KeyOtherA, GCounter.empty, WriteLocal)(_ :+ 1)
+        expectMsgType[UpdateSuccess[_]]
+        enterBarrier("updated-1")
+
+        enterBarrier("second-joined")
+
+        enterBarrier("received-1")
+
+        replicator ! Update(KeyOtherA, GCounter.empty, WriteLocal)(_ :+ 1)
+        expectMsgType[UpdateSuccess[_]]
+        enterBarrier("updated-2")
+
+        enterBarrier("received-2")
+      }
+      runOn(second) {
+        enterBarrier("updated-1")
+
+        // it's possible to subscribe before join
+        val subscriberProbe = TestProbe()
+        replicator ! Subscribe(GCounterKey("other-*"), subscriberProbe.ref)
+
+        join(second, first)
+
+        val chg1 = subscriberProbe.expectMsgType[Changed[GCounter]](10.seconds)
+        chg1.key should ===(KeyOtherA)
+        chg1.get(KeyOtherA).value should ===(18) // it was also incremented to 
17 in earlier test
+        enterBarrier("received-1")
+
+        enterBarrier("updated-2")
+
+        val chg2 = subscriberProbe.expectMsgType[Changed[GCounter]]
+        chg2.key should ===(KeyOtherA)
+        chg2.get(KeyOtherA).value should ===(19)
+        enterBarrier("received-2")
+      }
+    }
+
+    enterBarrier("done-3")
+  }
+}
diff --git 
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorWildcardSubscriptionSpec.scala
 
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorWildcardSubscriptionSpec.scala
new file mode 100644
index 0000000000..5cc8d10993
--- /dev/null
+++ 
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorWildcardSubscriptionSpec.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.cluster.ddata
+
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.cluster.ddata.Replicator._
+import pekko.testkit.ImplicitSender
+import pekko.testkit.TestKit
+import pekko.testkit.TestProbe
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import com.typesafe.config.ConfigFactory
+
+object ReplicatorWildcardSubscriptionSpec {
+  val config = ConfigFactory.parseString("""
+    pekko.actor.provider = "cluster"
+    pekko.remote.classic.netty.tcp.port = 0
+    pekko.remote.artery.canonical.port = 0
+    pekko.remote.artery.canonical.hostname = 127.0.0.1
+    pekko.cluster.distributed-data.notify-subscribers-interval = 100ms
+    """)
+}
+
+class ReplicatorWildcardSubscriptionSpec(_system: ActorSystem)
+    extends TestKit(_system)
+    with AnyWordSpecLike
+    with Matchers
+    with BeforeAndAfterAll
+    with ImplicitSender {
+
+  def this() =
+    this(ActorSystem("ReplicatorWildcardSubscriptionSpec", 
ReplicatorWildcardSubscriptionSpec.config))
+
+  override def afterAll(): Unit = shutdown(system)
+
+  implicit val selfUniqueAddress: SelfUniqueAddress = 
DistributedData(system).selfUniqueAddress
+  val replicator = DistributedData(system).replicator
+
+  "Replicator wildcard subscriptions" must {
+
+    "notify subscriber for keys matching the wildcard prefix" in {
+      val KeyA1 = GCounterKey("notif-counter-a1")
+      val KeyA2 = GCounterKey("notif-counter-a2")
+      val KeyB = GCounterKey("notif-other-counter")
+      val WildcardKey = GCounterKey("notif-counter-*")
+
+      val probe = TestProbe()
+      replicator ! Subscribe(WildcardKey, probe.ref)
+
+      // Update a matching key
+      replicator ! Update(KeyA1, GCounter.empty, WriteLocal)(_ :+ 1)
+      expectMsgType[UpdateSuccess[_]]
+
+      val changed1 = probe.expectMsgType[Changed[GCounter]](5.seconds)
+      changed1.key.id should ===("notif-counter-a1")
+      changed1.get(changed1.key).value should ===(1)
+
+      // Update another matching key
+      replicator ! Update(KeyA2, GCounter.empty, WriteLocal)(_ :+ 2)
+      expectMsgType[UpdateSuccess[_]]
+
+      val changed2 = probe.expectMsgType[Changed[GCounter]](5.seconds)
+      changed2.key.id should ===("notif-counter-a2")
+      changed2.get(changed2.key).value should ===(2)
+
+      // Update a non-matching key - no notification expected
+      replicator ! Update(KeyB, GCounter.empty, WriteLocal)(_ :+ 1)
+      expectMsgType[UpdateSuccess[_]]
+
+      probe.expectNoMessage(500.millis)
+    }
+
+    "send current value to new wildcard subscriber for existing matching keys" 
in {
+      val KeyA = GCounterKey("current-counter-a")
+      val WildcardKey = GCounterKey("current-counter-*")
+
+      replicator ! Update(KeyA, GCounter.empty, WriteLocal)(_ :+ 10)
+      expectMsgType[UpdateSuccess[_]]
+
+      // Subscribe after the key already exists
+      val subscribeProbe = TestProbe()
+      replicator ! Subscribe(WildcardKey, subscribeProbe.ref)
+
+      // Should receive current value for existing matching key
+      val changed = subscribeProbe.expectMsgType[Changed[GCounter]](5.seconds)
+      changed.key.id should ===("current-counter-a")
+      changed.get(changed.key).value should ===(10)
+    }
+
+    "unsubscribe wildcard subscriber" in {
+      val KeyA = GCounterKey("unsub-counter-a")
+      val WildcardKey = GCounterKey("unsub-counter-*")
+
+      replicator ! Update(KeyA, GCounter.empty, WriteLocal)(_ :+ 1)
+      expectMsgType[UpdateSuccess[_]]
+
+      val probe = TestProbe()
+      replicator ! Subscribe(WildcardKey, probe.ref)
+
+      // Receive the initial value
+      probe.expectMsgType[Changed[GCounter]](5.seconds)
+
+      // Unsubscribe
+      replicator ! Unsubscribe(WildcardKey, probe.ref)
+
+      // Update a matching key - no notification expected after unsubscribe
+      replicator ! Update(KeyA, GCounter.empty, WriteLocal)(_ :+ 100)
+      expectMsgType[UpdateSuccess[_]]
+
+      probe.expectNoMessage(500.millis)
+    }
+  }
+}
diff --git 
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
 
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
index 481b199a0d..3c9043c8b0 100644
--- 
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
+++ 
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
@@ -96,6 +96,7 @@ class ReplicatedDataSerializerSpec
       checkSameContent(GSet() + "a" + "b", GSet() + "b" + "a")
       checkSameContent(GSet() + ref1 + ref2 + ref3, GSet() + ref2 + ref1 + 
ref3)
       checkSameContent(GSet() + ref1 + ref2 + ref3, GSet() + ref3 + ref2 + 
ref1)
+      checkSerialization(GSetKey[String]("id"))
     }
 
     "serialize ORSet" in {
@@ -125,6 +126,7 @@ class ReplicatedDataSerializerSpec
       val s5 = ORSet().add(address1, "a").add(address2, ref1)
       val s6 = ORSet().add(address2, ref1).add(address1, "a")
       checkSameContent(s5.merge(s6), s6.merge(s5))
+      checkSerialization(ORSetKey[String]("id"))
     }
 
     "serialize ORSet with ActorRef message sent between two systems" in {
@@ -187,6 +189,7 @@ class ReplicatedDataSerializerSpec
     "serialize Flag" in {
       checkSerialization(Flag())
       checkSerialization(Flag().switchOn)
+      checkSerialization(FlagKey("id"))
     }
 
     "serialize LWWRegister" in {
@@ -194,6 +197,7 @@ class ReplicatedDataSerializerSpec
       checkSerialization(
         LWWRegister(address1, "value2", LWWRegister.defaultClock[String])
           .withValue(address2, "value3", LWWRegister.defaultClock[String]))
+      checkSerialization(LWWRegisterKey[String]("id"))
     }
 
     "serialize GCounter" in {
@@ -207,6 +211,7 @@ class ReplicatedDataSerializerSpec
       checkSameContent(
         GCounter().increment(address1, 2).increment(address3, 5),
         GCounter().increment(address3, 5).increment(address1, 2))
+      checkSerialization(GCounterKey("id"))
     }
 
     "serialize PNCounter" in {
@@ -225,6 +230,7 @@ class ReplicatedDataSerializerSpec
       checkSameContent(
         PNCounter().increment(address1, 2).decrement(address1, 
1).increment(address3, 5),
         PNCounter().increment(address3, 5).increment(address1, 
2).decrement(address1, 1))
+      checkSerialization(PNCounterKey("id"))
     }
 
     "serialize ORMap" in {
@@ -233,6 +239,7 @@ class ReplicatedDataSerializerSpec
       checkSerialization(ORMap().put(address1, 1L, GSet() + "A"))
       // use Flag for this test as object key because it is serializable
       checkSerialization(ORMap().put(address1, Flag(), GSet() + "A"))
+      checkSerialization(ORMapKey[UniqueAddress, GSet[String]]("id"))
     }
 
     "serialize ORMap delta" in {
@@ -270,6 +277,7 @@ class ReplicatedDataSerializerSpec
         LWWMap()
           .put(address1, "a", "value1", LWWRegister.defaultClock[Any])
           .put(address2, "b", 17, LWWRegister.defaultClock[Any]))
+      checkSerialization(LWWMapKey[UniqueAddress, String]("id"))
     }
 
     "serialize PNCounterMap" in {
@@ -280,6 +288,7 @@ class ReplicatedDataSerializerSpec
       checkSerialization(PNCounterMap().increment(address1, Flag(), 3))
       checkSerialization(
         PNCounterMap().increment(address1, "a", 3).decrement(address2, "a", 
2).increment(address2, "b", 5))
+      checkSerialization(PNCounterMapKey[String]("id"))
     }
 
     "serialize ORMultiMap" in {
@@ -303,6 +312,7 @@ class ReplicatedDataSerializerSpec
       val m3 = ORMultiMap.empty[String, String].addBinding(address1, "a", "A1")
       val d3 = m3.resetDelta.addBinding(address1, "a", 
"A2").addBinding(address1, "a", "A3").delta.get
       checkSerialization(d3)
+      checkSerialization(ORMultiMapKey[String, String]("id"))
     }
 
     "serialize ORMultiMap withValueDeltas" in {
@@ -340,5 +350,9 @@ class ReplicatedDataSerializerSpec
       checkSameContent(v1.merge(v2), v2.merge(v1))
     }
 
+    "serialize UnspecificKey" in {
+      checkSerialization(Key.UnspecificKey("id"))
+    }
+
   }
 }
diff --git a/docs/src/main/paradox/typed/distributed-data.md 
b/docs/src/main/paradox/typed/distributed-data.md
index 0013e4e52c..84a4104bad 100644
--- a/docs/src/main/paradox/typed/distributed-data.md
+++ b/docs/src/main/paradox/typed/distributed-data.md
@@ -176,6 +176,10 @@ configurable 
`pekko.cluster.distributed-data.notify-subscribers-interval`.
 The subscriber is automatically unsubscribed if the subscriber is terminated. 
A subscriber can
 also be de-registered with the `replicatorAdapter.unsubscribe(key)` function.
 
+In addition to subscribing to individual keys it is possible to subscribe to 
all keys with a given prefix
+by using a `*` at the end of the key `id`. For example 
`GCounterKey("counter-*")`. Notifications will be
+sent for all matching keys, also new keys added later.
+
 ### Delete
 
 A data entry can be deleted by sending a `Replicator.Delete` message to the 
local


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to