This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d8c761fe214 [fix][test] Fix flaky 
ShadowTopicTest.testConsumeShadowMessageWithoutCache (#25354)
d8c761fe214 is described below

commit d8c761fe214dc2347d3c95af9d0e2acfb00dc53c
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 02:08:11 2026 -0700

    [fix][test] Fix flaky ShadowTopicTest.testConsumeShadowMessageWithoutCache 
(#25354)
---
 .../broker/service/persistent/ShadowTopicTest.java       | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
index 0ef638b4b21..e0e0b723326 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
@@ -237,20 +237,28 @@ public class ShadowTopicTest extends SharedPulsarBaseTest 
{
             producer.send(content + i);
         }
 
+        // Unload the source topic to trigger a ledger rollover. The 
ShadowManagedLedgerImpl
+        // reads entries from the source's BookKeeper ledgers via metadata 
watch. Without the
+        // shadow replicator enabled, it can only discover entries in closed 
ledgers (the
+        // metadata for open ledgers shows entries=0). Unloading forces the 
current ledger
+        // to close so the shadow topic can see all entries.
+        admin.topics().unload(sourceTopic);
+
         admin.topics().createShadowTopic(shadowTopic, sourceTopic);
-        // disable shadow replicator
-        // admin.topics().setShadowTopics(sourceTopic, 
Lists.newArrayList(shadowTopic));
         @Cleanup Consumer<String> consumer =
                 
pulsarClient.newConsumer(Schema.STRING).topic(shadowTopic).subscriptionName("sub")
                         
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                         .subscribe();
 
-        Message<String> msg = consumer.receive();
+        Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
+        Assert.assertNotNull(msg, "Should have received a message from shadow 
topic");
         Assert.assertEquals(msg.getMessageId(), id);
         Assert.assertEquals(msg.getValue(), content);
 
         for (int i = 0; i < 10; i++) {
-            Assert.assertEquals(consumer.receive().getValue(), content + i);
+            msg = consumer.receive(10, TimeUnit.SECONDS);
+            Assert.assertNotNull(msg, "Should have received message " + i + " 
from shadow topic");
+            Assert.assertEquals(msg.getValue(), content + i);
         }
     }
 }

Reply via email to