BewareMyPower commented on code in PR #25998:
URL: https://github.com/apache/pulsar/pull/25998#discussion_r3400133632


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -2648,4 +2664,74 @@ private void triggerAndWaitCompaction(String topic) 
throws Exception {
         Awaitility.await().untilAsserted(() -> assertEquals(
                 admin.topics().compactionStatus(topic).status, 
LongRunningProcessStatus.Status.SUCCESS));
     }
+
+    @Test
+    public void testReaderReadOnDeletedLedger() throws Exception {
+        final var topic = 
"persistent://my-tenant/my-ns/reader-read-on-deleted-ledger";
+        try (final var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create()) {
+            for (int i = 0; i < 3; i++) {
+                producer.newMessage().key("key-" + i).value("value-" + 
i).send();
+            }
+        }
+        // Trigger the ledger rollover
+        var ml = (ManagedLedgerImpl) ((PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get()
+                .orElseThrow()).getManagedLedger();
+        ml.getConfig().setMaxEntriesPerLedger(1);
+        ml.getConfig().setMaxSizePerLedgerMb(0);
+        ml.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        ml.rollCurrentLedgerIfFull();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(ml.getLedgersInfo().size(), 2));
+
+        final var subName = "sub-" + System.currentTimeMillis();
+        @Cleanup final var reader = 
pulsarClient.newReader(Schema.STRING).readCompacted(true).topic(topic)
+                .subscriptionName(subName)
+                .startMessageId(MessageId.earliest).create();
+
+        // Slow down the pre-fetching
+        
pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(500);
+
+        // Receive 1 message so that the startMessageId will be reset to 
ledger_id:0 after reconnection
+        assertTrue(reader.hasMessageAvailable());
+        final var firstMsg = reader.readNext(3, TimeUnit.SECONDS);
+        assertNotNull(firstMsg);
+
+        triggerAndWaitCompaction(topic);
+
+        // Trigger the reconnection and trim the first ledger.
+        admin.namespaces().unload("my-tenant/my-ns");
+        // Simulate the pending cumulative acknowledgment is flushed after the 
consumer is created
+        // We don't need such interception if we can support controlling the 
acknowledgment flush for reader.
+        final var firstTime = new AtomicBoolean(true);
+        consumerCreated = serverConsumer -> {

Review Comment:
   reader's initial reconnect delay is 100 ms, this rare should hardly happen.
   
   In addition, the correctness to reproduce this issue also depends on the 
trim operation later. if the read happened before that, the test could also 
succeed without the fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to