Copilot commented on code in PR #2762:
URL: https://github.com/apache/pekko/pull/2762#discussion_r2962710814
##########
persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala:
##########
@@ -178,6 +179,71 @@ class
ReliableDeliveryWithEventSourcedProducerQueueSpec(config: Config)
testKit.stop(consumerController)
}
+ "resume correctly after restart when all messages were confirmed" in {
+ val producerId = "p-restart-clean"
+ val producerProbe =
createTestProbe[ProducerController.RequestNext[String]]()
+ val consumerProbe =
createTestProbe[ConsumerController.Delivery[String]]()
+
+ // Phase 1: send one message and confirm it fully, then stop both
controllers cleanly.
+ val (pc1, cc1) = startProducerAndConsumer(producerId, producerProbe,
consumerProbe)
+ producerProbe.receiveMessage().sendNextTo ! "msg-1"
+ val del1 = consumerProbe.receiveMessage()
+ del1.confirmTo ! ConsumerController.Confirmed
+ // Wait for the ProducerController to process the confirmation and issue
a new RequestNext.
+ // The seqNr is captured dynamically: for non-chunked messages it will
be 2; for chunked
+ // messages each byte is a separate seqNr, so it will be higher (e.g. 6
for a 5-byte string).
+ // Note: StoreMessageConfirmed is intentionally write-behind
(fire-and-forget, no reply), so
+ // the confirmation may not yet be persisted to the journal when pc1 is
stopped below.
+ // If that happens, pc2 will re-deliver the earlier message — Phase 2
handles this case.
+ val nextSeqNr = producerProbe.receiveMessage().currentSeqNr
+ testKit.stop(pc1)
+ producerProbe.expectTerminated(pc1)
+ testKit.stop(cc1)
+ consumerProbe.expectTerminated(cc1)
Review Comment:
This test stops `pc1` immediately after receiving the post-confirmation
`RequestNext`, but `StoreMessageConfirmed` is write-behind. If the confirmation
hasn’t been persisted yet when `pc1` is stopped, `pc2` can restart with a
non-empty `unconfirmed` buffer and then won’t emit a `RequestNext`
immediately—making `producerProbe.receiveMessage()` in Phase 2 potentially
block/time out. Consider making Phase 2 robust to both persisted and
not-yet-persisted confirmation (e.g., first try to drain/confirm a possible
redelivery with a short timeout before expecting `RequestNext`, or otherwise
wait until the confirmation is durably stored before stopping `pc1`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]