codelipenghui commented on code in PR #25514:
URL: https://github.com/apache/pulsar/pull/25514#discussion_r3121583301


##########
pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java:
##########
@@ -1376,6 +1376,68 @@ public void testCreateSchemaInParallel() throws 
Exception {
         producers2.clear();
     }
 
+    /**
+     * Test that when multiple producers concurrently create schema for a 
brand-new topic,
+     * the orphan BookKeeper ledgers created by the losing requests are 
properly cleaned up.
+     */
+    @Test
+    public void testConcurrentCreateSchemaNoOrphanLedger() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+
+        final String topic = getTopicName(ns, 
"testConcurrentCreateSchemaNoOrphanLedger");
+        final String schemaName = TopicName.get(topic).getSchemaName();
+
+        org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk =
+                (org.apache.bookkeeper.client.PulsarMockBookKeeper) 
pulsar.getBookKeeperClient();
+
+        // Concurrently create producers with the same schema on a brand-new 
topic
+        int concurrency = 16;
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+        List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = new 
ArrayList<>(concurrency);

Review Comment:
    Use Collections.synchronizedList(...)



##########
pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java:
##########
@@ -1376,6 +1376,68 @@ public void testCreateSchemaInParallel() throws 
Exception {
         producers2.clear();
     }
 
+    /**
+     * Test that when multiple producers concurrently create schema for a 
brand-new topic,
+     * the orphan BookKeeper ledgers created by the losing requests are 
properly cleaned up.
+     */
+    @Test
+    public void testConcurrentCreateSchemaNoOrphanLedger() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+
+        final String topic = getTopicName(ns, 
"testConcurrentCreateSchemaNoOrphanLedger");
+        final String schemaName = TopicName.get(topic).getSchemaName();
+
+        org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk =
+                (org.apache.bookkeeper.client.PulsarMockBookKeeper) 
pulsar.getBookKeeperClient();
+
+        // Concurrently create producers with the same schema on a brand-new 
topic
+        int concurrency = 16;
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+        List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = new 
ArrayList<>(concurrency);
+        CountDownLatch latch = new CountDownLatch(concurrency);
+        for (int i = 0; i < concurrency; i++) {
+            executor.execute(() -> {
+                
producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                        .topic(topic).createAsync());
+                latch.countDown();
+            });
+        }
+        latch.await();
+        FutureUtil.waitForAll(producers).join();
+
+        // Verify only 1 schema version exists
+        assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+
+        // Count surviving BK ledgers whose customMetadata "pulsar/schemaId" 
matches this topic's schemaName.
+        // If orphan ledgers were not cleaned up, there would be more than 1.
+        int schemaLedgerCount = 0;
+        for (org.apache.bookkeeper.client.PulsarMockLedgerHandle lh
+                : mockBk.getLedgerMap().values()) {
+            Map<String, byte[]> metadata = 
lh.getLedgerMetadata().getCustomMetadata();
+            byte[] schemaIdBytes = metadata.get("pulsar/schemaId");

Review Comment:
   Replace with LedgerMetadataUtils.METADATA_PROPERTY_SCHEMAID



##########
pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java:
##########
@@ -1376,6 +1376,68 @@ public void testCreateSchemaInParallel() throws 
Exception {
         producers2.clear();
     }
 
+    /**
+     * Test that when multiple producers concurrently create schema for a 
brand-new topic,
+     * the orphan BookKeeper ledgers created by the losing requests are 
properly cleaned up.
+     */
+    @Test
+    public void testConcurrentCreateSchemaNoOrphanLedger() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+
+        final String topic = getTopicName(ns, 
"testConcurrentCreateSchemaNoOrphanLedger");
+        final String schemaName = TopicName.get(topic).getSchemaName();
+
+        org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk =
+                (org.apache.bookkeeper.client.PulsarMockBookKeeper) 
pulsar.getBookKeeperClient();
+
+        // Concurrently create producers with the same schema on a brand-new 
topic
+        int concurrency = 16;
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+        List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = new 
ArrayList<>(concurrency);
+        CountDownLatch latch = new CountDownLatch(concurrency);
+        for (int i = 0; i < concurrency; i++) {
+            executor.execute(() -> {
+                
producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                        .topic(topic).createAsync());
+                latch.countDown();

Review Comment:
   Need to execute `latch.countDown();` in finally block to avoid any 
exceptions cause the test stuck.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to