This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eb79b2d013028e3144aaa2feb34f105283e1ebae Author: Hao Zhang <[email protected]> AuthorDate: Tue Apr 7 11:36:30 2026 +0800 [fix][broker] Fix backlog clearing for unloaded namespace bundles (#25272) Co-authored-by: 张浩 <[email protected]> (cherry picked from commit 8e37e3ee63d8fcebed493daa6e577fdc08591746) --- .../apache/pulsar/broker/admin/AdminResource.java | 7 - .../pulsar/broker/admin/impl/NamespacesBase.java | 302 ++++++++++----------- .../apache/pulsar/broker/admin/v1/Namespaces.java | 45 ++- .../apache/pulsar/broker/admin/v2/Namespaces.java | 57 ++-- .../pulsar/broker/namespace/NamespaceService.java | 19 ++ .../apache/pulsar/broker/admin/AdminApiTest.java | 126 +++++++++ 6 files changed, 370 insertions(+), 186 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 103181a7c73..dd45ac3c94c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.admin; -import static org.apache.commons.lang3.StringUtils.isBlank; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.errorprone.annotations.CanIgnoreReturnValue; @@ -852,12 +851,6 @@ public abstract class AdminResource extends PulsarWebResource { } } - protected void checkNotBlank(String str, String errorMessage) { - if (isBlank(str)) { - throw new RestException(Status.PRECONDITION_FAILED, errorMessage); - } - } - protected boolean isManagedLedgerNotFoundException(Throwable cause) { return cause instanceof ManagedLedgerException.MetadataNotFoundException || cause instanceof MetadataStoreException.NotFoundException; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 56f4fc1400c..b0006e459bb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -70,7 +70,6 @@ import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.GrantTopicPermissionOptions; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; @@ -78,7 +77,6 @@ import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; -import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; @@ -1538,139 +1536,108 @@ public abstract class NamespacesBase extends AdminResource { ); } - protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); - - final List<CompletableFuture<Void>> futures = new ArrayList<>(); - try { - NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundles(namespaceName); - for (NamespaceBundle nsBundle : bundles.getBundles()) { - // check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear - if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) { - futures.add(pulsar().getAdminClient().namespaces() - .clearNamespaceBundleBacklogAsync(namespaceName.toString(), nsBundle.getBundleRange())); - } - } - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - return; - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - return; - } - - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - log.warn("[{}] Failed to clear backlog on the bundles for namespace {}: {}", clientAppId(), - namespaceName, exception.getCause().getMessage()); - if (exception.getCause() instanceof PulsarAdminException) { - asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); - return null; - } else { - asyncResponse.resume(new RestException(exception.getCause())); - return null; - } - } - log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}", clientAppId(), - namespaceName); - asyncResponse.resume(Response.noContent().build()); - return null; - }); + protected CompletableFuture<Void> internalClearNamespaceBacklogAsync(boolean authoritative) { + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG) + .thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundlesAsync(namespaceName)) + .thenCompose(bundles -> { + final List<CompletableFuture<Void>> futures = new ArrayList<>(); + for (NamespaceBundle nsBundle : bundles.getBundles()) { + try { + futures.add(pulsar().getAdminClient().namespaces() + .clearNamespaceBundleBacklogAsync(namespaceName.toString(), + nsBundle.getBundleRange())); + } catch (PulsarServerException e) { + return CompletableFuture.failedFuture(e); + } + } + return FutureUtil.waitForAll(futures); + }).thenRun(() -> log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}", + clientAppId(), namespaceName)); } @SuppressWarnings("deprecation") - protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); - checkNotNull(bundleRange, "BundleRange should not be null"); - - Policies policies = getNamespacePolicies(namespaceName); - - if (namespaceName.isGlobal()) { - // check cluster ownership for a given global namespace: redirect if peer-cluster owns it - validateGlobalNamespaceOwnership(namespaceName); - } else { - validateClusterOwnership(namespaceName.getCluster()); - validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); + protected CompletableFuture<Void> internalClearNamespaceBundleBacklogAsync(String bundleRange, + boolean authoritative) { + if (bundleRange == null) { + return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "BundleRange should not be null")); } - validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); - - clearBacklog(namespaceName, bundleRange, null); - log.info("[{}] Successfully cleared backlog on namespace bundle {}/{}", clientAppId(), namespaceName, - bundleRange); + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG) + .thenCompose(__ -> { + // check cluster ownership for a given global namespace: redirect if peer-cluster owns it + return validateGlobalNamespaceOwnershipAsync(namespaceName); + }) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(policies -> + // Allow acquiring ownership for an unassigned bundle so backlog can be cleared + // even if not loaded. + validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + authoritative, false)) + .thenCompose(bundle -> clearBacklogAsync(bundle, null)) + .thenRun(() -> log.info("[{}] Successfully cleared backlog on namespace bundle {}/{}", clientAppId(), + namespaceName, bundleRange)); } - protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription, - boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); - checkNotNull(subscription, "Subscription should not be null"); - - final List<CompletableFuture<Void>> futures = new ArrayList<>(); - try { - NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundles(namespaceName); - for (NamespaceBundle nsBundle : bundles.getBundles()) { - // check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear - if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) { - futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync( - namespaceName.toString(), nsBundle.getBundleRange(), subscription)); - } - } - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - return; - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - return; + protected CompletableFuture<Void> internalClearNamespaceBacklogForSubscriptionAsync(String subscription, + boolean authoritative) { + if (subscription == null) { + return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Subscription should not be null")); } - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - log.warn("[{}] Failed to clear backlog for subscription {} on the bundles for namespace {}: {}", - clientAppId(), subscription, namespaceName, exception.getCause().getMessage()); - if (exception.getCause() instanceof PulsarAdminException) { - asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); - return null; - } else { - asyncResponse.resume(new RestException(exception.getCause())); - return null; - } - } - log.info("[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}", - clientAppId(), subscription, namespaceName); - asyncResponse.resume(Response.noContent().build()); - return null; - }); + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG) + .thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundlesAsync(namespaceName)) + .thenCompose(bundles -> { + final List<CompletableFuture<Void>> futures = new ArrayList<>(); + for (NamespaceBundle nsBundle : bundles.getBundles()) { + try { + futures.add(pulsar().getAdminClient().namespaces() + .clearNamespaceBundleBacklogForSubscriptionAsync( + namespaceName.toString(), nsBundle.getBundleRange(), subscription)); + } catch (PulsarServerException e) { + return CompletableFuture.failedFuture(e); + } + } + return FutureUtil.waitForAll(futures); + }).thenRun(() -> log.info( + "[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}", + clientAppId(), subscription, namespaceName)); } @SuppressWarnings("deprecation") - protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange, - boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); - checkNotNull(subscription, "Subscription should not be null"); - checkNotNull(bundleRange, "BundleRange should not be null"); - - Policies policies = getNamespacePolicies(namespaceName); - - if (namespaceName.isGlobal()) { - // check cluster ownership for a given global namespace: redirect if peer-cluster owns it - validateGlobalNamespaceOwnership(namespaceName); - } else { - validateClusterOwnership(namespaceName.getCluster()); - validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); + protected CompletableFuture<Void> internalClearNamespaceBundleBacklogForSubscriptionAsync(String subscription, + String bundleRange, + boolean authoritative) { + if (subscription == null) { + return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Subscription should not be null")); + } + if (bundleRange == null) { + return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "BundleRange should not be null")); } - validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); - - clearBacklog(namespaceName, bundleRange, subscription); - log.info("[{}] Successfully cleared backlog for subscription {} on namespace bundle {}/{}", clientAppId(), - subscription, namespaceName, bundleRange); + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG) + .thenCompose(__ -> { + // check cluster ownership for a given global namespace: redirect if peer-cluster owns it + return validateGlobalNamespaceOwnershipAsync(namespaceName); + }) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(policies -> + // Allow acquiring ownership for an unassigned bundle so backlog can be cleared + // even if not loaded. + validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + authoritative, false)) + .thenCompose(bundle -> clearBacklogAsync(bundle, subscription)) + .thenRun(() -> log.info( + "[{}] Successfully cleared backlog for subscription {} on namespace bundle {}/{}", + clientAppId(), subscription, namespaceName, bundleRange)); } protected CompletableFuture<Void> internalUnsubscribeNamespaceAsync(String subscription, boolean authoritative) { - checkNotNull(subscription, "Subscription should not be null"); + if (subscription == null) { + return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Subscription should not be null")); + } return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE) .thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory() @@ -1693,8 +1660,12 @@ public abstract class NamespacesBase extends AdminResource { @SuppressWarnings("deprecation") protected CompletableFuture<Void> internalUnsubscribeNamespaceBundleAsync(String subscription, String bundleRange, boolean authoritative) { - checkNotNull(subscription, "Subscription should not be null"); - checkNotNull(bundleRange, "BundleRange should not be null"); + if (subscription == null) { + return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Subscription should not be null")); + } + if (bundleRange == null) { + return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "BundleRange should not be null")); + } return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE) .thenCompose(__ -> { @@ -1794,8 +1765,14 @@ public abstract class NamespacesBase extends AdminResource { } protected CompletableFuture<Void> internalSetNamespaceAntiAffinityGroupAsync(String antiAffinityGroup) { - checkNotNull(antiAffinityGroup, "Anti-affinity group should not be null"); - checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty"); + if (antiAffinityGroup == null) { + return FutureUtil.failedFuture( + new RestException(Status.BAD_REQUEST, "Anti-affinity group should not be null")); + } + if (StringUtils.isBlank(antiAffinityGroup)) { + return FutureUtil.failedFuture( + new RestException(Status.PRECONDITION_FAILED, "Anti-affinity group can't be empty")); + } return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()).thenCompose( __ -> getDefaultBundleDataAsync().thenCompose( @@ -1833,10 +1810,20 @@ public abstract class NamespacesBase extends AdminResource { protected CompletableFuture<List<String>> internalGetAntiAffinityNamespacesAsync(String cluster, String antiAffinityGroup, String tenant) { - checkNotNull(cluster, "Cluster should not be null"); - checkNotNull(antiAffinityGroup, "Anti-affinity group should not be null"); - checkNotNull(tenant, "Tenant should not be null"); - checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty"); + if (cluster == null) { + return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Cluster should not be null")); + } + if (antiAffinityGroup == null) { + return FutureUtil.failedFuture( + new RestException(Status.BAD_REQUEST, "Anti-affinity group should not be null")); + } + if (tenant == null) { + return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Tenant should not be null")); + } + if (StringUtils.isBlank(antiAffinityGroup)) { + return FutureUtil.failedFuture( + new RestException(Status.PRECONDITION_FAILED, "Anti-affinity group can't be empty")); + } return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ) .thenCompose(__ -> validateClusterExistsAsync(cluster)) @@ -1867,39 +1854,48 @@ public abstract class NamespacesBase extends AdminResource { return checkBacklogQuota(quota, retention); } - private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) { - try { - List<Topic> topicList = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(), - nsName.toString() + "/" + bundleRange); - - List<CompletableFuture<Void>> futures = new ArrayList<>(); - if (subscription != null) { - if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { - subscription = PersistentReplicator.getRemoteCluster(subscription); - } - for (Topic topic : topicList) { - if (topic instanceof PersistentTopic - && !pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) { - futures.add(((PersistentTopic) topic).clearBacklog(subscription)); + private CompletableFuture<Void> clearBacklogAsync(NamespaceBundle bundle, String subscription) { + return pulsar().getNamespaceService().getOwnedPersistentTopicListForNamespaceBundle(bundle) + .thenCompose(topicsInBundle -> { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + String effectiveSubscription = subscription; + if (effectiveSubscription != null + && effectiveSubscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { + effectiveSubscription = PersistentReplicator.getRemoteCluster(effectiveSubscription); } - } - } else { - for (Topic topic : topicList) { - if (topic instanceof PersistentTopic - && !pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) { - futures.add(((PersistentTopic) topic).clearBacklog()); + final String finalSubscription = effectiveSubscription; + + for (String topic : topicsInBundle) { + TopicName topicName = TopicName.get(topic); + if (pulsar().getBrokerService().isSystemTopic(topicName)) { + continue; + } + futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false) + .thenCompose(optTopic -> { + if (optTopic.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + Topic loaded = optTopic.get(); + if (!(loaded instanceof PersistentTopic persistentTopic)) { + return CompletableFuture.completedFuture(null); + } + return finalSubscription != null + ? persistentTopic.clearBacklog(finalSubscription) + : persistentTopic.clearBacklog(); + })); } - } - } - FutureUtil.waitForAll(futures).get(); - } catch (Exception e) { - log.error("[{}] Failed to clear backlog for namespace {}/{}, subscription: {}", clientAppId(), - nsName.toString(), bundleRange, subscription, e); - throw new RestException(e); - } + return FutureUtil.waitForAll(futures); + }).exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof RestException) { + throw (RestException) cause; + } + throw new RestException(cause); + }); } + private CompletableFuture<Void> unsubscribeAsync(NamespaceBundle bundle, String subscription) { if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { return CompletableFuture.failedFuture( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 1c736d749ee..8e29256c2ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1375,12 +1375,20 @@ public class Namespaces extends NamespacesBase { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateNamespaceName(property, cluster, namespace); - internalClearNamespaceBacklog(asyncResponse, authoritative); } catch (WebApplicationException wae) { asyncResponse.resume(wae); + return; } catch (Exception e) { asyncResponse.resume(new RestException(e)); + return; } + internalClearNamespaceBacklogAsync(authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog on namespace {}", clientAppId(), namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1390,12 +1398,20 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBundleBacklog(@PathParam("property") String property, + public void clearNamespaceBundleBacklog(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(property, cluster, namespace); - internalClearNamespaceBundleBacklog(bundleRange, authoritative); + internalClearNamespaceBundleBacklogAsync(bundleRange, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog on namespace bundle {}/{}", clientAppId(), + namespaceName, bundleRange, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1410,12 +1426,21 @@ public class Namespaces extends NamespacesBase { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateNamespaceName(property, cluster, namespace); - internalClearNamespaceBacklogForSubscription(asyncResponse, subscription, authoritative); } catch (WebApplicationException wae) { asyncResponse.resume(wae); + return; } catch (Exception e) { asyncResponse.resume(new RestException(e)); + return; } + internalClearNamespaceBacklogForSubscriptionAsync(subscription, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog for subscription {} on namespace {}", clientAppId(), + subscription, namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1425,12 +1450,20 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBundleBacklogForSubscription(@PathParam("property") String property, + public void clearNamespaceBundleBacklogForSubscription(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(property, cluster, namespace); - internalClearNamespaceBundleBacklogForSubscription(subscription, bundleRange, authoritative); + internalClearNamespaceBundleBacklogForSubscriptionAsync(subscription, bundleRange, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog for subscription {} on namespace bundle {}/{}", + clientAppId(), subscription, namespaceName, bundleRange, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 836d2699e18..8e6f4fb62a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1512,14 +1512,14 @@ public class Namespaces extends NamespacesBase { public void clearNamespaceBacklog(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateNamespaceName(tenant, namespace); - internalClearNamespaceBacklog(asyncResponse, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(tenant, namespace); + internalClearNamespaceBacklogAsync(authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog on namespace {}", clientAppId(), namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1530,11 +1530,19 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBundleBacklog(@PathParam("tenant") String tenant, + public void clearNamespaceBundleBacklog(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(tenant, namespace); - internalClearNamespaceBundleBacklog(bundleRange, authoritative); + internalClearNamespaceBundleBacklogAsync(bundleRange, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog on namespace bundle {}/{}", clientAppId(), + namespaceName, bundleRange, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1548,14 +1556,15 @@ public class Namespaces extends NamespacesBase { @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateNamespaceName(tenant, namespace); - internalClearNamespaceBacklogForSubscription(asyncResponse, subscription, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(tenant, namespace); + internalClearNamespaceBacklogForSubscriptionAsync(subscription, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog for subscription {} on namespace {}", clientAppId(), + subscription, namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1566,12 +1575,20 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBundleBacklogForSubscription(@PathParam("tenant") String tenant, + public void clearNamespaceBundleBacklogForSubscription(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(tenant, namespace); - internalClearNamespaceBundleBacklogForSubscription(subscription, bundleRange, authoritative); + internalClearNamespaceBundleBacklogForSubscriptionAsync(subscription, bundleRange, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog for subscription {} on namespace bundle {}/{}", + clientAppId(), subscription, namespaceName, bundleRange, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index f17526416e5..2a40cfa4464 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1388,6 +1388,25 @@ public class NamespaceService implements AutoCloseable { }); } + public CompletableFuture<List<String>> getOwnedPersistentTopicListForNamespaceBundle(NamespaceBundle bundle) { + return getListOfPersistentTopics(bundle.getNamespaceObject()).thenCompose(topics -> + CompletableFuture.completedFuture( + topics.stream() + .filter(topic -> bundle.includes(TopicName.get(topic))) + .collect(Collectors.toList()))) + .thenCombine(getPartitions(bundle.getNamespaceObject(), TopicDomain.persistent).thenCompose(topics -> + CompletableFuture.completedFuture( + topics.stream().filter(topic -> bundle.includes(TopicName.get(topic))) + .collect(Collectors.toList()))), (left, right) -> { + for (String topic : right) { + if (!left.contains(topic)) { + left.add(topic); + } + } + return left; + }); + } + /*** * Checks whether the topic exists( partitioned or non-partitioned ). */ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 2c5d045f758..9a5c72ff240 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2284,6 +2284,132 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { assertEquals(backlog, 0); } + @Test(dataProvider = "numBundles") + public void testClearNamespaceBundleBacklogOnUnloadedBundle(Integer numBundles) throws Exception { + String namespace = "prop-xyz/ns1-bundles"; + admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles); + + String topic = "persistent://" + namespace + "/t1"; + String subscription = "sub1"; + + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + for (int i = 0; i < 10; i++) { + producer.send(("message-" + i).getBytes()); + } + producer.close(); + consumer.close(); + + long backlog = admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog(); + assertEquals(backlog, 10); + + NamespaceBundle bundle = + pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic)); + admin.namespaces().unloadNamespaceBundle(namespace, bundle.getBundleRange()); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by current broker"); + assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by other broker"); + }); + + admin.namespaces().clearNamespaceBundleBacklog(namespace, bundle.getBundleRange()); + + @Cleanup + Consumer<byte[]> consumer1 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + long backlogAfter = admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog(); + assertEquals(backlogAfter, 0); + } + + @Test(dataProvider = "numBundles") + public void testClearNamespaceBundleBacklogForSubscriptionOnUnloadedBundle(Integer numBundles) throws Exception { + String namespace = "prop-xyz/ns1-bundles"; + admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles); + + String topic = "persistent://" + namespace + "/t1"; + String subscription = "sub1"; + String otherSubscription = "sub2"; + + Consumer<byte[]> consumer1 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + Consumer<byte[]> consumer2 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(otherSubscription) + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + for (int i = 0; i < 10; i++) { + producer.send(("message-" + i).getBytes()); + } + producer.close(); + consumer1.close(); + consumer2.close(); + + long backlog = admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog(); + assertEquals(backlog, 10); + long otherBacklog = admin.topics().getStats(topic).getSubscriptions().get(otherSubscription).getMsgBacklog(); + assertEquals(otherBacklog, 10); + + NamespaceBundle bundle = + pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic)); + admin.namespaces().unloadNamespaceBundle(namespace, bundle.getBundleRange()); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by current broker"); + assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by other broker"); + }); + + admin.namespaces().clearNamespaceBundleBacklogForSubscription(namespace, bundle.getBundleRange(), + subscription); + + @Cleanup + Consumer<byte[]> consumer3 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + @Cleanup + Consumer<byte[]> consumer4 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(otherSubscription) + .subscribe(); + + long backlogAfter = admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog(); + assertEquals(backlogAfter, 0); + long otherBacklogAfter = + admin.topics().getStats(topic).getSubscriptions().get(otherSubscription).getMsgBacklog(); + assertEquals(otherBacklogAfter, 10); + } + + @Test(dataProvider = "bundling") public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception { admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
