This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8e37e3ee63d [fix][broker] Fix backlog clearing for unloaded namespace
bundles (#25272)
8e37e3ee63d is described below
commit 8e37e3ee63d8fcebed493daa6e577fdc08591746
Author: Hao Zhang <[email protected]>
AuthorDate: Tue Apr 7 11:36:30 2026 +0800
[fix][broker] Fix backlog clearing for unloaded namespace bundles (#25272)
Co-authored-by: 张浩 <[email protected]>
---
.../apache/pulsar/broker/admin/AdminResource.java | 7 -
.../pulsar/broker/admin/impl/NamespacesBase.java | 295 +++++++++++----------
.../apache/pulsar/broker/admin/v2/Namespaces.java | 57 ++--
.../pulsar/broker/namespace/NamespaceService.java | 19 ++
.../apache/pulsar/broker/admin/AdminApiTest.java | 126 +++++++++
5 files changed, 333 insertions(+), 171 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 21008e4c77c..0c7c11db97e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin;
-import static org.apache.commons.lang3.StringUtils.isBlank;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
@@ -821,12 +820,6 @@ public abstract class AdminResource extends
PulsarWebResource {
}
}
- protected void checkNotBlank(String str, String errorMessage) {
- if (isBlank(str)) {
- throw new RestException(Status.PRECONDITION_FAILED, errorMessage);
- }
- }
-
protected boolean isManagedLedgerNotFoundException(Throwable cause) {
return cause instanceof
ManagedLedgerException.MetadataNotFoundException
|| cause instanceof MetadataStoreException.NotFoundException;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 2fd572da431..89050fbd288 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -79,7 +79,6 @@ import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
-import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
@@ -1825,129 +1824,108 @@ public abstract class NamespacesBase extends
AdminResource {
);
}
- protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse,
boolean authoritative) {
- validateNamespaceOperation(namespaceName,
NamespaceOperation.CLEAR_BACKLOG);
-
- final List<CompletableFuture<Void>> futures = new ArrayList<>();
- try {
- NamespaceBundles bundles =
pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getBundles(namespaceName);
- for (NamespaceBundle nsBundle : bundles.getBundles()) {
- // check if the bundle is owned by any broker, if not then
there is no backlog on this bundle to clear
- if
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
- futures.add(pulsar().getAdminClient().namespaces()
-
.clearNamespaceBundleBacklogAsync(namespaceName.toString(),
nsBundle.getBundleRange()));
- }
- }
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- return;
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- log.warn("[{}] Failed to clear backlog on the bundles for
namespace {}: {}", clientAppId(),
- namespaceName, exception.getCause().getMessage());
- if (exception.getCause() instanceof PulsarAdminException) {
- asyncResponse.resume(new
RestException((PulsarAdminException) exception.getCause()));
- return null;
- } else {
- asyncResponse.resume(new
RestException(exception.getCause()));
- return null;
- }
- }
- log.info("[{}] Successfully cleared backlog on all the bundles for
namespace {}", clientAppId(),
- namespaceName);
- asyncResponse.resume(Response.noContent().build());
- return null;
- });
+ protected CompletableFuture<Void>
internalClearNamespaceBacklogAsync(boolean authoritative) {
+ return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.CLEAR_BACKLOG)
+ .thenCompose(__ ->
pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundlesAsync(namespaceName))
+ .thenCompose(bundles -> {
+ final List<CompletableFuture<Void>> futures = new
ArrayList<>();
+ for (NamespaceBundle nsBundle : bundles.getBundles()) {
+ try {
+ futures.add(pulsar().getAdminClient().namespaces()
+
.clearNamespaceBundleBacklogAsync(namespaceName.toString(),
+ nsBundle.getBundleRange()));
+ } catch (PulsarServerException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+ return FutureUtil.waitForAll(futures);
+ }).thenRun(() -> log.info("[{}] Successfully cleared backlog
on all the bundles for namespace {}",
+ clientAppId(), namespaceName));
}
@SuppressWarnings("deprecation")
- protected void internalClearNamespaceBundleBacklog(String bundleRange,
boolean authoritative) {
- validateNamespaceOperation(namespaceName,
NamespaceOperation.CLEAR_BACKLOG);
- checkNotNull(bundleRange, "BundleRange should not be null");
-
- Policies policies = getNamespacePolicies(namespaceName);
-
- // check cluster ownership for a given global namespace: redirect if
peer-cluster owns it
- validateGlobalNamespaceOwnership(namespaceName);
-
- validateNamespaceBundleOwnership(namespaceName, policies.bundles,
bundleRange, authoritative, true);
+ protected CompletableFuture<Void>
internalClearNamespaceBundleBacklogAsync(String bundleRange,
+
boolean authoritative) {
+ if (bundleRange == null) {
+ return FutureUtil.failedFuture(new
RestException(Status.BAD_REQUEST, "BundleRange should not be null"));
+ }
- clearBacklog(namespaceName, bundleRange, null);
- log.info("[{}] Successfully cleared backlog on namespace bundle
{}/{}", clientAppId(), namespaceName,
- bundleRange);
+ return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.CLEAR_BACKLOG)
+ .thenCompose(__ -> {
+ // check cluster ownership for a given global namespace:
redirect if peer-cluster owns it
+ return
validateGlobalNamespaceOwnershipAsync(namespaceName);
+ })
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies ->
+ // Allow acquiring ownership for an unassigned bundle
so backlog can be cleared
+ // even if not loaded.
+ validateNamespaceBundleOwnershipAsync(namespaceName,
policies.bundles, bundleRange,
+ authoritative, false))
+ .thenCompose(bundle -> clearBacklogAsync(bundle, null))
+ .thenRun(() -> log.info("[{}] Successfully cleared backlog on
namespace bundle {}/{}", clientAppId(),
+ namespaceName, bundleRange));
}
- protected void internalClearNamespaceBacklogForSubscription(AsyncResponse
asyncResponse, String subscription,
- boolean
authoritative) {
- validateNamespaceOperation(namespaceName,
NamespaceOperation.CLEAR_BACKLOG);
- checkNotNull(subscription, "Subscription should not be null");
-
- final List<CompletableFuture<Void>> futures = new ArrayList<>();
- try {
- NamespaceBundles bundles =
pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getBundles(namespaceName);
- for (NamespaceBundle nsBundle : bundles.getBundles()) {
- // check if the bundle is owned by any broker, if not then
there is no backlog on this bundle to clear
- if
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
-
futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync(
- namespaceName.toString(),
nsBundle.getBundleRange(), subscription));
- }
- }
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- return;
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- return;
+ protected CompletableFuture<Void>
internalClearNamespaceBacklogForSubscriptionAsync(String subscription,
+
boolean authoritative) {
+ if (subscription == null) {
+ return FutureUtil.failedFuture(new
RestException(Status.BAD_REQUEST, "Subscription should not be null"));
}
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- log.warn("[{}] Failed to clear backlog for subscription {} on
the bundles for namespace {}: {}",
- clientAppId(), subscription, namespaceName,
exception.getCause().getMessage());
- if (exception.getCause() instanceof PulsarAdminException) {
- asyncResponse.resume(new
RestException((PulsarAdminException) exception.getCause()));
- return null;
- } else {
- asyncResponse.resume(new
RestException(exception.getCause()));
- return null;
- }
- }
- log.info("[{}] Successfully cleared backlog for subscription {} on
all the bundles for namespace {}",
- clientAppId(), subscription, namespaceName);
- asyncResponse.resume(Response.noContent().build());
- return null;
- });
+ return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.CLEAR_BACKLOG)
+ .thenCompose(__ ->
pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundlesAsync(namespaceName))
+ .thenCompose(bundles -> {
+ final List<CompletableFuture<Void>> futures = new
ArrayList<>();
+ for (NamespaceBundle nsBundle : bundles.getBundles()) {
+ try {
+ futures.add(pulsar().getAdminClient().namespaces()
+
.clearNamespaceBundleBacklogForSubscriptionAsync(
+ namespaceName.toString(),
nsBundle.getBundleRange(), subscription));
+ } catch (PulsarServerException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+ return FutureUtil.waitForAll(futures);
+ }).thenRun(() -> log.info(
+ "[{}] Successfully cleared backlog for subscription {}
on all the bundles for namespace {}",
+ clientAppId(), subscription, namespaceName));
}
@SuppressWarnings("deprecation")
- protected void internalClearNamespaceBundleBacklogForSubscription(String
subscription, String bundleRange,
- boolean
authoritative) {
- validateNamespaceOperation(namespaceName,
NamespaceOperation.CLEAR_BACKLOG);
- checkNotNull(subscription, "Subscription should not be null");
- checkNotNull(bundleRange, "BundleRange should not be null");
-
- Policies policies = getNamespacePolicies(namespaceName);
-
- // check cluster ownership for a given global namespace: redirect if
peer-cluster owns it
- validateGlobalNamespaceOwnership(namespaceName);
-
- validateNamespaceBundleOwnership(namespaceName, policies.bundles,
bundleRange, authoritative, true);
+ protected CompletableFuture<Void>
internalClearNamespaceBundleBacklogForSubscriptionAsync(String subscription,
+
String bundleRange,
+
boolean authoritative) {
+ if (subscription == null) {
+ return FutureUtil.failedFuture(new
RestException(Status.BAD_REQUEST, "Subscription should not be null"));
+ }
+ if (bundleRange == null) {
+ return FutureUtil.failedFuture(new
RestException(Status.BAD_REQUEST, "BundleRange should not be null"));
+ }
- clearBacklog(namespaceName, bundleRange, subscription);
- log.info("[{}] Successfully cleared backlog for subscription {} on
namespace bundle {}/{}", clientAppId(),
- subscription, namespaceName, bundleRange);
+ return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.CLEAR_BACKLOG)
+ .thenCompose(__ -> {
+ // check cluster ownership for a given global namespace:
redirect if peer-cluster owns it
+ return
validateGlobalNamespaceOwnershipAsync(namespaceName);
+ })
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies ->
+ // Allow acquiring ownership for an unassigned bundle
so backlog can be cleared
+ // even if not loaded.
+ validateNamespaceBundleOwnershipAsync(namespaceName,
policies.bundles, bundleRange,
+ authoritative, false))
+ .thenCompose(bundle -> clearBacklogAsync(bundle, subscription))
+ .thenRun(() -> log.info(
+ "[{}] Successfully cleared backlog for subscription {}
on namespace bundle {}/{}",
+ clientAppId(), subscription, namespaceName,
bundleRange));
}
protected CompletableFuture<Void> internalUnsubscribeNamespaceAsync(String
subscription,
boolean authoritative) {
- checkNotNull(subscription, "Subscription should not be null");
+ if (subscription == null) {
+ return FutureUtil.failedFuture(new
RestException(Status.BAD_REQUEST, "Subscription should not be null"));
+ }
return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.UNSUBSCRIBE)
.thenCompose(__ ->
pulsar().getNamespaceService().getNamespaceBundleFactory()
@@ -1970,8 +1948,12 @@ public abstract class NamespacesBase extends
AdminResource {
@SuppressWarnings("deprecation")
protected CompletableFuture<Void>
internalUnsubscribeNamespaceBundleAsync(String subscription, String bundleRange,
boolean authoritative) {
- checkNotNull(subscription, "Subscription should not be null");
- checkNotNull(bundleRange, "BundleRange should not be null");
+ if (subscription == null) {
+ return FutureUtil.failedFuture(new
RestException(Status.BAD_REQUEST, "Subscription should not be null"));
+ }
+ if (bundleRange == null) {
+ return FutureUtil.failedFuture(new
RestException(Status.BAD_REQUEST, "BundleRange should not be null"));
+ }
return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.UNSUBSCRIBE)
.thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
@@ -2064,8 +2046,14 @@ public abstract class NamespacesBase extends
AdminResource {
}
protected CompletableFuture<Void>
internalSetNamespaceAntiAffinityGroupAsync(String antiAffinityGroup) {
- checkNotNull(antiAffinityGroup, "Anti-affinity group should not be
null");
- checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty");
+ if (antiAffinityGroup == null) {
+ return FutureUtil.failedFuture(
+ new RestException(Status.BAD_REQUEST, "Anti-affinity group
should not be null"));
+ }
+ if (StringUtils.isBlank(antiAffinityGroup)) {
+ return FutureUtil.failedFuture(
+ new RestException(Status.PRECONDITION_FAILED,
"Anti-affinity group can't be empty"));
+ }
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE)
.thenCompose(__ ->
validatePoliciesReadOnlyAccessAsync()).thenCompose(
__ -> getDefaultBundleDataAsync().thenCompose(
@@ -2103,10 +2091,20 @@ public abstract class NamespacesBase extends
AdminResource {
protected CompletableFuture<List<String>>
internalGetAntiAffinityNamespacesAsync(String cluster,
String antiAffinityGroup,
String tenant) {
- checkNotNull(cluster, "Cluster should not be null");
- checkNotNull(antiAffinityGroup, "Anti-affinity group should not be
null");
- checkNotNull(tenant, "Tenant should not be null");
- checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty");
+ if (cluster == null) {
+ return FutureUtil.failedFuture(new
RestException(Status.BAD_REQUEST, "Cluster should not be null"));
+ }
+ if (antiAffinityGroup == null) {
+ return FutureUtil.failedFuture(
+ new RestException(Status.BAD_REQUEST, "Anti-affinity group
should not be null"));
+ }
+ if (tenant == null) {
+ return FutureUtil.failedFuture(new
RestException(Status.BAD_REQUEST, "Tenant should not be null"));
+ }
+ if (StringUtils.isBlank(antiAffinityGroup)) {
+ return FutureUtil.failedFuture(
+ new RestException(Status.PRECONDITION_FAILED,
"Anti-affinity group can't be empty"));
+ }
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.ANTI_AFFINITY, PolicyOperation.READ)
.thenCompose(__ -> validateClusterExistsAsync(cluster))
@@ -2137,39 +2135,48 @@ public abstract class NamespacesBase extends
AdminResource {
return checkBacklogQuota(quota, retention);
}
- private void clearBacklog(NamespaceName nsName, String bundleRange, String
subscription) {
- try {
- List<Topic> topicList =
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
- nsName.toString() + "/" + bundleRange);
-
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- if (subscription != null) {
- if
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
- subscription =
PersistentReplicator.getRemoteCluster(subscription);
- }
- for (Topic topic : topicList) {
- if (topic instanceof PersistentTopic
- &&
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
- futures.add(((PersistentTopic)
topic).clearBacklog(subscription));
+ private CompletableFuture<Void> clearBacklogAsync(NamespaceBundle bundle,
String subscription) {
+ return
pulsar().getNamespaceService().getOwnedPersistentTopicListForNamespaceBundle(bundle)
+ .thenCompose(topicsInBundle -> {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ String effectiveSubscription = subscription;
+ if (effectiveSubscription != null
+ &&
effectiveSubscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix()))
{
+ effectiveSubscription =
PersistentReplicator.getRemoteCluster(effectiveSubscription);
}
- }
- } else {
- for (Topic topic : topicList) {
- if (topic instanceof PersistentTopic
- &&
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
- futures.add(((PersistentTopic) topic).clearBacklog());
+ final String finalSubscription = effectiveSubscription;
+
+ for (String topic : topicsInBundle) {
+ TopicName topicName = TopicName.get(topic);
+ if
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+ continue;
+ }
+
futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false)
+ .thenCompose(optTopic -> {
+ if (optTopic.isEmpty()) {
+ return
CompletableFuture.completedFuture(null);
+ }
+ Topic loaded = optTopic.get();
+ if (!(loaded instanceof PersistentTopic
persistentTopic)) {
+ return
CompletableFuture.completedFuture(null);
+ }
+ return finalSubscription != null
+ ?
persistentTopic.clearBacklog(finalSubscription)
+ : persistentTopic.clearBacklog();
+ }));
}
- }
- }
- FutureUtil.waitForAll(futures).get();
- } catch (Exception e) {
- log.error("[{}] Failed to clear backlog for namespace {}/{},
subscription: {}", clientAppId(),
- nsName.toString(), bundleRange, subscription, e);
- throw new RestException(e);
- }
+ return FutureUtil.waitForAll(futures);
+ }).exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof RestException) {
+ throw (RestException) cause;
+ }
+ throw new RestException(cause);
+ });
}
+
private CompletableFuture<Void> unsubscribeAsync(NamespaceBundle bundle,
String subscription) {
if
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
return CompletableFuture.failedFuture(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 1eedfb564f9..b691559cb8b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1517,14 +1517,14 @@ public class Namespaces extends NamespacesBase {
public void clearNamespaceBacklog(@Suspended final AsyncResponse
asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- try {
- validateNamespaceName(tenant, namespace);
- internalClearNamespaceBacklog(asyncResponse, authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+ internalClearNamespaceBacklogAsync(authoritative)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to clear backlog on namespace {}",
clientAppId(), namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -1535,11 +1535,19 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace"),
@ApiResponse(code = 403, message = "Don't have admin or operate
permission on the namespace"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public void clearNamespaceBundleBacklog(@PathParam("tenant") String tenant,
+ public void clearNamespaceBundleBacklog(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("bundle")
String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateNamespaceName(tenant, namespace);
- internalClearNamespaceBundleBacklog(bundleRange, authoritative);
+ internalClearNamespaceBundleBacklogAsync(bundleRange, authoritative)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to clear backlog on namespace
bundle {}/{}", clientAppId(),
+ namespaceName, bundleRange, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -1553,14 +1561,15 @@ public class Namespaces extends NamespacesBase {
@PathParam("tenant") String tenant, @PathParam("namespace") String
namespace,
@PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- try {
- validateNamespaceName(tenant, namespace);
- internalClearNamespaceBacklogForSubscription(asyncResponse,
subscription, authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+ internalClearNamespaceBacklogForSubscriptionAsync(subscription,
authoritative)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to clear backlog for subscription
{} on namespace {}", clientAppId(),
+ subscription, namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -1571,12 +1580,20 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace"),
@ApiResponse(code = 403, message = "Don't have admin or operate
permission on the namespace"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public void
clearNamespaceBundleBacklogForSubscription(@PathParam("tenant") String tenant,
+ public void clearNamespaceBundleBacklogForSubscription(@Suspended final
AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateNamespaceName(tenant, namespace);
- internalClearNamespaceBundleBacklogForSubscription(subscription,
bundleRange, authoritative);
+ internalClearNamespaceBundleBacklogForSubscriptionAsync(subscription,
bundleRange, authoritative)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to clear backlog for subscription
{} on namespace bundle {}/{}",
+ clientAppId(), subscription, namespaceName,
bundleRange, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index d9e076a0002..54ccc5a0022 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1381,6 +1381,25 @@ public class NamespaceService implements AutoCloseable {
});
}
+ public CompletableFuture<List<String>>
getOwnedPersistentTopicListForNamespaceBundle(NamespaceBundle bundle) {
+ return
getListOfPersistentTopics(bundle.getNamespaceObject()).thenCompose(topics ->
+ CompletableFuture.completedFuture(
+ topics.stream()
+ .filter(topic ->
bundle.includes(TopicName.get(topic)))
+ .collect(Collectors.toList())))
+ .thenCombine(getPartitions(bundle.getNamespaceObject(),
TopicDomain.persistent).thenCompose(topics ->
+ CompletableFuture.completedFuture(
+ topics.stream().filter(topic ->
bundle.includes(TopicName.get(topic)))
+ .collect(Collectors.toList()))),
(left, right) -> {
+ for (String topic : right) {
+ if (!left.contains(topic)) {
+ left.add(topic);
+ }
+ }
+ return left;
+ });
+ }
+
/***
* Checks whether the topic exists( partitioned or non-partitioned ).
*/
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index c7cbb63dc54..6d45dd9080a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2291,6 +2291,132 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
assertEquals(backlog, 0);
}
+ @Test(dataProvider = "numBundles")
+ public void testClearNamespaceBundleBacklogOnUnloadedBundle(Integer
numBundles) throws Exception {
+ String namespace = "prop-xyz/ns1-bundles";
+ admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
+
+ String topic = "persistent://" + namespace + "/t1";
+ String subscription = "sub1";
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ for (int i = 0; i < 10; i++) {
+ producer.send(("message-" + i).getBytes());
+ }
+ producer.close();
+ consumer.close();
+
+ long backlog =
admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog();
+ assertEquals(backlog, 10);
+
+ NamespaceBundle bundle =
+
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic));
+ admin.namespaces().unloadNamespaceBundle(namespace,
bundle.getBundleRange());
+
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
+ "Bundle should not be owned by current broker");
+
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
+ "Bundle should not be owned by other broker");
+ });
+
+ admin.namespaces().clearNamespaceBundleBacklog(namespace,
bundle.getBundleRange());
+
+ @Cleanup
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ long backlogAfter =
admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog();
+ assertEquals(backlogAfter, 0);
+ }
+
+ @Test(dataProvider = "numBundles")
+ public void
testClearNamespaceBundleBacklogForSubscriptionOnUnloadedBundle(Integer
numBundles) throws Exception {
+ String namespace = "prop-xyz/ns1-bundles";
+ admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
+
+ String topic = "persistent://" + namespace + "/t1";
+ String subscription = "sub1";
+ String otherSubscription = "sub2";
+
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(otherSubscription)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ for (int i = 0; i < 10; i++) {
+ producer.send(("message-" + i).getBytes());
+ }
+ producer.close();
+ consumer1.close();
+ consumer2.close();
+
+ long backlog =
admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog();
+ assertEquals(backlog, 10);
+ long otherBacklog =
admin.topics().getStats(topic).getSubscriptions().get(otherSubscription).getMsgBacklog();
+ assertEquals(otherBacklog, 10);
+
+ NamespaceBundle bundle =
+
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic));
+ admin.namespaces().unloadNamespaceBundle(namespace,
bundle.getBundleRange());
+
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
+ "Bundle should not be owned by current broker");
+
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
+ "Bundle should not be owned by other broker");
+ });
+
+
admin.namespaces().clearNamespaceBundleBacklogForSubscription(namespace,
bundle.getBundleRange(),
+ subscription);
+
+ @Cleanup
+ Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ @Cleanup
+ Consumer<byte[]> consumer4 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(otherSubscription)
+ .subscribe();
+
+ long backlogAfter =
admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog();
+ assertEquals(backlogAfter, 0);
+ long otherBacklogAfter =
+
admin.topics().getStats(topic).getSubscriptions().get(otherSubscription).getMsgBacklog();
+ assertEquals(otherBacklogAfter, 10);
+ }
+
+
@Test(dataProvider = "bundling")
public void testUnsubscribeOnNamespace(Integer numBundles) throws
Exception {
admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);