This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 15aeee6df9 Port CRDT key expiry from akka-core #31721 (#2733)
15aeee6df9 is described below
commit 15aeee6df9dfb6836aac3bd54876300c7b91ffad
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Mar 16 20:44:56 2026 +0100
Port CRDT key expiry from akka-core #31721 (#2733)
* Initial plan
* Port CRDT expiry feature from akka-core PR #31721
Co-authored-by: pjfanning <[email protected]>
* Fix code review issues: dbDelete error message, 0L consistency, isExpired
now param
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* Create crdt-expiry.excludes
* Update crdt-expiry.excludes
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../ddata/typed/internal/ReplicatorBehavior.scala | 1 +
.../cluster/ddata/typed/javadsl/Replicator.scala | 5 +
.../cluster/ddata/typed/scaladsl/Replicator.scala | 9 +
.../ddata/protobuf/msg/ReplicatorMessages.java | 198 +++++++++++++
.../2.0.x.backwards.excludes/crdt-expiry.excludes | 34 +++
.../src/main/protobuf/ReplicatorMessages.proto | 2 +
distributed-data/src/main/resources/reference.conf | 10 +
.../apache/pekko/cluster/ddata/DurableStore.scala | 60 +++-
.../apache/pekko/cluster/ddata/Replicator.scala | 311 +++++++++++++++++----
.../protobuf/ReplicatorMessageSerializer.scala | 23 +-
.../cluster/ddata/ReplicatorSettingsSpec.scala | 16 ++
.../protobuf/ReplicatorMessageSerializerSpec.scala | 21 +-
12 files changed, 619 insertions(+), 71 deletions(-)
diff --git
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala
index 9988fb3076..d245699ca8 100644
---
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala
+++
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala
@@ -159,6 +159,7 @@ import pekko.util.Timeout
rsp match {
case chg: dd.Replicator.Changed[_] => subscriber !
JReplicator.Changed(chg.key)(chg.dataValue)
case del: dd.Replicator.Deleted[_] => subscriber !
JReplicator.Deleted(del.key)
+ case exp: dd.Replicator.Expired[_] => subscriber !
JReplicator.Expired(exp.key)
}
Behaviors.same
diff --git
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/javadsl/Replicator.scala
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/javadsl/Replicator.scala
index bf391d57ac..c6b536a5d1 100644
---
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/javadsl/Replicator.scala
+++
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/javadsl/Replicator.scala
@@ -313,6 +313,11 @@ object Replicator {
*/
final case class Deleted[A <: ReplicatedData](key: Key[A]) extends
SubscribeResponse[A]
+ /**
+ * @see [[Replicator.Subscribe]]
+ */
+ final case class Expired[A <: ReplicatedData](key: Key[A]) extends
SubscribeResponse[A]
+
/**
* Send this message to the local `Replicator` to delete a data value for the
* given `key`. The `Replicator` will reply with one of the
[[DeleteResponse]] messages.
diff --git
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/scaladsl/Replicator.scala
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/scaladsl/Replicator.scala
index 36cf2f13cf..190c44c37f 100644
---
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/scaladsl/Replicator.scala
+++
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/scaladsl/Replicator.scala
@@ -301,6 +301,15 @@ object Replicator {
*/
type Deleted[A <: ReplicatedData] = dd.Replicator.Deleted[A]
+ object Expired {
+ def unapply[A <: ReplicatedData](expired: Expired[A]): Option[Key[A]] =
Some(expired.key)
+ }
+
+ /**
+ * @see [[Subscribe]]
+ */
+ type Expired[A <: ReplicatedData] = dd.Replicator.Expired[A]
+
object Delete {
/**
diff --git
a/distributed-data/src/main/java/org/apache/pekko/cluster/ddata/protobuf/msg/ReplicatorMessages.java
b/distributed-data/src/main/java/org/apache/pekko/cluster/ddata/protobuf/msg/ReplicatorMessages.java
index c39780857b..8fc406ea5c 100644
---
a/distributed-data/src/main/java/org/apache/pekko/cluster/ddata/protobuf/msg/ReplicatorMessages.java
+++
b/distributed-data/src/main/java/org/apache/pekko/cluster/ddata/protobuf/msg/ReplicatorMessages.java
@@ -11578,6 +11578,17 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
* @return The digest.
*/
org.apache.pekko.protobufv3.internal.ByteString getDigest();
+
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return Whether the usedTimestamp field is set.
+ */
+ boolean hasUsedTimestamp();
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return The usedTimestamp.
+ */
+ long getUsedTimestamp();
}
/**
* Protobuf type {@code org.apache.pekko.cluster.ddata.Status.Entry}
@@ -11687,6 +11698,25 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
return digest_;
}
+ public static final int USEDTIMESTAMP_FIELD_NUMBER = 3;
+ private long usedTimestamp_;
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return Whether the usedTimestamp field is set.
+ */
+ @java.lang.Override
+ public boolean hasUsedTimestamp() {
+ return ((bitField0_ & 0x00000004) != 0);
+ }
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return The usedTimestamp.
+ */
+ @java.lang.Override
+ public long getUsedTimestamp() {
+ return usedTimestamp_;
+ }
+
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
@@ -11715,6 +11745,9 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
if (((bitField0_ & 0x00000002) != 0)) {
output.writeBytes(2, digest_);
}
+ if (((bitField0_ & 0x00000004) != 0)) {
+ output.writeSInt64(3, usedTimestamp_);
+ }
getUnknownFields().writeTo(output);
}
@@ -11731,6 +11764,10 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
size += org.apache.pekko.protobufv3.internal.CodedOutputStream
.computeBytesSize(2, digest_);
}
+ if (((bitField0_ & 0x00000004) != 0)) {
+ size += org.apache.pekko.protobufv3.internal.CodedOutputStream
+ .computeSInt64Size(3, usedTimestamp_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSize = size;
return size;
@@ -11756,6 +11793,11 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
if (!getDigest()
.equals(other.getDigest())) return false;
}
+ if (hasUsedTimestamp() != other.hasUsedTimestamp()) return false;
+ if (hasUsedTimestamp()) {
+ if (getUsedTimestamp()
+ != other.getUsedTimestamp()) return false;
+ }
if (!getUnknownFields().equals(other.getUnknownFields())) return false;
return true;
}
@@ -11775,6 +11817,11 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
hash = (37 * hash) + DIGEST_FIELD_NUMBER;
hash = (53 * hash) + getDigest().hashCode();
}
+ if (hasUsedTimestamp()) {
+ hash = (37 * hash) + USEDTIMESTAMP_FIELD_NUMBER;
+ hash = (53 * hash) +
org.apache.pekko.protobufv3.internal.Internal.hashLong(
+ getUsedTimestamp());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -11908,6 +11955,7 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
bitField0_ = 0;
key_ = "";
digest_ = org.apache.pekko.protobufv3.internal.ByteString.EMPTY;
+ usedTimestamp_ = 0L;
return this;
}
@@ -11950,6 +11998,10 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
result.digest_ = digest_;
to_bitField0_ |= 0x00000002;
}
+ if (((from_bitField0_ & 0x00000004) != 0)) {
+ result.usedTimestamp_ = usedTimestamp_;
+ to_bitField0_ |= 0x00000004;
+ }
result.bitField0_ |= to_bitField0_;
}
@@ -11973,6 +12025,9 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
if (other.hasDigest()) {
setDigest(other.getDigest());
}
+ if (other.hasUsedTimestamp()) {
+ setUsedTimestamp(other.getUsedTimestamp());
+ }
this.mergeUnknownFields(other.getUnknownFields());
onChanged();
return this;
@@ -12015,6 +12070,11 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
bitField0_ |= 0x00000002;
break;
} // case 18
+ case 24: {
+ usedTimestamp_ = input.readSInt64();
+ bitField0_ |= 0x00000004;
+ break;
+ } // case 24
default: {
if (!super.parseUnknownField(input, extensionRegistry, tag))
{
done = true; // was an endgroup tag
@@ -12152,6 +12212,45 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
return this;
}
+ private long usedTimestamp_;
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return Whether the usedTimestamp field is set.
+ */
+ @java.lang.Override
+ public boolean hasUsedTimestamp() {
+ return ((bitField0_ & 0x00000004) != 0);
+ }
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return The usedTimestamp.
+ */
+ @java.lang.Override
+ public long getUsedTimestamp() {
+ return usedTimestamp_;
+ }
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @param value The usedTimestamp to set.
+ * @return This builder for chaining.
+ */
+ public Builder setUsedTimestamp(long value) {
+ usedTimestamp_ = value;
+ bitField0_ |= 0x00000004;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return This builder for chaining.
+ */
+ public Builder clearUsedTimestamp() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ usedTimestamp_ = 0L;
+ onChanged();
+ return this;
+ }
+
//
@@protoc_insertion_point(builder_scope:org.apache.pekko.cluster.ddata.Status.Entry)
}
@@ -13393,6 +13492,17 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
* <code>required .org.apache.pekko.cluster.ddata.DataEnvelope envelope
= 2;</code>
*/
org.apache.pekko.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelopeOrBuilder
getEnvelopeOrBuilder();
+
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return Whether the usedTimestamp field is set.
+ */
+ boolean hasUsedTimestamp();
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return The usedTimestamp.
+ */
+ long getUsedTimestamp();
}
/**
* Protobuf type {@code org.apache.pekko.cluster.ddata.Gossip.Entry}
@@ -13508,6 +13618,25 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
return envelope_ == null ?
org.apache.pekko.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.getDefaultInstance()
: envelope_;
}
+ public static final int USEDTIMESTAMP_FIELD_NUMBER = 3;
+ private long usedTimestamp_;
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return Whether the usedTimestamp field is set.
+ */
+ @java.lang.Override
+ public boolean hasUsedTimestamp() {
+ return ((bitField0_ & 0x00000004) != 0);
+ }
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return The usedTimestamp.
+ */
+ @java.lang.Override
+ public long getUsedTimestamp() {
+ return usedTimestamp_;
+ }
+
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
@@ -13540,6 +13669,9 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
if (((bitField0_ & 0x00000002) != 0)) {
output.writeMessage(2, getEnvelope());
}
+ if (((bitField0_ & 0x00000004) != 0)) {
+ output.writeSInt64(3, usedTimestamp_);
+ }
getUnknownFields().writeTo(output);
}
@@ -13556,6 +13688,10 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
size += org.apache.pekko.protobufv3.internal.CodedOutputStream
.computeMessageSize(2, getEnvelope());
}
+ if (((bitField0_ & 0x00000004) != 0)) {
+ size += org.apache.pekko.protobufv3.internal.CodedOutputStream
+ .computeSInt64Size(3, usedTimestamp_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSize = size;
return size;
@@ -13581,6 +13717,11 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
if (!getEnvelope()
.equals(other.getEnvelope())) return false;
}
+ if (hasUsedTimestamp() != other.hasUsedTimestamp()) return false;
+ if (hasUsedTimestamp()) {
+ if (getUsedTimestamp()
+ != other.getUsedTimestamp()) return false;
+ }
if (!getUnknownFields().equals(other.getUnknownFields())) return false;
return true;
}
@@ -13600,6 +13741,11 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
hash = (37 * hash) + ENVELOPE_FIELD_NUMBER;
hash = (53 * hash) + getEnvelope().hashCode();
}
+ if (hasUsedTimestamp()) {
+ hash = (37 * hash) + USEDTIMESTAMP_FIELD_NUMBER;
+ hash = (53 * hash) +
org.apache.pekko.protobufv3.internal.Internal.hashLong(
+ getUsedTimestamp());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -13743,6 +13889,7 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
envelopeBuilder_.dispose();
envelopeBuilder_ = null;
}
+ usedTimestamp_ = 0L;
return this;
}
@@ -13787,6 +13934,10 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
: envelopeBuilder_.build();
to_bitField0_ |= 0x00000002;
}
+ if (((from_bitField0_ & 0x00000004) != 0)) {
+ result.usedTimestamp_ = usedTimestamp_;
+ to_bitField0_ |= 0x00000004;
+ }
result.bitField0_ |= to_bitField0_;
}
@@ -13810,6 +13961,9 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
if (other.hasEnvelope()) {
mergeEnvelope(other.getEnvelope());
}
+ if (other.hasUsedTimestamp()) {
+ setUsedTimestamp(other.getUsedTimestamp());
+ }
this.mergeUnknownFields(other.getUnknownFields());
onChanged();
return this;
@@ -13857,6 +14011,11 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
bitField0_ |= 0x00000002;
break;
} // case 18
+ case 24: {
+ usedTimestamp_ = input.readSInt64();
+ bitField0_ |= 0x00000004;
+ break;
+ } // case 24
default: {
if (!super.parseUnknownField(input, extensionRegistry, tag))
{
done = true; // was an endgroup tag
@@ -14076,6 +14235,45 @@ public final class ReplicatorMessages extends
org.apache.pekko.protobufv3.intern
}
//
@@protoc_insertion_point(builder_scope:org.apache.pekko.cluster.ddata.Gossip.Entry)
+
+ private long usedTimestamp_;
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return Whether the usedTimestamp field is set.
+ */
+ @java.lang.Override
+ public boolean hasUsedTimestamp() {
+ return ((bitField0_ & 0x00000004) != 0);
+ }
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return The usedTimestamp.
+ */
+ @java.lang.Override
+ public long getUsedTimestamp() {
+ return usedTimestamp_;
+ }
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @param value The usedTimestamp to set.
+ * @return This builder for chaining.
+ */
+ public Builder setUsedTimestamp(long value) {
+ usedTimestamp_ = value;
+ bitField0_ |= 0x00000004;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional sint64 usedTimestamp = 3;</code>
+ * @return This builder for chaining.
+ */
+ public Builder clearUsedTimestamp() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ usedTimestamp_ = 0L;
+ onChanged();
+ return this;
+ }
}
//
@@protoc_insertion_point(class_scope:org.apache.pekko.cluster.ddata.Gossip.Entry)
diff --git
a/distributed-data/src/main/mima-filters/2.0.x.backwards.excludes/crdt-expiry.excludes
b/distributed-data/src/main/mima-filters/2.0.x.backwards.excludes/crdt-expiry.excludes
new file mode 100644
index 0000000000..b42e372aea
--- /dev/null
+++
b/distributed-data/src/main/mima-filters/2.0.x.backwards.excludes/crdt-expiry.excludes
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Support CRDT Key Expiry
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator.dataEntries")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator.dataEntries_=")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator.receiveStatus")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator.receiveGossip")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Gossip.updatedData")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Gossip.copy*")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Gossip.this")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Gossip.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Gossip.unapply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Gossip._1")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Status.digests")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Status.copy*")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Status.this")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Status.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Status.unapply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Status._1")
diff --git a/distributed-data/src/main/protobuf/ReplicatorMessages.proto
b/distributed-data/src/main/protobuf/ReplicatorMessages.proto
index 2471285636..240e490d93 100644
--- a/distributed-data/src/main/protobuf/ReplicatorMessages.proto
+++ b/distributed-data/src/main/protobuf/ReplicatorMessages.proto
@@ -96,6 +96,7 @@ message Status {
message Entry {
required string key = 1;
required bytes digest = 2;
+ optional sint64 usedTimestamp = 3;
}
required uint32 chunk = 1;
@@ -109,6 +110,7 @@ message Gossip {
message Entry {
required string key = 1;
required DataEnvelope envelope = 2;
+ optional sint64 usedTimestamp = 3;
}
required bool sendBack = 1;
diff --git a/distributed-data/src/main/resources/reference.conf
b/distributed-data/src/main/resources/reference.conf
index f7a5c5b5ad..49457a6fe1 100644
--- a/distributed-data/src/main/resources/reference.conf
+++ b/distributed-data/src/main/resources/reference.conf
@@ -81,6 +81,16 @@ pekko.cluster.distributed-data {
# This is number of elements or similar size hint, not size in bytes.
max-delta-size = 50
}
+
+ # Map of keys and inactivity duration for entries that will automatically be
removed
+ # without tombstones when they have been inactive for the given duration.
+ # Prefix matching is supported by using * at the end of a key.
+ # Matching tombstones will also be removed after the expiry duration.
+ expire-keys-after-inactivity {
+ # Example syntax:
+ # "key-1" = 10 minutes
+ # "cache-*" = 2 minutes
+ }
durable {
# List of keys that are durable. Prefix matching is supported by using *
at the
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/DurableStore.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/DurableStore.scala
index b7db7e1935..b894976894 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/DurableStore.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/DurableStore.scala
@@ -60,6 +60,9 @@ import com.typesafe.config.Config
* When the `Replicator` needs to store a value it sends a `Store` message
* to the durable store actor, which must then reply with the `successMsg` or
* `failureMsg` to the `replyTo`.
+ *
+ * When entries have expired the `Replicator` sends a `Expire` message to the
durable
+ * store actor, which can delete the entries from the backend store.
*/
object DurableStore {
@@ -87,6 +90,11 @@ object DurableStore {
def this(message: String) = this(message, null)
}
+ /**
+ * Request to expire (remove) entries.
+ */
+ final case class Expire(keys: Set[KeyId])
+
/**
* Wrapper class for serialization of a data value.
* The `ReplicatorMessageSerializer` will serialize/deserialize
@@ -149,7 +157,8 @@ final class LmdbDurableStore(config: Config) extends Actor
with ActorLogging {
private def lmdb(): Lmdb = _lmdb match {
case OptionVal.Some(l) => l
case _ =>
- val t0 = System.nanoTime()
+ val debugEnabled = log.isDebugEnabled
+ val t0 = if (debugEnabled) System.nanoTime() else 0L
log.info("Using durable data in LMDB directory [{}]",
dir.getCanonicalPath)
val env = {
val mapSize = config.getBytes("lmdb.map-size")
@@ -162,11 +171,11 @@ final class LmdbDurableStore(config: Config) extends
Actor with ActorLogging {
val keyBuffer = ByteBuffer.allocateDirect(env.getMaxKeySize)
val valueBuffer = ByteBuffer.allocateDirect(100 * 1024) // will grow
when needed
- if (log.isDebugEnabled)
+ if (debugEnabled)
log.debug(
"Init of LMDB in directory [{}] took [{} ms]",
dir.getCanonicalPath,
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0))
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0))
val l = Lmdb(env, db, keyBuffer, valueBuffer)
_lmdb = OptionVal.Some(l)
l
@@ -208,8 +217,9 @@ final class LmdbDurableStore(config: Config) extends Actor
with ActorLogging {
def init: Receive = {
case LoadAll =>
if (dir.exists && dir.list().length > 0) {
+ val debugEnabled = log.isDebugEnabled
+ val t0 = if (debugEnabled) System.nanoTime() else 0L
val l = lmdb()
- val t0 = System.nanoTime()
val tx = l.env.txnRead()
try {
val iter = l.db.iterate(tx)
@@ -228,8 +238,11 @@ final class LmdbDurableStore(config: Config) extends Actor
with ActorLogging {
if (loadData.data.nonEmpty)
sender() ! loadData
sender() ! LoadAllCompleted
- if (log.isDebugEnabled)
- log.debug("load all of [{}] entries took [{} ms]", n,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0))
+ if (debugEnabled)
+ log.debug(
+ "load all of [{}] entries took [{} ms]",
+ n,
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0))
context.become(active)
} finally {
Try(iter.close())
@@ -275,6 +288,9 @@ final class LmdbDurableStore(config: Config) extends Actor
with ActorLogging {
case WriteBehind =>
writeBehind()
+
+ case Expire(keys) =>
+ dbDelete(keys)
}
def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: KeyId, data:
DurableDataEnvelope): Unit = {
@@ -297,7 +313,8 @@ final class LmdbDurableStore(config: Config) extends Actor
with ActorLogging {
def writeBehind(): Unit = {
if (!pending.isEmpty()) {
- val t0 = System.nanoTime()
+ val debugEnabled = log.isDebugEnabled
+ val t0 = if (debugEnabled) System.nanoTime() else 0L
val tx = lmdb().env.txnWrite()
try {
val iter = pending.entrySet.iterator
@@ -306,11 +323,11 @@ final class LmdbDurableStore(config: Config) extends
Actor with ActorLogging {
dbPut(OptionVal.Some(tx), entry.getKey, entry.getValue)
}
tx.commit()
- if (log.isDebugEnabled)
+ if (debugEnabled)
log.debug(
"store and commit of [{}] entries took [{} ms]",
pending.size,
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0))
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0))
} catch {
case NonFatal(e) =>
import scala.jdk.CollectionConverters._
@@ -322,4 +339,29 @@ final class LmdbDurableStore(config: Config) extends Actor
with ActorLogging {
}
}
+ def dbDelete(keys: Set[KeyId]): Unit = {
+ val debugEnabled = log.isDebugEnabled
+ val t0 = if (debugEnabled) System.nanoTime() else 0L
+ val l = lmdb()
+ val tx = lmdb().env.txnWrite()
+ try {
+ keys.foreach { key =>
+ l.keyBuffer.put(key.getBytes(ByteString.UTF_8)).flip()
+ l.db.delete(tx, l.keyBuffer)
+ }
+ tx.commit()
+ if (debugEnabled)
+ log.debug(
+ "delete and commit of [{}] entries took [{} ms]",
+ keys.size,
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0))
+ } catch {
+ case NonFatal(e) =>
+ log.error(e, "failed to delete [{}]", keys.mkString(","))
+ tx.abort()
+ } finally {
+ l.keyBuffer.clear()
+ }
+ }
+
}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
index b4b69cab6b..bb94ca7e8b 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
@@ -112,7 +112,25 @@ object ReplicatorSettings {
deltaCrdtEnabled = config.getBoolean("delta-crdt.enabled"),
maxDeltaSize = config.getInt("delta-crdt.max-delta-size"),
preferOldest = config.getBoolean("prefer-oldest"),
- logDataSizeExceeding = logDataSizeExceeding)
+ logDataSizeExceeding = logDataSizeExceeding,
+ expiryKeys = parseExpiry(config))
+ }
+
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] def parseExpiry(config: Config): Map[KeyId,
FiniteDuration] = {
+ import scala.jdk.CollectionConverters._
+ val expiryConfig = config.getConfig("expire-keys-after-inactivity")
+ expiryConfig.root
+ .keySet()
+ .iterator()
+ .asScala
+ .map { key =>
+ val quotedKey = s""""$key"""" // must use quotes because of the
wildcard *
+ key -> expiryConfig.getDuration(quotedKey, MILLISECONDS).millis
+ }
+ .toMap
}
/**
@@ -158,6 +176,10 @@ object ReplicatorSettings {
* in the `Set`.
* @param preferOldest Update and Get operations are sent to oldest nodes
first.
* @param logDataSizeExceeding Log data size.
+ * @param expiryKeys Map of keys and inactivity duration for entries that will
automatically be removed
+ * without tombstones when they have been inactive for the given
duration.
+ * Prefix matching is supported by using * at the end of a key.
+ * Matching tombstones will also be removed after the expiry duration.
*/
final class ReplicatorSettings(
val roles: Set[String],
@@ -174,7 +196,8 @@ final class ReplicatorSettings(
val deltaCrdtEnabled: Boolean,
val maxDeltaSize: Int,
val preferOldest: Boolean,
- val logDataSizeExceeding: Option[Int]) {
+ val logDataSizeExceeding: Option[Int],
+ val expiryKeys: Map[KeyId, FiniteDuration]) {
def withRole(role: String): ReplicatorSettings = copy(roles =
ReplicatorSettings.roleOption(role).toSet)
@@ -247,6 +270,55 @@ final class ReplicatorSettings(
def withLogDataSizeExceeding(logDataSizeExceeding: Int): ReplicatorSettings =
copy(logDataSizeExceeding = Some(logDataSizeExceeding))
+ // for backwards compatibility
+ def this(
+ roles: Set[String],
+ gossipInterval: FiniteDuration,
+ notifySubscribersInterval: FiniteDuration,
+ maxDeltaElements: Int,
+ dispatcher: String,
+ pruningInterval: FiniteDuration,
+ maxPruningDissemination: FiniteDuration,
+ durableStoreProps: Either[(String, Config), Props],
+ durableKeys: Set[KeyId],
+ pruningMarkerTimeToLive: FiniteDuration,
+ durablePruningMarkerTimeToLive: FiniteDuration,
+ deltaCrdtEnabled: Boolean,
+ maxDeltaSize: Int,
+ preferOldest: Boolean,
+ logDataSizeExceeding: Option[Int]) =
+ this(
+ roles,
+ gossipInterval,
+ notifySubscribersInterval,
+ maxDeltaElements,
+ dispatcher,
+ pruningInterval,
+ maxPruningDissemination,
+ durableStoreProps,
+ durableKeys,
+ pruningMarkerTimeToLive,
+ durablePruningMarkerTimeToLive,
+ deltaCrdtEnabled,
+ maxDeltaSize,
+ preferOldest,
+ logDataSizeExceeding,
+ expiryKeys = Map.empty)
+
+ /**
+ * Scala API
+ */
+ def withExpiryKeys(expiryKeys: Map[KeyId, FiniteDuration]):
ReplicatorSettings =
+ copy(expiryKeys = expiryKeys)
+
+ /**
+ * Java API
+ */
+ def withExpiryKeys(expiryKeys: java.util.Map[String, java.time.Duration]):
ReplicatorSettings = {
+ import scala.jdk.CollectionConverters._
+ withExpiryKeys(expiryKeys.asScala.iterator.map { case (key, value) => key
-> value.toScala }.toMap)
+ }
+
private def copy(
roles: Set[String] = roles,
gossipInterval: FiniteDuration = gossipInterval,
@@ -262,7 +334,8 @@ final class ReplicatorSettings(
deltaCrdtEnabled: Boolean = deltaCrdtEnabled,
maxDeltaSize: Int = maxDeltaSize,
preferOldest: Boolean = preferOldest,
- logDataSizeExceeding: Option[Int] = logDataSizeExceeding):
ReplicatorSettings =
+ logDataSizeExceeding: Option[Int] = logDataSizeExceeding,
+ expiryKeys: Map[KeyId, FiniteDuration] = expiryKeys): ReplicatorSettings
=
new ReplicatorSettings(
roles,
gossipInterval,
@@ -278,10 +351,12 @@ final class ReplicatorSettings(
deltaCrdtEnabled,
maxDeltaSize,
preferOldest,
- logDataSizeExceeding)
+ logDataSizeExceeding,
+ expiryKeys)
}
object Replicator {
+ private type Timestamp = Long
/**
* Factory method for the [[pekko.actor.Props]] of the [[Replicator]] actor.
@@ -500,6 +575,9 @@ object Replicator {
*
* If the key is deleted the subscriber is notified with a [[Deleted]]
* message.
+ *
+ * If the key is expired the subscriber is notified with an [[Expired]]
+ * message.
*/
final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber:
ActorRef) extends ReplicatorMessage
@@ -546,6 +624,11 @@ object Replicator {
*/
final case class Deleted[A <: ReplicatedData](key: Key[A]) extends
SubscribeResponse[A]
+ /**
+ * @see [[Replicator.Subscribe]]
+ */
+ final case class Expired[A <: ReplicatedData](key: Key[A]) extends
SubscribeResponse[A]
+
object Update {
/**
@@ -932,7 +1015,7 @@ object Replicator {
}
final case class Status(
- digests: Map[KeyId, Digest],
+ digests: Map[KeyId, (Digest, Timestamp)],
chunk: Int,
totChunks: Int,
toSystemUid: Option[Long],
@@ -942,12 +1025,12 @@ object Replicator {
override def toString: String =
digests
.map {
- case (key, bytes) => key + " -> " + bytes.map(byte =>
f"$byte%02x").mkString("")
+ case (key, (bytes, _)) => key + " -> " + bytes.map(byte =>
f"$byte%02x").mkString("")
}
.mkString("Status(", ", ", ")")
}
final case class Gossip(
- updatedData: Map[KeyId, DataEnvelope],
+ updatedData: Map[KeyId, (DataEnvelope, Timestamp)],
sendBack: Boolean,
toSystemUid: Option[Long],
fromSystemUid: Option[Long])
@@ -1227,6 +1310,11 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
val serializer =
SerializationExtension(context.system).serializerFor(classOf[DataEnvelope])
val maxPruningDisseminationNanos = maxPruningDissemination.toNanos
+ val expiryWildcards = settings.expiryKeys.collect { case (k, v) if
k.endsWith("*") => k.dropRight(1) -> v }
+ val expiryEnabled: Boolean = settings.expiryKeys.nonEmpty
+ // updated on the gossip tick to avoid too many calls to
`currentTimeMillis()`
+ private var currentUsedTimestamp = if (expiryEnabled)
System.currentTimeMillis() else 0L
+
val hasDurableKeys = settings.durableKeys.nonEmpty
val durable = settings.durableKeys.filterNot(_.endsWith("*"))
val durableWildcards = settings.durableKeys.collect { case k if
k.endsWith("*") => k.dropRight(1) }
@@ -1311,8 +1399,8 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
var unreachable = Set.empty[UniqueAddress]
// the actual data
- var dataEntries = Map.empty[KeyId, (DataEnvelope, Digest)]
- // keys that have changed, Changed event published to subscribers on
FlushChanges
+ var dataEntries = Map.empty[KeyId, (DataEnvelope, Digest, Timestamp)]
+ // keys that have changed, Changed, Deleted, Expired event published to
subscribers on FlushChanges
var changed = Set.empty[KeyId]
// for splitting up gossip in chunks
@@ -1514,6 +1602,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
def receiveGet(key: KeyR, consistency: ReadConsistency, req: Option[Any]):
Unit = {
val localValue = getData(key.id)
log.debug("Received Get for key [{}].", key)
+ updateUsedTimestamp(key.id, currentUsedTimestamp)
if (isLocalGet(consistency)) {
val reply = localValue match {
case Some(DataEnvelope(DeletedData, _, _)) => GetDataDeleted(key, req)
@@ -1604,6 +1693,8 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
// so that the latest delta version is used
val newEnvelope = setData(key.id, envelope)
+ updateUsedTimestamp(key.id, currentUsedTimestamp)
+
val durable = isDurable(key.id)
if (isLocalUpdate(writeConsistency)) {
if (durable)
@@ -1724,7 +1815,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
def receiveGetKeyIds(): Unit = {
val keys: Set[KeyId] = dataEntries.iterator
.collect {
- case (key, (DataEnvelope(data, _, _), _)) if data != DeletedData => key
+ case (key, (DataEnvelope(data, _, _), _, _)) if data != DeletedData =>
key
}
.to(immutable.Set)
replyTo ! GetKeyIdsResult(keys)
@@ -1737,6 +1828,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
replyTo ! DataDeleted(key, req)
case _ =>
setData(key.id, DeletedEnvelope)
+ updateUsedTimestamp(key.id, currentUsedTimestamp)
payloadSizeAggregator.remove(key.id)
val durable = isDurable(key.id)
if (isLocalUpdate(consistency)) {
@@ -1801,7 +1893,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
} else if (newEnvelope.data == DeletedData) DeletedDigest
else LazyDigest
- dataEntries = dataEntries.updated(key, (newEnvelope, dig))
+ dataEntries = dataEntries.updated(key, (newEnvelope, dig,
getUsedTimestamp(key)))
if (newEnvelope.data == DeletedData)
deltaPropagationSelector.delete(key)
newEnvelope
@@ -1809,13 +1901,13 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
def getDigest(key: KeyId): Digest = {
dataEntries.get(key) match {
- case Some((envelope, LazyDigest)) =>
+ case Some((envelope, LazyDigest, usedTimestamp)) =>
val (d, size) = digest(envelope)
payloadSizeAggregator.updatePayloadSize(key, size)
- dataEntries = dataEntries.updated(key, (envelope, d))
+ dataEntries = dataEntries.updated(key, (envelope, d, usedTimestamp))
d
- case Some((_, digest)) => digest
- case None => NotFoundDigest
+ case Some((_, digest, _)) => digest
+ case None => NotFoundDigest
}
}
@@ -1830,44 +1922,103 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
(dig, bytes.length)
}
- def getData(key: KeyId): Option[DataEnvelope] = dataEntries.get(key).map {
case (envelope, _) => envelope }
+ def getData(key: KeyId): Option[DataEnvelope] =
+ dataEntries.get(key).map { case (envelope, _, _) => envelope }
+
+ def getExpiryDuration(key: KeyId): FiniteDuration = {
+ if (expiryEnabled) {
+ settings.expiryKeys.get(key) match {
+ case Some(d) => d
+ case None if expiryWildcards.isEmpty => Duration.Zero
+ case None =>
+ expiryWildcards
+ .collectFirst {
+ case (k, v) if key.startsWith(k) => v
+ }
+ .getOrElse(Duration.Zero)
+ }
+ } else {
+ Duration.Zero
+ }
+ }
+
+ def getUsedTimestamp(key: KeyId): Timestamp = {
+ if (expiryEnabled) {
+ dataEntries.get(key) match {
+ case Some((_, _, usedTimestamp)) => usedTimestamp
+ case None => 0L
+ }
+ } else {
+ 0L
+ }
+ }
+
+ def isExpired(key: KeyId): Boolean = {
+ val now = System.currentTimeMillis()
+ isExpired(key, getUsedTimestamp(key), now)
+ }
+
+ def isExpired(key: KeyId, timestamp: Timestamp): Boolean = {
+ isExpired(key, timestamp, System.currentTimeMillis())
+ }
+
+ def isExpired(key: KeyId, timestamp: Timestamp, now: Long): Boolean = {
+ expiryEnabled && timestamp != 0L && timestamp <= now -
getExpiryDuration(key).toMillis
+ }
+
+ def updateUsedTimestamp(key: KeyId, timestamp: Timestamp): Unit = {
+ if (expiryEnabled && timestamp != 0L) {
+ dataEntries.get(key).foreach {
+ case (existingEnvelope, existingDigest, existingTimestamp) =>
+ if (timestamp > existingTimestamp)
+ dataEntries = dataEntries.updated(key, (existingEnvelope,
existingDigest, timestamp))
+ }
+ }
+ }
def getDeltaSeqNr(key: KeyId, fromNode: UniqueAddress): Long =
dataEntries.get(key) match {
- case Some((DataEnvelope(_, _, deltaVersions), _)) =>
deltaVersions.versionAt(fromNode)
- case None => 0L
+ case Some((DataEnvelope(_, _, deltaVersions), _, _)) =>
deltaVersions.versionAt(fromNode)
+ case None => 0L
}
def isNodeRemoved(node: UniqueAddress, keys: Iterable[KeyId]): Boolean = {
removedNodes.contains(node) ||
(keys.exists(key =>
dataEntries.get(key) match {
- case Some((DataEnvelope(_, pruning, _), _)) => pruning.contains(node)
- case None => false
+ case Some((DataEnvelope(_, pruning, _), _, _)) =>
pruning.contains(node)
+ case None => false
}))
}
+ @nowarn("msg=deprecated")
def receiveFlushChanges(): Unit = {
- def notify(keyId: KeyId, subs: mutable.Set[ActorRef]): Unit = {
+ def notify(keyId: KeyId, subs: mutable.Set[ActorRef],
sendExpiredIfMissing: Boolean): Unit = {
val key = subscriptionKeys(keyId)
getData(keyId) match {
case Some(envelope) =>
val msg = if (envelope.data == DeletedData) Deleted(key) else
Changed(key)(envelope.data)
subs.foreach { _ ! msg }
case None =>
+ if (sendExpiredIfMissing) {
+ val msg = Expired(key)
+ subs.foreach {
+ _ ! msg
+ }
+ }
}
}
if (subscribers.nonEmpty) {
for (key <- changed; if subscribers.contains(key); subs <-
subscribers.get(key))
- notify(key, subs)
+ notify(key, subs, sendExpiredIfMissing = true)
}
// Changed event is sent to new subscribers even though the key has not
changed,
- // i.e. send current value
+ // i.e. send current value. Expired is not sent to new subscribers as the
first event.
if (newSubscribers.nonEmpty) {
for ((key, subs) <- newSubscribers) {
- notify(key, subs)
+ notify(key, subs, sendExpiredIfMissing = false)
subs.foreach { subscribers.addBinding(key, _) }
}
newSubscribers.clear()
@@ -1953,6 +2104,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
}
def receiveGossipTick(): Unit = {
+ cleanupExpired()
if (fullStateGossipEnabled)
selectRandomNode(allNodes.toVector).foreach(gossipTo)
}
@@ -1961,12 +2113,9 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
val to = replica(address)
val toSystemUid = Some(address.longUid)
if (dataEntries.size <= maxDeltaElements) {
- val status = Status(
- dataEntries.map { case (key, (_, _)) => (key, getDigest(key)) },
- chunk = 0,
- totChunks = 1,
- toSystemUid,
- selfFromSystemUid)
+ val status = Status(dataEntries.map {
+ case (key, (_, _, usedTimestamp)) => (key, (getDigest(key),
usedTimestamp))
+ }, chunk = 0, totChunks = 1, toSystemUid, selfFromSystemUid)
to ! status
} else {
val totChunks = dataEntries.size / maxDeltaElements
@@ -1981,7 +2130,8 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
}
val chunk = (statusCount % totChunks).toInt
val status = Status(dataEntries.collect {
- case (key, (_, _)) if math.abs(key.hashCode % totChunks) == chunk
=> (key, getDigest(key))
+ case (key, (_, _, usedTimestamp)) if math.abs(key.hashCode %
totChunks) == chunk =>
+ (key, (getDigest(key), usedTimestamp))
}, chunk, totChunks, toSystemUid, selfFromSystemUid)
to ! status
i += 1
@@ -1995,7 +2145,11 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
def replica(node: UniqueAddress): ActorSelection =
context.actorSelection(self.path.toStringWithAddress(node.address))
- def receiveStatus(otherDigests: Map[KeyId, Digest], chunk: Int, totChunks:
Int, fromSystemUid: Option[Long]): Unit = {
+ def receiveStatus(
+ otherDigests: Map[KeyId, (Digest, Timestamp)],
+ chunk: Int,
+ totChunks: Int,
+ fromSystemUid: Option[Long]): Unit = {
if (log.isDebugEnabled)
log.debug(
"Received gossip status from [{}], chunk [{}] of [{}] containing
[{}].",
@@ -2004,18 +2158,32 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
totChunks,
otherDigests.keys.mkString(", "))
+ // update the usedTimestamp when needed
+ if (expiryEnabled) {
+ otherDigests.foreach {
+ case (key, (_, usedTimestamp)) =>
+ updateUsedTimestamp(key, usedTimestamp)
+ // if we don't have the key it will be updated with the full Gossip
+ }
+ }
+
def isOtherDifferent(key: KeyId, otherDigest: Digest): Boolean = {
val d = getDigest(key)
d != NotFoundDigest && d != otherDigest
}
val otherDifferentKeys = otherDigests.collect {
- case (key, otherDigest) if isOtherDifferent(key, otherDigest) => key
+ case (key, (otherDigest, _)) if isOtherDifferent(key, otherDigest) => key
}
val otherKeys = otherDigests.keySet
val myKeys =
if (totChunks == 1) dataEntries.keySet
else dataEntries.keysIterator.filter(key => math.abs(key.hashCode %
totChunks) == chunk).toSet
- val otherMissingKeys = myKeys.diff(otherKeys)
+ val otherMissingKeys =
+ if (expiryEnabled) {
+ val now = System.currentTimeMillis()
+ myKeys.diff(otherKeys).filterNot(key => isExpired(key,
getUsedTimestamp(key), now))
+ } else
+ myKeys.diff(otherKeys)
val keys = (otherDifferentKeys ++ otherMissingKeys).take(maxDeltaElements)
if (keys.nonEmpty) {
if (log.isDebugEnabled)
@@ -2033,7 +2201,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
replyTo.path.address,
myMissingKeys.mkString(", "))
val status = Status(
- myMissingKeys.iterator.map(k => k -> NotFoundDigest).toMap,
+ myMissingKeys.iterator.map(k => k -> (NotFoundDigest -> 0L)).toMap,
chunk,
totChunks,
fromSystemUid,
@@ -2051,7 +2219,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
val maxMessageSize = payloadSizeAggregator.maxFrameSize - 128
var messages = Vector.empty[Gossip]
- val collectedEntries = Vector.newBuilder[(KeyId, DataEnvelope)]
+ val collectedEntries = Vector.newBuilder[(KeyId, (DataEnvelope,
Timestamp))]
var sum = 0
def addGossip(): Unit = {
@@ -2074,12 +2242,12 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
val entrySize = keySize + dataSize + envelopeSize
if (sum + entrySize <= maxMessageSize) {
- collectedEntries += (key -> dataEnvelope)
+ collectedEntries += (key -> (dataEnvelope -> getUsedTimestamp(key)))
sum += entrySize
} else {
addGossip()
collectedEntries.clear()
- collectedEntries += (key -> dataEnvelope)
+ collectedEntries += (key -> (dataEnvelope -> getUsedTimestamp(key)))
sum = entrySize
}
}
@@ -2092,19 +2260,27 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
messages
}
- def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean,
fromSystemUid: Option[Long]): Unit = {
+ def receiveGossip(
+ updatedData: Map[KeyId, (DataEnvelope, Timestamp)],
+ sendBack: Boolean,
+ fromSystemUid: Option[Long]): Unit = {
if (log.isDebugEnabled)
log.debug("Received gossip from [{}], containing [{}].",
replyTo.path.address, updatedData.keys.mkString(", "))
var replyKeys = Set.empty[KeyId]
+ val now = if (expiryEnabled) System.currentTimeMillis() else 0L
updatedData.foreach {
- case (key, envelope) =>
- val hadData = dataEntries.contains(key)
- writeAndStore(key, envelope, reply = false)
- if (sendBack) getData(key) match {
- case Some(d) =>
- if (hadData || d.pruning.nonEmpty)
- replyKeys += key
- case None =>
+ case (key, (envelope, usedTimestamp)) =>
+ if (!isExpired(key, usedTimestamp, now)) {
+ val hadData = dataEntries.contains(key)
+ writeAndStore(key, envelope, reply = false)
+ updateUsedTimestamp(key, usedTimestamp)
+
+ if (sendBack) getData(key) match {
+ case Some(d) =>
+ if (hadData || d.pruning.nonEmpty)
+ replyKeys += key
+ case None =>
+ }
}
}
if (sendBack && replyKeys.nonEmpty) {
@@ -2121,6 +2297,39 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
context.watch(subscriber)
}
+ private def cleanupExpired(): Unit = {
+ if (expiryEnabled) {
+ val now = System.currentTimeMillis()
+ // no need to be more accurate than the gossip tick interval
+ currentUsedTimestamp = now
+ val expiredKeys = dataEntries.collect {
+ // it can be 0L when it was set via a DeltaPropagation or Write first
time, don't expire such immediately
+ case (key, (_, _, usedTimestamp))
+ if usedTimestamp != 0L &&
+ getExpiryDuration(key) != Duration.Zero &&
+ usedTimestamp <= now - getExpiryDuration(key).toMillis =>
+ key
+ }
+
+ if (expiredKeys.nonEmpty) {
+ if (log.isDebugEnabled)
+ log.debug("Removing expired keys [{}]", expiredKeys.mkString(", "))
+
+ val durableExpiredKeys = expiredKeys.filter(isDurable).toSet
+ if (durableExpiredKeys.nonEmpty)
+ durableStore ! DurableStore.Expire(durableExpiredKeys)
+
+ expiredKeys.foreach { key =>
+ deltaPropagationSelector.delete(key)
+ payloadSizeAggregator.remove(key)
+ changed += key // notify subscribers, later
+ }
+
+ dataEntries = dataEntries -- expiredKeys
+ }
+ }
+ }
+
def receiveUnsubscribe(key: KeyR, subscriber: ActorRef): Unit = {
subscribers.removeBinding(key.id, subscriber)
newSubscribers.removeBinding(key.id, subscriber)
@@ -2236,7 +2445,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
val knownNodes = allNodes.union(removedNodes.keySet)
val newRemovedNodes =
dataEntries.foldLeft(Set.empty[UniqueAddress]) {
- case (acc, (_, (DataEnvelope(data: RemovedNodePruning, _, _), _))) =>
+ case (acc, (_, (DataEnvelope(data: RemovedNodePruning, _, _), _, _)))
=>
acc.union(data.modifiedByNodes.filterNot(n => n == selfUniqueAddress
|| knownNodes(n)))
case (acc, _) =>
acc
@@ -2257,7 +2466,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
.to(immutable.Set)
if (removedSet.nonEmpty) {
- for ((key, (envelope, _)) <- dataEntries; removed <- removedSet) {
+ for ((key, (envelope, _, _)) <- dataEntries; removed <- removedSet) {
def init(): Unit = {
val newEnvelope = envelope.initRemovedNodePruning(removed,
selfUniqueAddress)
@@ -2286,7 +2495,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
val pruningPerformed = PruningPerformed(System.currentTimeMillis() +
pruningMarkerTimeToLive.toMillis)
val durablePruningPerformed = PruningPerformed(System.currentTimeMillis()
+ durablePruningMarkerTimeToLive.toMillis)
dataEntries.foreach {
- case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, pruning,
_), _)) =>
+ case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, pruning,
_), _, _)) =>
pruning.foreach {
case (removed, PruningInitialized(owner, seen))
if owner == selfUniqueAddress
@@ -2305,7 +2514,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
def deleteObsoletePruningPerformed(): Unit = {
val currentTime = System.currentTimeMillis()
dataEntries.foreach {
- case (key, (envelope @ DataEnvelope(_: RemovedNodePruning, pruning, _),
_)) =>
+ case (key, (envelope @ DataEnvelope(_: RemovedNodePruning, pruning, _),
_, _)) =>
val newEnvelope = pruning.foldLeft(envelope) {
case (acc, (removed, p: PruningPerformed)) if
p.isObsolete(currentTime) =>
log.debug("Removing obsolete pruning marker for [{}] in [{}]",
removed, key)
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
index 9af89a47f8..bb45747aca 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
@@ -274,12 +274,15 @@ class ReplicatorMessageSerializer(val system:
ExtendedActorSystem)
val b = dm.Status.newBuilder()
b.setChunk(status.chunk).setTotChunks(status.totChunks)
status.digests.foreach {
- case (key, digest) =>
- b.addEntries(
+ case (key, (digest, usedTimestamp)) =>
+ val entryBuilder =
dm.Status.Entry
.newBuilder()
.setKey(key)
-
.setDigest(ByteStringUtils.toProtoByteStringUnsafe(digest.toArrayUnsafe())))
+
.setDigest(ByteStringUtils.toProtoByteStringUnsafe(digest.toArrayUnsafe()))
+ if (usedTimestamp != 0L)
+ entryBuilder.setUsedTimestamp(usedTimestamp)
+ b.addEntries(entryBuilder)
}
status.toSystemUid.foreach(b.setToSystemUid) // can be None when sending
back to a node of version 2.5.21
b.setFromSystemUid(status.fromSystemUid.get)
@@ -292,7 +295,7 @@ class ReplicatorMessageSerializer(val system:
ExtendedActorSystem)
val fromSystemUid = if (status.hasFromSystemUid)
Some(status.getFromSystemUid) else None
Status(
status.getEntriesList.asScala.iterator
- .map(e => e.getKey ->
PekkoByteString.fromArrayUnsafe(e.getDigest.toByteArray()))
+ .map(e => e.getKey ->
(PekkoByteString.fromArrayUnsafe(e.getDigest.toByteArray()) ->
e.getUsedTimestamp))
.toMap,
status.getChunk,
status.getTotChunks,
@@ -303,8 +306,12 @@ class ReplicatorMessageSerializer(val system:
ExtendedActorSystem)
private def gossipToProto(gossip: Gossip): dm.Gossip = {
val b = dm.Gossip.newBuilder().setSendBack(gossip.sendBack)
gossip.updatedData.foreach {
- case (key, data) =>
-
b.addEntries(dm.Gossip.Entry.newBuilder().setKey(key).setEnvelope(dataEnvelopeToProto(data)))
+ case (key, (data, usedTimestamp)) =>
+ val entryBuilder =
+
dm.Gossip.Entry.newBuilder().setKey(key).setEnvelope(dataEnvelopeToProto(data))
+ if (usedTimestamp != 0L)
+ entryBuilder.setUsedTimestamp(usedTimestamp)
+ b.addEntries(entryBuilder)
}
gossip.toSystemUid.foreach(b.setToSystemUid) // can be None when sending
back to a node of version 2.5.21
b.setFromSystemUid(gossip.fromSystemUid.get)
@@ -316,7 +323,9 @@ class ReplicatorMessageSerializer(val system:
ExtendedActorSystem)
val toSystemUid = if (gossip.hasToSystemUid) Some(gossip.getToSystemUid)
else None
val fromSystemUid = if (gossip.hasFromSystemUid)
Some(gossip.getFromSystemUid) else None
Gossip(
- gossip.getEntriesList.asScala.iterator.map(e => e.getKey ->
dataEnvelopeFromProto(e.getEnvelope)).toMap,
+ gossip.getEntriesList.asScala.iterator
+ .map(e => e.getKey -> (dataEnvelopeFromProto(e.getEnvelope) ->
e.getUsedTimestamp))
+ .toMap,
sendBack = gossip.getSendBack,
toSystemUid,
fromSystemUid)
diff --git
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorSettingsSpec.scala
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorSettingsSpec.scala
index 17b02d371f..7ce2daa36a 100644
---
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorSettingsSpec.scala
+++
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorSettingsSpec.scala
@@ -41,5 +41,21 @@ class ReplicatorSettingsSpec
"have the prefixed replicator name" in {
ReplicatorSettings.name(system, Some("other")) should
===("otherDdataReplicator")
}
+ "parse expire-keys-after-inactivity config" in {
+ val config = ConfigFactory.parseString("""
+ pekko.cluster.distributed-data.expire-keys-after-inactivity {
+ "key-1" = 10 minutes
+ "cache-*" = 2 minutes
+ }
+
""").withFallback(ReplicatorSettingsSpec.config).withFallback(system.settings.config)
+ val settings =
ReplicatorSettings(config.getConfig("pekko.cluster.distributed-data"))
+ settings.expiryKeys should have size 2
+ settings.expiryKeys("key-1").toMinutes should ===(10L)
+ settings.expiryKeys("cache-*").toMinutes should ===(2L)
+ }
+ "have empty expiry keys by default" in {
+ val settings = ReplicatorSettings(system)
+ settings.expiryKeys should be(empty)
+ }
}
}
diff --git
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
index a54adb163d..9a5648a414 100644
---
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
+++
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
@@ -123,30 +123,43 @@ class ReplicatorMessageSerializerSpec
checkSerialization(ReadResult(None))
checkSerialization(
Status(
- Map("A" -> ByteString.fromString("a"), "B" ->
ByteString.fromString("b")),
+ Map("A" -> (ByteString.fromString("a") -> 0L), "B" ->
(ByteString.fromString("b") -> 0L)),
chunk = 3,
totChunks = 10,
Some(17),
Some(19)))
checkSerialization(
Status(
- Map("A" -> ByteString.fromString("a"), "B" ->
ByteString.fromString("b")),
+ Map("A" -> (ByteString.fromString("a") -> 0L), "B" ->
(ByteString.fromString("b") -> 0L)),
chunk = 3,
totChunks = 10,
None, // can be None when sending back to a node of version 2.5.21
Some(19)))
+ checkSerialization(
+ Status(
+ Map("A" -> (ByteString.fromString("a") -> 12345L), "B" ->
(ByteString.fromString("b") -> 67890L)),
+ chunk = 3,
+ totChunks = 10,
+ Some(17),
+ Some(19)))
checkSerialization(
Gossip(
- Map("A" -> DataEnvelope(data1), "B" -> DataEnvelope(GSet() + "b" +
"c")),
+ Map("A" -> (DataEnvelope(data1) -> 0L), "B" -> (DataEnvelope(GSet()
+ "b" + "c") -> 0L)),
sendBack = true,
Some(17),
Some(19)))
checkSerialization(
Gossip(
- Map("A" -> DataEnvelope(data1), "B" -> DataEnvelope(GSet() + "b" +
"c")),
+ Map("A" -> (DataEnvelope(data1) -> 0L), "B" -> (DataEnvelope(GSet()
+ "b" + "c") -> 0L)),
sendBack = true,
None, // can be None when sending back to a node of version 2.5.21
Some(19)))
+ checkSerialization(
+ Gossip(
+ Map("A" -> (DataEnvelope(data1) -> 12345L), "B" ->
(DataEnvelope(GSet() + "b" + "c") -> 67890L)),
+ sendBack = true,
+ Some(17),
+ Some(19)))
checkSerialization(
DeltaPropagation(
address1,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]