This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d8c761fe214 [fix][test] Fix flaky
ShadowTopicTest.testConsumeShadowMessageWithoutCache (#25354)
d8c761fe214 is described below
commit d8c761fe214dc2347d3c95af9d0e2acfb00dc53c
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 02:08:11 2026 -0700
[fix][test] Fix flaky ShadowTopicTest.testConsumeShadowMessageWithoutCache
(#25354)
---
.../broker/service/persistent/ShadowTopicTest.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
index 0ef638b4b21..e0e0b723326 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
@@ -237,20 +237,28 @@ public class ShadowTopicTest extends SharedPulsarBaseTest
{
producer.send(content + i);
}
+ // Unload the source topic to trigger a ledger rollover. The
ShadowManagedLedgerImpl
+ // reads entries from the source's BookKeeper ledgers via metadata
watch. Without the
+ // shadow replicator enabled, it can only discover entries in closed
ledgers (the
+ // metadata for open ledgers shows entries=0). Unloading forces the
current ledger
+ // to close so the shadow topic can see all entries.
+ admin.topics().unload(sourceTopic);
+
admin.topics().createShadowTopic(shadowTopic, sourceTopic);
- // disable shadow replicator
- // admin.topics().setShadowTopics(sourceTopic,
Lists.newArrayList(shadowTopic));
@Cleanup Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(shadowTopic).subscriptionName("sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
- Message<String> msg = consumer.receive();
+ Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg, "Should have received a message from shadow
topic");
Assert.assertEquals(msg.getMessageId(), id);
Assert.assertEquals(msg.getValue(), content);
for (int i = 0; i < 10; i++) {
- Assert.assertEquals(consumer.receive().getValue(), content + i);
+ msg = consumer.receive(10, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg, "Should have received message " + i + "
from shadow topic");
+ Assert.assertEquals(msg.getValue(), content + i);
}
}
}