This is an automated email from the ASF dual-hosted git repository.
rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new e7eb92f1 feat(pulsaradmin): add scoped topic policies support (#1471)
e7eb92f1 is described below
commit e7eb92f148f55fbb437d499611f509eb55d72113
Author: Rui Fu <[email protected]>
AuthorDate: Mon Mar 23 22:42:49 2026 +0800
feat(pulsaradmin): add scoped topic policies support (#1471)
* feat(pulsaradmin): add scoped topic policies support
* style(topic_policies): improve code readability with formatting
* refactor(topic-policies): consolidate API by making context parameter
mandatory
---
pulsaradmin/README.md | 35 +
pulsaradmin/alias.go | 9 +
pulsaradmin/pkg/admin/topic.go | 260 ++++----
pulsaradmin/pkg/admin/topic_policies.go | 928 +++++++++++++++++++++++++++
pulsaradmin/pkg/admin/topic_policies_test.go | 241 +++++++
5 files changed, 1329 insertions(+), 144 deletions(-)
diff --git a/pulsaradmin/README.md b/pulsaradmin/README.md
index bf5ca739..f3795b26 100644
--- a/pulsaradmin/README.md
+++ b/pulsaradmin/README.md
@@ -103,6 +103,41 @@ func main() {
}
```
+- Read scoped topic policies
+
+```go
+import (
+ "context"
+ "github.com/apache/pulsar-client-go/pulsaradmin"
+ "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
+)
+
+func main() {
+ cfg := &pulsaradmin.Config{}
+ admin, err := pulsaradmin.NewClient(cfg)
+ if err != nil {
+ panic(err)
+ }
+
+ topic, _ := utils.GetTopicName("persistent://public/default/example")
+
+ localPolicies, err := pulsaradmin.TopicPoliciesOf(admin, false)
+ if err != nil {
+ panic(err)
+ }
+
+ ttl, err := localPolicies.GetMessageTTL(context.Background(), *topic,
false)
+ if err != nil {
+ panic(err)
+ }
+ if ttl == nil {
+ return
+ }
+
+ _, _ = pulsaradmin.TopicPoliciesOf(admin, true)
+}
+```
+
## Contributing
Contributions are warmly welcomed and greatly appreciated!
diff --git a/pulsaradmin/alias.go b/pulsaradmin/alias.go
index 53c8d93e..36cd0a54 100644
--- a/pulsaradmin/alias.go
+++ b/pulsaradmin/alias.go
@@ -25,10 +25,19 @@ import (
// Client contains all admin interfaces for operating pulsar resources
type Client = admin.Client
+// TopicPolicies contains scoped admin interfaces for topic-level policies.
+type TopicPolicies = admin.TopicPolicies
+
+// TopicPoliciesProvider provides scoped TopicPolicies instances.
+type TopicPoliciesProvider = admin.TopicPoliciesProvider
+
// Config are the arguments for creating a new admin Client
type Config = config.Config
var (
// NewClient returns a new admin Client for operating pulsar resources
NewClient = admin.New
+
+ // TopicPoliciesOf returns a scoped TopicPolicies client from an admin
client when available.
+ TopicPoliciesOf = admin.TopicPoliciesOf
)
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index 14662ddc..3101607b 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -1101,6 +1101,40 @@ func (c *pulsarClient) Topics() Topics {
}
}
+func (t *topics) localTopicPolicies() TopicPolicies {
+ return t.pulsar.TopicPolicies(false)
+}
+
+func legacyTopicPolicyInt(value *int) int {
+ if value == nil {
+ return -1
+ }
+ return *value
+}
+
+func legacyTopicPolicyInt64(value *int64) int64 {
+ if value == nil {
+ return -1
+ }
+ return *value
+}
+
+func legacyTopicPolicyBool(value *bool) bool {
+ if value == nil {
+ return false
+ }
+ return *value
+}
+
+func legacyTopicSchemaCompatibilityStrategy(
+ value *utils.SchemaCompatibilityStrategy,
+) utils.SchemaCompatibilityStrategy {
+ if value == nil {
+ return utils.SchemaCompatibilityStrategyUndefined
+ }
+ return *value
+}
+
func (t *topics) Create(topic utils.TopicName, partitions int) error {
return t.CreateWithContext(context.Background(), topic, partitions)
}
@@ -1512,10 +1546,11 @@ func (t *topics) GetMessageTTL(topic utils.TopicName)
(int, error) {
}
func (t *topics) GetMessageTTLWithContext(ctx context.Context, topic
utils.TopicName) (int, error) {
- var ttl = -1
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"messageTTL")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &ttl)
- return ttl, err
+ ttl, err := t.localTopicPolicies().GetMessageTTL(ctx, topic, false)
+ if err != nil {
+ return -1, err
+ }
+ return legacyTopicPolicyInt(ttl), nil
}
func (t *topics) SetMessageTTL(topic utils.TopicName, messageTTL int) error {
@@ -1547,10 +1582,11 @@ func (t *topics) GetMaxProducers(topic utils.TopicName)
(int, error) {
}
func (t *topics) GetMaxProducersWithContext(ctx context.Context, topic
utils.TopicName) (int, error) {
- var maxProducers = -1
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxProducers")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxProducers)
- return maxProducers, err
+ maxProducers, err := t.localTopicPolicies().GetMaxProducers(ctx, topic,
false)
+ if err != nil {
+ return -1, err
+ }
+ return legacyTopicPolicyInt(maxProducers), nil
}
func (t *topics) SetMaxProducers(topic utils.TopicName, maxProducers int)
error {
@@ -1578,10 +1614,11 @@ func (t *topics) GetMaxConsumers(topic utils.TopicName)
(int, error) {
}
func (t *topics) GetMaxConsumersWithContext(ctx context.Context, topic
utils.TopicName) (int, error) {
- var maxConsumers = -1
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxConsumers")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxConsumers)
- return maxConsumers, err
+ maxConsumers, err := t.localTopicPolicies().GetMaxConsumers(ctx, topic,
false)
+ if err != nil {
+ return -1, err
+ }
+ return legacyTopicPolicyInt(maxConsumers), nil
}
func (t *topics) SetMaxConsumers(topic utils.TopicName, maxConsumers int)
error {
@@ -1609,10 +1646,11 @@ func (t *topics) GetMaxUnackMessagesPerConsumer(topic
utils.TopicName) (int, err
}
func (t *topics) GetMaxUnackMessagesPerConsumerWithContext(ctx
context.Context, topic utils.TopicName) (int, error) {
- var maxNum = -1
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxUnackedMessagesOnConsumer")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxNum)
- return maxNum, err
+ maxNum, err :=
t.localTopicPolicies().GetMaxUnackMessagesPerConsumer(ctx, topic, false)
+ if err != nil {
+ return -1, err
+ }
+ return legacyTopicPolicyInt(maxNum), nil
}
func (t *topics) SetMaxUnackMessagesPerConsumer(topic utils.TopicName,
maxUnackedNum int) error {
@@ -1645,10 +1683,11 @@ func (t *topics)
GetMaxUnackMessagesPerSubscriptionWithContext(
ctx context.Context,
topic utils.TopicName,
) (int, error) {
- var maxNum = -1
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxUnackedMessagesOnSubscription")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxNum)
- return maxNum, err
+ maxNum, err :=
t.localTopicPolicies().GetMaxUnackMessagesPerSubscription(ctx, topic, false)
+ if err != nil {
+ return -1, err
+ }
+ return legacyTopicPolicyInt(maxNum), nil
}
func (t *topics) SetMaxUnackMessagesPerSubscription(topic utils.TopicName,
maxUnackedNum int) error {
@@ -1678,13 +1717,7 @@ func (t *topics) GetPersistence(topic utils.TopicName)
(*utils.PersistenceData,
}
func (t *topics) GetPersistenceWithContext(ctx context.Context, topic
utils.TopicName) (*utils.PersistenceData, error) {
- var persistenceData utils.PersistenceData
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"persistence")
- body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&persistenceData)
- if body != nil {
- return &persistenceData, err
- }
- return nil, err
+ return t.localTopicPolicies().GetPersistence(ctx, topic, false)
}
func (t *topics) SetPersistence(topic utils.TopicName, persistenceData
utils.PersistenceData) error {
@@ -1717,13 +1750,7 @@ func (t *topics) GetDelayedDeliveryWithContext(
ctx context.Context,
topic utils.TopicName,
) (*utils.DelayedDeliveryData, error) {
- var delayedDeliveryData utils.DelayedDeliveryData
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"delayedDelivery")
- body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&delayedDeliveryData)
- if body != nil {
- return &delayedDeliveryData, err
- }
- return nil, err
+ return t.localTopicPolicies().GetDelayedDelivery(ctx, topic, false)
}
func (t *topics) SetDelayedDelivery(topic utils.TopicName, delayedDeliveryData
utils.DelayedDeliveryData) error {
@@ -1756,13 +1783,7 @@ func (t *topics) GetDispatchRateWithContext(
ctx context.Context,
topic utils.TopicName,
) (*utils.DispatchRateData, error) {
- var dispatchRateData utils.DispatchRateData
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"dispatchRate")
- body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&dispatchRateData)
- if body != nil {
- return &dispatchRateData, err
- }
- return nil, err
+ return t.localTopicPolicies().GetDispatchRate(ctx, topic, false)
}
func (t *topics) SetDispatchRate(topic utils.TopicName, dispatchRateData
utils.DispatchRateData) error {
@@ -1792,13 +1813,7 @@ func (t *topics) GetPublishRate(topic utils.TopicName)
(*utils.PublishRateData,
}
func (t *topics) GetPublishRateWithContext(ctx context.Context, topic
utils.TopicName) (*utils.PublishRateData, error) {
- var publishRateData utils.PublishRateData
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"publishRate")
- body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&publishRateData)
- if body != nil {
- return &publishRateData, err
- }
- return nil, err
+ return t.localTopicPolicies().GetPublishRate(ctx, topic, false)
}
func (t *topics) SetPublishRate(topic utils.TopicName, publishRateData
utils.PublishRateData) error {
@@ -1828,10 +1843,11 @@ func (t *topics) GetDeduplicationStatus(topic
utils.TopicName) (bool, error) {
}
func (t *topics) GetDeduplicationStatusWithContext(ctx context.Context, topic
utils.TopicName) (bool, error) {
- var enabled bool
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"deduplicationEnabled")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &enabled)
- return enabled, err
+ enabled, err := t.localTopicPolicies().GetDeduplicationStatus(ctx,
topic, false)
+ if err != nil {
+ return false, err
+ }
+ return legacyTopicPolicyBool(enabled), nil
}
func (t *topics) SetDeduplicationStatus(topic utils.TopicName, enabled bool)
error {
@@ -1861,15 +1877,7 @@ func (t *topics) GetRetentionWithContext(
topic utils.TopicName,
applied bool,
) (*utils.RetentionPolicies, error) {
- var policy utils.RetentionPolicies
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"retention")
- body, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx,
endpoint, &policy, map[string]string{
- "applied": strconv.FormatBool(applied),
- }, true)
- if body != nil {
- return &policy, err
- }
- return nil, err
+ return t.localTopicPolicies().GetRetention(ctx, topic, applied)
}
func (t *topics) RemoveRetention(topic utils.TopicName) error {
@@ -1903,12 +1911,11 @@ func (t *topics) GetCompactionThresholdWithContext(
topic utils.TopicName,
applied bool,
) (int64, error) {
- var threshold int64 = -1
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"compactionThreshold")
- _, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint,
&threshold, map[string]string{
- "applied": strconv.FormatBool(applied),
- }, true)
- return threshold, err
+ threshold, err := t.localTopicPolicies().GetCompactionThreshold(ctx,
topic, applied)
+ if err != nil {
+ return -1, err
+ }
+ return legacyTopicPolicyInt64(threshold), nil
}
func (t *topics) SetCompactionThreshold(topic utils.TopicName, threshold
int64) error {
@@ -1942,13 +1949,7 @@ func (t *topics) GetBacklogQuotaMapWithContext(
applied bool,
) (map[utils.BacklogQuotaType]utils.BacklogQuota,
error) {
- var backlogQuotaMap map[utils.BacklogQuotaType]utils.BacklogQuota
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"backlogQuotaMap")
-
- queryParams := map[string]string{"applied": strconv.FormatBool(applied)}
- _, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint,
&backlogQuotaMap, queryParams, true)
-
- return backlogQuotaMap, err
+ return t.localTopicPolicies().GetBacklogQuotaMap(ctx, topic, applied)
}
func (t *topics) SetBacklogQuota(topic utils.TopicName, backlogQuota
utils.BacklogQuota,
@@ -1988,12 +1989,14 @@ func (t *topics) GetInactiveTopicPoliciesWithContext(
topic utils.TopicName,
applied bool,
) (utils.InactiveTopicPolicies, error) {
- var out utils.InactiveTopicPolicies
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"inactiveTopicPolicies")
- _, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint,
&out, map[string]string{
- "applied": strconv.FormatBool(applied),
- }, true)
- return out, err
+ policies, err := t.localTopicPolicies().GetInactiveTopicPolicies(ctx,
topic, applied)
+ if err != nil {
+ return utils.InactiveTopicPolicies{}, err
+ }
+ if policies == nil {
+ return utils.InactiveTopicPolicies{}, nil
+ }
+ return *policies, nil
}
func (t *topics) RemoveInactiveTopicPolicies(topic utils.TopicName) error {
@@ -2032,10 +2035,7 @@ func (t *topics) GetReplicationClusters(topic
utils.TopicName) ([]string, error)
}
func (t *topics) GetReplicationClustersWithContext(ctx context.Context, topic
utils.TopicName) ([]string, error) {
- var data []string
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"replication")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &data)
- return data, err
+ return t.localTopicPolicies().GetReplicationClusters(ctx, topic, false)
}
func (t *topics) RemoveReplicationClusters(topic utils.TopicName) error {
@@ -2052,13 +2052,7 @@ func (t *topics) GetSubscribeRate(topic utils.TopicName)
(*utils.SubscribeRate,
}
func (t *topics) GetSubscribeRateWithContext(ctx context.Context, topic
utils.TopicName) (*utils.SubscribeRate, error) {
- var subscribeRate utils.SubscribeRate
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"subscribeRate")
- body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&subscribeRate)
- if body != nil {
- return &subscribeRate, err
- }
- return nil, err
+ return t.localTopicPolicies().GetSubscribeRate(ctx, topic, false)
}
func (t *topics) SetSubscribeRate(topic utils.TopicName, subscribeRate
utils.SubscribeRate) error {
@@ -2091,13 +2085,7 @@ func (t *topics) GetSubscriptionDispatchRateWithContext(
ctx context.Context,
topic utils.TopicName,
) (*utils.DispatchRateData, error) {
- var dispatchRate utils.DispatchRateData
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"subscriptionDispatchRate")
- body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&dispatchRate)
- if body != nil {
- return &dispatchRate, err
- }
- return nil, err
+ return t.localTopicPolicies().GetSubscriptionDispatchRate(ctx, topic,
false)
}
func (t *topics) SetSubscriptionDispatchRate(topic utils.TopicName,
dispatchRate utils.DispatchRateData) error {
@@ -2127,10 +2115,11 @@ func (t *topics) GetMaxConsumersPerSubscription(topic
utils.TopicName) (int, err
}
func (t *topics) GetMaxConsumersPerSubscriptionWithContext(ctx
context.Context, topic utils.TopicName) (int, error) {
- var maxConsumers = -1
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxConsumersPerSubscription")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxConsumers)
- return maxConsumers, err
+ maxConsumers, err :=
t.localTopicPolicies().GetMaxConsumersPerSubscription(ctx, topic, false)
+ if err != nil {
+ return -1, err
+ }
+ return legacyTopicPolicyInt(maxConsumers), nil
}
func (t *topics) SetMaxConsumersPerSubscription(topic utils.TopicName,
maxConsumers int) error {
@@ -2160,10 +2149,11 @@ func (t *topics) GetMaxMessageSize(topic
utils.TopicName) (int, error) {
}
func (t *topics) GetMaxMessageSizeWithContext(ctx context.Context, topic
utils.TopicName) (int, error) {
- var maxMessageSize = -1
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxMessageSize")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxMessageSize)
- return maxMessageSize, err
+ maxMessageSize, err := t.localTopicPolicies().GetMaxMessageSize(ctx,
topic, false)
+ if err != nil {
+ return -1, err
+ }
+ return legacyTopicPolicyInt(maxMessageSize), nil
}
func (t *topics) SetMaxMessageSize(topic utils.TopicName, maxMessageSize int)
error {
@@ -2189,10 +2179,11 @@ func (t *topics) GetMaxSubscriptionsPerTopic(topic
utils.TopicName) (int, error)
}
func (t *topics) GetMaxSubscriptionsPerTopicWithContext(ctx context.Context,
topic utils.TopicName) (int, error) {
- var maxSubscriptions = -1
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxSubscriptionsPerTopic")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxSubscriptions)
- return maxSubscriptions, err
+ maxSubscriptions, err :=
t.localTopicPolicies().GetMaxSubscriptionsPerTopic(ctx, topic, false)
+ if err != nil {
+ return -1, err
+ }
+ return legacyTopicPolicyInt(maxSubscriptions), nil
}
func (t *topics) SetMaxSubscriptionsPerTopic(topic utils.TopicName,
maxSubscriptions int) error {
@@ -2222,10 +2213,11 @@ func (t *topics) GetSchemaValidationEnforced(topic
utils.TopicName) (bool, error
}
func (t *topics) GetSchemaValidationEnforcedWithContext(ctx context.Context,
topic utils.TopicName) (bool, error) {
- var enabled bool
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"schemaValidationEnforced")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &enabled)
- return enabled, err
+ enabled, err := t.localTopicPolicies().GetSchemaValidationEnforced(ctx,
topic, false)
+ if err != nil {
+ return false, err
+ }
+ return legacyTopicPolicyBool(enabled), nil
}
func (t *topics) SetSchemaValidationEnforced(topic utils.TopicName, enabled
bool) error {
@@ -2255,10 +2247,11 @@ func (t *topics) GetDeduplicationSnapshotInterval(topic
utils.TopicName) (int, e
}
func (t *topics) GetDeduplicationSnapshotIntervalWithContext(ctx
context.Context, topic utils.TopicName) (int, error) {
- var interval = -1
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"deduplicationSnapshotInterval")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &interval)
- return interval, err
+ interval, err :=
t.localTopicPolicies().GetDeduplicationSnapshotInterval(ctx, topic, false)
+ if err != nil {
+ return -1, err
+ }
+ return legacyTopicPolicyInt(interval), nil
}
func (t *topics) SetDeduplicationSnapshotInterval(topic utils.TopicName,
interval int) error {
@@ -2291,13 +2284,7 @@ func (t *topics) GetReplicatorDispatchRateWithContext(
ctx context.Context,
topic utils.TopicName,
) (*utils.DispatchRateData, error) {
- var dispatchRate utils.DispatchRateData
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"replicatorDispatchRate")
- body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&dispatchRate)
- if body != nil {
- return &dispatchRate, err
- }
- return nil, err
+ return t.localTopicPolicies().GetReplicatorDispatchRate(ctx, topic,
false)
}
func (t *topics) SetReplicatorDispatchRate(topic utils.TopicName, dispatchRate
utils.DispatchRateData) error {
@@ -2330,13 +2317,7 @@ func (t *topics) GetAutoSubscriptionCreationWithContext(
ctx context.Context,
topic utils.TopicName,
) (*utils.AutoSubscriptionCreationOverride, error) {
- var autoSubCreation utils.AutoSubscriptionCreationOverride
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"autoSubscriptionCreation")
- body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&autoSubCreation)
- if body != nil {
- return &autoSubCreation, err
- }
- return nil, err
+ return t.localTopicPolicies().GetAutoSubscriptionCreation(ctx, topic,
false)
}
func (t *topics) SetAutoSubscriptionCreation(topic utils.TopicName,
@@ -2382,14 +2363,11 @@ func (t *topics)
GetSchemaCompatibilityStrategyAppliedWithContext(
topic utils.TopicName,
applied bool,
) (utils.SchemaCompatibilityStrategy, error) {
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"schemaCompatibilityStrategy")
- body, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx,
endpoint, nil, map[string]string{
- "applied": strconv.FormatBool(applied),
- }, false)
+ strategy, err :=
t.localTopicPolicies().GetSchemaCompatibilityStrategy(ctx, topic, applied)
if err != nil {
return "", err
}
- return parseTopicSchemaCompatibilityStrategy(body)
+ return legacyTopicSchemaCompatibilityStrategy(strategy), nil
}
func (t *topics) SetSchemaCompatibilityStrategy(topic utils.TopicName,
@@ -2429,13 +2407,7 @@ func (t *topics) GetOffloadPoliciesWithContext(
ctx context.Context,
topic utils.TopicName,
) (*utils.OffloadPolicies, error) {
- var offloadPolicies utils.OffloadPolicies
- endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"offloadPolicies")
- body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&offloadPolicies)
- if body != nil {
- return &offloadPolicies, err
- }
- return nil, err
+ return t.localTopicPolicies().GetOffloadPolicies(ctx, topic, false)
}
func (t *topics) SetOffloadPolicies(topic utils.TopicName, offloadPolicies
utils.OffloadPolicies) error {
diff --git a/pulsaradmin/pkg/admin/topic_policies.go
b/pulsaradmin/pkg/admin/topic_policies.go
new file mode 100644
index 00000000..a754e428
--- /dev/null
+++ b/pulsaradmin/pkg/admin/topic_policies.go
@@ -0,0 +1,928 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package admin
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "strconv"
+ "strings"
+
+ "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
+)
+
+// TopicPolicies is the scoped admin interface for topic-level policies.
+type TopicPolicies interface {
+ GetMessageTTL(context.Context, utils.TopicName, bool) (*int, error)
+ SetMessageTTL(context.Context, utils.TopicName, int) error
+ RemoveMessageTTL(context.Context, utils.TopicName) error
+
+ GetMaxProducers(context.Context, utils.TopicName, bool) (*int, error)
+ SetMaxProducers(context.Context, utils.TopicName, int) error
+ RemoveMaxProducers(context.Context, utils.TopicName) error
+
+ GetMaxConsumers(context.Context, utils.TopicName, bool) (*int, error)
+ SetMaxConsumers(context.Context, utils.TopicName, int) error
+ RemoveMaxConsumers(context.Context, utils.TopicName) error
+
+ GetMaxUnackMessagesPerConsumer(context.Context, utils.TopicName, bool)
(*int, error)
+ SetMaxUnackMessagesPerConsumer(context.Context, utils.TopicName, int)
error
+ RemoveMaxUnackMessagesPerConsumer(context.Context, utils.TopicName)
error
+
+ GetMaxUnackMessagesPerSubscription(context.Context, utils.TopicName,
bool) (*int, error)
+ SetMaxUnackMessagesPerSubscription(context.Context, utils.TopicName,
int) error
+ RemoveMaxUnackMessagesPerSubscription(context.Context, utils.TopicName)
error
+
+ GetPersistence(context.Context, utils.TopicName, bool)
(*utils.PersistenceData, error)
+ SetPersistence(context.Context, utils.TopicName, utils.PersistenceData)
error
+ RemovePersistence(context.Context, utils.TopicName) error
+
+ GetDelayedDelivery(context.Context, utils.TopicName, bool)
(*utils.DelayedDeliveryData, error)
+ SetDelayedDelivery(context.Context, utils.TopicName,
utils.DelayedDeliveryData) error
+ RemoveDelayedDelivery(context.Context, utils.TopicName) error
+
+ GetDispatchRate(context.Context, utils.TopicName, bool)
(*utils.DispatchRateData, error)
+ SetDispatchRate(context.Context, utils.TopicName,
utils.DispatchRateData) error
+ RemoveDispatchRate(context.Context, utils.TopicName) error
+
+ GetPublishRate(context.Context, utils.TopicName, bool)
(*utils.PublishRateData, error)
+ SetPublishRate(context.Context, utils.TopicName, utils.PublishRateData)
error
+ RemovePublishRate(context.Context, utils.TopicName) error
+
+ GetDeduplicationStatus(context.Context, utils.TopicName, bool) (*bool,
error)
+ SetDeduplicationStatus(context.Context, utils.TopicName, bool) error
+ RemoveDeduplicationStatus(context.Context, utils.TopicName) error
+
+ GetRetention(context.Context, utils.TopicName, bool)
(*utils.RetentionPolicies, error)
+ SetRetention(context.Context, utils.TopicName, utils.RetentionPolicies)
error
+ RemoveRetention(context.Context, utils.TopicName) error
+
+ GetCompactionThreshold(context.Context, utils.TopicName, bool) (*int64,
error)
+ SetCompactionThreshold(context.Context, utils.TopicName, int64) error
+ RemoveCompactionThreshold(context.Context, utils.TopicName) error
+
+ GetBacklogQuotaMap(context.Context, utils.TopicName, bool)
(map[utils.BacklogQuotaType]utils.BacklogQuota, error)
+ SetBacklogQuota(context.Context, utils.TopicName, utils.BacklogQuota,
utils.BacklogQuotaType) error
+ RemoveBacklogQuota(context.Context, utils.TopicName,
utils.BacklogQuotaType) error
+
+ GetInactiveTopicPolicies(context.Context, utils.TopicName, bool)
(*utils.InactiveTopicPolicies, error)
+ SetInactiveTopicPolicies(context.Context, utils.TopicName,
utils.InactiveTopicPolicies) error
+ RemoveInactiveTopicPolicies(context.Context, utils.TopicName) error
+
+ GetReplicationClusters(context.Context, utils.TopicName, bool)
([]string, error)
+ SetReplicationClusters(context.Context, utils.TopicName, []string) error
+ RemoveReplicationClusters(context.Context, utils.TopicName) error
+
+ GetSubscribeRate(context.Context, utils.TopicName, bool)
(*utils.SubscribeRate, error)
+ SetSubscribeRate(context.Context, utils.TopicName, utils.SubscribeRate)
error
+ RemoveSubscribeRate(context.Context, utils.TopicName) error
+
+ GetSubscriptionDispatchRate(context.Context, utils.TopicName, bool)
(*utils.DispatchRateData, error)
+ SetSubscriptionDispatchRate(context.Context, utils.TopicName,
utils.DispatchRateData) error
+ RemoveSubscriptionDispatchRate(context.Context, utils.TopicName) error
+
+ GetMaxConsumersPerSubscription(context.Context, utils.TopicName, bool)
(*int, error)
+ SetMaxConsumersPerSubscription(context.Context, utils.TopicName, int)
error
+ RemoveMaxConsumersPerSubscription(context.Context, utils.TopicName)
error
+
+ GetMaxMessageSize(context.Context, utils.TopicName, bool) (*int, error)
+ SetMaxMessageSize(context.Context, utils.TopicName, int) error
+ RemoveMaxMessageSize(context.Context, utils.TopicName) error
+
+ GetMaxSubscriptionsPerTopic(context.Context, utils.TopicName, bool)
(*int, error)
+ SetMaxSubscriptionsPerTopic(context.Context, utils.TopicName, int) error
+ RemoveMaxSubscriptionsPerTopic(context.Context, utils.TopicName) error
+
+ GetSchemaValidationEnforced(context.Context, utils.TopicName, bool)
(*bool, error)
+ SetSchemaValidationEnforced(context.Context, utils.TopicName, bool)
error
+ RemoveSchemaValidationEnforced(context.Context, utils.TopicName) error
+
+ GetDeduplicationSnapshotInterval(context.Context, utils.TopicName,
bool) (*int, error)
+ SetDeduplicationSnapshotInterval(context.Context, utils.TopicName, int)
error
+ RemoveDeduplicationSnapshotInterval(context.Context, utils.TopicName)
error
+
+ GetReplicatorDispatchRate(context.Context, utils.TopicName, bool)
(*utils.DispatchRateData, error)
+ SetReplicatorDispatchRate(context.Context, utils.TopicName,
utils.DispatchRateData) error
+ RemoveReplicatorDispatchRate(context.Context, utils.TopicName) error
+
+ GetOffloadPolicies(context.Context, utils.TopicName, bool)
(*utils.OffloadPolicies, error)
+ SetOffloadPolicies(context.Context, utils.TopicName,
utils.OffloadPolicies) error
+ RemoveOffloadPolicies(context.Context, utils.TopicName) error
+
+ GetAutoSubscriptionCreation(context.Context, utils.TopicName, bool)
(*utils.AutoSubscriptionCreationOverride, error)
+ SetAutoSubscriptionCreation(context.Context, utils.TopicName,
utils.AutoSubscriptionCreationOverride) error
+ RemoveAutoSubscriptionCreation(context.Context, utils.TopicName) error
+
+ GetSchemaCompatibilityStrategy(context.Context, utils.TopicName, bool)
(*utils.SchemaCompatibilityStrategy, error)
+ SetSchemaCompatibilityStrategy(context.Context, utils.TopicName,
utils.SchemaCompatibilityStrategy) error
+ RemoveSchemaCompatibilityStrategy(context.Context, utils.TopicName)
error
+}
+
+// TopicPoliciesProvider provides access to scoped topic policy clients.
+type TopicPoliciesProvider interface {
+ TopicPolicies(isGlobal bool) TopicPolicies
+}
+
+// TopicPoliciesOf returns a scoped topic policy client when the provided
admin client supports it.
+func TopicPoliciesOf(client Client, isGlobal bool) (TopicPolicies, error) {
+ provider, ok := client.(TopicPoliciesProvider)
+ if !ok {
+ return nil, fmt.Errorf("admin client does not implement
TopicPoliciesProvider")
+ }
+ return provider.TopicPolicies(isGlobal), nil
+}
+
+type topicPolicies struct {
+ pulsar *pulsarClient
+ basePath string
+ isGlobal bool
+}
+
+var _ TopicPolicies = &topicPolicies{}
+var _ TopicPoliciesProvider = &pulsarClient{}
+
+func (c *pulsarClient) TopicPolicies(isGlobal bool) TopicPolicies {
+ return &topicPolicies{
+ pulsar: c,
+ basePath: "",
+ isGlobal: isGlobal,
+ }
+}
+
+func (t *topicPolicies) topicEndpoint(topic utils.TopicName, parts ...string)
string {
+ allParts := make([]string, 0, len(parts)+1)
+ allParts = append(allParts, topic.GetRestPath())
+ allParts = append(allParts, parts...)
+ return t.pulsar.endpoint(t.basePath, allParts...)
+}
+
+func (t *topicPolicies) scopedQueryParams(params map[string]string)
map[string]string {
+ if !t.isGlobal && len(params) == 0 {
+ return nil
+ }
+
+ out := make(map[string]string, len(params)+1)
+ for key, value := range params {
+ out[key] = value
+ }
+ if t.isGlobal {
+ out["isGlobal"] = "true"
+ }
+ return out
+}
+
+func (t *topicPolicies) appliedQueryParams(applied bool) map[string]string {
+ return t.scopedQueryParams(map[string]string{
+ "applied": strconv.FormatBool(applied),
+ })
+}
+
+func (t *topicPolicies) readPolicyBodyWithContext(
+ ctx context.Context,
+ endpoint string,
+ applied bool,
+) ([]byte, error) {
+ return t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint,
nil, t.appliedQueryParams(applied), false)
+}
+
+func (t *topicPolicies) scopedPostWithContext(
+ ctx context.Context,
+ endpoint string,
+ body interface{},
+ params map[string]string,
+) error {
+ return t.pulsar.Client.PostWithQueryParamsWithContext(ctx, endpoint,
body, t.scopedQueryParams(params))
+}
+
+func (t *topicPolicies) scopedPutWithContext(
+ ctx context.Context,
+ endpoint string,
+ body interface{},
+ params map[string]string,
+) error {
+ return t.pulsar.Client.PutWithQueryParamsWithContext(ctx, endpoint,
body, nil, t.scopedQueryParams(params))
+}
+
+func (t *topicPolicies) scopedDeleteWithContext(
+ ctx context.Context,
+ endpoint string,
+ params map[string]string,
+) error {
+ return t.pulsar.Client.DeleteWithQueryParamsWithContext(ctx, endpoint,
t.scopedQueryParams(params))
+}
+
+func decodeOptionalJSON[T any](body []byte) (*T, error) {
+ if isUnsetPolicyBody(body) {
+ return nil, nil
+ }
+
+ var out T
+ if err := json.Unmarshal(body, &out); err != nil {
+ return nil, err
+ }
+ return &out, nil
+}
+
+func decodeOptionalSchemaCompatibilityStrategy(body []byte)
(*utils.SchemaCompatibilityStrategy, error) {
+ if isUnsetPolicyBody(body) {
+ return nil, nil
+ }
+
+ raw := strings.TrimSpace(string(body))
+ raw = strings.Trim(raw, "\"")
+ if raw == "" {
+ return nil, nil
+ }
+
+ strategy, err := utils.ParseSchemaCompatibilityStrategy(raw)
+ if err != nil {
+ return nil, err
+ }
+ return &strategy, nil
+}
+
+func isUnsetPolicyBody(body []byte) bool {
+ trimmed := strings.TrimSpace(string(body))
+ return trimmed == "" || trimmed == "null"
+}
+
+func (t *topicPolicies) GetMessageTTL(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*int, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"messageTTL"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMessageTTL(ctx context.Context, topic
utils.TopicName, messageTTL int) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"messageTTL"), nil, map[string]string{
+ "messageTTL": strconv.Itoa(messageTTL),
+ })
+}
+
+func (t *topicPolicies) RemoveMessageTTL(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"messageTTL"), map[string]string{
+ "messageTTL": strconv.Itoa(0),
+ })
+}
+
+func (t *topicPolicies) GetMaxProducers(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*int, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"maxProducers"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxProducers(ctx context.Context, topic
utils.TopicName, maxProducers int) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"maxProducers"), &maxProducers, nil)
+}
+
+func (t *topicPolicies) RemoveMaxProducers(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"maxProducers"), nil)
+}
+
+func (t *topicPolicies) GetMaxConsumers(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*int, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"maxConsumers"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxConsumers(ctx context.Context, topic
utils.TopicName, maxConsumers int) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"maxConsumers"), &maxConsumers, nil)
+}
+
+func (t *topicPolicies) RemoveMaxConsumers(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"maxConsumers"), nil)
+}
+
+func (t *topicPolicies) GetMaxUnackMessagesPerConsumer(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*int, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"maxUnackedMessagesOnConsumer"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxUnackMessagesPerConsumer(
+ ctx context.Context,
+ topic utils.TopicName,
+ maxUnackedNum int,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"maxUnackedMessagesOnConsumer"), &maxUnackedNum, nil)
+}
+
+func (t *topicPolicies) RemoveMaxUnackMessagesPerConsumer(
+ ctx context.Context,
+ topic utils.TopicName,
+) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"maxUnackedMessagesOnConsumer"), nil)
+}
+
+func (t *topicPolicies) GetMaxUnackMessagesPerSubscription(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*int, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"maxUnackedMessagesOnSubscription"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxUnackMessagesPerSubscription(
+ ctx context.Context,
+ topic utils.TopicName,
+ maxUnackedNum int,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"maxUnackedMessagesOnSubscription"), &maxUnackedNum, nil)
+}
+
+func (t *topicPolicies) RemoveMaxUnackMessagesPerSubscription(
+ ctx context.Context,
+ topic utils.TopicName,
+) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"maxUnackedMessagesOnSubscription"), nil)
+}
+
+func (t *topicPolicies) GetPersistence(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*utils.PersistenceData, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"persistence"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[utils.PersistenceData](body)
+}
+
+func (t *topicPolicies) SetPersistence(
+ ctx context.Context,
+ topic utils.TopicName,
+ persistenceData utils.PersistenceData,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"persistence"), &persistenceData, nil)
+}
+
+func (t *topicPolicies) RemovePersistence(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"persistence"), nil)
+}
+
+func (t *topicPolicies) GetDelayedDelivery(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*utils.DelayedDeliveryData, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"delayedDelivery"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[utils.DelayedDeliveryData](body)
+}
+
+func (t *topicPolicies) SetDelayedDelivery(
+ ctx context.Context,
+ topic utils.TopicName,
+ delayedDeliveryData utils.DelayedDeliveryData,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"delayedDelivery"), &delayedDeliveryData, nil)
+}
+
+func (t *topicPolicies) RemoveDelayedDelivery(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"delayedDelivery"), nil)
+}
+
+func (t *topicPolicies) GetDispatchRate(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*utils.DispatchRateData, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"dispatchRate"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[utils.DispatchRateData](body)
+}
+
+func (t *topicPolicies) SetDispatchRate(
+ ctx context.Context,
+ topic utils.TopicName,
+ dispatchRateData utils.DispatchRateData,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"dispatchRate"), &dispatchRateData, nil)
+}
+
+func (t *topicPolicies) RemoveDispatchRate(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"dispatchRate"), nil)
+}
+
+func (t *topicPolicies) GetPublishRate(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*utils.PublishRateData, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"publishRate"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[utils.PublishRateData](body)
+}
+
+func (t *topicPolicies) SetPublishRate(
+ ctx context.Context,
+ topic utils.TopicName,
+ publishRateData utils.PublishRateData,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"publishRate"), &publishRateData, nil)
+}
+
+func (t *topicPolicies) RemovePublishRate(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"publishRate"), nil)
+}
+
+func (t *topicPolicies) GetDeduplicationStatus(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*bool, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"deduplicationEnabled"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[bool](body)
+}
+
+func (t *topicPolicies) SetDeduplicationStatus(
+ ctx context.Context,
+ topic utils.TopicName,
+ enabled bool,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"deduplicationEnabled"), enabled, nil)
+}
+
+func (t *topicPolicies) RemoveDeduplicationStatus(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"deduplicationEnabled"), nil)
+}
+
+func (t *topicPolicies) GetRetention(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*utils.RetentionPolicies, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"retention"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[utils.RetentionPolicies](body)
+}
+
+func (t *topicPolicies) SetRetention(
+ ctx context.Context,
+ topic utils.TopicName,
+ data utils.RetentionPolicies,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"retention"), data, nil)
+}
+
+func (t *topicPolicies) RemoveRetention(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"retention"), nil)
+}
+
+func (t *topicPolicies) GetCompactionThreshold(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*int64, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"compactionThreshold"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[int64](body)
+}
+
+func (t *topicPolicies) SetCompactionThreshold(
+ ctx context.Context,
+ topic utils.TopicName,
+ threshold int64,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"compactionThreshold"), threshold, nil)
+}
+
+func (t *topicPolicies) RemoveCompactionThreshold(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"compactionThreshold"), nil)
+}
+
+func (t *topicPolicies) GetBacklogQuotaMap(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (map[utils.BacklogQuotaType]utils.BacklogQuota, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"backlogQuotaMap"), applied)
+ if err != nil {
+ return nil, err
+ }
+
+ decoded, err :=
decodeOptionalJSON[map[utils.BacklogQuotaType]utils.BacklogQuota](body)
+ if err != nil || decoded == nil {
+ return nil, err
+ }
+ return *decoded, nil
+}
+
+func (t *topicPolicies) SetBacklogQuota(
+ ctx context.Context,
+ topic utils.TopicName,
+ backlogQuota utils.BacklogQuota,
+ backlogQuotaType utils.BacklogQuotaType,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"backlogQuota"), &backlogQuota, map[string]string{
+ "backlogQuotaType": string(backlogQuotaType),
+ })
+}
+
+func (t *topicPolicies) RemoveBacklogQuota(
+ ctx context.Context,
+ topic utils.TopicName,
+ backlogQuotaType utils.BacklogQuotaType,
+) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"backlogQuota"), map[string]string{
+ "backlogQuotaType": string(backlogQuotaType),
+ })
+}
+
+func (t *topicPolicies) GetInactiveTopicPolicies(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*utils.InactiveTopicPolicies, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"inactiveTopicPolicies"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[utils.InactiveTopicPolicies](body)
+}
+
+func (t *topicPolicies) SetInactiveTopicPolicies(
+ ctx context.Context,
+ topic utils.TopicName,
+ data utils.InactiveTopicPolicies,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"inactiveTopicPolicies"), data, nil)
+}
+
+func (t *topicPolicies) RemoveInactiveTopicPolicies(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"inactiveTopicPolicies"), nil)
+}
+
+func (t *topicPolicies) GetReplicationClusters(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) ([]string, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"replication"), applied)
+ if err != nil {
+ return nil, err
+ }
+
+ decoded, err := decodeOptionalJSON[[]string](body)
+ if err != nil || decoded == nil {
+ return nil, err
+ }
+ return *decoded, nil
+}
+
+func (t *topicPolicies) SetReplicationClusters(
+ ctx context.Context,
+ topic utils.TopicName,
+ data []string,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"replication"), data, nil)
+}
+
+func (t *topicPolicies) RemoveReplicationClusters(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"replication"), nil)
+}
+
+func (t *topicPolicies) GetSubscribeRate(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*utils.SubscribeRate, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"subscribeRate"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[utils.SubscribeRate](body)
+}
+
+func (t *topicPolicies) SetSubscribeRate(
+ ctx context.Context,
+ topic utils.TopicName,
+ subscribeRate utils.SubscribeRate,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"subscribeRate"), &subscribeRate, nil)
+}
+
+func (t *topicPolicies) RemoveSubscribeRate(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"subscribeRate"), nil)
+}
+
+func (t *topicPolicies) GetSubscriptionDispatchRate(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*utils.DispatchRateData, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"subscriptionDispatchRate"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[utils.DispatchRateData](body)
+}
+
+func (t *topicPolicies) SetSubscriptionDispatchRate(
+ ctx context.Context,
+ topic utils.TopicName,
+ dispatchRate utils.DispatchRateData,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"subscriptionDispatchRate"), &dispatchRate, nil)
+}
+
+func (t *topicPolicies) RemoveSubscriptionDispatchRate(
+ ctx context.Context,
+ topic utils.TopicName,
+) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"subscriptionDispatchRate"), nil)
+}
+
+func (t *topicPolicies) GetMaxConsumersPerSubscription(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*int, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"maxConsumersPerSubscription"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxConsumersPerSubscription(
+ ctx context.Context,
+ topic utils.TopicName,
+ maxConsumers int,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"maxConsumersPerSubscription"), &maxConsumers, nil)
+}
+
+func (t *topicPolicies) RemoveMaxConsumersPerSubscription(
+ ctx context.Context,
+ topic utils.TopicName,
+) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"maxConsumersPerSubscription"), nil)
+}
+
+func (t *topicPolicies) GetMaxMessageSize(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*int, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"maxMessageSize"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxMessageSize(
+ ctx context.Context,
+ topic utils.TopicName,
+ maxMessageSize int,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"maxMessageSize"), &maxMessageSize, nil)
+}
+
+func (t *topicPolicies) RemoveMaxMessageSize(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"maxMessageSize"), nil)
+}
+
+func (t *topicPolicies) GetMaxSubscriptionsPerTopic(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*int, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"maxSubscriptionsPerTopic"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxSubscriptionsPerTopic(
+ ctx context.Context,
+ topic utils.TopicName,
+ maxSubscriptions int,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"maxSubscriptionsPerTopic"), &maxSubscriptions, nil)
+}
+
+func (t *topicPolicies) RemoveMaxSubscriptionsPerTopic(
+ ctx context.Context,
+ topic utils.TopicName,
+) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"maxSubscriptionsPerTopic"), nil)
+}
+
+func (t *topicPolicies) GetSchemaValidationEnforced(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*bool, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"schemaValidationEnforced"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[bool](body)
+}
+
+func (t *topicPolicies) SetSchemaValidationEnforced(
+ ctx context.Context,
+ topic utils.TopicName,
+ enabled bool,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"schemaValidationEnforced"), enabled, nil)
+}
+
+func (t *topicPolicies) RemoveSchemaValidationEnforced(
+ ctx context.Context,
+ topic utils.TopicName,
+) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"schemaValidationEnforced"), nil)
+}
+
+func (t *topicPolicies) GetDeduplicationSnapshotInterval(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*int, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"deduplicationSnapshotInterval"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetDeduplicationSnapshotInterval(
+ ctx context.Context,
+ topic utils.TopicName,
+ interval int,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"deduplicationSnapshotInterval"), &interval, nil)
+}
+
+func (t *topicPolicies) RemoveDeduplicationSnapshotInterval(
+ ctx context.Context,
+ topic utils.TopicName,
+) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"deduplicationSnapshotInterval"), nil)
+}
+
+func (t *topicPolicies) GetReplicatorDispatchRate(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*utils.DispatchRateData, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"replicatorDispatchRate"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[utils.DispatchRateData](body)
+}
+
+func (t *topicPolicies) SetReplicatorDispatchRate(
+ ctx context.Context,
+ topic utils.TopicName,
+ dispatchRate utils.DispatchRateData,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"replicatorDispatchRate"), &dispatchRate, nil)
+}
+
+func (t *topicPolicies) RemoveReplicatorDispatchRate(
+ ctx context.Context,
+ topic utils.TopicName,
+) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"replicatorDispatchRate"), nil)
+}
+
+func (t *topicPolicies) GetOffloadPolicies(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*utils.OffloadPolicies, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"offloadPolicies"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[utils.OffloadPolicies](body)
+}
+
+func (t *topicPolicies) SetOffloadPolicies(
+ ctx context.Context,
+ topic utils.TopicName,
+ offloadPolicies utils.OffloadPolicies,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"offloadPolicies"), &offloadPolicies, nil)
+}
+
+func (t *topicPolicies) RemoveOffloadPolicies(ctx context.Context, topic
utils.TopicName) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"offloadPolicies"), nil)
+}
+
+func (t *topicPolicies) GetAutoSubscriptionCreation(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*utils.AutoSubscriptionCreationOverride, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"autoSubscriptionCreation"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalJSON[utils.AutoSubscriptionCreationOverride](body)
+}
+
+func (t *topicPolicies) SetAutoSubscriptionCreation(
+ ctx context.Context,
+ topic utils.TopicName,
+ autoSubCreation utils.AutoSubscriptionCreationOverride,
+) error {
+ return t.scopedPostWithContext(ctx, t.topicEndpoint(topic,
"autoSubscriptionCreation"), &autoSubCreation, nil)
+}
+
+func (t *topicPolicies) RemoveAutoSubscriptionCreation(
+ ctx context.Context,
+ topic utils.TopicName,
+) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"autoSubscriptionCreation"), nil)
+}
+
+func (t *topicPolicies) GetSchemaCompatibilityStrategy(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (*utils.SchemaCompatibilityStrategy, error) {
+ body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic,
"schemaCompatibilityStrategy"), applied)
+ if err != nil {
+ return nil, err
+ }
+ return decodeOptionalSchemaCompatibilityStrategy(body)
+}
+
+func (t *topicPolicies) SetSchemaCompatibilityStrategy(
+ ctx context.Context,
+ topic utils.TopicName,
+ strategy utils.SchemaCompatibilityStrategy,
+) error {
+ return t.scopedPutWithContext(ctx, t.topicEndpoint(topic,
"schemaCompatibilityStrategy"), strategy, nil)
+}
+
+func (t *topicPolicies) RemoveSchemaCompatibilityStrategy(
+ ctx context.Context,
+ topic utils.TopicName,
+) error {
+ return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic,
"schemaCompatibilityStrategy"), nil)
+}
diff --git a/pulsaradmin/pkg/admin/topic_policies_test.go
b/pulsaradmin/pkg/admin/topic_policies_test.go
new file mode 100644
index 00000000..b6eac691
--- /dev/null
+++ b/pulsaradmin/pkg/admin/topic_policies_test.go
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package admin
+
+import (
+ "context"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
+ "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
+)
+
+type topicPolicyRequest struct {
+ method string
+ path string
+ query url.Values
+ body string
+}
+
+func newTopicPolicyTestClient(t *testing.T, handler http.HandlerFunc) (Client,
*pulsarClient) {
+ t.Helper()
+
+ server := httptest.NewServer(handler)
+ t.Cleanup(server.Close)
+
+ client, err := New(&config.Config{WebServiceURL: server.URL})
+ require.NoError(t, err)
+
+ pulsarClient, ok := client.(*pulsarClient)
+ require.True(t, ok)
+ return client, pulsarClient
+}
+
+func mustTopicName(t *testing.T, topic string) *utils.TopicName {
+ t.Helper()
+
+ topicName, err := utils.GetTopicName(topic)
+ require.NoError(t, err)
+ return topicName
+}
+
+func TestTopicPoliciesOf(t *testing.T) {
+ client, err := New(&config.Config{})
+ require.NoError(t, err)
+
+ local, err := TopicPoliciesOf(client, false)
+ require.NoError(t, err)
+ require.NotNil(t, local)
+ assert.False(t, local.(*topicPolicies).isGlobal)
+
+ global, err := TopicPoliciesOf(client, true)
+ require.NoError(t, err)
+ require.NotNil(t, global)
+ assert.True(t, global.(*topicPolicies).isGlobal)
+}
+
+func TestTopicPoliciesScopeAndAppliedParams(t *testing.T) {
+ requests := make([]topicPolicyRequest, 0, 4)
+ client, pulsarClient := newTopicPolicyTestClient(t, func(w
http.ResponseWriter, r *http.Request) {
+ body, err := io.ReadAll(r.Body)
+ require.NoError(t, err)
+
+ requests = append(requests, topicPolicyRequest{
+ method: r.Method,
+ path: r.URL.EscapedPath(),
+ query: r.URL.Query(),
+ body: string(body),
+ })
+
+ switch {
+ case r.Method == http.MethodGet:
+ _, err = w.Write([]byte("10"))
+ require.NoError(t, err)
+ default:
+ w.WriteHeader(http.StatusNoContent)
+ }
+ })
+
+ topic := mustTopicName(t, "persistent://public/default/scoped-policy")
+
+ localPolicies, err := TopicPoliciesOf(client, false)
+ require.NoError(t, err)
+ globalPolicies, err := TopicPoliciesOf(client, true)
+ require.NoError(t, err)
+
+ ttl, err := localPolicies.GetMessageTTL(context.Background(), *topic,
true)
+ require.NoError(t, err)
+ require.NotNil(t, ttl)
+ assert.Equal(t, 10, *ttl)
+
+ ttl, err = globalPolicies.GetMessageTTL(context.Background(), *topic,
false)
+ require.NoError(t, err)
+ require.NotNil(t, ttl)
+ assert.Equal(t, 10, *ttl)
+
+ err = globalPolicies.SetMaxProducers(context.Background(), *topic, 3)
+ require.NoError(t, err)
+
+ err = globalPolicies.RemoveRetention(context.Background(), *topic)
+ require.NoError(t, err)
+
+ expectedMessageTTLPath := pulsarClient.endpoint("",
topic.GetRestPath(), "messageTTL")
+ expectedMaxProducersPath := pulsarClient.endpoint("",
topic.GetRestPath(), "maxProducers")
+ expectedRetentionPath := pulsarClient.endpoint("", topic.GetRestPath(),
"retention")
+ decodedExpectedMessageTTLPath, err :=
url.PathUnescape(expectedMessageTTLPath)
+ require.NoError(t, err)
+ decodedExpectedMaxProducersPath, err :=
url.PathUnescape(expectedMaxProducersPath)
+ require.NoError(t, err)
+ decodedExpectedRetentionPath, err :=
url.PathUnescape(expectedRetentionPath)
+ require.NoError(t, err)
+
+ require.Len(t, requests, 4)
+
+ assert.Equal(t, http.MethodGet, requests[0].method)
+ assert.Equal(t, decodedExpectedMessageTTLPath, requests[0].path)
+ assert.Equal(t, "true", requests[0].query.Get("applied"))
+ assert.Empty(t, requests[0].query.Get("isGlobal"))
+
+ assert.Equal(t, http.MethodGet, requests[1].method)
+ assert.Equal(t, decodedExpectedMessageTTLPath, requests[1].path)
+ assert.Equal(t, "false", requests[1].query.Get("applied"))
+ assert.Equal(t, "true", requests[1].query.Get("isGlobal"))
+
+ assert.Equal(t, http.MethodPost, requests[2].method)
+ assert.Equal(t, decodedExpectedMaxProducersPath, requests[2].path)
+ assert.Equal(t, "true", requests[2].query.Get("isGlobal"))
+ assert.Contains(t, requests[2].body, "3")
+
+ assert.Equal(t, http.MethodDelete, requests[3].method)
+ assert.Equal(t, decodedExpectedRetentionPath, requests[3].path)
+ assert.Equal(t, "true", requests[3].query.Get("isGlobal"))
+}
+
+func TestTopicPoliciesNullDecodingAndLegacyDefaults(t *testing.T) {
+ client, _ := newTopicPolicyTestClient(t, func(w http.ResponseWriter, _
*http.Request) {
+ _, err := w.Write([]byte("null"))
+ require.NoError(t, err)
+ })
+
+ topic := mustTopicName(t, "persistent://public/default/null-policy")
+ policies, err := TopicPoliciesOf(client, false)
+ require.NoError(t, err)
+
+ ttl, err := policies.GetMessageTTL(context.Background(), *topic, false)
+ require.NoError(t, err)
+ assert.Nil(t, ttl)
+
+ deduplication, err :=
policies.GetDeduplicationStatus(context.Background(), *topic, false)
+ require.NoError(t, err)
+ assert.Nil(t, deduplication)
+
+ threshold, err := policies.GetCompactionThreshold(context.Background(),
*topic, false)
+ require.NoError(t, err)
+ assert.Nil(t, threshold)
+
+ strategy, err :=
policies.GetSchemaCompatibilityStrategy(context.Background(), *topic, false)
+ require.NoError(t, err)
+ assert.Nil(t, strategy)
+
+ inactivePolicies, err :=
policies.GetInactiveTopicPolicies(context.Background(), *topic, false)
+ require.NoError(t, err)
+ assert.Nil(t, inactivePolicies)
+
+ replicationClusters, err :=
policies.GetReplicationClusters(context.Background(), *topic, false)
+ require.NoError(t, err)
+ assert.Nil(t, replicationClusters)
+
+ backlogQuotaMap, err :=
policies.GetBacklogQuotaMap(context.Background(), *topic, false)
+ require.NoError(t, err)
+ assert.Nil(t, backlogQuotaMap)
+
+ legacyTTL, err := client.Topics().GetMessageTTL(*topic)
+ require.NoError(t, err)
+ assert.Equal(t, -1, legacyTTL)
+
+ legacyDeduplication, err :=
client.Topics().GetDeduplicationStatus(*topic)
+ require.NoError(t, err)
+ assert.False(t, legacyDeduplication)
+
+ legacyThreshold, err := client.Topics().GetCompactionThreshold(*topic,
false)
+ require.NoError(t, err)
+ assert.Equal(t, int64(-1), legacyThreshold)
+
+ legacyStrategy, err :=
client.Topics().GetSchemaCompatibilityStrategyApplied(*topic, false)
+ require.NoError(t, err)
+ assert.Equal(t, utils.SchemaCompatibilityStrategyUndefined,
legacyStrategy)
+
+ legacyInactivePolicies, err :=
client.Topics().GetInactiveTopicPolicies(*topic, false)
+ require.NoError(t, err)
+ assert.Equal(t, utils.InactiveTopicPolicies{}, legacyInactivePolicies)
+
+ legacyReplicationClusters, err :=
client.Topics().GetReplicationClusters(*topic)
+ require.NoError(t, err)
+ assert.Nil(t, legacyReplicationClusters)
+
+ legacyBacklogQuotaMap, err :=
client.Topics().GetBacklogQuotaMap(*topic, false)
+ require.NoError(t, err)
+ assert.Nil(t, legacyBacklogQuotaMap)
+}
+
+func TestLocalTopicPoliciesParityWithTopics(t *testing.T) {
+ client, _ := newTopicPolicyTestClient(t, func(w http.ResponseWriter, _
*http.Request) {
+ _, err := w.Write([]byte("600"))
+ require.NoError(t, err)
+ })
+
+ topic := mustTopicName(t, "persistent://public/default/parity-policy")
+ localPolicies, err := TopicPoliciesOf(client, false)
+ require.NoError(t, err)
+
+ newTTL, err := localPolicies.GetMessageTTL(context.Background(),
*topic, false)
+ require.NoError(t, err)
+ require.NotNil(t, newTTL)
+
+ legacyTTL, err := client.Topics().GetMessageTTL(*topic)
+ require.NoError(t, err)
+
+ assert.Equal(t, legacyTTL, *newTTL)
+}