This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e37e3ee63d [fix][broker] Fix backlog clearing for unloaded namespace 
bundles (#25272)
8e37e3ee63d is described below

commit 8e37e3ee63d8fcebed493daa6e577fdc08591746
Author: Hao Zhang <[email protected]>
AuthorDate: Tue Apr 7 11:36:30 2026 +0800

    [fix][broker] Fix backlog clearing for unloaded namespace bundles (#25272)
    
    Co-authored-by: 张浩 <[email protected]>
---
 .../apache/pulsar/broker/admin/AdminResource.java  |   7 -
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 295 +++++++++++----------
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  57 ++--
 .../pulsar/broker/namespace/NamespaceService.java  |  19 ++
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 126 +++++++++
 5 files changed, 333 insertions(+), 171 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 21008e4c77c..0c7c11db97e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import static org.apache.commons.lang3.StringUtils.isBlank;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
@@ -821,12 +820,6 @@ public abstract class AdminResource extends 
PulsarWebResource {
         }
     }
 
-    protected void checkNotBlank(String str, String errorMessage) {
-        if (isBlank(str)) {
-            throw new RestException(Status.PRECONDITION_FAILED, errorMessage);
-        }
-    }
-
     protected boolean isManagedLedgerNotFoundException(Throwable cause) {
         return cause instanceof 
ManagedLedgerException.MetadataNotFoundException
                 || cause instanceof MetadataStoreException.NotFoundException;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 2fd572da431..89050fbd288 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -79,7 +79,6 @@ import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
-import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
@@ -1825,129 +1824,108 @@ public abstract class NamespacesBase extends 
AdminResource {
         );
     }
 
-    protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, 
boolean authoritative) {
-        validateNamespaceOperation(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG);
-
-        final List<CompletableFuture<Void>> futures = new ArrayList<>();
-        try {
-            NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
-                    .getBundles(namespaceName);
-            for (NamespaceBundle nsBundle : bundles.getBundles()) {
-                // check if the bundle is owned by any broker, if not then 
there is no backlog on this bundle to clear
-                if 
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
-                    futures.add(pulsar().getAdminClient().namespaces()
-                            
.clearNamespaceBundleBacklogAsync(namespaceName.toString(), 
nsBundle.getBundleRange()));
-                }
-            }
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-            return;
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-            return;
-        }
-
-        FutureUtil.waitForAll(futures).handle((result, exception) -> {
-            if (exception != null) {
-                log.warn("[{}] Failed to clear backlog on the bundles for 
namespace {}: {}", clientAppId(),
-                        namespaceName, exception.getCause().getMessage());
-                if (exception.getCause() instanceof PulsarAdminException) {
-                    asyncResponse.resume(new 
RestException((PulsarAdminException) exception.getCause()));
-                    return null;
-                } else {
-                    asyncResponse.resume(new 
RestException(exception.getCause()));
-                    return null;
-                }
-            }
-            log.info("[{}] Successfully cleared backlog on all the bundles for 
namespace {}", clientAppId(),
-                    namespaceName);
-            asyncResponse.resume(Response.noContent().build());
-            return null;
-        });
+    protected CompletableFuture<Void> 
internalClearNamespaceBacklogAsync(boolean authoritative) {
+        return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG)
+                .thenCompose(__ -> 
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                        .getBundlesAsync(namespaceName))
+                .thenCompose(bundles -> {
+                    final List<CompletableFuture<Void>> futures = new 
ArrayList<>();
+                    for (NamespaceBundle nsBundle : bundles.getBundles()) {
+                        try {
+                            futures.add(pulsar().getAdminClient().namespaces()
+                                    
.clearNamespaceBundleBacklogAsync(namespaceName.toString(),
+                                            nsBundle.getBundleRange()));
+                        } catch (PulsarServerException e) {
+                            return CompletableFuture.failedFuture(e);
+                        }
+                    }
+                    return FutureUtil.waitForAll(futures);
+                }).thenRun(() -> log.info("[{}] Successfully cleared backlog 
on all the bundles for namespace {}",
+                        clientAppId(), namespaceName));
     }
 
     @SuppressWarnings("deprecation")
-    protected void internalClearNamespaceBundleBacklog(String bundleRange, 
boolean authoritative) {
-        validateNamespaceOperation(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG);
-        checkNotNull(bundleRange, "BundleRange should not be null");
-
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        // check cluster ownership for a given global namespace: redirect if 
peer-cluster owns it
-        validateGlobalNamespaceOwnership(namespaceName);
-
-        validateNamespaceBundleOwnership(namespaceName, policies.bundles, 
bundleRange, authoritative, true);
+    protected CompletableFuture<Void> 
internalClearNamespaceBundleBacklogAsync(String bundleRange,
+                                                                               
boolean authoritative) {
+        if (bundleRange == null) {
+            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "BundleRange should not be null"));
+        }
 
-        clearBacklog(namespaceName, bundleRange, null);
-        log.info("[{}] Successfully cleared backlog on namespace bundle 
{}/{}", clientAppId(), namespaceName,
-                bundleRange);
+        return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG)
+                .thenCompose(__ -> {
+                    // check cluster ownership for a given global namespace: 
redirect if peer-cluster owns it
+                    return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                })
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(policies ->
+                        // Allow acquiring ownership for an unassigned bundle 
so backlog can be cleared
+                        // even if not loaded.
+                        validateNamespaceBundleOwnershipAsync(namespaceName, 
policies.bundles, bundleRange,
+                                authoritative, false))
+                .thenCompose(bundle -> clearBacklogAsync(bundle, null))
+                .thenRun(() -> log.info("[{}] Successfully cleared backlog on 
namespace bundle {}/{}", clientAppId(),
+                        namespaceName, bundleRange));
     }
 
-    protected void internalClearNamespaceBacklogForSubscription(AsyncResponse 
asyncResponse, String subscription,
-                                                                boolean 
authoritative) {
-        validateNamespaceOperation(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG);
-        checkNotNull(subscription, "Subscription should not be null");
-
-        final List<CompletableFuture<Void>> futures = new ArrayList<>();
-        try {
-            NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
-                    .getBundles(namespaceName);
-            for (NamespaceBundle nsBundle : bundles.getBundles()) {
-                // check if the bundle is owned by any broker, if not then 
there is no backlog on this bundle to clear
-                if 
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
-                    
futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync(
-                            namespaceName.toString(), 
nsBundle.getBundleRange(), subscription));
-                }
-            }
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-            return;
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-            return;
+    protected CompletableFuture<Void> 
internalClearNamespaceBacklogForSubscriptionAsync(String subscription,
+                                                                               
         boolean authoritative) {
+        if (subscription == null) {
+            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "Subscription should not be null"));
         }
 
-        FutureUtil.waitForAll(futures).handle((result, exception) -> {
-            if (exception != null) {
-                log.warn("[{}] Failed to clear backlog for subscription {} on 
the bundles for namespace {}: {}",
-                        clientAppId(), subscription, namespaceName, 
exception.getCause().getMessage());
-                if (exception.getCause() instanceof PulsarAdminException) {
-                    asyncResponse.resume(new 
RestException((PulsarAdminException) exception.getCause()));
-                    return null;
-                } else {
-                    asyncResponse.resume(new 
RestException(exception.getCause()));
-                    return null;
-                }
-            }
-            log.info("[{}] Successfully cleared backlog for subscription {} on 
all the bundles for namespace {}",
-                    clientAppId(), subscription, namespaceName);
-            asyncResponse.resume(Response.noContent().build());
-            return null;
-        });
+        return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG)
+                .thenCompose(__ -> 
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                        .getBundlesAsync(namespaceName))
+                .thenCompose(bundles -> {
+                    final List<CompletableFuture<Void>> futures = new 
ArrayList<>();
+                    for (NamespaceBundle nsBundle : bundles.getBundles()) {
+                        try {
+                            futures.add(pulsar().getAdminClient().namespaces()
+                                    
.clearNamespaceBundleBacklogForSubscriptionAsync(
+                                            namespaceName.toString(), 
nsBundle.getBundleRange(), subscription));
+                        } catch (PulsarServerException e) {
+                            return CompletableFuture.failedFuture(e);
+                        }
+                    }
+                    return FutureUtil.waitForAll(futures);
+                }).thenRun(() -> log.info(
+                        "[{}] Successfully cleared backlog for subscription {} 
on all the bundles for namespace {}",
+                        clientAppId(), subscription, namespaceName));
     }
 
     @SuppressWarnings("deprecation")
-    protected void internalClearNamespaceBundleBacklogForSubscription(String 
subscription, String bundleRange,
-                                                                      boolean 
authoritative) {
-        validateNamespaceOperation(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG);
-        checkNotNull(subscription, "Subscription should not be null");
-        checkNotNull(bundleRange, "BundleRange should not be null");
-
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        // check cluster ownership for a given global namespace: redirect if 
peer-cluster owns it
-        validateGlobalNamespaceOwnership(namespaceName);
-
-        validateNamespaceBundleOwnership(namespaceName, policies.bundles, 
bundleRange, authoritative, true);
+    protected CompletableFuture<Void> 
internalClearNamespaceBundleBacklogForSubscriptionAsync(String subscription,
+                                                                               
               String bundleRange,
+                                                                               
               boolean authoritative) {
+        if (subscription == null) {
+            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "Subscription should not be null"));
+        }
+        if (bundleRange == null) {
+            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "BundleRange should not be null"));
+        }
 
-        clearBacklog(namespaceName, bundleRange, subscription);
-        log.info("[{}] Successfully cleared backlog for subscription {} on 
namespace bundle {}/{}", clientAppId(),
-                subscription, namespaceName, bundleRange);
+        return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG)
+                .thenCompose(__ -> {
+                    // check cluster ownership for a given global namespace: 
redirect if peer-cluster owns it
+                    return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                })
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(policies ->
+                        // Allow acquiring ownership for an unassigned bundle 
so backlog can be cleared
+                        // even if not loaded.
+                        validateNamespaceBundleOwnershipAsync(namespaceName, 
policies.bundles, bundleRange,
+                                authoritative, false))
+                .thenCompose(bundle -> clearBacklogAsync(bundle, subscription))
+                .thenRun(() -> log.info(
+                        "[{}] Successfully cleared backlog for subscription {} 
on namespace bundle {}/{}",
+                        clientAppId(), subscription, namespaceName, 
bundleRange));
     }
 
     protected CompletableFuture<Void> internalUnsubscribeNamespaceAsync(String 
subscription,
                                                                         
boolean authoritative) {
-        checkNotNull(subscription, "Subscription should not be null");
+        if (subscription == null) {
+            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "Subscription should not be null"));
+        }
 
         return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.UNSUBSCRIBE)
                 .thenCompose(__ -> 
pulsar().getNamespaceService().getNamespaceBundleFactory()
@@ -1970,8 +1948,12 @@ public abstract class NamespacesBase extends 
AdminResource {
     @SuppressWarnings("deprecation")
     protected CompletableFuture<Void> 
internalUnsubscribeNamespaceBundleAsync(String subscription, String bundleRange,
                                                                               
boolean authoritative) {
-        checkNotNull(subscription, "Subscription should not be null");
-        checkNotNull(bundleRange, "BundleRange should not be null");
+        if (subscription == null) {
+            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "Subscription should not be null"));
+        }
+        if (bundleRange == null) {
+            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "BundleRange should not be null"));
+        }
 
         return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.UNSUBSCRIBE)
                 .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
@@ -2064,8 +2046,14 @@ public abstract class NamespacesBase extends 
AdminResource {
     }
 
     protected CompletableFuture<Void> 
internalSetNamespaceAntiAffinityGroupAsync(String antiAffinityGroup) {
-        checkNotNull(antiAffinityGroup, "Anti-affinity group should not be 
null");
-        checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty");
+        if (antiAffinityGroup == null) {
+            return FutureUtil.failedFuture(
+                    new RestException(Status.BAD_REQUEST, "Anti-affinity group 
should not be null"));
+        }
+        if (StringUtils.isBlank(antiAffinityGroup)) {
+            return FutureUtil.failedFuture(
+                    new RestException(Status.PRECONDITION_FAILED, 
"Anti-affinity group can't be empty"));
+        }
         return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE)
                 .thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync()).thenCompose(
                         __ -> getDefaultBundleDataAsync().thenCompose(
@@ -2103,10 +2091,20 @@ public abstract class NamespacesBase extends 
AdminResource {
     protected CompletableFuture<List<String>> 
internalGetAntiAffinityNamespacesAsync(String cluster,
                                                                                
      String antiAffinityGroup,
                                                                                
      String tenant) {
-        checkNotNull(cluster, "Cluster should not be null");
-        checkNotNull(antiAffinityGroup, "Anti-affinity group should not be 
null");
-        checkNotNull(tenant, "Tenant should not be null");
-        checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty");
+        if (cluster == null) {
+            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "Cluster should not be null"));
+        }
+        if (antiAffinityGroup == null) {
+            return FutureUtil.failedFuture(
+                    new RestException(Status.BAD_REQUEST, "Anti-affinity group 
should not be null"));
+        }
+        if (tenant == null) {
+            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "Tenant should not be null"));
+        }
+        if (StringUtils.isBlank(antiAffinityGroup)) {
+            return FutureUtil.failedFuture(
+                    new RestException(Status.PRECONDITION_FAILED, 
"Anti-affinity group can't be empty"));
+        }
 
         return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.READ)
                 .thenCompose(__ -> validateClusterExistsAsync(cluster))
@@ -2137,39 +2135,48 @@ public abstract class NamespacesBase extends 
AdminResource {
         return checkBacklogQuota(quota, retention);
     }
 
-    private void clearBacklog(NamespaceName nsName, String bundleRange, String 
subscription) {
-        try {
-            List<Topic> topicList = 
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
-                    nsName.toString() + "/" + bundleRange);
-
-            List<CompletableFuture<Void>> futures = new ArrayList<>();
-            if (subscription != null) {
-                if 
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
-                    subscription = 
PersistentReplicator.getRemoteCluster(subscription);
-                }
-                for (Topic topic : topicList) {
-                    if (topic instanceof PersistentTopic
-                            && 
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
-                        futures.add(((PersistentTopic) 
topic).clearBacklog(subscription));
+    private CompletableFuture<Void> clearBacklogAsync(NamespaceBundle bundle, 
String subscription) {
+        return 
pulsar().getNamespaceService().getOwnedPersistentTopicListForNamespaceBundle(bundle)
+                .thenCompose(topicsInBundle -> {
+                    List<CompletableFuture<Void>> futures = new ArrayList<>();
+                    String effectiveSubscription = subscription;
+                    if (effectiveSubscription != null
+                            && 
effectiveSubscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix()))
 {
+                        effectiveSubscription = 
PersistentReplicator.getRemoteCluster(effectiveSubscription);
                     }
-                }
-            } else {
-                for (Topic topic : topicList) {
-                    if (topic instanceof PersistentTopic
-                            && 
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
-                        futures.add(((PersistentTopic) topic).clearBacklog());
+                    final String finalSubscription = effectiveSubscription;
+
+                    for (String topic : topicsInBundle) {
+                        TopicName topicName = TopicName.get(topic);
+                        if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+                            continue;
+                        }
+                        
futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false)
+                                .thenCompose(optTopic -> {
+                                    if (optTopic.isEmpty()) {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+                                    Topic loaded = optTopic.get();
+                                    if (!(loaded instanceof PersistentTopic 
persistentTopic)) {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+                                    return finalSubscription != null
+                                            ? 
persistentTopic.clearBacklog(finalSubscription)
+                                            : persistentTopic.clearBacklog();
+                                }));
                     }
-                }
-            }
 
-            FutureUtil.waitForAll(futures).get();
-        } catch (Exception e) {
-            log.error("[{}] Failed to clear backlog for namespace {}/{}, 
subscription: {}", clientAppId(),
-                    nsName.toString(), bundleRange, subscription, e);
-            throw new RestException(e);
-        }
+                    return FutureUtil.waitForAll(futures);
+                }).exceptionally(ex -> {
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                    if (cause instanceof RestException) {
+                        throw (RestException) cause;
+                    }
+                    throw new RestException(cause);
+                });
     }
 
+
     private CompletableFuture<Void> unsubscribeAsync(NamespaceBundle bundle, 
String subscription) {
         if 
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
             return CompletableFuture.failedFuture(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 1eedfb564f9..b691559cb8b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1517,14 +1517,14 @@ public class Namespaces extends NamespacesBase {
     public void clearNamespaceBacklog(@Suspended final AsyncResponse 
asyncResponse, @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            internalClearNamespaceBacklog(asyncResponse, authoritative);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        internalClearNamespaceBacklogAsync(authoritative)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to clear backlog on namespace {}", 
clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1535,11 +1535,19 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace"),
             @ApiResponse(code = 403, message = "Don't have admin or operate 
permission on the namespace"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public void clearNamespaceBundleBacklog(@PathParam("tenant") String tenant,
+    public void clearNamespaceBundleBacklog(@Suspended final AsyncResponse 
asyncResponse,
+            @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace, @PathParam("bundle") 
String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateNamespaceName(tenant, namespace);
-        internalClearNamespaceBundleBacklog(bundleRange, authoritative);
+        internalClearNamespaceBundleBacklogAsync(bundleRange, authoritative)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to clear backlog on namespace 
bundle {}/{}", clientAppId(),
+                            namespaceName, bundleRange, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1553,14 +1561,15 @@ public class Namespaces extends NamespacesBase {
             @PathParam("tenant") String tenant, @PathParam("namespace") String 
namespace,
             @PathParam("subscription") String subscription,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            internalClearNamespaceBacklogForSubscription(asyncResponse, 
subscription, authoritative);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        internalClearNamespaceBacklogForSubscriptionAsync(subscription, 
authoritative)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to clear backlog for subscription 
{} on namespace {}", clientAppId(),
+                            subscription, namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1571,12 +1580,20 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace"),
             @ApiResponse(code = 403, message = "Don't have admin or operate 
permission on the namespace"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public void 
clearNamespaceBundleBacklogForSubscription(@PathParam("tenant") String tenant,
+    public void clearNamespaceBundleBacklogForSubscription(@Suspended final 
AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
             @PathParam("bundle") String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateNamespaceName(tenant, namespace);
-        internalClearNamespaceBundleBacklogForSubscription(subscription, 
bundleRange, authoritative);
+        internalClearNamespaceBundleBacklogForSubscriptionAsync(subscription, 
bundleRange, authoritative)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to clear backlog for subscription 
{} on namespace bundle {}/{}",
+                            clientAppId(), subscription, namespaceName, 
bundleRange, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index d9e076a0002..54ccc5a0022 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1381,6 +1381,25 @@ public class NamespaceService implements AutoCloseable {
                 });
     }
 
+    public CompletableFuture<List<String>> 
getOwnedPersistentTopicListForNamespaceBundle(NamespaceBundle bundle) {
+        return 
getListOfPersistentTopics(bundle.getNamespaceObject()).thenCompose(topics ->
+                        CompletableFuture.completedFuture(
+                                topics.stream()
+                                        .filter(topic -> 
bundle.includes(TopicName.get(topic)))
+                                        .collect(Collectors.toList())))
+                .thenCombine(getPartitions(bundle.getNamespaceObject(), 
TopicDomain.persistent).thenCompose(topics ->
+                        CompletableFuture.completedFuture(
+                                topics.stream().filter(topic -> 
bundle.includes(TopicName.get(topic)))
+                                        .collect(Collectors.toList()))), 
(left, right) -> {
+                    for (String topic : right) {
+                        if (!left.contains(topic)) {
+                            left.add(topic);
+                        }
+                    }
+                    return left;
+                });
+    }
+
     /***
      * Checks whether the topic exists( partitioned or non-partitioned ).
      */
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index c7cbb63dc54..6d45dd9080a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2291,6 +2291,132 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(backlog, 0);
     }
 
+    @Test(dataProvider = "numBundles")
+    public void testClearNamespaceBundleBacklogOnUnloadedBundle(Integer 
numBundles) throws Exception {
+        String namespace = "prop-xyz/ns1-bundles";
+        admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
+
+        String topic = "persistent://" + namespace + "/t1";
+        String subscription = "sub1";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        for (int i = 0; i < 10; i++) {
+            producer.send(("message-" + i).getBytes());
+        }
+        producer.close();
+        consumer.close();
+
+        long backlog = 
admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog();
+        assertEquals(backlog, 10);
+
+        NamespaceBundle bundle =
+                
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic));
+        admin.namespaces().unloadNamespaceBundle(namespace, 
bundle.getBundleRange());
+
+        Awaitility.await()
+                .atMost(30, TimeUnit.SECONDS)
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
+                            "Bundle should not be owned by current broker");
+                    
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
+                            "Bundle should not be owned by other broker");
+                });
+
+        admin.namespaces().clearNamespaceBundleBacklog(namespace, 
bundle.getBundleRange());
+
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        long backlogAfter = 
admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog();
+        assertEquals(backlogAfter, 0);
+    }
+
+    @Test(dataProvider = "numBundles")
+    public void 
testClearNamespaceBundleBacklogForSubscriptionOnUnloadedBundle(Integer 
numBundles) throws Exception {
+        String namespace = "prop-xyz/ns1-bundles";
+        admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
+
+        String topic = "persistent://" + namespace + "/t1";
+        String subscription = "sub1";
+        String otherSubscription = "sub2";
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(otherSubscription)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        for (int i = 0; i < 10; i++) {
+            producer.send(("message-" + i).getBytes());
+        }
+        producer.close();
+        consumer1.close();
+        consumer2.close();
+
+        long backlog = 
admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog();
+        assertEquals(backlog, 10);
+        long otherBacklog = 
admin.topics().getStats(topic).getSubscriptions().get(otherSubscription).getMsgBacklog();
+        assertEquals(otherBacklog, 10);
+
+        NamespaceBundle bundle =
+                
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic));
+        admin.namespaces().unloadNamespaceBundle(namespace, 
bundle.getBundleRange());
+
+        Awaitility.await()
+                .atMost(30, TimeUnit.SECONDS)
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
+                            "Bundle should not be owned by current broker");
+                    
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
+                            "Bundle should not be owned by other broker");
+                });
+
+        
admin.namespaces().clearNamespaceBundleBacklogForSubscription(namespace, 
bundle.getBundleRange(),
+                subscription);
+
+        @Cleanup
+        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        @Cleanup
+        Consumer<byte[]> consumer4 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(otherSubscription)
+                .subscribe();
+
+        long backlogAfter = 
admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog();
+        assertEquals(backlogAfter, 0);
+        long otherBacklogAfter =
+                
admin.topics().getStats(topic).getSubscriptions().get(otherSubscription).getMsgBacklog();
+        assertEquals(otherBacklogAfter, 10);
+    }
+
+
     @Test(dataProvider = "bundling")
     public void testUnsubscribeOnNamespace(Integer numBundles) throws 
Exception {
         admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);


Reply via email to