This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5f928c8237ae76b95c81f4ab131edce9cc2ff898 Author: fengyubiao <[email protected]> AuthorDate: Tue Mar 17 17:43:41 2026 +0800 [fix][broker]system topic was created with different partitions acrossing clusters after enabled namespace-level replication (#25312) --- .../pulsar/broker/service/BrokerService.java | 132 +++++++++++++++++--- .../pulsar/broker/admin/PersistentTopicsTest.java | 6 + .../service/OneWayReplicatorUsingGlobalZKTest.java | 137 +++++++++++++++++++++ 3 files changed, 258 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e61d8ab4efb..cf2e873fa47 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -115,6 +115,7 @@ import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.namespace.TopicExistsInfo; import org.apache.pulsar.broker.resources.DynamicConfigurationResources; import org.apache.pulsar.broker.resources.LocalPoliciesResources; import org.apache.pulsar.broker.resources.NamespaceResources; @@ -145,8 +146,10 @@ import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache; import org.apache.pulsar.broker.validator.BindAddressValidator; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -3337,20 +3340,32 @@ public class BrokerService implements Closeable { return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)); } - // Allow auto create non-partitioned topic. - boolean autoCreatePartitionedTopic = pulsar.getBrokerService() - .isDefaultTopicTypePartitioned(topicName, policies); - if (!autoCreatePartitionedTopic || topicName.isPartitioned()) { - return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)); - } + return getRemotePartitionedTopicMetadataForAutoCreation(topicName, policies) + .thenCompose(remoteTopicExistsInfo -> { + // If remote topic exists, prioritize topic shape from remote clusters. + if (remoteTopicExistsInfo.isExists()) { + if (remoteTopicExistsInfo.getTopicType() == TopicType.PARTITIONED) { + return createPartitionedTopicMetadataAsync(topicName, + remoteTopicExistsInfo.getPartitions()); + } + return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)); + } - // Create partitioned metadata. - return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName, policies) - .exceptionallyCompose(ex -> { + // Allow auto create non-partitioned topic. + boolean autoCreatePartitionedTopic = pulsar.getBrokerService() + .isDefaultTopicTypePartitioned(topicName, policies); + if (!autoCreatePartitionedTopic || topicName.isPartitioned()) { + return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)); + } + + // Create partitioned metadata. + return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName, + policies); + }).exceptionallyCompose(ex -> { // The partitioned topic might be created concurrently. if (ex.getCause() instanceof MetadataStoreException.AlreadyExistsException) { - log.info("[{}] The partitioned topic is already created, try to refresh the cache" - + " and read again.", topicName); + log.info("[{}] The partitioned topic is already created, try to refresh " + + "the cache and read again.", topicName); CompletableFuture<PartitionedTopicMetadata> recheckFuture = fetchPartitionedTopicMetadataAsync(topicName, true); recheckFuture.exceptionally(ex2 -> { @@ -3379,15 +3394,25 @@ public class BrokerService implements Closeable { private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName, Optional<Policies> policies) { final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies); + return createPartitionedTopicMetadataAsync(topicName, defaultNumPartitions); + } + + private CompletableFuture<PartitionedTopicMetadata> createPartitionedTopicMetadataAsync(TopicName topicName, + int numPartitions) { final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); - checkArgument(defaultNumPartitions > 0, - "Default number of partitions should be more than 0"); - checkArgument(maxPartitions <= 0 || defaultNumPartitions <= maxPartitions, - "Number of partitions should be less than or equal to " + maxPartitions); + if (numPartitions <= 0) { + return FutureUtil.failedFuture( + new IllegalArgumentException("Default number of partitions should be more than 0")); + } + if (maxPartitions > 0 && numPartitions > maxPartitions) { + return FutureUtil.failedFuture( + new IllegalArgumentException("Number of partitions should be less than or equal to " + + maxPartitions)); + } - PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions); + PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(numPartitions); - return checkMaxTopicsPerNamespace(topicName, defaultNumPartitions, true) + return checkMaxTopicsPerNamespace(topicName, numPartitions, true) .thenCompose(__ -> { PartitionedTopicResources partitionResources = pulsar.getPulsarResources().getNamespaceResources() .getPartitionedTopicResources(); @@ -3399,6 +3424,79 @@ public class BrokerService implements Closeable { }); } + private CompletableFuture<TopicExistsInfo> getRemotePartitionedTopicMetadataForAutoCreation( + TopicName topicName, Optional<Policies> policies) { + if (!pulsar.getConfig().isCreateTopicToRemoteClusterForReplication()) { + return CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists()); + } + if (topicName.isPartitioned() || !topicName.isPersistent() || policies.isEmpty()) { + return CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists()); + } + Set<String> replicationClusters = policies.get().replication_clusters; + if (replicationClusters == null || replicationClusters.isEmpty()) { + return CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists()); + } + String localCluster = pulsar.getConfiguration().getClusterName(); + if (!replicationClusters.contains(localCluster) || replicationClusters.size() <= 1) { + return CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists()); + } + List<String> remoteClusters = replicationClusters.stream() + .filter(cluster -> !cluster.equals(localCluster)) + .sorted() + .toList(); + return findRemoteTopicMetadataForAutoCreation(topicName, remoteClusters, 0, null); + } + + private CompletableFuture<TopicExistsInfo> findRemoteTopicMetadataForAutoCreation( + TopicName topicName, List<String> remoteClusters, int index, Throwable errOccurred) { + if (index >= remoteClusters.size()) { + if (errOccurred != null) { + log.error("[{}] Failed to check remote topic partitioned metadata on cluster {}. Fallback to " + + "default auto topic creation policy.", + topicName, remoteClusters, errOccurred); + } + return CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists()); + } + final String remoteCluster = remoteClusters.get(index); + return pulsar.getPulsarResources().getClusterResources().getClusterAsync(remoteCluster) + .thenCompose(clusterData -> { + if (clusterData.isEmpty()) { + log.warn("[{}] Skip checking remote cluster {} because cluster data is missing", + topicName, remoteCluster); + return findRemoteTopicMetadataForAutoCreation(topicName, remoteClusters, index + 1, null); + } + PulsarClient client = getReplicationClient(remoteCluster, clusterData); + CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>(); + client.getPartitionsForTopic(topicName.toString(), false).handle((topics, t) -> { + if (t != null) { + Throwable actEx = FutureUtil.unwrapCompletionException(t); + if (actEx instanceof PulsarClientException.NotFoundException + | actEx instanceof PulsarClientException.TopicDoesNotExistException + | actEx instanceof PulsarAdminException.NotFoundException) { + future.complete(TopicExistsInfo.newTopicNotExists()); + } else { + FutureUtil.completeAfter(future, + findRemoteTopicMetadataForAutoCreation(topicName, remoteClusters, index + 1, actEx)); + } + return null; + } + if (topics.isEmpty()) { + future.complete(TopicExistsInfo.newTopicNotExists()); + } else if (topics.size() == 1 && !TopicName.get(topics.get(0)).isPartitioned()) { + future.complete(TopicExistsInfo.newNonPartitionedTopicExists()); + } else { + int maxPartitionNum = 0; + for (String topic : topics) { + maxPartitionNum = Math.max(maxPartitionNum, TopicName.get(topic).getPartitionIndex()); + } + future.complete(TopicExistsInfo.newPartitionedTopicExists(maxPartitionNum + 1)); + } + return null; + }); + return future; + }); + } + public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName) { return fetchPartitionedTopicMetadataAsync(topicName, false); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 3fe61afbcdb..46e28147732 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -137,6 +137,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { uriInfo = mock(UriInfo.class); } + @Override + protected void doInitConf() throws Exception { + configureInitialConfig(conf); + conf.setCreateTopicToRemoteClusterForReplication(false); + } + @Override @BeforeMethod protected void setup() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index db16963f208..4a3d8bbb417 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -37,10 +37,13 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.TopicExistsInfo; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; @@ -50,14 +53,18 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.apache.pulsar.zookeeper.ZookeeperServerTest; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -661,6 +668,136 @@ public class OneWayReplicatorUsingGlobalZKTest extends OneWayReplicatorTest { } } + @DataProvider + public Object[][] localSystemTopicPartitions() { + return new Object[][] { + {0}, + {3} + }; + } + + @Test(dataProvider = "localSystemTopicPartitions") + public void testSystemTopicCreationWithDifferentTopicCreationRule(int localSystemTopicPartitions) throws Exception { + String ns = BrokerTestUtil.newUniqueName(defaultTenant + "/ns"); + Predicate<String> topicNameFilter = t -> TopicName.get(t).getNamespace().equals(ns); + String systemTopic = "persistent://" + ns + "/__change_events"; + admin1.namespaces().createNamespace(ns); + admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1)), false); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin1.namespaces().getNamespaceReplicationClusters(ns).size(), 1); + assertEquals(admin2.namespaces().getNamespaceReplicationClusters(ns).size(), 1); + }); + + // Trigger system topic creation on cluster1, following {@param localSystemTopicPartitions}. + AutoTopicCreationOverride autoTopicCreation1 = null; + if (localSystemTopicPartitions == 0) { + autoTopicCreation1 = AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true) + .topicType("non-partitioned").build(); + } else { + autoTopicCreation1 = AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true) + .topicType("partitioned").defaultNumPartitions(localSystemTopicPartitions).build(); + } + admin1.namespaces().setAutoTopicCreation(ns, autoTopicCreation1); + Awaitility.await().untilAsserted(() -> { + AutoTopicCreationOverride autoTopicCreationOverride = + admin1.namespaces().getAutoTopicCreationAsync(ns).get(3, TimeUnit.SECONDS); + assertNotNull(autoTopicCreationOverride); + if (localSystemTopicPartitions == 0) { + assertTrue("non-partitioned".equalsIgnoreCase(autoTopicCreationOverride.getTopicType())); + } else { + assertEquals(autoTopicCreationOverride.getDefaultNumPartitions(), localSystemTopicPartitions); + } + }); + // Use a topic loading to trigger system topic creation. + String topicUsedToTriggerSystemTopic = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp"); + admin1.topics().createNonPartitionedTopic(topicUsedToTriggerSystemTopic); + admin1.topics().delete(topicUsedToTriggerSystemTopic, false); + // Verify: the system topic was created as expected. + Awaitility.await().untilAsserted(() -> { + TopicExistsInfo existsInfo = pulsar1.getNamespaceService() + .checkTopicExistsAsync(TopicName.get(systemTopic)).get(3, TimeUnit.SECONDS); + assertTrue(existsInfo.isExists()); + if (localSystemTopicPartitions == 0) { + assertEquals(existsInfo.getTopicType(), TopicType.NON_PARTITIONED); + } else { + assertEquals(existsInfo.getTopicType(), TopicType.PARTITIONED); + assertEquals(existsInfo.getPartitions(), localSystemTopicPartitions); + } + }); + + // Enable replication. + // Set topic auto-creation rule to "partitions: 2". + final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp"); + final Set<String> clusters = new HashSet<>(Arrays.asList(cluster1, cluster2)); + admin1.namespaces().setNamespaceReplicationClusters(ns, clusters, true); + AutoTopicCreationOverride autoTopicCreation2 = + AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true) + .topicType("partitioned").defaultNumPartitions(2).build(); + admin1.namespaces().setAutoTopicCreation(ns, autoTopicCreation2); + admin2.namespaces().setAutoTopicCreation(ns, autoTopicCreation2); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin1.namespaces().getAutoTopicCreationAsync(ns).join() + .getDefaultNumPartitions(), 2); + assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join() + .getDefaultNumPartitions(), 2); + }); + + admin2.topics().createNonPartitionedTopic(tp); + Producer<String> p2 = client2.newProducer(Schema.STRING).topic(tp).create(); + p2.send("msg-1"); + p2.close(); + Producer<String> p1 = client1.newProducer(Schema.STRING).topic(tp).create(); + p1.send("msg-1"); + p1.close(); + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic1 = (PersistentTopic) broker1.getTopic(tp, false).join().get(); + assertFalse(persistentTopic1.getReplicators().isEmpty()); + PersistentTopic persistentTopic2 = (PersistentTopic) broker2.getTopic(tp, false).join().get(); + assertFalse(persistentTopic2.getReplicators().isEmpty()); + }); + + // Verify: the topics are the same between two clusters. + Awaitility.await().untilAsserted(() -> { + List<String> topics1 = pulsar1.getBrokerService().getTopics().keySet() + .stream().filter(topicNameFilter).collect(Collectors.toList()); + List<String> topics2 = pulsar2.getBrokerService().getTopics().keySet() + .stream().filter(topicNameFilter).collect(Collectors.toList()); + Collections.sort(topics1); + Collections.sort(topics2); + boolean systemTopicCreated1 = false; + for (String tp1 : topics1) { + if (tp1.contains("__change_events")) { + systemTopicCreated1 = true; + break; + } + } + boolean systemTopicCreated2 = false; + for (String tp2 : topics2) { + if (tp2.contains("__change_events")) { + systemTopicCreated2 = true; + break; + } + } + log.info("topics1: {}", topics1); + log.info("topics2: {}", topics2); + assertTrue(systemTopicCreated1); + assertTrue(systemTopicCreated2); + assertEquals(topics1, topics2); + }); + + // cleanup. + admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1)); + admin2.topics().setReplicationClusters(tp, Arrays.asList(cluster2)); + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic1 = (PersistentTopic) broker1.getTopic(tp, false).join().get(); + assertTrue(persistentTopic1.getReplicators().isEmpty()); + PersistentTopic persistentTopic2 = (PersistentTopic) broker2.getTopic(tp, false).join().get(); + assertTrue(persistentTopic2.getReplicators().isEmpty()); + }); + admin1.topics().delete(tp, false); + admin2.topics().delete(tp, false); + } + @Test public void testUpdateNamespacePolicies() throws Exception { // Create a namespace and allow both clusters to access.
