This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bc8449deb3adcf4b5e8ba39eb01d2fd3e5743f44 Author: Lari Hotari <[email protected]> AuthorDate: Wed Apr 15 17:34:26 2026 +0300 Revert "[fix][broker] Fix backlog clearing for unloaded namespace bundles (#25272)" This reverts commit 24e7814cbd49a94007456b6be2492bc8c1881b5e. --- .../pulsar/broker/admin/impl/NamespacesBase.java | 273 +++++++++++---------- .../apache/pulsar/broker/admin/v2/Namespaces.java | 57 ++--- .../pulsar/broker/namespace/NamespaceService.java | 19 -- .../apache/pulsar/broker/admin/AdminApiTest.java | 126 ---------- 4 files changed, 166 insertions(+), 309 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 8e1d11b15f3..f28b07d8a9a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -79,6 +79,7 @@ import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; +import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; @@ -1567,108 +1568,139 @@ public abstract class NamespacesBase extends AdminResource { ); } - protected CompletableFuture<Void> internalClearNamespaceBacklogAsync(boolean authoritative) { - return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG) - .thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundlesAsync(namespaceName)) - .thenCompose(bundles -> { - final List<CompletableFuture<Void>> futures = new ArrayList<>(); - for (NamespaceBundle nsBundle : bundles.getBundles()) { - try { - futures.add(pulsar().getAdminClient().namespaces() - .clearNamespaceBundleBacklogAsync(namespaceName.toString(), - nsBundle.getBundleRange())); - } catch (PulsarServerException e) { - return CompletableFuture.failedFuture(e); - } - } - return FutureUtil.waitForAll(futures); - }).thenRun(() -> log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}", - clientAppId(), namespaceName)); + protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) { + validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); + + final List<CompletableFuture<Void>> futures = new ArrayList<>(); + try { + NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundles(namespaceName); + for (NamespaceBundle nsBundle : bundles.getBundles()) { + // check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear + if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) { + futures.add(pulsar().getAdminClient().namespaces() + .clearNamespaceBundleBacklogAsync(namespaceName.toString(), nsBundle.getBundleRange())); + } + } + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + return; + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + return; + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + log.warn("[{}] Failed to clear backlog on the bundles for namespace {}: {}", clientAppId(), + namespaceName, exception.getCause().getMessage()); + if (exception.getCause() instanceof PulsarAdminException) { + asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); + return null; + } else { + asyncResponse.resume(new RestException(exception.getCause())); + return null; + } + } + log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}", clientAppId(), + namespaceName); + asyncResponse.resume(Response.noContent().build()); + return null; + }); } @SuppressWarnings("deprecation") - protected CompletableFuture<Void> internalClearNamespaceBundleBacklogAsync(String bundleRange, - boolean authoritative) { - if (bundleRange == null) { - return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "BundleRange should not be null")); + protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean authoritative) { + validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); + checkNotNull(bundleRange, "BundleRange should not be null"); + + Policies policies = getNamespacePolicies(namespaceName); + + if (namespaceName.isGlobal()) { + // check cluster ownership for a given global namespace: redirect if peer-cluster owns it + validateGlobalNamespaceOwnership(namespaceName); + } else { + validateClusterOwnership(namespaceName.getCluster()); + validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); } - return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG) - .thenCompose(__ -> { - // check cluster ownership for a given global namespace: redirect if peer-cluster owns it - return validateGlobalNamespaceOwnershipAsync(namespaceName); - }) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies -> - // Allow acquiring ownership for an unassigned bundle so backlog can be cleared - // even if not loaded. - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, - authoritative, false)) - .thenCompose(bundle -> clearBacklogAsync(bundle, null)) - .thenRun(() -> log.info("[{}] Successfully cleared backlog on namespace bundle {}/{}", clientAppId(), - namespaceName, bundleRange)); + validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); + + clearBacklog(namespaceName, bundleRange, null); + log.info("[{}] Successfully cleared backlog on namespace bundle {}/{}", clientAppId(), namespaceName, + bundleRange); } - protected CompletableFuture<Void> internalClearNamespaceBacklogForSubscriptionAsync(String subscription, - boolean authoritative) { - if (subscription == null) { - return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Subscription should not be null")); + protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription, + boolean authoritative) { + validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); + checkNotNull(subscription, "Subscription should not be null"); + + final List<CompletableFuture<Void>> futures = new ArrayList<>(); + try { + NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundles(namespaceName); + for (NamespaceBundle nsBundle : bundles.getBundles()) { + // check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear + if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) { + futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync( + namespaceName.toString(), nsBundle.getBundleRange(), subscription)); + } + } + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + return; + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + return; } - return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG) - .thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundlesAsync(namespaceName)) - .thenCompose(bundles -> { - final List<CompletableFuture<Void>> futures = new ArrayList<>(); - for (NamespaceBundle nsBundle : bundles.getBundles()) { - try { - futures.add(pulsar().getAdminClient().namespaces() - .clearNamespaceBundleBacklogForSubscriptionAsync( - namespaceName.toString(), nsBundle.getBundleRange(), subscription)); - } catch (PulsarServerException e) { - return CompletableFuture.failedFuture(e); - } - } - return FutureUtil.waitForAll(futures); - }).thenRun(() -> log.info( - "[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}", - clientAppId(), subscription, namespaceName)); + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + log.warn("[{}] Failed to clear backlog for subscription {} on the bundles for namespace {}: {}", + clientAppId(), subscription, namespaceName, exception.getCause().getMessage()); + if (exception.getCause() instanceof PulsarAdminException) { + asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); + return null; + } else { + asyncResponse.resume(new RestException(exception.getCause())); + return null; + } + } + log.info("[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}", + clientAppId(), subscription, namespaceName); + asyncResponse.resume(Response.noContent().build()); + return null; + }); } @SuppressWarnings("deprecation") - protected CompletableFuture<Void> internalClearNamespaceBundleBacklogForSubscriptionAsync(String subscription, - String bundleRange, - boolean authoritative) { - if (subscription == null) { - return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Subscription should not be null")); - } - if (bundleRange == null) { - return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "BundleRange should not be null")); + protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange, + boolean authoritative) { + validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); + checkNotNull(subscription, "Subscription should not be null"); + checkNotNull(bundleRange, "BundleRange should not be null"); + + Policies policies = getNamespacePolicies(namespaceName); + + if (namespaceName.isGlobal()) { + // check cluster ownership for a given global namespace: redirect if peer-cluster owns it + validateGlobalNamespaceOwnership(namespaceName); + } else { + validateClusterOwnership(namespaceName.getCluster()); + validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); } - return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG) - .thenCompose(__ -> { - // check cluster ownership for a given global namespace: redirect if peer-cluster owns it - return validateGlobalNamespaceOwnershipAsync(namespaceName); - }) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies -> - // Allow acquiring ownership for an unassigned bundle so backlog can be cleared - // even if not loaded. - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, - authoritative, false)) - .thenCompose(bundle -> clearBacklogAsync(bundle, subscription)) - .thenRun(() -> log.info( - "[{}] Successfully cleared backlog for subscription {} on namespace bundle {}/{}", - clientAppId(), subscription, namespaceName, bundleRange)); + validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); + + clearBacklog(namespaceName, bundleRange, subscription); + log.info("[{}] Successfully cleared backlog for subscription {} on namespace bundle {}/{}", clientAppId(), + subscription, namespaceName, bundleRange); } protected CompletableFuture<Void> internalUnsubscribeNamespaceAsync(String subscription, boolean authoritative) { - if (subscription == null) { - return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Subscription should not be null")); - } + checkNotNull(subscription, "Subscription should not be null"); return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE) .thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory() @@ -1691,12 +1723,8 @@ public abstract class NamespacesBase extends AdminResource { @SuppressWarnings("deprecation") protected CompletableFuture<Void> internalUnsubscribeNamespaceBundleAsync(String subscription, String bundleRange, boolean authoritative) { - if (subscription == null) { - return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Subscription should not be null")); - } - if (bundleRange == null) { - return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "BundleRange should not be null")); - } + checkNotNull(subscription, "Subscription should not be null"); + checkNotNull(bundleRange, "BundleRange should not be null"); return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE) .thenCompose(__ -> { @@ -1904,47 +1932,38 @@ public abstract class NamespacesBase extends AdminResource { return checkBacklogQuota(quota, retention); } - private CompletableFuture<Void> clearBacklogAsync(NamespaceBundle bundle, String subscription) { - return pulsar().getNamespaceService().getOwnedPersistentTopicListForNamespaceBundle(bundle) - .thenCompose(topicsInBundle -> { - List<CompletableFuture<Void>> futures = new ArrayList<>(); - String effectiveSubscription = subscription; - if (effectiveSubscription != null - && effectiveSubscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { - effectiveSubscription = PersistentReplicator.getRemoteCluster(effectiveSubscription); - } - final String finalSubscription = effectiveSubscription; + private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) { + try { + List<Topic> topicList = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(), + nsName.toString() + "/" + bundleRange); - for (String topic : topicsInBundle) { - TopicName topicName = TopicName.get(topic); - if (pulsar().getBrokerService().isSystemTopic(topicName)) { - continue; - } - futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false) - .thenCompose(optTopic -> { - if (optTopic.isEmpty()) { - return CompletableFuture.completedFuture(null); - } - Topic loaded = optTopic.get(); - if (!(loaded instanceof PersistentTopic persistentTopic)) { - return CompletableFuture.completedFuture(null); - } - return finalSubscription != null - ? persistentTopic.clearBacklog(finalSubscription) - : persistentTopic.clearBacklog(); - })); + List<CompletableFuture<Void>> futures = new ArrayList<>(); + if (subscription != null) { + if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { + subscription = PersistentReplicator.getRemoteCluster(subscription); + } + for (Topic topic : topicList) { + if (topic instanceof PersistentTopic + && !pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) { + futures.add(((PersistentTopic) topic).clearBacklog(subscription)); } - - return FutureUtil.waitForAll(futures); - }).exceptionally(ex -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - if (cause instanceof RestException) { - throw (RestException) cause; + } + } else { + for (Topic topic : topicList) { + if (topic instanceof PersistentTopic + && !pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) { + futures.add(((PersistentTopic) topic).clearBacklog()); } - throw new RestException(cause); - }); - } + } + } + FutureUtil.waitForAll(futures).get(); + } catch (Exception e) { + log.error("[{}] Failed to clear backlog for namespace {}/{}, subscription: {}", clientAppId(), + nsName.toString(), bundleRange, subscription, e); + throw new RestException(e); + } + } private CompletableFuture<Void> unsubscribeAsync(NamespaceBundle bundle, String subscription) { if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index a997e1d898a..5058eccc401 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1512,14 +1512,14 @@ public class Namespaces extends NamespacesBase { public void clearNamespaceBacklog(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateNamespaceName(tenant, namespace); - internalClearNamespaceBacklogAsync(authoritative) - .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { - log.error("[{}] Failed to clear backlog on namespace {}", clientAppId(), namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + try { + validateNamespaceName(tenant, namespace); + internalClearNamespaceBacklog(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @POST @@ -1530,19 +1530,11 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBundleBacklog(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, + public void clearNamespaceBundleBacklog(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(tenant, namespace); - internalClearNamespaceBundleBacklogAsync(bundleRange, authoritative) - .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { - log.error("[{}] Failed to clear backlog on namespace bundle {}/{}", clientAppId(), - namespaceName, bundleRange, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalClearNamespaceBundleBacklog(bundleRange, authoritative); } @POST @@ -1556,15 +1548,14 @@ public class Namespaces extends NamespacesBase { @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateNamespaceName(tenant, namespace); - internalClearNamespaceBacklogForSubscriptionAsync(subscription, authoritative) - .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { - log.error("[{}] Failed to clear backlog for subscription {} on namespace {}", clientAppId(), - subscription, namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + try { + validateNamespaceName(tenant, namespace); + internalClearNamespaceBacklogForSubscription(asyncResponse, subscription, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @POST @@ -1575,20 +1566,12 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBundleBacklogForSubscription(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, + public void clearNamespaceBundleBacklogForSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(tenant, namespace); - internalClearNamespaceBundleBacklogForSubscriptionAsync(subscription, bundleRange, authoritative) - .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { - log.error("[{}] Failed to clear backlog for subscription {} on namespace bundle {}/{}", - clientAppId(), subscription, namespaceName, bundleRange, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalClearNamespaceBundleBacklogForSubscription(subscription, bundleRange, authoritative); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index cda619b7018..b5dbddc2ea3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1380,25 +1380,6 @@ public class NamespaceService implements AutoCloseable { }); } - public CompletableFuture<List<String>> getOwnedPersistentTopicListForNamespaceBundle(NamespaceBundle bundle) { - return getListOfPersistentTopics(bundle.getNamespaceObject()).thenCompose(topics -> - CompletableFuture.completedFuture( - topics.stream() - .filter(topic -> bundle.includes(TopicName.get(topic))) - .collect(Collectors.toList()))) - .thenCombine(getPartitions(bundle.getNamespaceObject(), TopicDomain.persistent).thenCompose(topics -> - CompletableFuture.completedFuture( - topics.stream().filter(topic -> bundle.includes(TopicName.get(topic))) - .collect(Collectors.toList()))), (left, right) -> { - for (String topic : right) { - if (!left.contains(topic)) { - left.add(topic); - } - } - return left; - }); - } - /*** * Checks whether the topic exists( partitioned or non-partitioned ). */ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 9a5c72ff240..2c5d045f758 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2284,132 +2284,6 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { assertEquals(backlog, 0); } - @Test(dataProvider = "numBundles") - public void testClearNamespaceBundleBacklogOnUnloadedBundle(Integer numBundles) throws Exception { - String namespace = "prop-xyz/ns1-bundles"; - admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles); - - String topic = "persistent://" + namespace + "/t1"; - String subscription = "sub1"; - - Consumer<byte[]> consumer = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName(subscription) - .subscribe(); - - Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) - .topic(topic) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); - for (int i = 0; i < 10; i++) { - producer.send(("message-" + i).getBytes()); - } - producer.close(); - consumer.close(); - - long backlog = admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog(); - assertEquals(backlog, 10); - - NamespaceBundle bundle = - pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic)); - admin.namespaces().unloadNamespaceBundle(namespace, bundle.getBundleRange()); - - Awaitility.await() - .atMost(30, TimeUnit.SECONDS) - .pollInterval(200, TimeUnit.MILLISECONDS) - .untilAsserted(() -> { - assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle), - "Bundle should not be owned by current broker"); - assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle), - "Bundle should not be owned by other broker"); - }); - - admin.namespaces().clearNamespaceBundleBacklog(namespace, bundle.getBundleRange()); - - @Cleanup - Consumer<byte[]> consumer1 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName(subscription) - .subscribe(); - - long backlogAfter = admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog(); - assertEquals(backlogAfter, 0); - } - - @Test(dataProvider = "numBundles") - public void testClearNamespaceBundleBacklogForSubscriptionOnUnloadedBundle(Integer numBundles) throws Exception { - String namespace = "prop-xyz/ns1-bundles"; - admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles); - - String topic = "persistent://" + namespace + "/t1"; - String subscription = "sub1"; - String otherSubscription = "sub2"; - - Consumer<byte[]> consumer1 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName(subscription) - .subscribe(); - - Consumer<byte[]> consumer2 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName(otherSubscription) - .subscribe(); - - Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) - .topic(topic) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); - for (int i = 0; i < 10; i++) { - producer.send(("message-" + i).getBytes()); - } - producer.close(); - consumer1.close(); - consumer2.close(); - - long backlog = admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog(); - assertEquals(backlog, 10); - long otherBacklog = admin.topics().getStats(topic).getSubscriptions().get(otherSubscription).getMsgBacklog(); - assertEquals(otherBacklog, 10); - - NamespaceBundle bundle = - pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic)); - admin.namespaces().unloadNamespaceBundle(namespace, bundle.getBundleRange()); - - Awaitility.await() - .atMost(30, TimeUnit.SECONDS) - .pollInterval(200, TimeUnit.MILLISECONDS) - .untilAsserted(() -> { - assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle), - "Bundle should not be owned by current broker"); - assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle), - "Bundle should not be owned by other broker"); - }); - - admin.namespaces().clearNamespaceBundleBacklogForSubscription(namespace, bundle.getBundleRange(), - subscription); - - @Cleanup - Consumer<byte[]> consumer3 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName(subscription) - .subscribe(); - - @Cleanup - Consumer<byte[]> consumer4 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName(otherSubscription) - .subscribe(); - - long backlogAfter = admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog(); - assertEquals(backlogAfter, 0); - long otherBacklogAfter = - admin.topics().getStats(topic).getSubscriptions().get(otherSubscription).getMsgBacklog(); - assertEquals(otherBacklogAfter, 10); - } - - @Test(dataProvider = "bundling") public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception { admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
