This is an automated email from the ASF dual-hosted git repository. BewareMyPower pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c333504534ee1127e991abb6ce1bc2e853da57d7 Author: Yunze Xu <[email protected]> AuthorDate: Mon May 18 19:16:47 2026 +0800 [feat][broker] PIP-469: Legacy-aware topic policies backend routing and metadata-store topic policies (#25707) (cherry picked from commit 8652efa4d60b791a5f1ee4e52f7ffda6ebbbb256) (cherry picked from commit 712372f9ee18ae5d5bd8e35ca6ef45473c0a3bb5) --- pip/pip-469.md | 37 ++- .../apache/pulsar/broker/ServiceConfiguration.java | 12 +- .../org/apache/pulsar/broker/PulsarService.java | 10 +- .../pulsar/broker/service/AbstractTopic.java | 2 +- .../service/LegacyAwareTopicPoliciesService.java | 142 +++++++++++ .../service/MetadataStoreTopicPoliciesService.java | 277 +++++++++++++++++++++ .../SystemTopicBasedTopicPoliciesService.java | 2 +- .../broker/service/TopicPoliciesService.java | 15 +- .../broker/service/persistent/PersistentTopic.java | 8 +- .../admin/MetadataStoreTopicPoliciesTest.java | 72 ++++++ .../pulsar/broker/admin/TopicPoliciesTest.java | 112 +++++++-- .../LegacyAwareTopicPoliciesServiceTest.java | 190 ++++++++++++++ .../SystemTopicBasedTopicPoliciesServiceTest.java | 4 +- .../broker/service/TopicPolicyTestUtils.java | 7 + 14 files changed, 846 insertions(+), 44 deletions(-) diff --git a/pip/pip-469.md b/pip/pip-469.md index 69036381f0f..4734adc127d 100644 --- a/pip/pip-469.md +++ b/pip/pip-469.md @@ -105,8 +105,15 @@ Broker startup validates both backends: - `SystemTopicBasedTopicPoliciesService` must be instantiable. - The configured `topicPoliciesServiceClassName` must be instantiable. -If either backend cannot be instantiated or started, broker startup fails. There is no per-request fallback from one -backend to another. +`LegacyAwareTopicPoliciesService#start` starts only the configured backend. It intentionally does not call +`SystemTopicBasedTopicPoliciesService#start`, because that start path registers a namespace-bundle ownership listener +whose only purpose is to eagerly create a reader on `<namespace>/__change_events` when a namespace bundle is loaded. +Under legacy-aware routing, that eager optimization would be counterproductive because it can create readers for +namespaces that do not have topic policies in `__change_events`. For legacy namespaces, the system-topic reader and +policy cache are initialized lazily by the routed system-topic backend operations. + +If either backend cannot be instantiated, or if the configured backend cannot be started, broker startup fails. There is +no per-request fallback from one backend to another. ### Namespace-scoped service routing @@ -118,6 +125,10 @@ backend to another. the system-topic backend when the system topic exists. - Routing the same operations to the configured backend when the system topic does not exist. +Listener registration is routed through `TopicPoliciesService#registerListenerAsync`. This lets the wrapper resolve the +namespace backend before registering the listener, and the listener is registered only on the selected backend instead +of being registered on both backends. + The system-topic existence check can be cached per namespace in memory, but the routing rule is defined by actual topic existence rather than by new namespace metadata. @@ -137,9 +148,13 @@ meaning the system-topic-backed topic-policies state is gone. - Topic names are normalized to the partitioned topic name, so all partitions share the same topic-policies record. - Global policies are stored in the configuration metadata store path: - `/admin/topic-policies/{tenant}/{namespace}/{domain}/{encodedTopic}`. + `/admin/topic-policies/global/{tenant}/{namespace}/{domain}/{encodedTopic}`. - Local policies are stored in the local metadata store path: - `/admin/local-policies/topic-policies/{tenant}/{namespace}/{domain}/{encodedTopic}`. + `/admin/topic-policies/local/{tenant}/{namespace}/{domain}/{encodedTopic}`. + +To avoid possible conflicts like the listener registered on the `/admin/local-policies` path from +`BrokerService#handleMetadataChanges`, these two paths share the same root path `/admin/topic-policies`, which is not +used by any other component. Each node stores a serialized `TopicPolicies` document. The backend writes and reads the two scopes independently: @@ -159,6 +174,11 @@ managed-ledger metadata updates. ### Listener behavior +`TopicPoliciesService` adds `registerListenerAsync(TopicName, TopicPolicyListener)` for listener registration. The +existing synchronous `registerListener(TopicName, TopicPolicyListener)` method is retained as a deprecated compatibility +hook for existing custom implementations, and the default async method delegates to it. Implementations that need async +routing or initialization, such as `LegacyAwareTopicPoliciesService`, override `registerListenerAsync` directly. + The backend registers watchers on both metadata stores: - A change on the local path re-reads the local node and notifies listeners with the latest local `TopicPolicies` or @@ -173,6 +193,11 @@ append-only replay log; it relies on metadata-store notifications and read-after ### Public API +The `TopicPoliciesService` extension point gains a default +`CompletableFuture<Boolean> registerListenerAsync(TopicName, TopicPolicyListener)` method. Existing implementations +remain compatible because `registerListener(TopicName, TopicPolicyListener)` is retained and used by the default async +implementation. + No new namespace policy field is introduced. No new namespace admin REST endpoint or Java admin client method is introduced. @@ -221,6 +246,10 @@ This upgrade rule is intentionally conservative: This means some namespaces with an empty but already-created `__change_events` topic may continue using the system-topic backend. That is acceptable because it avoids missing legacy state. +Existing custom `TopicPoliciesService` implementations that only implement the synchronous `registerListener` method +continue to work through the default `registerListenerAsync` bridge. Implementations can override +`registerListenerAsync` when registration itself needs asynchronous backend resolution or initialization. + ## Downgrade / Rollback Rolling back to a broker version that does not understand legacy-aware routing returns topic-policies backend diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 820c6b515e5..192f1b3ad4c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1704,8 +1704,16 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_SERVER, - doc = "The class name of the topic policies service. The default config only takes affect when the " - + "systemTopicEnable config is true" + doc = """ + The class name of the topic policies service. There are 2 built-in implementations: + 1. "org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService" (default) + It stores a topic's policies in the `__change_events` topic. If `systemTopicEnabled` is false, + the topic policies will just be disabled + 2. "org.apache.pulsar.broker.service.MetadataStoreTopicPoliciesService" + It stores a topic's policies in the metadata store. If `systemTopicEnabled` is true and the + topic's namespace has a `__change_events` topic, the policies will still be stored in the + `__change_events` topic for backward compatibility. + """ ) private String topicPoliciesServiceClassName = "org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService"; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index a77e09d79ca..e74bc51a532 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -104,6 +104,7 @@ import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.rest.Topics; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.HealthChecker; +import org.apache.pulsar.broker.service.LegacyAwareTopicPoliciesService; import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer; import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService; import org.apache.pulsar.broker.service.Topic; @@ -2232,8 +2233,15 @@ public class PulsarService implements AutoCloseable, ShutdownService { return TopicPoliciesService.DISABLED; } } - return (TopicPoliciesService) Reflections.createInstance(className, + final var configuredService = (TopicPoliciesService) Reflections.createInstance(className, Thread.currentThread().getContextClassLoader()); + if (!config.isSystemTopicEnabled()) { + LOG.info("[{}] System topic is disabled, using configured topic policies service without legacy routing", + className); + return configuredService; + } + return new LegacyAwareTopicPoliciesService(this, new SystemTopicBasedTopicPoliciesService(this), + configuredService); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 8af4a3264d9..0190c739a9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -543,7 +543,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { protected void registerTopicPolicyListener() { brokerService.getPulsar().getTopicPoliciesService() - .registerListener(TopicName.getPartitionedTopicName(topic), this); + .registerListenerAsync(TopicName.getPartitionedTopicName(topic), this); } protected void unregisterTopicPolicyListener() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesService.java new file mode 100644 index 00000000000..6e1bfd7bacc --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesService.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; +import org.apache.pulsar.common.events.EventType; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.jspecify.annotations.NonNull; + +/** + * Routes topic policy operations to the legacy system-topic backend when a namespace already has + * a topic-policy {@code __change_events} system topic, and otherwise to the configured backend. + */ +public class LegacyAwareTopicPoliciesService implements TopicPoliciesService { + + private final AsyncLoadingCache<NamespaceName, Boolean> isLegacyNamespace; + @VisibleForTesting + final SystemTopicBasedTopicPoliciesService systemTopicService; + private final TopicPoliciesService configuredService; + + public LegacyAwareTopicPoliciesService(PulsarService pulsar, + SystemTopicBasedTopicPoliciesService systemTopicService, + TopicPoliciesService configuredService) { + // Generally, we only need to check if the __change_events topic exists once because the __change_events topic + // should only be created by broker before the upgrade, where `SystemTopicBasedTopicPoliciesService` is + // configured as the topic policies service. + this.isLegacyNamespace = Caffeine.newBuilder().expireAfterWrite(Duration.ofHours(1)) + .buildAsync(new AsyncCacheLoader<>() { + @NonNull + @Override + public CompletableFuture<? extends Boolean> asyncLoad(NamespaceName key, + @NonNull Executor executor) { + return NamespaceEventsSystemTopicFactory.checkSystemTopicExists(key, EventType.TOPIC_POLICY, + pulsar); + } + }); + this.systemTopicService = systemTopicService; + this.configuredService = configuredService; + if (configuredService instanceof SystemTopicBasedTopicPoliciesService) { + throw new IllegalArgumentException( + "configuredService should not be an instance of SystemTopicBasedTopicPoliciesService"); + } + } + + @Override + public void start(PulsarService pulsarService) { + // We should not call `systemTopicService.start()`, which just registers a namespace bundle listener to create + // a reader on `<namespace>/__change_events` when the namespace's bundle is loaded firstly. It's just an + // optimization to create the reader before loading any topic. However, it could create a reader on a namespace + // that does not even have the __change_events topic. + configuredService.start(pulsarService); + } + + @Override + public void close() throws Exception { + try { + configuredService.close(); + } finally { + systemTopicService.close(); + } + } + + @Override + public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicName topicName, GetType type) { + return resolveService(topicName.getNamespaceObject()) + .thenCompose(service -> service.getTopicPoliciesAsync(topicName, type)); + } + + @Override + public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, boolean isGlobalPolicy, + boolean skipUpdateWhenTopicPolicyDoesntExist, + Consumer<TopicPolicies> policyUpdater) { + return resolveService(topicName.getNamespaceObject()) + .thenCompose(service -> service.updateTopicPoliciesAsync(topicName, isGlobalPolicy, + skipUpdateWhenTopicPolicyDoesntExist, policyUpdater)); + } + + @Override + public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) { + return resolveService(topicName.getNamespaceObject()) + .thenCompose(service -> service.deleteTopicPoliciesAsync(topicName)); + } + + @Override + public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName, + boolean keepGlobalPoliciesAfterDeleting) { + return resolveService(topicName.getNamespaceObject()) + .thenCompose(service -> service.deleteTopicPoliciesAsync(topicName, + keepGlobalPoliciesAfterDeleting)); + } + + @Override + public CompletableFuture<Boolean> registerListenerAsync(TopicName topicName, TopicPolicyListener listener) { + return resolveService(topicName.getNamespaceObject()) + .thenCompose(service -> service.registerListenerAsync(topicName, listener)); + } + + @Override + public boolean registerListener(TopicName topicName, TopicPolicyListener listener) { + throw new RuntimeException("should not be called"); + } + + @Override + public void unregisterListener(TopicName topicName, TopicPolicyListener listener) { + configuredService.unregisterListener(topicName, listener); + systemTopicService.unregisterListener(topicName, listener); + } + + @VisibleForTesting + CompletableFuture<TopicPoliciesService> resolveService(NamespaceName namespace) { + return isLegacyNamespace.get(namespace) + .thenApply(isLegacy -> isLegacy ? systemTopicService : configuredService); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MetadataStoreTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MetadataStoreTopicPoliciesService.java new file mode 100644 index 00000000000..218c37472a3 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MetadataStoreTopicPoliciesService.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataCache; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; +import org.jspecify.annotations.Nullable; + +/** + * Topic policies service backed by Pulsar metadata stores. + */ +@Slf4j +public class MetadataStoreTopicPoliciesService implements TopicPoliciesService { + + public static final String GLOBAL_POLICIES_ROOT = "/admin/topic-policies/global"; + public static final String LOCAL_POLICIES_ROOT = "/admin/topic-policies/local"; + + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Map<TopicName, List<TopicPolicyListener>> listeners = new ConcurrentHashMap<>(); + private MetadataCache<TopicPolicies> localPoliciesCache; + private MetadataCache<TopicPolicies> globalPoliciesCache; + + @Override + public void start(PulsarService pulsar) { + MetadataStore localStore = pulsar.getLocalMetadataStore(); + MetadataStore configurationStore = pulsar.getConfigurationMetadataStore(); + this.localPoliciesCache = localStore.getMetadataCache(TopicPolicies.class); + this.globalPoliciesCache = configurationStore.getMetadataCache(TopicPolicies.class); + localStore.registerListener(notification -> handleNotification(notification, false)); + configurationStore.registerListener(notification -> handleNotification(notification, true)); + } + + @Override + public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) { + return deleteTopicPoliciesAsync(topicName, false); + } + + @Override + public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName, + boolean keepGlobalPoliciesAfterDeleting) { + TopicName partitionedTopicName = normalizeTopicName(topicName); + if (NamespaceService.isHeartbeatNamespace(partitionedTopicName.getNamespaceObject())) { + return CompletableFuture.completedFuture(null); + } + if (closed.get()) { + return CompletableFuture.failedFuture(new BrokerServiceException(getClass().getName() + " is closed.")); + } + CompletableFuture<Void> deleteLocal = + deleteIfExists(localPoliciesCache, pathFor(partitionedTopicName, false)); + if (keepGlobalPoliciesAfterDeleting) { + return deleteLocal; + } + CompletableFuture<Void> deleteGlobal = + deleteIfExists(globalPoliciesCache, pathFor(partitionedTopicName, true)); + return CompletableFuture.allOf(deleteLocal, deleteGlobal); + } + + @Override + public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, boolean isGlobalPolicy, + boolean skipUpdateWhenTopicPolicyDoesntExist, + Consumer<TopicPolicies> policyUpdater) { + TopicName partitionedTopicName = normalizeTopicName(topicName); + if (NamespaceService.isHeartbeatNamespace(partitionedTopicName.getNamespaceObject())) { + return CompletableFuture.failedFuture(new BrokerServiceException.NotAllowedException( + "Not allowed to update topic policy for the heartbeat topic")); + } + if (closed.get()) { + return CompletableFuture.failedFuture(new BrokerServiceException(getClass().getName() + " is closed.")); + } + MetadataCache<TopicPolicies> cache = cache(isGlobalPolicy); + String path = pathFor(partitionedTopicName, isGlobalPolicy); + CompletableFuture<TopicPolicies> updateFuture; + if (skipUpdateWhenTopicPolicyDoesntExist) { + updateFuture = cache.readModifyUpdate(path, + current -> updatePolicies(Optional.of(current), isGlobalPolicy, policyUpdater)); + } else { + updateFuture = cache.readModifyUpdateOrCreate(path, + current -> updatePolicies(current, isGlobalPolicy, policyUpdater)); + } + return updateFuture.thenAccept(__ -> { }).exceptionally(error -> { + if (skipUpdateWhenTopicPolicyDoesntExist + && FutureUtil.unwrapCompletionException(error) instanceof NotFoundException) { + return null; + } + throw FutureUtil.wrapToCompletionException(error); + }); + } + + @Override + public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicName topicName, GetType type) { + TopicName partitionedTopicName = normalizeTopicName(topicName); + if (NamespaceService.isHeartbeatNamespace(partitionedTopicName.getNamespaceObject())) { + return CompletableFuture.completedFuture(Optional.empty()); + } + if (closed.get()) { + return CompletableFuture.completedFuture(Optional.empty()); + } + boolean global = type == GetType.GLOBAL_ONLY; + return cache(global).get(pathFor(partitionedTopicName, global)) + .thenApply(policies -> policies.map(policy -> cloneWithScope(policy, global))); + } + + @Override + public boolean registerListener(TopicName topicName, TopicPolicyListener listener) { + listeners.compute(normalizeTopicName(topicName), (__, topicListeners) -> { + if (topicListeners == null) { + topicListeners = new CopyOnWriteArrayList<>(); + } + topicListeners.add(listener); + return topicListeners; + }); + return true; + } + + @Override + public void unregisterListener(TopicName topicName, TopicPolicyListener listener) { + listeners.computeIfPresent(normalizeTopicName(topicName), (__, topicListeners) -> { + topicListeners.remove(listener); + return topicListeners.isEmpty() ? null : topicListeners; + }); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + listeners.clear(); + if (localPoliciesCache != null) { + localPoliciesCache.invalidateAll(); + } + if (globalPoliciesCache != null) { + globalPoliciesCache.invalidateAll(); + } + } + } + + private MetadataCache<TopicPolicies> cache(boolean isGlobalPolicy) { + return isGlobalPolicy ? globalPoliciesCache : localPoliciesCache; + } + + private CompletableFuture<Void> deleteIfExists(MetadataCache<TopicPolicies> cache, String path) { + return cache.delete(path).handle((__, error) -> { + cache.invalidate(path); + if (error == null || FutureUtil.unwrapCompletionException(error) instanceof NotFoundException) { + return null; + } + throw FutureUtil.wrapToCompletionException(error); + }); + } + + private static TopicPolicies updatePolicies(Optional<TopicPolicies> currentPolicies, + boolean isGlobalPolicy, + Consumer<TopicPolicies> policyUpdater) { + TopicPolicies policies = currentPolicies.map(TopicPolicies::clone).orElseGet(TopicPolicies::new); + policies.setIsGlobal(isGlobalPolicy); + policyUpdater.accept(policies); + return policies; + } + + private void handleNotification(Notification notification, boolean isGlobalPolicy) { + if (closed.get() + || (notification.getType() != NotificationType.Created + && notification.getType() != NotificationType.Modified + && notification.getType() != NotificationType.Deleted)) { + return; + } + String path = notification.getPath(); + String root = isGlobalPolicy ? GLOBAL_POLICIES_ROOT : LOCAL_POLICIES_ROOT; + Optional<TopicName> topicName = topicNameFromPath(root, path); + if (topicName.isEmpty()) { + return; + } + MetadataCache<TopicPolicies> cache = cache(isGlobalPolicy); + cache.invalidate(path); + if (notification.getType() == NotificationType.Deleted) { + notifyListeners(topicName.get(), null); + return; + } + cache.get(path).whenComplete((policies, error) -> { + if (error != null) { + log.warn("[{}] Failed to refresh topic policies after metadata notification", path, error); + return; + } + notifyListeners(topicName.get(), + policies.map(policy -> cloneWithScope(policy, isGlobalPolicy)).orElse(null)); + }); + } + + private void notifyListeners(TopicName topicName, @Nullable TopicPolicies policies) { + List<TopicPolicyListener> topicListeners = listeners.get(topicName); + if (topicListeners == null) { + return; + } + for (TopicPolicyListener listener : topicListeners) { + try { + listener.onUpdate(policies == null ? null : policies.clone()); + } catch (Throwable error) { + log.error("[{}] Call topic policy listener error", topicName, error); + } + } + } + + private static TopicName normalizeTopicName(TopicName topicName) { + return TopicName.get(topicName.getPartitionedTopicName()); + } + + private static TopicPolicies cloneWithScope(TopicPolicies policies, boolean isGlobalPolicy) { + TopicPolicies cloned = policies.clone(); + cloned.setIsGlobal(isGlobalPolicy); + return cloned; + } + + @VisibleForTesting + public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesDirectFromStore(TopicName topicName, + boolean isGlobal) { + String path = pathFor(topicName, isGlobal); + MetadataCache<TopicPolicies> c = cache(isGlobal); + c.invalidate(path); + return c.get(path).thenApply(opt -> opt.map(p -> cloneWithScope(p, isGlobal))); + } + + @VisibleForTesting + static String pathFor(TopicName topicName, boolean isGlobalPolicy) { + TopicName partitionedTopicName = normalizeTopicName(topicName); + return (isGlobalPolicy ? GLOBAL_POLICIES_ROOT : LOCAL_POLICIES_ROOT) + + "/" + partitionedTopicName.getTenant() + + "/" + partitionedTopicName.getNamespacePortion() + + "/" + partitionedTopicName.getDomain() + + "/" + partitionedTopicName.getEncodedLocalName(); + } + + @VisibleForTesting + private static Optional<TopicName> topicNameFromPath(String root, String path) { + if (!path.startsWith(root + "/")) { + return Optional.empty(); + } + String[] parts = path.substring(root.length() + 1).split("/", 4); + if (parts.length != 4) { + return Optional.empty(); + } + return Optional.of(TopicName.get(parts[2], parts[0], parts[1], Codec.decode(parts[3]))); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index ad18af308e2..f8a5d593661 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -647,7 +647,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic return systemTopicClient.newReaderAsync(); } - private void removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { + void removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { NamespaceName namespace = namespaceBundle.getNamespaceObject(); if (NamespaceService.isHeartbeatNamespace(namespace)) { return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index 239c1d3d9ba..7f49686a17f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -100,6 +100,15 @@ public interface TopicPoliciesService extends AutoCloseable { default void close() throws Exception { } + + /** + * @implNote This method is never called unless by the default implementation of + * {@link TopicPoliciesService#registerListenerAsync(TopicName, TopicPolicyListener)}, which is actually called + * internally. This method is only retained for backward compatibility on custom implementations. + */ + @Deprecated + boolean registerListener(TopicName topicName, TopicPolicyListener listener); + /** * Registers a listener for topic policies updates. * @@ -110,10 +119,10 @@ public interface TopicPoliciesService extends AutoCloseable { * guaranteed to be received by the listener. * In summary, the listener is guaranteed to receive only the latest value. * </p> - * - * @return true if the listener is registered successfully */ - boolean registerListener(TopicName topicName, TopicPolicyListener listener); + default CompletableFuture<Boolean> registerListenerAsync(TopicName topicName, TopicPolicyListener listener) { + return CompletableFuture.completedFuture(registerListener(topicName, listener)); + } /** * Unregister the topic policies listener. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7b7faac3a7a..df01fcfce89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4714,7 +4714,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal protected CompletableFuture<Void> initTopicPolicy() { final var topicPoliciesService = brokerService.pulsar().getTopicPoliciesService(); final var partitionedTopicName = TopicName.getPartitionedTopicName(topic); - if (topicPoliciesService.registerListener(partitionedTopicName, this)) { + return topicPoliciesService.registerListenerAsync(partitionedTopicName, this).thenCompose(registered -> { + if (!registered) { + return CompletableFuture.completedFuture(null); + } if (ExtensibleLoadManagerImpl.isInternalTopic(topic)) { return CompletableFuture.completedFuture(null); } @@ -4726,8 +4729,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal TopicPoliciesService.GetType.LOCAL_ONLY)) .thenAcceptAsync(optionalPolicies -> optionalPolicies.ifPresent(this::onUpdate), brokerService.getTopicOrderedExecutor()); - } - return CompletableFuture.completedFuture(null); + }); } @VisibleForTesting diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MetadataStoreTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MetadataStoreTopicPoliciesTest.java new file mode 100644 index 00000000000..e7fefa16497 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MetadataStoreTopicPoliciesTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.admin; + +import org.apache.pulsar.broker.service.MetadataStoreTopicPoliciesService; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +public class MetadataStoreTopicPoliciesTest extends TopicPoliciesTest { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + conf.setTopicPoliciesServiceClassName(MetadataStoreTopicPoliciesService.class.getName()); + super.setup(); + } + + @Override + protected void clearTopicPoliciesCache() { + } + + @Test(enabled = false) + @Override + public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws Exception { + // This test is specific to SystemTopicBasedTopicPoliciesService (uses getPoliciesCacheInit). + // Not applicable to MetadataStoreTopicPoliciesService. + } + + @Test(enabled = false) + @Override + public void testSystemTopicShouldBeCompacted() throws Exception { + // Relies on __change_events system topic, which does not exist with MetadataStoreTopicPoliciesService. + } + + @Test(enabled = false) + @Override + public void testPoliciesCanBeDeletedWithTopic() throws Exception { + // Directly accesses __change_events PersistentTopic for compaction. + // Not applicable to MetadataStoreTopicPoliciesService. + } + + @Test(enabled = false) + @Override + public void testProduceChangesWithEncryptionRequired() throws Exception { + // Checks __change_events LAC, which does not exist with MetadataStoreTopicPoliciesService. + } + + @Test(enabled = false) + @Override + public void testTopicPoliciesAfterCompaction(String reloadPolicyType) throws Exception { + // The "Recreate_Service" variant creates a new SystemTopicBasedTopicPoliciesService, + // which is not applicable to MetadataStoreTopicPoliciesService. + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index c5537897e7b..0a1c76ff226 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -117,7 +117,9 @@ import org.glassfish.jersey.client.JerseyClient; import org.glassfish.jersey.client.JerseyClientBuilder; import org.mockito.Mockito; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -142,10 +144,11 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { private final int testTopicPartitions = 2; - @BeforeMethod + @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { this.conf.setDefaultNumberOfNamespaceBundles(1); + this.conf.setForceDeleteNamespaceAllowed(true); super.internalSetup(); admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); @@ -156,15 +159,48 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { admin.topics().createPartitionedTopic(testTopic, testTopicPartitions); Producer producer = pulsarClient.newProducer().topic(testTopic).create(); producer.close(); - waitForZooKeeperWatchers(); } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) @Override public void cleanup() throws Exception { super.internalCleanup(); } + @BeforeMethod + void setupTestTopic() throws Exception { + // Recreate namespace to clear any policies set by previous tests + try { + admin.topics().deletePartitionedTopic(testTopic, true); + } catch (PulsarAdminException.NotFoundException e) { + // topic may already be deleted + } + try { + admin.namespaces().deleteNamespace(myNamespace, true); + } catch (PulsarAdminException.NotFoundException e) { + // namespace may already be deleted + } + try { + admin.namespaces().deleteNamespace(myNamespaceV1, true); + } catch (PulsarAdminException.NotFoundException e) { + // namespace may already be deleted + } + admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Set.of("test")); + admin.namespaces().createNamespace(myNamespaceV1); + admin.topics().createPartitionedTopic(testTopic, testTopicPartitions); + // Acquire namespace bundle ownership so tests that call getOrCreateTopic() directly succeed. + // Without this, services that don't create a __change_events reader (e.g. MetadataStoreTopicPoliciesService) + // leave the bundle unowned after namespace recreation and the first broker-side topic load fails. + admin.lookups().lookupTopic(testTopic + "-partition-0"); + } + + @AfterMethod(alwaysRun = true) + void afterMethodCleanup() throws Exception{ + admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInMessages", "0"); + admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", "0"); + clearTopicPoliciesCache(); + } + @Test public void updatePropertiesForAutoCreatedTopicTest() throws Exception { TopicName topicName = TopicName.get( @@ -483,8 +519,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { @Test(dataProvider = "clientRequestType") public void testPriorityOfGlobalPolicies(String clientRequestType) throws Exception { - final SystemTopicBasedTopicPoliciesService topicPoliciesService = - (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + final TopicPoliciesService topicPoliciesService = + pulsar.getTopicPoliciesService(); final JerseyClient httpClient = JerseyClientBuilder.createClient(); // create topic and load it up. final String namespace = myNamespace; @@ -564,8 +600,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { @Test(dataProvider = "clientRequestType") public void testPriorityOfGlobalPolicies2(String clientRequestType) throws Exception { - final SystemTopicBasedTopicPoliciesService topicPoliciesService = - (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + final TopicPoliciesService topicPoliciesService = + pulsar.getTopicPoliciesService(); final JerseyClient httpClient = JerseyClientBuilder.createClient(); // create topic and load it up. final String namespace = myNamespace; @@ -651,8 +687,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final TopicName topicName = TopicName.get(topic); admin.topics().createNonPartitionedTopic(topic); pulsarClient.newProducer().topic(topic).create().close(); - final SystemTopicBasedTopicPoliciesService topicPoliciesService = - (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + final TopicPoliciesService topicPoliciesService = + pulsar.getTopicPoliciesService(); // Set non-global policy of the limitation of max consumers. // Set global policy of the limitation of max producers. @@ -693,8 +729,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final TopicName topicName = TopicName.get(topic); admin.topics().createNonPartitionedTopic(topic); pulsarClient.newProducer().topic(topic).create().close(); - final SystemTopicBasedTopicPoliciesService topicPoliciesService = - (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + final TopicPoliciesService topicPoliciesService = + pulsar.getTopicPoliciesService(); // Set non-global policy of the limitation of max consumers. // Set global policy of the persistence policies. @@ -2482,10 +2518,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { @Test public void testPublishRateInDifferentLevelPolicy() throws Exception { - cleanup(); - conf.setMaxPublishRatePerTopicInMessages(5); - conf.setMaxPublishRatePerTopicInBytes(50L); - setup(); + admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInMessages", "5"); + admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", "50"); final String topicName = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); pulsarClient.newProducer().topic(topicName).create().close(); @@ -2776,9 +2810,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { @Test public void testMaxUnackedMessagesOnSubscriptionPriority() throws Exception { - cleanup(); - conf.setMaxUnackedMessagesPerSubscription(30); - setup(); + restartBroker(conf -> conf.setMaxUnackedMessagesPerSubscription(30)); final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); // init cache @Cleanup @@ -2841,6 +2873,9 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { && admin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic) == null); messages = getMsgReceived(consumer1, Integer.MAX_VALUE); assertEquals(messages.size(), defaultMaxUnackedMsgOnBroker); + + // restore default config + restartBroker(conf -> conf.setMaxUnackedMessagesPerSubscription(4 * 50000)); } private void produceMsg(Producer producer, int msgNum) throws Exception{ @@ -3025,14 +3060,16 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { @Test(timeOut = 30000) public void testAutoCreationDisabled() throws Exception { - cleanup(); - conf.setAllowAutoTopicCreation(false); - setup(); + admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", "false"); + final String topic = testTopic + UUID.randomUUID(); admin.topics().createPartitionedTopic(topic, 3); pulsarClient.newProducer().topic(topic).create().close(); //should not fail assertNull(admin.topicPolicies().getMessageTTL(topic)); + + // restore default + admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", "true"); } @Test @@ -3156,6 +3193,12 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { pulsarClient.newConsumer().topic(topic) .subscriptionType(SubscriptionType.Shared).subscriptionName("test") .subscribe().close(); + + // restore dynamic broker config and conf object + pulsar.getConfiguration().setSubscriptionTypesEnabled( + Set.of("Exclusive", "Shared", "Failover", "Key_Shared")); + admin.brokers().updateDynamicConfiguration("subscriptionTypesEnabled", + "Exclusive,Shared,Failover,Key_Shared"); } @Test(timeOut = 20000) @@ -3486,7 +3529,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { } @Test - public void testDoNotCreateSystemTopicForHeartbeatNamespace() { + public void testDoNotCreateSystemTopicForHeartbeatNamespace() throws Exception { + initEventsTopicAndPartitions(); assertTrue(pulsar.getBrokerService().getTopics().size() > 0); pulsar.getBrokerService().getTopics().forEach((k, v) -> { TopicName topicName = TopicName.get(k); @@ -3548,8 +3592,13 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { } private void triggerAndWaitNewTopicCompaction(String topicName) throws Exception { - PersistentTopic tp = - (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + Optional<Topic> topicOpt = + pulsar.getBrokerService().getTopic(topicName, false).join(); + if (topicOpt.isEmpty()) { + // Topic doesn't exist (e.g., when not using system-topic-based policies service), nothing to compact. + return; + } + PersistentTopic tp = (PersistentTopic) topicOpt.get(); // Wait for the old task finish. Awaitility.await().untilAsserted(() -> { CompletableFuture<Long> compactionTask = WhiteboxImpl.getInternalState(tp, "currentCompaction"); @@ -3568,7 +3617,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { * It is not a thread safety method, something will go to a wrong pointer if there is a task is trying to load a * topic policies. */ - private void clearTopicPoliciesCache() { + protected void clearTopicPoliciesCache() { TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService(); if (topicPoliciesService instanceof TopicPoliciesService.TopicPoliciesServiceDisabled) { return; @@ -3798,8 +3847,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .isNull()); admin.topicPolicies(true).setRetention(topic, new RetentionPolicies(1, 2)); - SystemTopicBasedTopicPoliciesService topicPoliciesService = - (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + TopicPoliciesService topicPoliciesService = + pulsar.getTopicPoliciesService(); // check global topic policies can be added correctly. Awaitility.await().untilAsserted(() -> assertNotNull( @@ -3843,6 +3892,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { @Test public void testMaxMessageSizeWithChunking() throws Exception { + final var maxMessageSize = this.conf.getMaxMessageSize(); this.conf.setMaxMessageSize(1000); @Cleanup @@ -3871,6 +3921,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { // chunk message send success producer.send(new byte[2000]); + this.conf.setMaxMessageSize(maxMessageSize); } @Test(timeOut = 30000) @@ -3924,6 +3975,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { @Test public void testProduceChangesWithEncryptionRequired() throws Exception { + initEventsTopicAndPartitions(); final String beforeLac = admin.topics().getInternalStats(topicPolicyEventsTopic).lastConfirmedEntry; admin.namespaces().setEncryptionRequiredStatus(myNamespace, true); // just an update to trigger writes on __change_events @@ -4188,4 +4240,10 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), (Long) (1024 * 1024 * 10L), "Should inherit offload threshold from legacy namespace policy"); } + + private void initEventsTopicAndPartitions() throws Exception { + try (Producer<?> producer = pulsarClient.newProducer().topic(testTopic).create()) { + // No-op. Creating the producer initializes the events topic and partitions. + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesServiceTest.java new file mode 100644 index 00000000000..47a7de0528d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesServiceTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.awaitility.Awaitility; +import org.awaitility.core.ThrowingRunnable; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Test order: testUpgrade() -> other tests (with MetadataStoreTopicPoliciesService configured) -> testDowngrade(). + */ +@Test(groups = "broker") +public class LegacyAwareTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest { + + private static final String metaNamespace = "public/meta-ns"; + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.setupDefaultTenantAndNamespace(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(priority = -1) + public void testUpgrade() throws Exception { + final var topic = "test-upgrade"; + admin.topics().createNonPartitionedTopic(topic); + admin.topicPolicies().setCompactionThreshold(topic, 100); + waitUntilAssert(() -> assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 100)); + + restartBroker(conf -> { + conf.setSystemTopicEnabled(false); + conf.setTopicPoliciesServiceClassName(MetadataStoreTopicPoliciesService.class.getName()); + }); + // The policies will be lost because when system topic is disabled, it will not try to read policies from the + // __change_events topic + assertNull(admin.topicPolicies().getCompactionThreshold(topic)); + + restartBroker(conf -> conf.setSystemTopicEnabled(true)); + // The default namespace still read policies from the __change_events topic if it exists + assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 100); + assertFalse(pulsar.getLocalMetadataStore().exists(MetadataStoreTopicPoliciesService.LOCAL_POLICIES_ROOT).get()); + + // The global policies are still stored in the __change_events topic + admin.topicPolicies(true).setCompactionThreshold(topic, 200); + waitUntilAssert(() -> assertEquals(admin.topicPolicies(true).getCompactionThreshold(topic), 200)); + assertFalse(pulsar.getConfigurationMetadataStore() + .exists(MetadataStoreTopicPoliciesService.GLOBAL_POLICIES_ROOT).get()); + + admin.topicPolicies().deleteTopicPolicies(topic); + waitUntilAssert(() -> assertNull(admin.topicPolicies().getCompactionThreshold(topic))); + + admin.namespaces().createNamespace(metaNamespace); + } + + @Test(priority = 1) + public void testDowngrade() throws Exception { + final var topic1 = "downgrade"; // in default namespace + admin.topics().createNonPartitionedTopic(topic1); + admin.topicPolicies().setCompactionThreshold(topic1, 1); + waitUntilAssert(() -> assertEquals(admin.topicPolicies().getCompactionThreshold(topic1), 1)); + + final var topic2 = metaNamespace + "/downgrade"; + admin.topics().createNonPartitionedTopic(topic2); + admin.topicPolicies().setCompactionThreshold(topic2, 2); + waitUntilAssert(() -> assertEquals(admin.topicPolicies().getCompactionThreshold(topic2), 2)); + + restartBroker(conf -> + conf.setTopicPoliciesServiceClassName(SystemTopicBasedTopicPoliciesService.class.getName())); + assertEquals(admin.topicPolicies().getCompactionThreshold(topic1), 1); + // The policies will be lost because they are not stored in the __change_events topic + assertNull(admin.topicPolicies().getCompactionThreshold(topic2)); + } + + @DataProvider + public Object[][] namespaces() { + return new Object[][] { + { "public/default" }, + { metaNamespace } + }; + } + + @Test(dataProvider = "namespaces") + public void testPoliciesOperations(String namespace) throws Exception { + final var topicName = TopicName.get(namespace + "/test-policies-operations"); + final var topic = topicName.toString(); + admin.topics().createNonPartitionedTopic(topic); + + final var compactionThreshold = new AtomicLong(0); + // Verify the exception thrown from one listener does not affect other listeners + pulsar.getTopicPoliciesService().registerListenerAsync(topicName, __ -> { + throw new RuntimeException("injected failure"); + }).get(); + pulsar.getTopicPoliciesService().registerListenerAsync(topicName, policies -> + Optional.ofNullable(policies).map(TopicPolicies::getCompactionThreshold).ifPresentOrElse( + compactionThreshold::set, () -> compactionThreshold.set(-1))).get(); + + // Verify Created events are handled + admin.topicPolicies(false).setCompactionThreshold(topic, 100); + waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 100)); + final var localStore = pulsar.getLocalMetadataStore(); + final var configurationStore = pulsar.getConfigurationMetadataStore(); + + if (namespace.equals(metaNamespace)) { + assertTrue(localStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName, false)).get()); + assertFalse(configurationStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName, true)).get()); + } + + admin.topicPolicies(true).setCompactionThreshold(topic, 200); + waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 200)); + if (namespace.equals(metaNamespace)) { + assertTrue(configurationStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName, true)).get()); + } + + // Verify Modified events are handled + admin.topicPolicies(false).setCompactionThreshold(topic, 300); + waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 300)); + + admin.topicPolicies(true).setCompactionThreshold(topic, 400); + waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 400)); + + final var readerNamespaces = ((LegacyAwareTopicPoliciesService) pulsar.getTopicPoliciesService()) + .systemTopicService.getReaderCaches().keySet(); + assertFalse(readerNamespaces.contains(NamespaceName.get(metaNamespace))); + + // Verify Deleted events are handled + admin.topicPolicies(false).deleteTopicPolicies(topic); + waitUntilAssert(() -> assertEquals(compactionThreshold.get(), -1)); + if (namespace.equals(metaNamespace)) { + assertFalse(localStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName, false)).get()); + assertFalse(configurationStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName, true)).get()); + } + } + + @Test + public void testUserCreatedEventsTopicAreIgnored() throws Exception { + final var topic = TopicName.get(metaNamespace + "/" + System.currentTimeMillis()).toString(); + admin.topics().createNonPartitionedTopic(topic); + admin.topicPolicies().setCompactionThreshold(topic, 1); + waitUntilAssert(() -> assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 1)); + + final var eventsTopic = metaNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; + admin.topics().createNonPartitionedTopic(eventsTopic); + // Even if the __change_events topic is created, since it has detected the namespace didn't have the events + // topic before, it will be ignored and the policies are still read from metadata store. + waitUntilAssert(() -> assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 1)); + admin.topics().delete(eventsTopic); + } + + private static void waitUntilAssert(ThrowingRunnable assertion) { + Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(assertion); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index ad02b4707a3..8d3b16723ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -110,7 +110,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic CompletableFuture<Void> f = CompletableFuture.completedFuture(null).thenRunAsync(() -> { for (int i = 0; i < 100; i++) { TopicPolicyListener listener = new TopicPolicyListenerImpl(); - systemTopicBasedTopicPoliciesService.registerListener(topicName, listener); + systemTopicBasedTopicPoliciesService.registerListenerAsync(topicName, listener); Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName)); Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1); systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener); @@ -119,7 +119,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic for (int i = 0; i < 100; i++) { TopicPolicyListener listener = new TopicPolicyListenerImpl(); - systemTopicBasedTopicPoliciesService.registerListener(topicName, listener); + systemTopicBasedTopicPoliciesService.registerListenerAsync(topicName, listener); Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName)); Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1); systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java index 6b9735d59b2..7e9c697fb5d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java @@ -72,6 +72,13 @@ public class TopicPolicyTestUtils { public static Optional<TopicPolicies> getTopicPoliciesBypassCache(TopicPoliciesService topicPoliciesService, TopicName topicName, boolean isGlobal) throws Exception { + if (topicPoliciesService instanceof LegacyAwareTopicPoliciesService legacyService) { + TopicPoliciesService resolved = legacyService.resolveService(topicName.getNamespaceObject()).get(); + return getTopicPoliciesBypassCache(resolved, topicName, isGlobal); + } + if (topicPoliciesService instanceof MetadataStoreTopicPoliciesService metadataStoreService) { + return metadataStoreService.getTopicPoliciesDirectFromStore(topicName, isGlobal).get(); + } @Cleanup final var reader = ((SystemTopicBasedTopicPoliciesService) topicPoliciesService) .getNamespaceEventsSystemTopicFactory() .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject())
