This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 125d1d194f4084c7eaa4a32e073096da5ac6e1bd Author: Oneby Wang <[email protected]> AuthorDate: Wed Dec 17 19:16:20 2025 +0800 [fix][admin] Refactor namespace anti affinity group sync operations to async in rest api (#25086) Co-authored-by: oneby-wang <[email protected]> (cherry picked from commit e041fab73c1b9a6bb93457d186fc0a937beafd53) --- .../broker/resources/LocalPoliciesResources.java | 6 +- .../apache/pulsar/broker/admin/AdminResource.java | 17 +- .../pulsar/broker/admin/impl/NamespacesBase.java | 186 +++++++-------------- .../apache/pulsar/broker/admin/v1/Namespaces.java | 55 ++++-- .../apache/pulsar/broker/admin/v2/Namespaces.java | 58 +++++-- .../apache/pulsar/broker/admin/NamespacesTest.java | 80 +++++++++ .../pulsar/broker/admin/NamespacesV2Test.java | 95 ++++++++++- 7 files changed, 332 insertions(+), 165 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java index b7ef19ccbe8..8e7b0ab0b1e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java @@ -37,9 +37,9 @@ public class LocalPoliciesResources extends BaseResources<LocalPolicies> { super(localStore, LocalPolicies.class, operationTimeoutSec); } - public void setLocalPolicies(NamespaceName ns, Function<LocalPolicies, LocalPolicies> modifyFunction) - throws MetadataStoreException { - set(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), modifyFunction); + public CompletableFuture<Void> setLocalPoliciesAsync(NamespaceName ns, + Function<LocalPolicies, LocalPolicies> modifyFunction) { + return setAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), modifyFunction); } public Optional<LocalPolicies> getLocalPolicies(NamespaceName ns) throws MetadataStoreException{ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index d35ea51d283..103181a7c73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.apache.commons.lang3.StringUtils.isBlank; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.errorprone.annotations.CanIgnoreReturnValue; @@ -497,14 +498,12 @@ public abstract class AdminResource extends PulsarWebResource { }); } - protected void validateClusterExists(String cluster) { - try { - if (!clusterResources().getCluster(cluster).isPresent()) { + protected CompletableFuture<Void> validateClusterExistsAsync(String cluster) { + return clusterResources().clusterExistsAsync(cluster).thenAccept(clusterExist -> { + if (!clusterExist) { throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist."); } - } catch (Exception e) { - throw new RestException(e); - } + }); } protected Policies getNamespacePolicies(String tenant, String cluster, String namespace) { @@ -853,6 +852,12 @@ public abstract class AdminResource extends PulsarWebResource { } } + protected void checkNotBlank(String str, String errorMessage) { + if (isBlank(str)) { + throw new RestException(Status.PRECONDITION_FAILED, errorMessage); + } + } + protected boolean isManagedLedgerNotFoundException(Throwable cause) { return cause instanceof ManagedLedgerException.MetadataNotFoundException || cause instanceof MetadataStoreException.NotFoundException; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index f28b07d8a9a..b9ec275f79c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.admin.impl; -import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles; import com.google.common.collect.Sets; import java.lang.reflect.Field; @@ -1038,35 +1037,6 @@ public abstract class NamespacesBase extends AdminResource { return internalSetBookieAffinityGroupAsync(null); } - @Deprecated - protected BookieAffinityGroupData internalGetBookieAffinityGroup() { - validateSuperUserAccess(); - - if (namespaceName.isGlobal()) { - // check cluster ownership for a given global namespace: redirect if peer-cluster owns it - validateGlobalNamespaceOwnership(namespaceName); - } else { - validateClusterOwnership(namespaceName.getCluster()); - validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); - } - try { - final BookieAffinityGroupData bookkeeperAffinityGroup = getLocalPolicies().getLocalPolicies(namespaceName) - .orElseThrow(() -> new RestException(Status.NOT_FOUND, - "Namespace local-policies does not exist")).bookieAffinityGroup; - return bookkeeperAffinityGroup; - } catch (NotFoundException e) { - log.warn("[{}] Failed to get local-policy configuration for namespace {}: does not exist", - clientAppId(), namespaceName); - throw new RestException(Status.NOT_FOUND, "Namespace policies does not exist"); - } catch (RestException re) { - throw re; - } catch (Exception e) { - log.error("[{}] Failed to get local-policy configuration for namespace {}", clientAppId(), - namespaceName, e); - throw new RestException(e); - } - } - protected CompletableFuture<BookieAffinityGroupData> internalGetBookieAffinityGroupAsync() { return validateSuperUserAccessAsync().thenCompose(__ -> { if (namespaceName.isGlobal()) { @@ -1077,9 +1047,8 @@ public abstract class NamespacesBase extends AdminResource { unused -> validateClusterForTenantAsync(namespaceName.getTenant(), namespaceName.getCluster())); } }).thenCompose(__ -> getLocalPolicies().getLocalPoliciesAsync(namespaceName)) - .thenApply(policies -> policies.orElseThrow( - () -> new RestException(Status.NOT_FOUND, "Namespace local-policies does not exist")) - .bookieAffinityGroup); + .thenApply(policies -> policies.orElseThrow(() -> new RestException(Status.NOT_FOUND, + "Namespace local-policies does not exist")).bookieAffinityGroup); } private CompletableFuture<Void> validateLeaderBrokerAsync() { @@ -1823,104 +1792,69 @@ public abstract class NamespacesBase extends AdminResource { internalSetPolicies("delayed_delivery_policies", delayedDeliveryPolicies); } - protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) { - validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE); - checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null"); - validatePoliciesReadOnlyAccess(); - - log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), antiAffinityGroup, namespaceName); - - if (isBlank(antiAffinityGroup)) { - throw new RestException(Status.PRECONDITION_FAILED, "antiAffinityGroup can't be empty"); - } - - try { - getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp)-> - lp.map(policies -> new LocalPolicies(policies.bundles, - policies.bookieAffinityGroup, - antiAffinityGroup, - policies.migrated)) - .orElseGet(() -> new LocalPolicies(getDefaultBundleData(), null, antiAffinityGroup)) - ); - log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(), - namespaceName, antiAffinityGroup); - } catch (RestException re) { - throw re; - } catch (Exception e) { - log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName, - e); - throw new RestException(e); - } - } - - protected String internalGetNamespaceAntiAffinityGroup() { - validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ); - - try { - return getLocalPolicies() - .getLocalPolicies(namespaceName) - .orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()) - , null, null)).namespaceAntiAffinityGroup; - } catch (Exception e) { - log.error("[{}] Failed to get the antiAffinityGroup of namespace {}", clientAppId(), namespaceName, e); - throw new RestException(Status.NOT_FOUND, "Couldn't find namespace policies"); - } - } - - protected void internalRemoveNamespaceAntiAffinityGroup() { - validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE); - validatePoliciesReadOnlyAccess(); - - log.info("[{}] Deleting anti-affinity group for {}", clientAppId(), namespaceName); - - try { - getLocalPolicies().setLocalPolicies(namespaceName, (policies)-> - new LocalPolicies(policies.bundles, - policies.bookieAffinityGroup, - null, - policies.migrated)); - log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName); - } catch (Exception e) { - log.error("[{}] Failed to remove anti-affinity group for namespace {}", clientAppId(), namespaceName, e); - throw new RestException(e); - } + protected CompletableFuture<Void> internalSetNamespaceAntiAffinityGroupAsync(String antiAffinityGroup) { + checkNotNull(antiAffinityGroup, "Anti-affinity group should not be null"); + checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty"); + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()).thenCompose( + __ -> getDefaultBundleDataAsync().thenCompose( + defaultBundleData -> getLocalPolicies().setLocalPoliciesWithCreateAsync(namespaceName, + oldPolicies -> oldPolicies.map(policies -> new LocalPolicies(policies.bundles, + policies.bookieAffinityGroup, antiAffinityGroup, + policies.migrated)) + .orElseGet(() -> new LocalPolicies(defaultBundleData, null, + antiAffinityGroup))))) + .thenAccept(__ -> log.info( + "[{}] Successfully updated namespace anti-affinity group, namespace={}, anti-affinity" + + " group={}", clientAppId(), namespaceName, antiAffinityGroup)); + } + + protected CompletableFuture<String> internalGetNamespaceAntiAffinityGroupAsync() { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ) + .thenCompose(__ -> getLocalPolicies().getLocalPoliciesAsync(namespaceName) + .thenApply(policiesOpt -> policiesOpt.map(localPolicies -> localPolicies.namespaceAntiAffinityGroup) + .orElse(null))); + } + + protected CompletableFuture<Void> internalRemoveNamespaceAntiAffinityGroupAsync() { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> { + log.info("[{}] Removing anti-affinity group for namespace: {}", clientAppId(), namespaceName); + return getLocalPolicies().setLocalPoliciesAsync(namespaceName, + (policies) -> new LocalPolicies(policies.bundles, policies.bookieAffinityGroup, null, + policies.migrated)); + }) + .thenAccept(__ -> log.info("[{}] Successfully removed anti-affinity group for namespace: {}", + clientAppId(), namespaceName)); } - protected List<String> internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup, - String tenant) { - validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ); + protected CompletableFuture<List<String>> internalGetAntiAffinityNamespacesAsync(String cluster, + String antiAffinityGroup, + String tenant) { checkNotNull(cluster, "Cluster should not be null"); - checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null"); + checkNotNull(antiAffinityGroup, "Anti-affinity group should not be null"); checkNotNull(tenant, "Tenant should not be null"); + checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty"); - log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, cluster); - - if (isBlank(antiAffinityGroup)) { - throw new RestException(Status.PRECONDITION_FAILED, "anti-affinity group can't be empty."); - } - validateClusterExists(cluster); - - try { - List<String> namespaces = tenantResources().getListOfNamespaces(tenant); - - return namespaces.stream().filter(ns -> { - Optional<LocalPolicies> policies; - try { - policies = getLocalPolicies().getLocalPolicies(NamespaceName.get(ns)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - String storedAntiAffinityGroup = policies.orElseGet(() -> - new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()), - null, null)).namespaceAntiAffinityGroup; - return antiAffinityGroup.equalsIgnoreCase(storedAntiAffinityGroup); - }).collect(Collectors.toList()); - - } catch (Exception e) { - log.warn("Failed to list of properties/namespace from global-zk", e); - throw new RestException(e); - } + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ) + .thenCompose(__ -> validateClusterExistsAsync(cluster)) + .thenCompose(__ -> { + log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, + cluster); + return tenantResources().getListOfNamespacesAsync(tenant).thenCompose(namespaces -> { + List<CompletableFuture<String>> nsFutures = namespaces.stream() + .map(ns -> getLocalPolicies().getLocalPoliciesAsync(NamespaceName.get(ns)) + .thenApply(policiesOpt -> policiesOpt.map( + localPolicies -> localPolicies.namespaceAntiAffinityGroup).orElse(null)) + .thenApply(antiAffinityGroup::equalsIgnoreCase) + .thenApply(equals -> equals ? ns : null)).toList(); + CompletableFuture<Void> allFuture = FutureUtil.waitForAll(nsFutures); + return allFuture.thenApply( + unused -> nsFutures.stream().map(CompletableFuture::join).filter(Objects::nonNull) + .toList()); + }); + }); } private boolean checkQuotas(Policies policies, RetentionPolicies retention) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 276ea457d2c..4ba8d6aa1da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -583,10 +583,19 @@ public class Namespaces extends NamespacesBase { @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid antiAffinityGroup") }) - public void setNamespaceAntiAffinityGroup(@PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, String antiAffinityGroup) { + public void setNamespaceAntiAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, String antiAffinityGroup) { validateNamespaceName(property, cluster, namespace); - internalSetNamespaceAntiAffinityGroup(antiAffinityGroup); + internalSetNamespaceAntiAffinityGroupAsync(antiAffinityGroup) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to set namespace anti-affinity group, tenant: {}, namespace: {}, " + + "antiAffinityGroup: {}", clientAppId(), property, namespace, antiAffinityGroup, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -594,10 +603,19 @@ public class Namespaces extends NamespacesBase { @ApiOperation(value = "Get anti-affinity group of a namespace.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) - public String getNamespaceAntiAffinityGroup(@PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { + public void getNamespaceAntiAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - return internalGetNamespaceAntiAffinityGroup(); + internalGetNamespaceAntiAffinityGroupAsync() + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get namespace anti-affinity group, tenant: {}, namespace: {}", + clientAppId(), property, namespace, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -606,10 +624,19 @@ public class Namespaces extends NamespacesBase { + " api can be only accessed by admin of any of the existing property") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 412, message = "Cluster not exist/Anti-affinity group can't be empty.")}) - public List<String> getAntiAffinityNamespaces(@PathParam("cluster") String cluster, + public void getAntiAffinityNamespaces(@Suspended AsyncResponse asyncResponse, + @PathParam("cluster") String cluster, @PathParam("group") String antiAffinityGroup, @QueryParam("property") String property) { - return internalGetAntiAffinityNamespaces(cluster, antiAffinityGroup, property); + internalGetAntiAffinityNamespacesAsync(cluster, antiAffinityGroup, property) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get all namespaces in cluster of given anti-affinity group, cluster: {}, " + + "tenant: {}, antiAffinityGroup: {}", clientAppId(), cluster, property, antiAffinityGroup, + ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE @@ -618,11 +645,19 @@ public class Namespaces extends NamespacesBase { @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification")}) - public void removeNamespaceAntiAffinityGroup(@PathParam("property") String property, + public void removeNamespaceAntiAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - internalRemoveNamespaceAntiAffinityGroup(); + internalRemoveNamespaceAntiAffinityGroupAsync() + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to remove namespace anti-affinity group, tenant: {}, namespace: {}", + clientAppId(), property, namespace, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 5058eccc401..732cb315604 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -2144,13 +2144,21 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid antiAffinityGroup")}) - public void setNamespaceAntiAffinityGroup(@PathParam("tenant") String tenant, + public void setNamespaceAntiAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @ApiParam(value = "Anti-affinity group for the specified namespace", required = true) - String antiAffinityGroup) { + String antiAffinityGroup) { validateNamespaceName(tenant, namespace); - internalSetNamespaceAntiAffinityGroup(antiAffinityGroup); + internalSetNamespaceAntiAffinityGroupAsync(antiAffinityGroup) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to set namespace anti-affinity group, tenant: {}, namespace: {}, " + + "antiAffinityGroup: {}", clientAppId(), tenant, namespace, antiAffinityGroup, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -2158,10 +2166,18 @@ public class Namespaces extends NamespacesBase { @ApiOperation(value = "Get anti-affinity group of a namespace.", response = String.class) @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public String getNamespaceAntiAffinityGroup(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + public void getNamespaceAntiAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - return internalGetNamespaceAntiAffinityGroup(); + internalGetNamespaceAntiAffinityGroupAsync() + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get namespace anti-affinity group, tenant: {}, namespace: {}", + clientAppId(), tenant, namespace, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE @@ -2172,10 +2188,18 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification") }) - public void removeNamespaceAntiAffinityGroup(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + public void removeNamespaceAntiAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - internalRemoveNamespaceAntiAffinityGroup(); + internalRemoveNamespaceAntiAffinityGroupAsync() + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to remove namespace anti-affinity group, tenant: {}, namespace: {}", + clientAppId(), tenant, namespace, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -2185,9 +2209,19 @@ public class Namespaces extends NamespacesBase { response = String.class, responseContainer = "List") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 412, message = "Cluster not exist/Anti-affinity group can't be empty.")}) - public List<String> getAntiAffinityNamespaces(@PathParam("cluster") String cluster, - @PathParam("group") String antiAffinityGroup, @QueryParam("tenant") String tenant) { - return internalGetAntiAffinityNamespaces(cluster, antiAffinityGroup, tenant); + public void getAntiAffinityNamespaces(@Suspended AsyncResponse asyncResponse, + @PathParam("cluster") String cluster, + @PathParam("group") String antiAffinityGroup, + @QueryParam("tenant") String tenant) { + internalGetAntiAffinityNamespacesAsync(cluster, antiAffinityGroup, tenant) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get all namespaces in cluster of given anti-affinity group, cluster: {}, " + + "tenant: {}, antiAffinityGroup: {}", clientAppId(), cluster, tenant, antiAffinityGroup, + ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 2aaa5676d5b..2f5b99bfd9d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -2433,4 +2433,84 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { setBookieAffinityGroupNs)); assertNull(bookieAffinityGroupDataResp); } + + @Test + public void testSetAndDeleteNamespaceAntiAffinityGroup() throws Exception { + // 1. create namespace with empty policies, namespace anti affinity group should be null + String setNamespaceAntiAffinityGroupNs = "test-set-namespace-anti-affinity-group-ns"; + asyncRequests(response -> namespaces.createNamespace(response, testTenant, testLocalCluster, + setNamespaceAntiAffinityGroupNs, (Policies) null)); + String namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + setNamespaceAntiAffinityGroupNs)); + assertNull(namespaceAntiAffinityGroupResp); + + // 2.set namespace anti affinity group + String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; + asyncRequests(response -> namespaces.setNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + setNamespaceAntiAffinityGroupNs, namespaceAntiAffinityGroupReq)); + + // 3.assert namespace anti affinity group + namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + setNamespaceAntiAffinityGroupNs)); + assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); + + // 4.delete namespace anti affinity group + asyncRequests(response -> namespaces.removeNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + setNamespaceAntiAffinityGroupNs)); + + // 5.assert namespace anti affinity group + namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + setNamespaceAntiAffinityGroupNs)); + assertNull(namespaceAntiAffinityGroupResp); + } + + @Test + public void testGetClusterAntiAffinityNamespaces() throws Exception { + // create 5 namespaces, 3 namespaces are set to the same namespace anti affinity group, + // 2 namespaces are not set to any anti affinity group + String namespaceWithAntiAffinity1 = "namespace-with-anti-affinity-1"; + String namespaceWithAntiAffinity2 = "namespace-with-anti-affinity-2"; + String namespaceWithAntiAffinity3 = "namespace-with-anti-affinity-3"; + String namespaceWithoutAntiAffinity1 = "namespace-without-anti-affinity-1"; + String namespaceWithoutAntiAffinity2 = "namespace-without-anti-affinity-2"; + + // create namespaces + List<String> allNamespaces = + List.of(namespaceWithAntiAffinity1, namespaceWithAntiAffinity2, namespaceWithAntiAffinity3, + namespaceWithoutAntiAffinity1, namespaceWithoutAntiAffinity2); + for (String namespace : allNamespaces) { + asyncRequests(response -> namespaces.createNamespace(response, testTenant, testLocalCluster, namespace, + (Policies) null)); + } + + // set namespace anti affinity group + String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; + List<String> namespacesWithAntiAffinityGroup = + List.of(namespaceWithAntiAffinity1, namespaceWithAntiAffinity2, namespaceWithAntiAffinity3); + for (String namespace : namespacesWithAntiAffinityGroup) { + asyncRequests(response -> namespaces.setNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + namespace, namespaceAntiAffinityGroupReq)); + } + + // assert namespace anti affinity group + for (String namespace : namespacesWithAntiAffinityGroup) { + String namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + namespace)); + assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); + } + + // get namespaces in cluster of given anti affinity group + List<String> namespacesResp = (List<String>) asyncRequests( + response -> namespaces.getAntiAffinityNamespaces(response, testLocalCluster, + namespaceAntiAffinityGroupReq, testTenant)); + List<String> namespacesWithFullPath = + namespacesWithAntiAffinityGroup.stream().map(ns -> NamespaceName.get(testTenant, testLocalCluster, ns)) + .map(NamespaceName::toString).toList(); + assertEquals(namespacesResp, namespacesWithFullPath); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java index 629e92c056f..596cfa3f396 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java @@ -305,8 +305,8 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest { // 2.set namespace anti affinity group String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; - namespaces.setNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs, - namespaceAntiAffinityGroupReq); + asyncRequests(response -> namespaces.setNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs, namespaceAntiAffinityGroupReq)); // 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles() BundlesData bundlesData = (BundlesData) asyncRequests( @@ -314,8 +314,9 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest { assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles()); // 4.assert namespace anti affinity group - String namespaceAntiAffinityGroupResp = - namespaces.getNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs); + String namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs)); assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); } @@ -330,8 +331,8 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest { // 2.set namespace anti affinity group String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; - namespaces.setNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs, - namespaceAntiAffinityGroupReq); + asyncRequests(response -> namespaces.setNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs, namespaceAntiAffinityGroupReq)); // 3.query namespace num bundles, should be policies.bundles, which we set before BundlesData bundlesData = (BundlesData) asyncRequests( @@ -339,8 +340,9 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest { assertEquals(bundlesData, policies.bundles); // 4.assert namespace anti affinity group - String namespaceAntiAffinityGroupResp = - namespaces.getNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs); + String namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs)); assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); } @@ -418,4 +420,81 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest { assertNull(bookieAffinityGroupDataResp); } + @Test + public void testSetAndDeleteNamespaceAntiAffinityGroup() throws Exception { + // 1. create namespace with empty policies, namespace anti affinity group should be null + String setNamespaceAntiAffinityGroupNs = "test-set-namespace-anti-affinity-group-ns"; + asyncRequests( + response -> namespaces.createNamespace(response, testTenant, setNamespaceAntiAffinityGroupNs, null)); + String namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs)); + assertNull(namespaceAntiAffinityGroupResp); + + // 2.set namespace anti affinity group + String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; + asyncRequests(response -> namespaces.setNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs, namespaceAntiAffinityGroupReq)); + + // 3.assert namespace anti affinity group + namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs)); + assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); + + // 4.delete namespace anti affinity group + asyncRequests(response -> namespaces.removeNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs)); + + // 5.assert namespace anti affinity group + namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs)); + assertNull(namespaceAntiAffinityGroupResp); + } + + @Test + public void testGetClusterAntiAffinityNamespaces() throws Exception { + // create 5 namespaces, 3 namespaces are set to the same namespace anti affinity group, + // 2 namespaces are not set to any anti affinity group + String namespaceWithAntiAffinity1 = "namespace-with-anti-affinity-1"; + String namespaceWithAntiAffinity2 = "namespace-with-anti-affinity-2"; + String namespaceWithAntiAffinity3 = "namespace-with-anti-affinity-3"; + String namespaceWithoutAntiAffinity1 = "namespace-without-anti-affinity-1"; + String namespaceWithoutAntiAffinity2 = "namespace-without-anti-affinity-2"; + + // create namespaces + List<String> allNamespaces = + List.of(namespaceWithAntiAffinity1, namespaceWithAntiAffinity2, namespaceWithAntiAffinity3, + namespaceWithoutAntiAffinity1, namespaceWithoutAntiAffinity2); + for (String namespace : allNamespaces) { + asyncRequests(response -> namespaces.createNamespace(response, testTenant, namespace, null)); + } + + // set namespace anti affinity group + String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; + List<String> namespacesWithAntiAffinityGroup = + List.of(namespaceWithAntiAffinity1, namespaceWithAntiAffinity2, namespaceWithAntiAffinity3); + for (String namespace : namespacesWithAntiAffinityGroup) { + asyncRequests(response -> namespaces.setNamespaceAntiAffinityGroup(response, testTenant, namespace, + namespaceAntiAffinityGroupReq)); + } + + // assert namespace anti affinity group + for (String namespace : namespacesWithAntiAffinityGroup) { + String namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, namespace)); + assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); + } + + // get namespaces in cluster of given anti affinity group + List<String> namespacesResp = (List<String>) asyncRequests( + response -> namespaces.getAntiAffinityNamespaces(response, testLocalCluster, + namespaceAntiAffinityGroupReq, testTenant)); + List<String> namespacesWithFullPath = + namespacesWithAntiAffinityGroup.stream().map(ns -> NamespaceName.get(testTenant, ns)) + .map(NamespaceName::toString).toList(); + assertEquals(namespacesResp, namespacesWithFullPath); + } + }
