poorbarcode commented on code in PR #25443:
URL: https://github.com/apache/pulsar/pull/25443#discussion_r3020344506
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -2010,29 +2013,82 @@ private CompletableFuture<Void>
checkTopicAlreadyMigrated(TopicName topicName) {
return result;
}
- public CompletableFuture<ManagedLedgerConfig>
getManagedLedgerConfig(@NonNull TopicName topicName) {
+ /**
+ * @return Triple [namespace policies, global topic policies, topic
policies].
+ */
+ public CompletableFuture<Boolean> isAllowedCurrentClusterAccess(@NonNull
TopicName topicName) {
+ final String cluster = getPulsar().getConfig().getClusterName();
+ return getCombinedTopicPolicies(topicName).thenApply(triple -> {
+ Optional<TopicPolicies> topicP = triple.getRight();
+ Optional<TopicPolicies> globalTopicP = triple.getMiddle();
+ Optional<Policies> nsPolicies = triple.getLeft();
+ // Disabled a cluster for a namespace manually.
+ if (nsPolicies.isPresent() &&
!nsPolicies.get().allowed_clusters.isEmpty()
+ && !nsPolicies.get().allowed_clusters.contains(cluster)) {
+ return false;
+ }
+ // Manually enabled topic-level replication, which can skip to set
a namespace-level replication.
+ if (topicP.isPresent() &&
CollectionUtils.isNotEmpty(topicP.get().getReplicationClusters())) {
+ if (topicP.get().getReplicationClusters().contains(cluster)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ if (globalTopicP.isPresent() &&
CollectionUtils.isNotEmpty(globalTopicP.get().getReplicationClusters())) {
+ if
(globalTopicP.get().getReplicationClusters().contains(cluster)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ // No settings for replication/allowed_clusters.
+ if (nsPolicies.isEmpty()) {
+ return true;
+ }
+ // Namespace level settings.
+ return nsPolicies.get().replication_clusters.isEmpty()
+ || nsPolicies.get().replication_clusters.contains(cluster);
+ });
+ }
Review Comment:
> Don't we already have a mechanism for retrieving the applied policies for
a topic? We should use it to avoid writing duplicate code.
No, we don't have such a method. There are currently two methods
- Persistent topic has a `HierarchyTopicPolicies` of merged policies, and
this result is relied upon for inspection.
- Check when the namespace modifies replication/allowed-cluster, see also
`BrokerService.isCurrentClusterAllowed(Namespace)`
The newly added method has maximised the reuse of code (by merging the
namespace check method `BrokerService.isCurrentClusterAllowed(Namespace`).
BTW, the method newly added will become the method you mentioned: `a
mechanism for retrieving the applied policies for a topic`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]