Copilot commented on code in PR #2762:
URL: https://github.com/apache/pekko/pull/2762#discussion_r2962710814


##########
persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala:
##########
@@ -178,6 +179,71 @@ class 
ReliableDeliveryWithEventSourcedProducerQueueSpec(config: Config)
       testKit.stop(consumerController)
     }
 
+    "resume correctly after restart when all messages were confirmed" in {
+      val producerId = "p-restart-clean"
+      val producerProbe = 
createTestProbe[ProducerController.RequestNext[String]]()
+      val consumerProbe = 
createTestProbe[ConsumerController.Delivery[String]]()
+
+      // Phase 1: send one message and confirm it fully, then stop both 
controllers cleanly.
+      val (pc1, cc1) = startProducerAndConsumer(producerId, producerProbe, 
consumerProbe)
+      producerProbe.receiveMessage().sendNextTo ! "msg-1"
+      val del1 = consumerProbe.receiveMessage()
+      del1.confirmTo ! ConsumerController.Confirmed
+      // Wait for the ProducerController to process the confirmation and issue 
a new RequestNext.
+      // The seqNr is captured dynamically: for non-chunked messages it will 
be 2; for chunked
+      // messages each byte is a separate seqNr, so it will be higher (e.g. 6 
for a 5-byte string).
+      // Note: StoreMessageConfirmed is intentionally write-behind 
(fire-and-forget, no reply), so
+      // the confirmation may not yet be persisted to the journal when pc1 is 
stopped below.
+      // If that happens, pc2 will re-deliver the earlier message — Phase 2 
handles this case.
+      val nextSeqNr = producerProbe.receiveMessage().currentSeqNr
+      testKit.stop(pc1)
+      producerProbe.expectTerminated(pc1)
+      testKit.stop(cc1)
+      consumerProbe.expectTerminated(cc1)

Review Comment:
   This test stops `pc1` immediately after receiving the post-confirmation 
`RequestNext`, but `StoreMessageConfirmed` is write-behind. If the confirmation 
hasn’t been persisted yet when `pc1` is stopped, `pc2` can restart with a 
non-empty `unconfirmed` buffer and then won’t emit a `RequestNext` 
immediately—making `producerProbe.receiveMessage()` in Phase 2 potentially 
block/time out. Consider making Phase 2 robust to both persisted and 
not-yet-persisted confirmation (e.g., first try to drain/confirm a possible 
redelivery with a short timeout before expecting `RequestNext`, or otherwise 
wait until the confirmation is durably stored before stopping `pc1`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to