This is an automated email from the ASF dual-hosted git repository.
dao-jun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 01fedb84a45 [fix][broker] Clean up orphan ledger on concurrent initial
schema creation in BookkeeperSchemaStorage (#25514)
01fedb84a45 is described below
commit 01fedb84a454bdc7d755e70e68af1624ceeb1f12
Author: zhou zhuohan <[email protected]>
AuthorDate: Thu Apr 23 20:50:39 2026 +0800
[fix][broker] Clean up orphan ledger on concurrent initial schema creation
in BookkeeperSchemaStorage (#25514)
---
.../mledger/impl/LedgerMetadataUtils.java | 30 +++++-----
.../service/schema/BookkeeperSchemaStorage.java | 40 +++++++++++--
.../java/org/apache/pulsar/schema/SchemaTest.java | 65 ++++++++++++++++++++++
.../bookkeeper/client/PulsarMockBookKeeper.java | 2 +-
.../bookkeeper/client/PulsarMockLedgerHandle.java | 22 ++++++--
5 files changed, 132 insertions(+), 27 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
index 4107949de51..9e1529d0683 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
@@ -29,28 +29,28 @@ import
org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
*/
public final class LedgerMetadataUtils {
- private static final String METADATA_PROPERTY_APPLICATION = "application";
- private static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR =
"pulsar".getBytes(StandardCharsets.UTF_8);
+ public static final String METADATA_PROPERTY_APPLICATION = "application";
+ public static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR =
"pulsar".getBytes(StandardCharsets.UTF_8);
- private static final String METADATA_PROPERTY_COMPONENT = "component";
- private static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER =
+ public static final String METADATA_PROPERTY_COMPONENT = "component";
+ public static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER =
"managed-ledger".getBytes(StandardCharsets.UTF_8);
- private static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER =
+ public static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER =
"compacted-ledger".getBytes(StandardCharsets.UTF_8);
- private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA =
"schema".getBytes(StandardCharsets.UTF_8);
+ public static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA =
"schema".getBytes(StandardCharsets.UTF_8);
- private static final byte[]
METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET =
+ public static final byte[]
METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET =
"delayed-index-bucket".getBytes(StandardCharsets.UTF_8);
- private static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME =
"pulsar/managed-ledger";
- private static final String METADATA_PROPERTY_CURSOR_NAME =
"pulsar/cursor";
- private static final String METADATA_PROPERTY_COMPACTEDTOPIC =
"pulsar/compactedTopic";
- private static final String METADATA_PROPERTY_COMPACTEDTO =
"pulsar/compactedTo";
- private static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";
+ public static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME =
"pulsar/managed-ledger";
+ public static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor";
+ public static final String METADATA_PROPERTY_COMPACTEDTOPIC =
"pulsar/compactedTopic";
+ public static final String METADATA_PROPERTY_COMPACTEDTO =
"pulsar/compactedTo";
+ public static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";
- private static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY =
"pulsar/delayedIndexBucketKey";
- private static final String METADATA_PROPERTY_DELAYED_INDEX_TOPIC =
"pulsar/delayedIndexTopic";
- private static final String METADATA_PROPERTY_DELAYED_INDEX_CURSOR =
"pulsar/delayedIndexCursor";
+ public static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY =
"pulsar/delayedIndexBucketKey";
+ public static final String METADATA_PROPERTY_DELAYED_INDEX_TOPIC =
"pulsar/delayedIndexTopic";
+ public static final String METADATA_PROPERTY_DELAYED_INDEX_CURSOR =
"pulsar/delayedIndexCursor";
/**
* Build base metadata for every ManagedLedger.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 89c3289ebb5..f3a1baa5388 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -348,23 +348,51 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
private CompletableFuture<Long> createNewSchema(String schemaId, byte[]
data, byte[] hash) {
+ // Step 1: Store the schema data into a new BookKeeper ledger
IndexEntry emptyIndex = new IndexEntry();
emptyIndex.setVersion(0);
emptyIndex.setHash(hash);
emptyIndex.setPosition().setEntryId(-1L).setLedgerId(-1L);
+ CompletableFuture<PositionInfo> stored = addNewSchemaEntryToStore(
+ schemaId, Collections.singletonList(emptyIndex), data
+ );
- return addNewSchemaEntryToStore(schemaId,
Collections.singletonList(emptyIndex), data).thenCompose(position -> {
- // The schema was stored in the ledger, now update the z-node with
the pointer to it
+ return stored.thenCompose(position -> {
+ // Step 2: Create the schema locator z-node pointing to the ledger
IndexEntry info = new IndexEntry();
info.setVersion(0);
info.setPosition().copyFrom(position);
info.setHash(hash);
-
SchemaLocator locator = new SchemaLocator();
locator.setInfo().copyFrom(info);
locator.addIndex().copyFrom(info);
- return createSchemaLocator(getSchemaPath(schemaId), locator)
- .thenApply(ignore -> 0L);
+ CompletableFuture<Long> created = createSchemaLocator(
+ getSchemaPath(schemaId), locator).thenApply(ignore -> 0L);
+
+ // Step 3: Handle failure by cleaning up the orphan ledger
+ // if concurrent schema creation caused a CAS conflict
+ return created.whenComplete((__, ex) -> {
+ if (ex == null) {
+ return;
+ }
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ log.warn()
+ .attr("schemaId", schemaId)
+ .attr("ledgerId", position.getLedgerId())
+ .exception(cause)
+ .log("Failed to create schema locator with position");
+ if (cause instanceof AlreadyExistsException || cause
instanceof BadVersionException) {
+ bookKeeper.asyncDeleteLedger(position.getLedgerId(), (rc,
ctx) -> {
+ if (rc != BKException.Code.OK) {
+ log.warn()
+ .attr("schemaId", schemaId)
+ .attr("ledgerId", position.getLedgerId())
+ .attr("rc", rc)
+ .log("Failed to delete orphan ledger after
schema locator creation failed");
+ }
+ }, null);
+ }
+ });
});
}
@@ -481,7 +509,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.warn()
.attr("schemaId", schemaId)
- .attr("position", position)
+ .attr("ledgerId", position.getLedgerId())
.exception(cause)
.log("Failed to update schema locator with position");
if (cause instanceof AlreadyExistsException || cause
instanceof BadVersionException) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 2b761689d73..748818c6bd5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -55,6 +55,7 @@ import lombok.EqualsAndHashCode;
import org.apache.avro.Schema.Parser;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
@@ -1380,6 +1381,70 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
producers2.clear();
}
+ /**
+ * Test that when multiple producers concurrently create schema for a
brand-new topic,
+ * the orphan BookKeeper ledgers created by the losing requests are
properly cleaned up.
+ */
+ @Test
+ public void testConcurrentCreateSchemaNoOrphanLedger() throws Exception {
+ final String namespace = "test-namespace-" + randomName(16);
+ String ns = PUBLIC_TENANT + "/" + namespace;
+ admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+
+ final String topic = getTopicName(ns,
"testConcurrentCreateSchemaNoOrphanLedger");
+ final String schemaName = TopicName.get(topic).getSchemaName();
+
+ org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk =
+ (org.apache.bookkeeper.client.PulsarMockBookKeeper)
pulsar.getBookKeeperClient();
+
+ // Concurrently create producers with the same schema on a brand-new
topic
+ int concurrency = 16;
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+ List<CompletableFuture<Producer<Schemas.PersonOne>>> producers =
+ Collections.synchronizedList(new ArrayList<>(concurrency));
+ CountDownLatch latch = new CountDownLatch(concurrency);
+ for (int i = 0; i < concurrency; i++) {
+ executor.execute(() -> {
+ try {
+
producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topic).createAsync());
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+ latch.await();
+ FutureUtil.waitForAll(producers).join();
+
+ // Verify only 1 schema version exists
+ assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+
+ // Count surviving BK ledgers whose customMetadata "pulsar/schemaId"
matches this topic's schemaName.
+ // If orphan ledgers were not cleaned up, there would be more than 1.
+ int schemaLedgerCount = 0;
+ for (org.apache.bookkeeper.client.PulsarMockLedgerHandle lh :
mockBk.getLedgerMap().values()) {
+ Map<String, byte[]> metadata =
lh.getLedgerMetadata().getCustomMetadata();
+ byte[] schemaIdBytes =
metadata.get(LedgerMetadataUtils.METADATA_PROPERTY_SCHEMAID);
+ if (schemaIdBytes != null
+ && schemaName.equals(new String(schemaIdBytes,
java.nio.charset.StandardCharsets.UTF_8))) {
+ schemaLedgerCount++;
+ }
+ }
+ assertEquals(schemaLedgerCount, 1,
+ "Expected exactly 1 schema ledger for the topic, but found "
+ + schemaLedgerCount + ". Orphan ledgers were not
cleaned up.");
+
+ // Cleanup
+ producers.forEach(p -> {
+ try {
+ p.join().close();
+ } catch (Exception ignore) {
+ }
+ });
+ producers.clear();
+ }
+
@EqualsAndHashCode
static class User implements Serializable {
private String name;
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 2c7ee580697..0e1e75248e4 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -137,7 +137,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
long id = sequence.getAndIncrement();
log.info().attr("ledgerId", id).log("Creating ledger");
PulsarMockLedgerHandle lh =
- new
PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd);
+ new
PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd,
properties);
ledgers.put(id, lh);
return FutureUtils.value(lh);
} catch (Throwable t) {
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index d26c8be506b..2b11b12b428 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
@@ -69,7 +70,14 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
DigestType digest, byte[] passwd) throws
GeneralSecurityException {
- super(bk.getClientCtx(), id, new Versioned<>(createMetadata(id,
digest, passwd), new LongVersion(0L)),
+ this(bk, id, digest, passwd, Collections.emptyMap());
+ }
+
+ public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
+ DigestType digest, byte[] passwd,
+ Map<String, byte[]> customMetadata) throws
GeneralSecurityException {
+ super(bk.getClientCtx(), id,
+ new Versioned<>(createMetadata(id, digest, passwd,
customMetadata), new LongVersion(0L)),
digest, passwd, WriteFlag.NONE);
this.bk = bk;
this.id = id;
@@ -271,13 +279,17 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
return readHandle.readLastAddConfirmedAndEntryAsync(entryId,
timeOutInMillis, parallel);
}
- private static LedgerMetadata createMetadata(long id, DigestType digest,
byte[] passwd) {
+ private static LedgerMetadata createMetadata(long id, DigestType digest,
byte[] passwd,
+ Map<String, byte[]>
customMetadata) {
List<BookieId> ensemble = new
ArrayList<>(PulsarMockBookKeeper.getMockEnsemble());
- return LedgerMetadataBuilder.create()
+ LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
.withDigestType(digest.toApiDigestType())
.withPassword(passwd)
.withId(id)
- .newEnsembleEntry(0L, ensemble)
- .build();
+ .newEnsembleEntry(0L, ensemble);
+ if (customMetadata != null && !customMetadata.isEmpty()) {
+ builder.withCustomMetadata(customMetadata);
+ }
+ return builder.build();
}
}