This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bc8449deb3adcf4b5e8ba39eb01d2fd3e5743f44
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Apr 15 17:34:26 2026 +0300

    Revert "[fix][broker] Fix backlog clearing for unloaded namespace bundles 
(#25272)"
    
    This reverts commit 24e7814cbd49a94007456b6be2492bc8c1881b5e.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 273 +++++++++++----------
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  57 ++---
 .../pulsar/broker/namespace/NamespaceService.java  |  19 --
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 126 ----------
 4 files changed, 166 insertions(+), 309 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 8e1d11b15f3..f28b07d8a9a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -79,6 +79,7 @@ import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
+import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
@@ -1567,108 +1568,139 @@ public abstract class NamespacesBase extends 
AdminResource {
         );
     }
 
-    protected CompletableFuture<Void> 
internalClearNamespaceBacklogAsync(boolean authoritative) {
-        return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG)
-                .thenCompose(__ -> 
pulsar().getNamespaceService().getNamespaceBundleFactory()
-                        .getBundlesAsync(namespaceName))
-                .thenCompose(bundles -> {
-                    final List<CompletableFuture<Void>> futures = new 
ArrayList<>();
-                    for (NamespaceBundle nsBundle : bundles.getBundles()) {
-                        try {
-                            futures.add(pulsar().getAdminClient().namespaces()
-                                    
.clearNamespaceBundleBacklogAsync(namespaceName.toString(),
-                                            nsBundle.getBundleRange()));
-                        } catch (PulsarServerException e) {
-                            return CompletableFuture.failedFuture(e);
-                        }
-                    }
-                    return FutureUtil.waitForAll(futures);
-                }).thenRun(() -> log.info("[{}] Successfully cleared backlog 
on all the bundles for namespace {}",
-                        clientAppId(), namespaceName));
+    protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, 
boolean authoritative) {
+        validateNamespaceOperation(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG);
+
+        final List<CompletableFuture<Void>> futures = new ArrayList<>();
+        try {
+            NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                    .getBundles(namespaceName);
+            for (NamespaceBundle nsBundle : bundles.getBundles()) {
+                // check if the bundle is owned by any broker, if not then 
there is no backlog on this bundle to clear
+                if 
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
+                    futures.add(pulsar().getAdminClient().namespaces()
+                            
.clearNamespaceBundleBacklogAsync(namespaceName.toString(), 
nsBundle.getBundleRange()));
+                }
+            }
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+            return;
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+            if (exception != null) {
+                log.warn("[{}] Failed to clear backlog on the bundles for 
namespace {}: {}", clientAppId(),
+                        namespaceName, exception.getCause().getMessage());
+                if (exception.getCause() instanceof PulsarAdminException) {
+                    asyncResponse.resume(new 
RestException((PulsarAdminException) exception.getCause()));
+                    return null;
+                } else {
+                    asyncResponse.resume(new 
RestException(exception.getCause()));
+                    return null;
+                }
+            }
+            log.info("[{}] Successfully cleared backlog on all the bundles for 
namespace {}", clientAppId(),
+                    namespaceName);
+            asyncResponse.resume(Response.noContent().build());
+            return null;
+        });
     }
 
     @SuppressWarnings("deprecation")
-    protected CompletableFuture<Void> 
internalClearNamespaceBundleBacklogAsync(String bundleRange,
-                                                                               
boolean authoritative) {
-        if (bundleRange == null) {
-            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "BundleRange should not be null"));
+    protected void internalClearNamespaceBundleBacklog(String bundleRange, 
boolean authoritative) {
+        validateNamespaceOperation(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG);
+        checkNotNull(bundleRange, "BundleRange should not be null");
+
+        Policies policies = getNamespacePolicies(namespaceName);
+
+        if (namespaceName.isGlobal()) {
+            // check cluster ownership for a given global namespace: redirect 
if peer-cluster owns it
+            validateGlobalNamespaceOwnership(namespaceName);
+        } else {
+            validateClusterOwnership(namespaceName.getCluster());
+            validateClusterForTenant(namespaceName.getTenant(), 
namespaceName.getCluster());
         }
 
-        return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG)
-                .thenCompose(__ -> {
-                    // check cluster ownership for a given global namespace: 
redirect if peer-cluster owns it
-                    return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
-                })
-                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
-                .thenCompose(policies ->
-                        // Allow acquiring ownership for an unassigned bundle 
so backlog can be cleared
-                        // even if not loaded.
-                        validateNamespaceBundleOwnershipAsync(namespaceName, 
policies.bundles, bundleRange,
-                                authoritative, false))
-                .thenCompose(bundle -> clearBacklogAsync(bundle, null))
-                .thenRun(() -> log.info("[{}] Successfully cleared backlog on 
namespace bundle {}/{}", clientAppId(),
-                        namespaceName, bundleRange));
+        validateNamespaceBundleOwnership(namespaceName, policies.bundles, 
bundleRange, authoritative, true);
+
+        clearBacklog(namespaceName, bundleRange, null);
+        log.info("[{}] Successfully cleared backlog on namespace bundle 
{}/{}", clientAppId(), namespaceName,
+                bundleRange);
     }
 
-    protected CompletableFuture<Void> 
internalClearNamespaceBacklogForSubscriptionAsync(String subscription,
-                                                                               
         boolean authoritative) {
-        if (subscription == null) {
-            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "Subscription should not be null"));
+    protected void internalClearNamespaceBacklogForSubscription(AsyncResponse 
asyncResponse, String subscription,
+                                                                boolean 
authoritative) {
+        validateNamespaceOperation(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG);
+        checkNotNull(subscription, "Subscription should not be null");
+
+        final List<CompletableFuture<Void>> futures = new ArrayList<>();
+        try {
+            NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                    .getBundles(namespaceName);
+            for (NamespaceBundle nsBundle : bundles.getBundles()) {
+                // check if the bundle is owned by any broker, if not then 
there is no backlog on this bundle to clear
+                if 
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
+                    
futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync(
+                            namespaceName.toString(), 
nsBundle.getBundleRange(), subscription));
+                }
+            }
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+            return;
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+            return;
         }
 
-        return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG)
-                .thenCompose(__ -> 
pulsar().getNamespaceService().getNamespaceBundleFactory()
-                        .getBundlesAsync(namespaceName))
-                .thenCompose(bundles -> {
-                    final List<CompletableFuture<Void>> futures = new 
ArrayList<>();
-                    for (NamespaceBundle nsBundle : bundles.getBundles()) {
-                        try {
-                            futures.add(pulsar().getAdminClient().namespaces()
-                                    
.clearNamespaceBundleBacklogForSubscriptionAsync(
-                                            namespaceName.toString(), 
nsBundle.getBundleRange(), subscription));
-                        } catch (PulsarServerException e) {
-                            return CompletableFuture.failedFuture(e);
-                        }
-                    }
-                    return FutureUtil.waitForAll(futures);
-                }).thenRun(() -> log.info(
-                        "[{}] Successfully cleared backlog for subscription {} 
on all the bundles for namespace {}",
-                        clientAppId(), subscription, namespaceName));
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+            if (exception != null) {
+                log.warn("[{}] Failed to clear backlog for subscription {} on 
the bundles for namespace {}: {}",
+                        clientAppId(), subscription, namespaceName, 
exception.getCause().getMessage());
+                if (exception.getCause() instanceof PulsarAdminException) {
+                    asyncResponse.resume(new 
RestException((PulsarAdminException) exception.getCause()));
+                    return null;
+                } else {
+                    asyncResponse.resume(new 
RestException(exception.getCause()));
+                    return null;
+                }
+            }
+            log.info("[{}] Successfully cleared backlog for subscription {} on 
all the bundles for namespace {}",
+                    clientAppId(), subscription, namespaceName);
+            asyncResponse.resume(Response.noContent().build());
+            return null;
+        });
     }
 
     @SuppressWarnings("deprecation")
-    protected CompletableFuture<Void> 
internalClearNamespaceBundleBacklogForSubscriptionAsync(String subscription,
-                                                                               
               String bundleRange,
-                                                                               
               boolean authoritative) {
-        if (subscription == null) {
-            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "Subscription should not be null"));
-        }
-        if (bundleRange == null) {
-            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "BundleRange should not be null"));
+    protected void internalClearNamespaceBundleBacklogForSubscription(String 
subscription, String bundleRange,
+                                                                      boolean 
authoritative) {
+        validateNamespaceOperation(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG);
+        checkNotNull(subscription, "Subscription should not be null");
+        checkNotNull(bundleRange, "BundleRange should not be null");
+
+        Policies policies = getNamespacePolicies(namespaceName);
+
+        if (namespaceName.isGlobal()) {
+            // check cluster ownership for a given global namespace: redirect 
if peer-cluster owns it
+            validateGlobalNamespaceOwnership(namespaceName);
+        } else {
+            validateClusterOwnership(namespaceName.getCluster());
+            validateClusterForTenant(namespaceName.getTenant(), 
namespaceName.getCluster());
         }
 
-        return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.CLEAR_BACKLOG)
-                .thenCompose(__ -> {
-                    // check cluster ownership for a given global namespace: 
redirect if peer-cluster owns it
-                    return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
-                })
-                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
-                .thenCompose(policies ->
-                        // Allow acquiring ownership for an unassigned bundle 
so backlog can be cleared
-                        // even if not loaded.
-                        validateNamespaceBundleOwnershipAsync(namespaceName, 
policies.bundles, bundleRange,
-                                authoritative, false))
-                .thenCompose(bundle -> clearBacklogAsync(bundle, subscription))
-                .thenRun(() -> log.info(
-                        "[{}] Successfully cleared backlog for subscription {} 
on namespace bundle {}/{}",
-                        clientAppId(), subscription, namespaceName, 
bundleRange));
+        validateNamespaceBundleOwnership(namespaceName, policies.bundles, 
bundleRange, authoritative, true);
+
+        clearBacklog(namespaceName, bundleRange, subscription);
+        log.info("[{}] Successfully cleared backlog for subscription {} on 
namespace bundle {}/{}", clientAppId(),
+                subscription, namespaceName, bundleRange);
     }
 
     protected CompletableFuture<Void> internalUnsubscribeNamespaceAsync(String 
subscription,
                                                                         
boolean authoritative) {
-        if (subscription == null) {
-            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "Subscription should not be null"));
-        }
+        checkNotNull(subscription, "Subscription should not be null");
 
         return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.UNSUBSCRIBE)
                 .thenCompose(__ -> 
pulsar().getNamespaceService().getNamespaceBundleFactory()
@@ -1691,12 +1723,8 @@ public abstract class NamespacesBase extends 
AdminResource {
     @SuppressWarnings("deprecation")
     protected CompletableFuture<Void> 
internalUnsubscribeNamespaceBundleAsync(String subscription, String bundleRange,
                                                                               
boolean authoritative) {
-        if (subscription == null) {
-            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "Subscription should not be null"));
-        }
-        if (bundleRange == null) {
-            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "BundleRange should not be null"));
-        }
+        checkNotNull(subscription, "Subscription should not be null");
+        checkNotNull(bundleRange, "BundleRange should not be null");
 
         return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.UNSUBSCRIBE)
                 .thenCompose(__ -> {
@@ -1904,47 +1932,38 @@ public abstract class NamespacesBase extends 
AdminResource {
         return checkBacklogQuota(quota, retention);
     }
 
-    private CompletableFuture<Void> clearBacklogAsync(NamespaceBundle bundle, 
String subscription) {
-        return 
pulsar().getNamespaceService().getOwnedPersistentTopicListForNamespaceBundle(bundle)
-                .thenCompose(topicsInBundle -> {
-                    List<CompletableFuture<Void>> futures = new ArrayList<>();
-                    String effectiveSubscription = subscription;
-                    if (effectiveSubscription != null
-                            && 
effectiveSubscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix()))
 {
-                        effectiveSubscription = 
PersistentReplicator.getRemoteCluster(effectiveSubscription);
-                    }
-                    final String finalSubscription = effectiveSubscription;
+    private void clearBacklog(NamespaceName nsName, String bundleRange, String 
subscription) {
+        try {
+            List<Topic> topicList = 
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
+                    nsName.toString() + "/" + bundleRange);
 
-                    for (String topic : topicsInBundle) {
-                        TopicName topicName = TopicName.get(topic);
-                        if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
-                            continue;
-                        }
-                        
futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false)
-                                .thenCompose(optTopic -> {
-                                    if (optTopic.isEmpty()) {
-                                        return 
CompletableFuture.completedFuture(null);
-                                    }
-                                    Topic loaded = optTopic.get();
-                                    if (!(loaded instanceof PersistentTopic 
persistentTopic)) {
-                                        return 
CompletableFuture.completedFuture(null);
-                                    }
-                                    return finalSubscription != null
-                                            ? 
persistentTopic.clearBacklog(finalSubscription)
-                                            : persistentTopic.clearBacklog();
-                                }));
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            if (subscription != null) {
+                if 
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
+                    subscription = 
PersistentReplicator.getRemoteCluster(subscription);
+                }
+                for (Topic topic : topicList) {
+                    if (topic instanceof PersistentTopic
+                            && 
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
+                        futures.add(((PersistentTopic) 
topic).clearBacklog(subscription));
                     }
-
-                    return FutureUtil.waitForAll(futures);
-                }).exceptionally(ex -> {
-                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                    if (cause instanceof RestException) {
-                        throw (RestException) cause;
+                }
+            } else {
+                for (Topic topic : topicList) {
+                    if (topic instanceof PersistentTopic
+                            && 
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
+                        futures.add(((PersistentTopic) topic).clearBacklog());
                     }
-                    throw new RestException(cause);
-                });
-    }
+                }
+            }
 
+            FutureUtil.waitForAll(futures).get();
+        } catch (Exception e) {
+            log.error("[{}] Failed to clear backlog for namespace {}/{}, 
subscription: {}", clientAppId(),
+                    nsName.toString(), bundleRange, subscription, e);
+            throw new RestException(e);
+        }
+    }
 
     private CompletableFuture<Void> unsubscribeAsync(NamespaceBundle bundle, 
String subscription) {
         if 
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index a997e1d898a..5058eccc401 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1512,14 +1512,14 @@ public class Namespaces extends NamespacesBase {
     public void clearNamespaceBacklog(@Suspended final AsyncResponse 
asyncResponse, @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateNamespaceName(tenant, namespace);
-        internalClearNamespaceBacklogAsync(authoritative)
-                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
-                .exceptionally(ex -> {
-                    log.error("[{}] Failed to clear backlog on namespace {}", 
clientAppId(), namespaceName, ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
-                });
+        try {
+            validateNamespaceName(tenant, namespace);
+            internalClearNamespaceBacklog(asyncResponse, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @POST
@@ -1530,19 +1530,11 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace"),
             @ApiResponse(code = 403, message = "Don't have admin or operate 
permission on the namespace"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public void clearNamespaceBundleBacklog(@Suspended final AsyncResponse 
asyncResponse,
-            @PathParam("tenant") String tenant,
+    public void clearNamespaceBundleBacklog(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace, @PathParam("bundle") 
String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateNamespaceName(tenant, namespace);
-        internalClearNamespaceBundleBacklogAsync(bundleRange, authoritative)
-                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
-                .exceptionally(ex -> {
-                    log.error("[{}] Failed to clear backlog on namespace 
bundle {}/{}", clientAppId(),
-                            namespaceName, bundleRange, ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
-                });
+        internalClearNamespaceBundleBacklog(bundleRange, authoritative);
     }
 
     @POST
@@ -1556,15 +1548,14 @@ public class Namespaces extends NamespacesBase {
             @PathParam("tenant") String tenant, @PathParam("namespace") String 
namespace,
             @PathParam("subscription") String subscription,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateNamespaceName(tenant, namespace);
-        internalClearNamespaceBacklogForSubscriptionAsync(subscription, 
authoritative)
-                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
-                .exceptionally(ex -> {
-                    log.error("[{}] Failed to clear backlog for subscription 
{} on namespace {}", clientAppId(),
-                            subscription, namespaceName, ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
-                });
+        try {
+            validateNamespaceName(tenant, namespace);
+            internalClearNamespaceBacklogForSubscription(asyncResponse, 
subscription, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @POST
@@ -1575,20 +1566,12 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace"),
             @ApiResponse(code = 403, message = "Don't have admin or operate 
permission on the namespace"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public void clearNamespaceBundleBacklogForSubscription(@Suspended final 
AsyncResponse asyncResponse,
-            @PathParam("tenant") String tenant,
+    public void 
clearNamespaceBundleBacklogForSubscription(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
             @PathParam("bundle") String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateNamespaceName(tenant, namespace);
-        internalClearNamespaceBundleBacklogForSubscriptionAsync(subscription, 
bundleRange, authoritative)
-                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
-                .exceptionally(ex -> {
-                    log.error("[{}] Failed to clear backlog for subscription 
{} on namespace bundle {}/{}",
-                            clientAppId(), subscription, namespaceName, 
bundleRange, ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
-                });
+        internalClearNamespaceBundleBacklogForSubscription(subscription, 
bundleRange, authoritative);
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index cda619b7018..b5dbddc2ea3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1380,25 +1380,6 @@ public class NamespaceService implements AutoCloseable {
                 });
     }
 
-    public CompletableFuture<List<String>> 
getOwnedPersistentTopicListForNamespaceBundle(NamespaceBundle bundle) {
-        return 
getListOfPersistentTopics(bundle.getNamespaceObject()).thenCompose(topics ->
-                        CompletableFuture.completedFuture(
-                                topics.stream()
-                                        .filter(topic -> 
bundle.includes(TopicName.get(topic)))
-                                        .collect(Collectors.toList())))
-                .thenCombine(getPartitions(bundle.getNamespaceObject(), 
TopicDomain.persistent).thenCompose(topics ->
-                        CompletableFuture.completedFuture(
-                                topics.stream().filter(topic -> 
bundle.includes(TopicName.get(topic)))
-                                        .collect(Collectors.toList()))), 
(left, right) -> {
-                    for (String topic : right) {
-                        if (!left.contains(topic)) {
-                            left.add(topic);
-                        }
-                    }
-                    return left;
-                });
-    }
-
     /***
      * Checks whether the topic exists( partitioned or non-partitioned ).
      */
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 9a5c72ff240..2c5d045f758 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2284,132 +2284,6 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(backlog, 0);
     }
 
-    @Test(dataProvider = "numBundles")
-    public void testClearNamespaceBundleBacklogOnUnloadedBundle(Integer 
numBundles) throws Exception {
-        String namespace = "prop-xyz/ns1-bundles";
-        admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
-
-        String topic = "persistent://" + namespace + "/t1";
-        String subscription = "sub1";
-
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName(subscription)
-                .subscribe();
-
-        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
-                .topic(topic)
-                .enableBatching(false)
-                .messageRoutingMode(MessageRoutingMode.SinglePartition)
-                .create();
-        for (int i = 0; i < 10; i++) {
-            producer.send(("message-" + i).getBytes());
-        }
-        producer.close();
-        consumer.close();
-
-        long backlog = 
admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog();
-        assertEquals(backlog, 10);
-
-        NamespaceBundle bundle =
-                
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic));
-        admin.namespaces().unloadNamespaceBundle(namespace, 
bundle.getBundleRange());
-
-        Awaitility.await()
-                .atMost(30, TimeUnit.SECONDS)
-                .pollInterval(200, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> {
-                    
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
-                            "Bundle should not be owned by current broker");
-                    
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
-                            "Bundle should not be owned by other broker");
-                });
-
-        admin.namespaces().clearNamespaceBundleBacklog(namespace, 
bundle.getBundleRange());
-
-        @Cleanup
-        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName(subscription)
-                .subscribe();
-
-        long backlogAfter = 
admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog();
-        assertEquals(backlogAfter, 0);
-    }
-
-    @Test(dataProvider = "numBundles")
-    public void 
testClearNamespaceBundleBacklogForSubscriptionOnUnloadedBundle(Integer 
numBundles) throws Exception {
-        String namespace = "prop-xyz/ns1-bundles";
-        admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
-
-        String topic = "persistent://" + namespace + "/t1";
-        String subscription = "sub1";
-        String otherSubscription = "sub2";
-
-        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName(subscription)
-                .subscribe();
-
-        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName(otherSubscription)
-                .subscribe();
-
-        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
-                .topic(topic)
-                .enableBatching(false)
-                .messageRoutingMode(MessageRoutingMode.SinglePartition)
-                .create();
-        for (int i = 0; i < 10; i++) {
-            producer.send(("message-" + i).getBytes());
-        }
-        producer.close();
-        consumer1.close();
-        consumer2.close();
-
-        long backlog = 
admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog();
-        assertEquals(backlog, 10);
-        long otherBacklog = 
admin.topics().getStats(topic).getSubscriptions().get(otherSubscription).getMsgBacklog();
-        assertEquals(otherBacklog, 10);
-
-        NamespaceBundle bundle =
-                
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic));
-        admin.namespaces().unloadNamespaceBundle(namespace, 
bundle.getBundleRange());
-
-        Awaitility.await()
-                .atMost(30, TimeUnit.SECONDS)
-                .pollInterval(200, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> {
-                    
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
-                            "Bundle should not be owned by current broker");
-                    
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
-                            "Bundle should not be owned by other broker");
-                });
-
-        
admin.namespaces().clearNamespaceBundleBacklogForSubscription(namespace, 
bundle.getBundleRange(),
-                subscription);
-
-        @Cleanup
-        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName(subscription)
-                .subscribe();
-
-        @Cleanup
-        Consumer<byte[]> consumer4 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName(otherSubscription)
-                .subscribe();
-
-        long backlogAfter = 
admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog();
-        assertEquals(backlogAfter, 0);
-        long otherBacklogAfter =
-                
admin.topics().getStats(topic).getSubscriptions().get(otherSubscription).getMsgBacklog();
-        assertEquals(otherBacklogAfter, 10);
-    }
-
-
     @Test(dataProvider = "bundling")
     public void testUnsubscribeOnNamespace(Integer numBundles) throws 
Exception {
         admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);

Reply via email to