Copilot commented on code in PR #2797:
URL: https://github.com/apache/pekko/pull/2797#discussion_r3004479612
##########
persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala:
##########
@@ -38,24 +38,12 @@ class RetentionCriteriaSpec extends TestSuite with Matchers
with AnyWordSpecLike
"have valid sequenceNr range based on keepNSnapshots" in {
val criteria = RetentionCriteria.snapshotEvery(3,
2).asInstanceOf[SnapshotCountRetentionCriteriaImpl]
- val expected = List(
- 1 -> (0 -> 0),
- 3 -> (0 -> 0),
- 4 -> (0 -> 0),
- 6 -> (0 -> 0),
- 7 -> (0 -> 1),
- 9 -> (0 -> 3),
- 10 -> (0 -> 4),
- 12 -> (0 -> 6),
- 13 -> (1 -> 7),
- 15 -> (3 -> 9),
- 18 -> (6 -> 12),
- 20 -> (8 -> 14))
+ val expected =
+ List(1 -> 0, 3 -> 0, 4 -> 0, 6 -> 0, 7 -> 1, 9 -> 3, 10 -> 4, 12 -> 6,
13 -> 7, 15 -> 9, 18 -> 12, 20 -> 14)
Review Comment:
This `expected` list is now on a single very long line that likely exceeds
the repo’s scalafmt `maxColumn = 120` (see `.scalafmt.conf`). Please reformat
(multi-line) to avoid formatting/lint failures and keep the test readable.
```suggestion
List(
1 -> 0,
3 -> 0,
4 -> 0,
6 -> 0,
7 -> 1,
9 -> 3,
10 -> 4,
12 -> 6,
13 -> 7,
15 -> 9,
18 -> 12,
20 -> 14)
```
##########
persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala:
##########
@@ -798,13 +798,23 @@ private[pekko] object Running {
this
} else {
visibleState = state
- if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null)
{
+ def skipRetention(): Boolean = {
+ // only one retention process at a time
+ val inProgress = shouldSnapshotAfterPersist ==
SnapshotWithRetention && setup.isRetentionInProgress()
+ if (inProgress)
+ setup.internalLogger.info(
Review Comment:
`Skipping retention ...` is logged at INFO every time a
snapshot-with-retention is attempted while a prior retention is still running.
Under load this could generate a lot of log volume/noise. Consider lowering to
DEBUG (or otherwise rate-limiting) since it's an internal, potentially
high-frequency condition.
```suggestion
setup.internalLogger.debug(
```
##########
persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala:
##########
@@ -224,10 +224,10 @@ private[pekko] trait SnapshotInteractions[C, E, S] {
}
/** Deletes the snapshots up to and including the `sequenceNr`. */
- protected def internalDeleteSnapshots(fromSequenceNr: Long, toSequenceNr:
Long): Unit = {
+ protected def internalDeleteSnapshots(toSequenceNr: Long): Unit = {
if (toSequenceNr > 0) {
- val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr =
fromSequenceNr, maxSequenceNr = toSequenceNr)
- setup.internalLogger.debug2("Deleting snapshots from sequenceNr [{}] to
[{}]", fromSequenceNr, toSequenceNr)
+ val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = 0L,
maxSequenceNr = toSequenceNr)
+ setup.internalLogger.debug("Deleting snapshots to sequenceNr [{}]",
toSequenceNr)
setup.snapshotStore
.tell(SnapshotProtocol.DeleteSnapshots(setup.persistenceId.id,
snapshotCriteria), setup.selfClassic)
}
Review Comment:
`internalDeleteSnapshots` now always deletes with `minSequenceNr = 0L`
(i.e., from the beginning) instead of using a bounded lower sequenceNr window.
For snapshot stores where range deletion performance depends on the size of the
range, repeatedly deleting from 0..N can be significantly more expensive than
deleting in chunks. If the intent is purely to prevent overlapping retention
cycles, consider restoring a moving lower bound (or add a short rationale here
explaining why deleting from 0 is required/acceptable for Pekko’s supported
snapshot stores).
##########
persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala:
##########
@@ -798,13 +798,23 @@ private[pekko] object Running {
this
} else {
visibleState = state
- if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null)
{
+ def skipRetention(): Boolean = {
+ // only one retention process at a time
+ val inProgress = shouldSnapshotAfterPersist ==
SnapshotWithRetention && setup.isRetentionInProgress()
+ if (inProgress)
+ setup.internalLogger.info(
+ "Skipping retention at seqNr [{}] because previous retention
has not completed yet. " +
+ "Next retention will cover skipped retention.",
+ state.seqNr)
+ inProgress
+ }
+ if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null
|| skipRetention()) {
val newState = applySideEffects(sideEffects, state)
-
onWriteDone(setup.context, p)
-
tryUnstashOne(newState)
} else {
Review Comment:
When a retention cycle is already in progress, this branch skips the entire
snapshot+retention step (because `skipRetention()` is part of the condition),
rather than actually deferring the retention request until completion as
described in the PR description. This can reduce snapshot frequency (since the
snapshot itself is skipped) and may increase recovery time if the actor stops
before the next snapshot cycle. Consider still saving the snapshot but
deferring only the delete-events/delete-snapshots part (or update the PR
description/logging to reflect that retention is dropped rather than deferred).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]