This is an automated email from the ASF dual-hosted git repository.

rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e7eb92f1 feat(pulsaradmin): add scoped topic policies support (#1471)
e7eb92f1 is described below

commit e7eb92f148f55fbb437d499611f509eb55d72113
Author: Rui Fu <[email protected]>
AuthorDate: Mon Mar 23 22:42:49 2026 +0800

    feat(pulsaradmin): add scoped topic policies support (#1471)
    
    * feat(pulsaradmin): add scoped topic policies support
    
    * style(topic_policies): improve code readability with formatting
    
    * refactor(topic-policies): consolidate API by making context parameter 
mandatory
---
 pulsaradmin/README.md                        |  35 +
 pulsaradmin/alias.go                         |   9 +
 pulsaradmin/pkg/admin/topic.go               | 260 ++++----
 pulsaradmin/pkg/admin/topic_policies.go      | 928 +++++++++++++++++++++++++++
 pulsaradmin/pkg/admin/topic_policies_test.go | 241 +++++++
 5 files changed, 1329 insertions(+), 144 deletions(-)

diff --git a/pulsaradmin/README.md b/pulsaradmin/README.md
index bf5ca739..f3795b26 100644
--- a/pulsaradmin/README.md
+++ b/pulsaradmin/README.md
@@ -103,6 +103,41 @@ func main() {
 }
 ```
 
+- Read scoped topic policies
+
+```go
+import (
+    "context"
+    "github.com/apache/pulsar-client-go/pulsaradmin"
+    "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
+)
+
+func main() {
+    cfg := &pulsaradmin.Config{}
+    admin, err := pulsaradmin.NewClient(cfg)
+    if err != nil {
+        panic(err)
+    }
+
+    topic, _ := utils.GetTopicName("persistent://public/default/example")
+
+    localPolicies, err := pulsaradmin.TopicPoliciesOf(admin, false)
+    if err != nil {
+        panic(err)
+    }
+
+    ttl, err := localPolicies.GetMessageTTL(context.Background(), *topic, 
false)
+    if err != nil {
+        panic(err)
+    }
+    if ttl == nil {
+        return
+    }
+
+    _, _ = pulsaradmin.TopicPoliciesOf(admin, true)
+}
+```
+
 ## Contributing
 
 Contributions are warmly welcomed and greatly appreciated!
diff --git a/pulsaradmin/alias.go b/pulsaradmin/alias.go
index 53c8d93e..36cd0a54 100644
--- a/pulsaradmin/alias.go
+++ b/pulsaradmin/alias.go
@@ -25,10 +25,19 @@ import (
 // Client contains all admin interfaces for operating pulsar resources
 type Client = admin.Client
 
+// TopicPolicies contains scoped admin interfaces for topic-level policies.
+type TopicPolicies = admin.TopicPolicies
+
+// TopicPoliciesProvider provides scoped TopicPolicies instances.
+type TopicPoliciesProvider = admin.TopicPoliciesProvider
+
 // Config are the arguments for creating a new admin Client
 type Config = config.Config
 
 var (
        // NewClient returns a new admin Client for operating pulsar resources
        NewClient = admin.New
+
+       // TopicPoliciesOf returns a scoped TopicPolicies client from an admin 
client when available.
+       TopicPoliciesOf = admin.TopicPoliciesOf
 )
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index 14662ddc..3101607b 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -1101,6 +1101,40 @@ func (c *pulsarClient) Topics() Topics {
        }
 }
 
+func (t *topics) localTopicPolicies() TopicPolicies {
+       return t.pulsar.TopicPolicies(false)
+}
+
+func legacyTopicPolicyInt(value *int) int {
+       if value == nil {
+               return -1
+       }
+       return *value
+}
+
+func legacyTopicPolicyInt64(value *int64) int64 {
+       if value == nil {
+               return -1
+       }
+       return *value
+}
+
+func legacyTopicPolicyBool(value *bool) bool {
+       if value == nil {
+               return false
+       }
+       return *value
+}
+
+func legacyTopicSchemaCompatibilityStrategy(
+       value *utils.SchemaCompatibilityStrategy,
+) utils.SchemaCompatibilityStrategy {
+       if value == nil {
+               return utils.SchemaCompatibilityStrategyUndefined
+       }
+       return *value
+}
+
 func (t *topics) Create(topic utils.TopicName, partitions int) error {
        return t.CreateWithContext(context.Background(), topic, partitions)
 }
@@ -1512,10 +1546,11 @@ func (t *topics) GetMessageTTL(topic utils.TopicName) 
(int, error) {
 }
 
 func (t *topics) GetMessageTTLWithContext(ctx context.Context, topic 
utils.TopicName) (int, error) {
-       var ttl = -1
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"messageTTL")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &ttl)
-       return ttl, err
+       ttl, err := t.localTopicPolicies().GetMessageTTL(ctx, topic, false)
+       if err != nil {
+               return -1, err
+       }
+       return legacyTopicPolicyInt(ttl), nil
 }
 
 func (t *topics) SetMessageTTL(topic utils.TopicName, messageTTL int) error {
@@ -1547,10 +1582,11 @@ func (t *topics) GetMaxProducers(topic utils.TopicName) 
(int, error) {
 }
 
 func (t *topics) GetMaxProducersWithContext(ctx context.Context, topic 
utils.TopicName) (int, error) {
-       var maxProducers = -1
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxProducers")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxProducers)
-       return maxProducers, err
+       maxProducers, err := t.localTopicPolicies().GetMaxProducers(ctx, topic, 
false)
+       if err != nil {
+               return -1, err
+       }
+       return legacyTopicPolicyInt(maxProducers), nil
 }
 
 func (t *topics) SetMaxProducers(topic utils.TopicName, maxProducers int) 
error {
@@ -1578,10 +1614,11 @@ func (t *topics) GetMaxConsumers(topic utils.TopicName) 
(int, error) {
 }
 
 func (t *topics) GetMaxConsumersWithContext(ctx context.Context, topic 
utils.TopicName) (int, error) {
-       var maxConsumers = -1
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxConsumers")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxConsumers)
-       return maxConsumers, err
+       maxConsumers, err := t.localTopicPolicies().GetMaxConsumers(ctx, topic, 
false)
+       if err != nil {
+               return -1, err
+       }
+       return legacyTopicPolicyInt(maxConsumers), nil
 }
 
 func (t *topics) SetMaxConsumers(topic utils.TopicName, maxConsumers int) 
error {
@@ -1609,10 +1646,11 @@ func (t *topics) GetMaxUnackMessagesPerConsumer(topic 
utils.TopicName) (int, err
 }
 
 func (t *topics) GetMaxUnackMessagesPerConsumerWithContext(ctx 
context.Context, topic utils.TopicName) (int, error) {
-       var maxNum = -1
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxUnackedMessagesOnConsumer")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxNum)
-       return maxNum, err
+       maxNum, err := 
t.localTopicPolicies().GetMaxUnackMessagesPerConsumer(ctx, topic, false)
+       if err != nil {
+               return -1, err
+       }
+       return legacyTopicPolicyInt(maxNum), nil
 }
 
 func (t *topics) SetMaxUnackMessagesPerConsumer(topic utils.TopicName, 
maxUnackedNum int) error {
@@ -1645,10 +1683,11 @@ func (t *topics) 
GetMaxUnackMessagesPerSubscriptionWithContext(
        ctx context.Context,
        topic utils.TopicName,
 ) (int, error) {
-       var maxNum = -1
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxUnackedMessagesOnSubscription")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxNum)
-       return maxNum, err
+       maxNum, err := 
t.localTopicPolicies().GetMaxUnackMessagesPerSubscription(ctx, topic, false)
+       if err != nil {
+               return -1, err
+       }
+       return legacyTopicPolicyInt(maxNum), nil
 }
 
 func (t *topics) SetMaxUnackMessagesPerSubscription(topic utils.TopicName, 
maxUnackedNum int) error {
@@ -1678,13 +1717,7 @@ func (t *topics) GetPersistence(topic utils.TopicName) 
(*utils.PersistenceData,
 }
 
 func (t *topics) GetPersistenceWithContext(ctx context.Context, topic 
utils.TopicName) (*utils.PersistenceData, error) {
-       var persistenceData utils.PersistenceData
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"persistence")
-       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&persistenceData)
-       if body != nil {
-               return &persistenceData, err
-       }
-       return nil, err
+       return t.localTopicPolicies().GetPersistence(ctx, topic, false)
 }
 
 func (t *topics) SetPersistence(topic utils.TopicName, persistenceData 
utils.PersistenceData) error {
@@ -1717,13 +1750,7 @@ func (t *topics) GetDelayedDeliveryWithContext(
        ctx context.Context,
        topic utils.TopicName,
 ) (*utils.DelayedDeliveryData, error) {
-       var delayedDeliveryData utils.DelayedDeliveryData
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"delayedDelivery")
-       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&delayedDeliveryData)
-       if body != nil {
-               return &delayedDeliveryData, err
-       }
-       return nil, err
+       return t.localTopicPolicies().GetDelayedDelivery(ctx, topic, false)
 }
 
 func (t *topics) SetDelayedDelivery(topic utils.TopicName, delayedDeliveryData 
utils.DelayedDeliveryData) error {
@@ -1756,13 +1783,7 @@ func (t *topics) GetDispatchRateWithContext(
        ctx context.Context,
        topic utils.TopicName,
 ) (*utils.DispatchRateData, error) {
-       var dispatchRateData utils.DispatchRateData
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"dispatchRate")
-       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&dispatchRateData)
-       if body != nil {
-               return &dispatchRateData, err
-       }
-       return nil, err
+       return t.localTopicPolicies().GetDispatchRate(ctx, topic, false)
 }
 
 func (t *topics) SetDispatchRate(topic utils.TopicName, dispatchRateData 
utils.DispatchRateData) error {
@@ -1792,13 +1813,7 @@ func (t *topics) GetPublishRate(topic utils.TopicName) 
(*utils.PublishRateData,
 }
 
 func (t *topics) GetPublishRateWithContext(ctx context.Context, topic 
utils.TopicName) (*utils.PublishRateData, error) {
-       var publishRateData utils.PublishRateData
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"publishRate")
-       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&publishRateData)
-       if body != nil {
-               return &publishRateData, err
-       }
-       return nil, err
+       return t.localTopicPolicies().GetPublishRate(ctx, topic, false)
 }
 
 func (t *topics) SetPublishRate(topic utils.TopicName, publishRateData 
utils.PublishRateData) error {
@@ -1828,10 +1843,11 @@ func (t *topics) GetDeduplicationStatus(topic 
utils.TopicName) (bool, error) {
 }
 
 func (t *topics) GetDeduplicationStatusWithContext(ctx context.Context, topic 
utils.TopicName) (bool, error) {
-       var enabled bool
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"deduplicationEnabled")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &enabled)
-       return enabled, err
+       enabled, err := t.localTopicPolicies().GetDeduplicationStatus(ctx, 
topic, false)
+       if err != nil {
+               return false, err
+       }
+       return legacyTopicPolicyBool(enabled), nil
 }
 
 func (t *topics) SetDeduplicationStatus(topic utils.TopicName, enabled bool) 
error {
@@ -1861,15 +1877,7 @@ func (t *topics) GetRetentionWithContext(
        topic utils.TopicName,
        applied bool,
 ) (*utils.RetentionPolicies, error) {
-       var policy utils.RetentionPolicies
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"retention")
-       body, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, 
endpoint, &policy, map[string]string{
-               "applied": strconv.FormatBool(applied),
-       }, true)
-       if body != nil {
-               return &policy, err
-       }
-       return nil, err
+       return t.localTopicPolicies().GetRetention(ctx, topic, applied)
 }
 
 func (t *topics) RemoveRetention(topic utils.TopicName) error {
@@ -1903,12 +1911,11 @@ func (t *topics) GetCompactionThresholdWithContext(
        topic utils.TopicName,
        applied bool,
 ) (int64, error) {
-       var threshold int64 = -1
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"compactionThreshold")
-       _, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, 
&threshold, map[string]string{
-               "applied": strconv.FormatBool(applied),
-       }, true)
-       return threshold, err
+       threshold, err := t.localTopicPolicies().GetCompactionThreshold(ctx, 
topic, applied)
+       if err != nil {
+               return -1, err
+       }
+       return legacyTopicPolicyInt64(threshold), nil
 }
 
 func (t *topics) SetCompactionThreshold(topic utils.TopicName, threshold 
int64) error {
@@ -1942,13 +1949,7 @@ func (t *topics) GetBacklogQuotaMapWithContext(
        applied bool,
 ) (map[utils.BacklogQuotaType]utils.BacklogQuota,
        error) {
-       var backlogQuotaMap map[utils.BacklogQuotaType]utils.BacklogQuota
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"backlogQuotaMap")
-
-       queryParams := map[string]string{"applied": strconv.FormatBool(applied)}
-       _, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, 
&backlogQuotaMap, queryParams, true)
-
-       return backlogQuotaMap, err
+       return t.localTopicPolicies().GetBacklogQuotaMap(ctx, topic, applied)
 }
 
 func (t *topics) SetBacklogQuota(topic utils.TopicName, backlogQuota 
utils.BacklogQuota,
@@ -1988,12 +1989,14 @@ func (t *topics) GetInactiveTopicPoliciesWithContext(
        topic utils.TopicName,
        applied bool,
 ) (utils.InactiveTopicPolicies, error) {
-       var out utils.InactiveTopicPolicies
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"inactiveTopicPolicies")
-       _, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, 
&out, map[string]string{
-               "applied": strconv.FormatBool(applied),
-       }, true)
-       return out, err
+       policies, err := t.localTopicPolicies().GetInactiveTopicPolicies(ctx, 
topic, applied)
+       if err != nil {
+               return utils.InactiveTopicPolicies{}, err
+       }
+       if policies == nil {
+               return utils.InactiveTopicPolicies{}, nil
+       }
+       return *policies, nil
 }
 
 func (t *topics) RemoveInactiveTopicPolicies(topic utils.TopicName) error {
@@ -2032,10 +2035,7 @@ func (t *topics) GetReplicationClusters(topic 
utils.TopicName) ([]string, error)
 }
 
 func (t *topics) GetReplicationClustersWithContext(ctx context.Context, topic 
utils.TopicName) ([]string, error) {
-       var data []string
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"replication")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &data)
-       return data, err
+       return t.localTopicPolicies().GetReplicationClusters(ctx, topic, false)
 }
 
 func (t *topics) RemoveReplicationClusters(topic utils.TopicName) error {
@@ -2052,13 +2052,7 @@ func (t *topics) GetSubscribeRate(topic utils.TopicName) 
(*utils.SubscribeRate,
 }
 
 func (t *topics) GetSubscribeRateWithContext(ctx context.Context, topic 
utils.TopicName) (*utils.SubscribeRate, error) {
-       var subscribeRate utils.SubscribeRate
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"subscribeRate")
-       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&subscribeRate)
-       if body != nil {
-               return &subscribeRate, err
-       }
-       return nil, err
+       return t.localTopicPolicies().GetSubscribeRate(ctx, topic, false)
 }
 
 func (t *topics) SetSubscribeRate(topic utils.TopicName, subscribeRate 
utils.SubscribeRate) error {
@@ -2091,13 +2085,7 @@ func (t *topics) GetSubscriptionDispatchRateWithContext(
        ctx context.Context,
        topic utils.TopicName,
 ) (*utils.DispatchRateData, error) {
-       var dispatchRate utils.DispatchRateData
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"subscriptionDispatchRate")
-       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&dispatchRate)
-       if body != nil {
-               return &dispatchRate, err
-       }
-       return nil, err
+       return t.localTopicPolicies().GetSubscriptionDispatchRate(ctx, topic, 
false)
 }
 
 func (t *topics) SetSubscriptionDispatchRate(topic utils.TopicName, 
dispatchRate utils.DispatchRateData) error {
@@ -2127,10 +2115,11 @@ func (t *topics) GetMaxConsumersPerSubscription(topic 
utils.TopicName) (int, err
 }
 
 func (t *topics) GetMaxConsumersPerSubscriptionWithContext(ctx 
context.Context, topic utils.TopicName) (int, error) {
-       var maxConsumers = -1
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxConsumersPerSubscription")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxConsumers)
-       return maxConsumers, err
+       maxConsumers, err := 
t.localTopicPolicies().GetMaxConsumersPerSubscription(ctx, topic, false)
+       if err != nil {
+               return -1, err
+       }
+       return legacyTopicPolicyInt(maxConsumers), nil
 }
 
 func (t *topics) SetMaxConsumersPerSubscription(topic utils.TopicName, 
maxConsumers int) error {
@@ -2160,10 +2149,11 @@ func (t *topics) GetMaxMessageSize(topic 
utils.TopicName) (int, error) {
 }
 
 func (t *topics) GetMaxMessageSizeWithContext(ctx context.Context, topic 
utils.TopicName) (int, error) {
-       var maxMessageSize = -1
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxMessageSize")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxMessageSize)
-       return maxMessageSize, err
+       maxMessageSize, err := t.localTopicPolicies().GetMaxMessageSize(ctx, 
topic, false)
+       if err != nil {
+               return -1, err
+       }
+       return legacyTopicPolicyInt(maxMessageSize), nil
 }
 
 func (t *topics) SetMaxMessageSize(topic utils.TopicName, maxMessageSize int) 
error {
@@ -2189,10 +2179,11 @@ func (t *topics) GetMaxSubscriptionsPerTopic(topic 
utils.TopicName) (int, error)
 }
 
 func (t *topics) GetMaxSubscriptionsPerTopicWithContext(ctx context.Context, 
topic utils.TopicName) (int, error) {
-       var maxSubscriptions = -1
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxSubscriptionsPerTopic")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxSubscriptions)
-       return maxSubscriptions, err
+       maxSubscriptions, err := 
t.localTopicPolicies().GetMaxSubscriptionsPerTopic(ctx, topic, false)
+       if err != nil {
+               return -1, err
+       }
+       return legacyTopicPolicyInt(maxSubscriptions), nil
 }
 
 func (t *topics) SetMaxSubscriptionsPerTopic(topic utils.TopicName, 
maxSubscriptions int) error {
@@ -2222,10 +2213,11 @@ func (t *topics) GetSchemaValidationEnforced(topic 
utils.TopicName) (bool, error
 }
 
 func (t *topics) GetSchemaValidationEnforcedWithContext(ctx context.Context, 
topic utils.TopicName) (bool, error) {
-       var enabled bool
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"schemaValidationEnforced")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &enabled)
-       return enabled, err
+       enabled, err := t.localTopicPolicies().GetSchemaValidationEnforced(ctx, 
topic, false)
+       if err != nil {
+               return false, err
+       }
+       return legacyTopicPolicyBool(enabled), nil
 }
 
 func (t *topics) SetSchemaValidationEnforced(topic utils.TopicName, enabled 
bool) error {
@@ -2255,10 +2247,11 @@ func (t *topics) GetDeduplicationSnapshotInterval(topic 
utils.TopicName) (int, e
 }
 
 func (t *topics) GetDeduplicationSnapshotIntervalWithContext(ctx 
context.Context, topic utils.TopicName) (int, error) {
-       var interval = -1
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"deduplicationSnapshotInterval")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &interval)
-       return interval, err
+       interval, err := 
t.localTopicPolicies().GetDeduplicationSnapshotInterval(ctx, topic, false)
+       if err != nil {
+               return -1, err
+       }
+       return legacyTopicPolicyInt(interval), nil
 }
 
 func (t *topics) SetDeduplicationSnapshotInterval(topic utils.TopicName, 
interval int) error {
@@ -2291,13 +2284,7 @@ func (t *topics) GetReplicatorDispatchRateWithContext(
        ctx context.Context,
        topic utils.TopicName,
 ) (*utils.DispatchRateData, error) {
-       var dispatchRate utils.DispatchRateData
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"replicatorDispatchRate")
-       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&dispatchRate)
-       if body != nil {
-               return &dispatchRate, err
-       }
-       return nil, err
+       return t.localTopicPolicies().GetReplicatorDispatchRate(ctx, topic, 
false)
 }
 
 func (t *topics) SetReplicatorDispatchRate(topic utils.TopicName, dispatchRate 
utils.DispatchRateData) error {
@@ -2330,13 +2317,7 @@ func (t *topics) GetAutoSubscriptionCreationWithContext(
        ctx context.Context,
        topic utils.TopicName,
 ) (*utils.AutoSubscriptionCreationOverride, error) {
-       var autoSubCreation utils.AutoSubscriptionCreationOverride
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"autoSubscriptionCreation")
-       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&autoSubCreation)
-       if body != nil {
-               return &autoSubCreation, err
-       }
-       return nil, err
+       return t.localTopicPolicies().GetAutoSubscriptionCreation(ctx, topic, 
false)
 }
 
 func (t *topics) SetAutoSubscriptionCreation(topic utils.TopicName,
@@ -2382,14 +2363,11 @@ func (t *topics) 
GetSchemaCompatibilityStrategyAppliedWithContext(
        topic utils.TopicName,
        applied bool,
 ) (utils.SchemaCompatibilityStrategy, error) {
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"schemaCompatibilityStrategy")
-       body, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, 
endpoint, nil, map[string]string{
-               "applied": strconv.FormatBool(applied),
-       }, false)
+       strategy, err := 
t.localTopicPolicies().GetSchemaCompatibilityStrategy(ctx, topic, applied)
        if err != nil {
                return "", err
        }
-       return parseTopicSchemaCompatibilityStrategy(body)
+       return legacyTopicSchemaCompatibilityStrategy(strategy), nil
 }
 
 func (t *topics) SetSchemaCompatibilityStrategy(topic utils.TopicName,
@@ -2429,13 +2407,7 @@ func (t *topics) GetOffloadPoliciesWithContext(
        ctx context.Context,
        topic utils.TopicName,
 ) (*utils.OffloadPolicies, error) {
-       var offloadPolicies utils.OffloadPolicies
-       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"offloadPolicies")
-       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&offloadPolicies)
-       if body != nil {
-               return &offloadPolicies, err
-       }
-       return nil, err
+       return t.localTopicPolicies().GetOffloadPolicies(ctx, topic, false)
 }
 
 func (t *topics) SetOffloadPolicies(topic utils.TopicName, offloadPolicies 
utils.OffloadPolicies) error {
diff --git a/pulsaradmin/pkg/admin/topic_policies.go 
b/pulsaradmin/pkg/admin/topic_policies.go
new file mode 100644
index 00000000..a754e428
--- /dev/null
+++ b/pulsaradmin/pkg/admin/topic_policies.go
@@ -0,0 +1,928 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package admin
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "strconv"
+       "strings"
+
+       "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
+)
+
+// TopicPolicies is the scoped admin interface for topic-level policies.
+type TopicPolicies interface {
+       GetMessageTTL(context.Context, utils.TopicName, bool) (*int, error)
+       SetMessageTTL(context.Context, utils.TopicName, int) error
+       RemoveMessageTTL(context.Context, utils.TopicName) error
+
+       GetMaxProducers(context.Context, utils.TopicName, bool) (*int, error)
+       SetMaxProducers(context.Context, utils.TopicName, int) error
+       RemoveMaxProducers(context.Context, utils.TopicName) error
+
+       GetMaxConsumers(context.Context, utils.TopicName, bool) (*int, error)
+       SetMaxConsumers(context.Context, utils.TopicName, int) error
+       RemoveMaxConsumers(context.Context, utils.TopicName) error
+
+       GetMaxUnackMessagesPerConsumer(context.Context, utils.TopicName, bool) 
(*int, error)
+       SetMaxUnackMessagesPerConsumer(context.Context, utils.TopicName, int) 
error
+       RemoveMaxUnackMessagesPerConsumer(context.Context, utils.TopicName) 
error
+
+       GetMaxUnackMessagesPerSubscription(context.Context, utils.TopicName, 
bool) (*int, error)
+       SetMaxUnackMessagesPerSubscription(context.Context, utils.TopicName, 
int) error
+       RemoveMaxUnackMessagesPerSubscription(context.Context, utils.TopicName) 
error
+
+       GetPersistence(context.Context, utils.TopicName, bool) 
(*utils.PersistenceData, error)
+       SetPersistence(context.Context, utils.TopicName, utils.PersistenceData) 
error
+       RemovePersistence(context.Context, utils.TopicName) error
+
+       GetDelayedDelivery(context.Context, utils.TopicName, bool) 
(*utils.DelayedDeliveryData, error)
+       SetDelayedDelivery(context.Context, utils.TopicName, 
utils.DelayedDeliveryData) error
+       RemoveDelayedDelivery(context.Context, utils.TopicName) error
+
+       GetDispatchRate(context.Context, utils.TopicName, bool) 
(*utils.DispatchRateData, error)
+       SetDispatchRate(context.Context, utils.TopicName, 
utils.DispatchRateData) error
+       RemoveDispatchRate(context.Context, utils.TopicName) error
+
+       GetPublishRate(context.Context, utils.TopicName, bool) 
(*utils.PublishRateData, error)
+       SetPublishRate(context.Context, utils.TopicName, utils.PublishRateData) 
error
+       RemovePublishRate(context.Context, utils.TopicName) error
+
+       GetDeduplicationStatus(context.Context, utils.TopicName, bool) (*bool, 
error)
+       SetDeduplicationStatus(context.Context, utils.TopicName, bool) error
+       RemoveDeduplicationStatus(context.Context, utils.TopicName) error
+
+       GetRetention(context.Context, utils.TopicName, bool) 
(*utils.RetentionPolicies, error)
+       SetRetention(context.Context, utils.TopicName, utils.RetentionPolicies) 
error
+       RemoveRetention(context.Context, utils.TopicName) error
+
+       GetCompactionThreshold(context.Context, utils.TopicName, bool) (*int64, 
error)
+       SetCompactionThreshold(context.Context, utils.TopicName, int64) error
+       RemoveCompactionThreshold(context.Context, utils.TopicName) error
+
+       GetBacklogQuotaMap(context.Context, utils.TopicName, bool) 
(map[utils.BacklogQuotaType]utils.BacklogQuota, error)
+       SetBacklogQuota(context.Context, utils.TopicName, utils.BacklogQuota, 
utils.BacklogQuotaType) error
+       RemoveBacklogQuota(context.Context, utils.TopicName, 
utils.BacklogQuotaType) error
+
+       GetInactiveTopicPolicies(context.Context, utils.TopicName, bool) 
(*utils.InactiveTopicPolicies, error)
+       SetInactiveTopicPolicies(context.Context, utils.TopicName, 
utils.InactiveTopicPolicies) error
+       RemoveInactiveTopicPolicies(context.Context, utils.TopicName) error
+
+       GetReplicationClusters(context.Context, utils.TopicName, bool) 
([]string, error)
+       SetReplicationClusters(context.Context, utils.TopicName, []string) error
+       RemoveReplicationClusters(context.Context, utils.TopicName) error
+
+       GetSubscribeRate(context.Context, utils.TopicName, bool) 
(*utils.SubscribeRate, error)
+       SetSubscribeRate(context.Context, utils.TopicName, utils.SubscribeRate) 
error
+       RemoveSubscribeRate(context.Context, utils.TopicName) error
+
+       GetSubscriptionDispatchRate(context.Context, utils.TopicName, bool) 
(*utils.DispatchRateData, error)
+       SetSubscriptionDispatchRate(context.Context, utils.TopicName, 
utils.DispatchRateData) error
+       RemoveSubscriptionDispatchRate(context.Context, utils.TopicName) error
+
+       GetMaxConsumersPerSubscription(context.Context, utils.TopicName, bool) 
(*int, error)
+       SetMaxConsumersPerSubscription(context.Context, utils.TopicName, int) 
error
+       RemoveMaxConsumersPerSubscription(context.Context, utils.TopicName) 
error
+
+       GetMaxMessageSize(context.Context, utils.TopicName, bool) (*int, error)
+       SetMaxMessageSize(context.Context, utils.TopicName, int) error
+       RemoveMaxMessageSize(context.Context, utils.TopicName) error
+
+       GetMaxSubscriptionsPerTopic(context.Context, utils.TopicName, bool) 
(*int, error)
+       SetMaxSubscriptionsPerTopic(context.Context, utils.TopicName, int) error
+       RemoveMaxSubscriptionsPerTopic(context.Context, utils.TopicName) error
+
+       GetSchemaValidationEnforced(context.Context, utils.TopicName, bool) 
(*bool, error)
+       SetSchemaValidationEnforced(context.Context, utils.TopicName, bool) 
error
+       RemoveSchemaValidationEnforced(context.Context, utils.TopicName) error
+
+       GetDeduplicationSnapshotInterval(context.Context, utils.TopicName, 
bool) (*int, error)
+       SetDeduplicationSnapshotInterval(context.Context, utils.TopicName, int) 
error
+       RemoveDeduplicationSnapshotInterval(context.Context, utils.TopicName) 
error
+
+       GetReplicatorDispatchRate(context.Context, utils.TopicName, bool) 
(*utils.DispatchRateData, error)
+       SetReplicatorDispatchRate(context.Context, utils.TopicName, 
utils.DispatchRateData) error
+       RemoveReplicatorDispatchRate(context.Context, utils.TopicName) error
+
+       GetOffloadPolicies(context.Context, utils.TopicName, bool) 
(*utils.OffloadPolicies, error)
+       SetOffloadPolicies(context.Context, utils.TopicName, 
utils.OffloadPolicies) error
+       RemoveOffloadPolicies(context.Context, utils.TopicName) error
+
+       GetAutoSubscriptionCreation(context.Context, utils.TopicName, bool) 
(*utils.AutoSubscriptionCreationOverride, error)
+       SetAutoSubscriptionCreation(context.Context, utils.TopicName, 
utils.AutoSubscriptionCreationOverride) error
+       RemoveAutoSubscriptionCreation(context.Context, utils.TopicName) error
+
+       GetSchemaCompatibilityStrategy(context.Context, utils.TopicName, bool) 
(*utils.SchemaCompatibilityStrategy, error)
+       SetSchemaCompatibilityStrategy(context.Context, utils.TopicName, 
utils.SchemaCompatibilityStrategy) error
+       RemoveSchemaCompatibilityStrategy(context.Context, utils.TopicName) 
error
+}
+
+// TopicPoliciesProvider provides access to scoped topic policy clients.
+type TopicPoliciesProvider interface {
+       TopicPolicies(isGlobal bool) TopicPolicies
+}
+
+// TopicPoliciesOf returns a scoped topic policy client when the provided 
admin client supports it.
+func TopicPoliciesOf(client Client, isGlobal bool) (TopicPolicies, error) {
+       provider, ok := client.(TopicPoliciesProvider)
+       if !ok {
+               return nil, fmt.Errorf("admin client does not implement 
TopicPoliciesProvider")
+       }
+       return provider.TopicPolicies(isGlobal), nil
+}
+
+type topicPolicies struct {
+       pulsar   *pulsarClient
+       basePath string
+       isGlobal bool
+}
+
+var _ TopicPolicies = &topicPolicies{}
+var _ TopicPoliciesProvider = &pulsarClient{}
+
+func (c *pulsarClient) TopicPolicies(isGlobal bool) TopicPolicies {
+       return &topicPolicies{
+               pulsar:   c,
+               basePath: "",
+               isGlobal: isGlobal,
+       }
+}
+
+func (t *topicPolicies) topicEndpoint(topic utils.TopicName, parts ...string) 
string {
+       allParts := make([]string, 0, len(parts)+1)
+       allParts = append(allParts, topic.GetRestPath())
+       allParts = append(allParts, parts...)
+       return t.pulsar.endpoint(t.basePath, allParts...)
+}
+
+func (t *topicPolicies) scopedQueryParams(params map[string]string) 
map[string]string {
+       if !t.isGlobal && len(params) == 0 {
+               return nil
+       }
+
+       out := make(map[string]string, len(params)+1)
+       for key, value := range params {
+               out[key] = value
+       }
+       if t.isGlobal {
+               out["isGlobal"] = "true"
+       }
+       return out
+}
+
+func (t *topicPolicies) appliedQueryParams(applied bool) map[string]string {
+       return t.scopedQueryParams(map[string]string{
+               "applied": strconv.FormatBool(applied),
+       })
+}
+
+func (t *topicPolicies) readPolicyBodyWithContext(
+       ctx context.Context,
+       endpoint string,
+       applied bool,
+) ([]byte, error) {
+       return t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, 
nil, t.appliedQueryParams(applied), false)
+}
+
+func (t *topicPolicies) scopedPostWithContext(
+       ctx context.Context,
+       endpoint string,
+       body interface{},
+       params map[string]string,
+) error {
+       return t.pulsar.Client.PostWithQueryParamsWithContext(ctx, endpoint, 
body, t.scopedQueryParams(params))
+}
+
+func (t *topicPolicies) scopedPutWithContext(
+       ctx context.Context,
+       endpoint string,
+       body interface{},
+       params map[string]string,
+) error {
+       return t.pulsar.Client.PutWithQueryParamsWithContext(ctx, endpoint, 
body, nil, t.scopedQueryParams(params))
+}
+
+func (t *topicPolicies) scopedDeleteWithContext(
+       ctx context.Context,
+       endpoint string,
+       params map[string]string,
+) error {
+       return t.pulsar.Client.DeleteWithQueryParamsWithContext(ctx, endpoint, 
t.scopedQueryParams(params))
+}
+
+func decodeOptionalJSON[T any](body []byte) (*T, error) {
+       if isUnsetPolicyBody(body) {
+               return nil, nil
+       }
+
+       var out T
+       if err := json.Unmarshal(body, &out); err != nil {
+               return nil, err
+       }
+       return &out, nil
+}
+
+func decodeOptionalSchemaCompatibilityStrategy(body []byte) 
(*utils.SchemaCompatibilityStrategy, error) {
+       if isUnsetPolicyBody(body) {
+               return nil, nil
+       }
+
+       raw := strings.TrimSpace(string(body))
+       raw = strings.Trim(raw, "\"")
+       if raw == "" {
+               return nil, nil
+       }
+
+       strategy, err := utils.ParseSchemaCompatibilityStrategy(raw)
+       if err != nil {
+               return nil, err
+       }
+       return &strategy, nil
+}
+
+func isUnsetPolicyBody(body []byte) bool {
+       trimmed := strings.TrimSpace(string(body))
+       return trimmed == "" || trimmed == "null"
+}
+
+func (t *topicPolicies) GetMessageTTL(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*int, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"messageTTL"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMessageTTL(ctx context.Context, topic 
utils.TopicName, messageTTL int) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"messageTTL"), nil, map[string]string{
+               "messageTTL": strconv.Itoa(messageTTL),
+       })
+}
+
+func (t *topicPolicies) RemoveMessageTTL(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"messageTTL"), map[string]string{
+               "messageTTL": strconv.Itoa(0),
+       })
+}
+
+func (t *topicPolicies) GetMaxProducers(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*int, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"maxProducers"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxProducers(ctx context.Context, topic 
utils.TopicName, maxProducers int) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"maxProducers"), &maxProducers, nil)
+}
+
+func (t *topicPolicies) RemoveMaxProducers(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"maxProducers"), nil)
+}
+
+func (t *topicPolicies) GetMaxConsumers(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*int, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"maxConsumers"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxConsumers(ctx context.Context, topic 
utils.TopicName, maxConsumers int) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"maxConsumers"), &maxConsumers, nil)
+}
+
+func (t *topicPolicies) RemoveMaxConsumers(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"maxConsumers"), nil)
+}
+
+func (t *topicPolicies) GetMaxUnackMessagesPerConsumer(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*int, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"maxUnackedMessagesOnConsumer"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxUnackMessagesPerConsumer(
+       ctx context.Context,
+       topic utils.TopicName,
+       maxUnackedNum int,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"maxUnackedMessagesOnConsumer"), &maxUnackedNum, nil)
+}
+
+func (t *topicPolicies) RemoveMaxUnackMessagesPerConsumer(
+       ctx context.Context,
+       topic utils.TopicName,
+) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"maxUnackedMessagesOnConsumer"), nil)
+}
+
+func (t *topicPolicies) GetMaxUnackMessagesPerSubscription(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*int, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"maxUnackedMessagesOnSubscription"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxUnackMessagesPerSubscription(
+       ctx context.Context,
+       topic utils.TopicName,
+       maxUnackedNum int,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"maxUnackedMessagesOnSubscription"), &maxUnackedNum, nil)
+}
+
+func (t *topicPolicies) RemoveMaxUnackMessagesPerSubscription(
+       ctx context.Context,
+       topic utils.TopicName,
+) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"maxUnackedMessagesOnSubscription"), nil)
+}
+
+func (t *topicPolicies) GetPersistence(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*utils.PersistenceData, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"persistence"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[utils.PersistenceData](body)
+}
+
+func (t *topicPolicies) SetPersistence(
+       ctx context.Context,
+       topic utils.TopicName,
+       persistenceData utils.PersistenceData,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"persistence"), &persistenceData, nil)
+}
+
+func (t *topicPolicies) RemovePersistence(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"persistence"), nil)
+}
+
+func (t *topicPolicies) GetDelayedDelivery(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*utils.DelayedDeliveryData, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"delayedDelivery"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[utils.DelayedDeliveryData](body)
+}
+
+func (t *topicPolicies) SetDelayedDelivery(
+       ctx context.Context,
+       topic utils.TopicName,
+       delayedDeliveryData utils.DelayedDeliveryData,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"delayedDelivery"), &delayedDeliveryData, nil)
+}
+
+func (t *topicPolicies) RemoveDelayedDelivery(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"delayedDelivery"), nil)
+}
+
+func (t *topicPolicies) GetDispatchRate(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*utils.DispatchRateData, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"dispatchRate"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[utils.DispatchRateData](body)
+}
+
+func (t *topicPolicies) SetDispatchRate(
+       ctx context.Context,
+       topic utils.TopicName,
+       dispatchRateData utils.DispatchRateData,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"dispatchRate"), &dispatchRateData, nil)
+}
+
+func (t *topicPolicies) RemoveDispatchRate(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"dispatchRate"), nil)
+}
+
+func (t *topicPolicies) GetPublishRate(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*utils.PublishRateData, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"publishRate"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[utils.PublishRateData](body)
+}
+
+func (t *topicPolicies) SetPublishRate(
+       ctx context.Context,
+       topic utils.TopicName,
+       publishRateData utils.PublishRateData,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"publishRate"), &publishRateData, nil)
+}
+
+func (t *topicPolicies) RemovePublishRate(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"publishRate"), nil)
+}
+
+func (t *topicPolicies) GetDeduplicationStatus(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*bool, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"deduplicationEnabled"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[bool](body)
+}
+
+func (t *topicPolicies) SetDeduplicationStatus(
+       ctx context.Context,
+       topic utils.TopicName,
+       enabled bool,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"deduplicationEnabled"), enabled, nil)
+}
+
+func (t *topicPolicies) RemoveDeduplicationStatus(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"deduplicationEnabled"), nil)
+}
+
+func (t *topicPolicies) GetRetention(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*utils.RetentionPolicies, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"retention"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[utils.RetentionPolicies](body)
+}
+
+func (t *topicPolicies) SetRetention(
+       ctx context.Context,
+       topic utils.TopicName,
+       data utils.RetentionPolicies,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"retention"), data, nil)
+}
+
+func (t *topicPolicies) RemoveRetention(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"retention"), nil)
+}
+
+func (t *topicPolicies) GetCompactionThreshold(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*int64, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"compactionThreshold"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[int64](body)
+}
+
+func (t *topicPolicies) SetCompactionThreshold(
+       ctx context.Context,
+       topic utils.TopicName,
+       threshold int64,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"compactionThreshold"), threshold, nil)
+}
+
+func (t *topicPolicies) RemoveCompactionThreshold(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"compactionThreshold"), nil)
+}
+
+func (t *topicPolicies) GetBacklogQuotaMap(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (map[utils.BacklogQuotaType]utils.BacklogQuota, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"backlogQuotaMap"), applied)
+       if err != nil {
+               return nil, err
+       }
+
+       decoded, err := 
decodeOptionalJSON[map[utils.BacklogQuotaType]utils.BacklogQuota](body)
+       if err != nil || decoded == nil {
+               return nil, err
+       }
+       return *decoded, nil
+}
+
+func (t *topicPolicies) SetBacklogQuota(
+       ctx context.Context,
+       topic utils.TopicName,
+       backlogQuota utils.BacklogQuota,
+       backlogQuotaType utils.BacklogQuotaType,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"backlogQuota"), &backlogQuota, map[string]string{
+               "backlogQuotaType": string(backlogQuotaType),
+       })
+}
+
+func (t *topicPolicies) RemoveBacklogQuota(
+       ctx context.Context,
+       topic utils.TopicName,
+       backlogQuotaType utils.BacklogQuotaType,
+) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"backlogQuota"), map[string]string{
+               "backlogQuotaType": string(backlogQuotaType),
+       })
+}
+
+func (t *topicPolicies) GetInactiveTopicPolicies(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*utils.InactiveTopicPolicies, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"inactiveTopicPolicies"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[utils.InactiveTopicPolicies](body)
+}
+
+func (t *topicPolicies) SetInactiveTopicPolicies(
+       ctx context.Context,
+       topic utils.TopicName,
+       data utils.InactiveTopicPolicies,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"inactiveTopicPolicies"), data, nil)
+}
+
+func (t *topicPolicies) RemoveInactiveTopicPolicies(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"inactiveTopicPolicies"), nil)
+}
+
+func (t *topicPolicies) GetReplicationClusters(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) ([]string, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"replication"), applied)
+       if err != nil {
+               return nil, err
+       }
+
+       decoded, err := decodeOptionalJSON[[]string](body)
+       if err != nil || decoded == nil {
+               return nil, err
+       }
+       return *decoded, nil
+}
+
+func (t *topicPolicies) SetReplicationClusters(
+       ctx context.Context,
+       topic utils.TopicName,
+       data []string,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"replication"), data, nil)
+}
+
+func (t *topicPolicies) RemoveReplicationClusters(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"replication"), nil)
+}
+
+func (t *topicPolicies) GetSubscribeRate(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*utils.SubscribeRate, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"subscribeRate"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[utils.SubscribeRate](body)
+}
+
+func (t *topicPolicies) SetSubscribeRate(
+       ctx context.Context,
+       topic utils.TopicName,
+       subscribeRate utils.SubscribeRate,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"subscribeRate"), &subscribeRate, nil)
+}
+
+func (t *topicPolicies) RemoveSubscribeRate(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"subscribeRate"), nil)
+}
+
+func (t *topicPolicies) GetSubscriptionDispatchRate(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*utils.DispatchRateData, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"subscriptionDispatchRate"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[utils.DispatchRateData](body)
+}
+
+func (t *topicPolicies) SetSubscriptionDispatchRate(
+       ctx context.Context,
+       topic utils.TopicName,
+       dispatchRate utils.DispatchRateData,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"subscriptionDispatchRate"), &dispatchRate, nil)
+}
+
+func (t *topicPolicies) RemoveSubscriptionDispatchRate(
+       ctx context.Context,
+       topic utils.TopicName,
+) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"subscriptionDispatchRate"), nil)
+}
+
+func (t *topicPolicies) GetMaxConsumersPerSubscription(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*int, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"maxConsumersPerSubscription"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxConsumersPerSubscription(
+       ctx context.Context,
+       topic utils.TopicName,
+       maxConsumers int,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"maxConsumersPerSubscription"), &maxConsumers, nil)
+}
+
+func (t *topicPolicies) RemoveMaxConsumersPerSubscription(
+       ctx context.Context,
+       topic utils.TopicName,
+) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"maxConsumersPerSubscription"), nil)
+}
+
+func (t *topicPolicies) GetMaxMessageSize(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*int, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"maxMessageSize"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxMessageSize(
+       ctx context.Context,
+       topic utils.TopicName,
+       maxMessageSize int,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"maxMessageSize"), &maxMessageSize, nil)
+}
+
+func (t *topicPolicies) RemoveMaxMessageSize(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"maxMessageSize"), nil)
+}
+
+func (t *topicPolicies) GetMaxSubscriptionsPerTopic(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*int, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"maxSubscriptionsPerTopic"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetMaxSubscriptionsPerTopic(
+       ctx context.Context,
+       topic utils.TopicName,
+       maxSubscriptions int,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"maxSubscriptionsPerTopic"), &maxSubscriptions, nil)
+}
+
+func (t *topicPolicies) RemoveMaxSubscriptionsPerTopic(
+       ctx context.Context,
+       topic utils.TopicName,
+) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"maxSubscriptionsPerTopic"), nil)
+}
+
+func (t *topicPolicies) GetSchemaValidationEnforced(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*bool, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"schemaValidationEnforced"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[bool](body)
+}
+
+func (t *topicPolicies) SetSchemaValidationEnforced(
+       ctx context.Context,
+       topic utils.TopicName,
+       enabled bool,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"schemaValidationEnforced"), enabled, nil)
+}
+
+func (t *topicPolicies) RemoveSchemaValidationEnforced(
+       ctx context.Context,
+       topic utils.TopicName,
+) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"schemaValidationEnforced"), nil)
+}
+
+func (t *topicPolicies) GetDeduplicationSnapshotInterval(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*int, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"deduplicationSnapshotInterval"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[int](body)
+}
+
+func (t *topicPolicies) SetDeduplicationSnapshotInterval(
+       ctx context.Context,
+       topic utils.TopicName,
+       interval int,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"deduplicationSnapshotInterval"), &interval, nil)
+}
+
+func (t *topicPolicies) RemoveDeduplicationSnapshotInterval(
+       ctx context.Context,
+       topic utils.TopicName,
+) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"deduplicationSnapshotInterval"), nil)
+}
+
+func (t *topicPolicies) GetReplicatorDispatchRate(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*utils.DispatchRateData, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"replicatorDispatchRate"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[utils.DispatchRateData](body)
+}
+
+func (t *topicPolicies) SetReplicatorDispatchRate(
+       ctx context.Context,
+       topic utils.TopicName,
+       dispatchRate utils.DispatchRateData,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"replicatorDispatchRate"), &dispatchRate, nil)
+}
+
+func (t *topicPolicies) RemoveReplicatorDispatchRate(
+       ctx context.Context,
+       topic utils.TopicName,
+) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"replicatorDispatchRate"), nil)
+}
+
+func (t *topicPolicies) GetOffloadPolicies(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*utils.OffloadPolicies, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"offloadPolicies"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[utils.OffloadPolicies](body)
+}
+
+func (t *topicPolicies) SetOffloadPolicies(
+       ctx context.Context,
+       topic utils.TopicName,
+       offloadPolicies utils.OffloadPolicies,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"offloadPolicies"), &offloadPolicies, nil)
+}
+
+func (t *topicPolicies) RemoveOffloadPolicies(ctx context.Context, topic 
utils.TopicName) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"offloadPolicies"), nil)
+}
+
+func (t *topicPolicies) GetAutoSubscriptionCreation(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*utils.AutoSubscriptionCreationOverride, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"autoSubscriptionCreation"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalJSON[utils.AutoSubscriptionCreationOverride](body)
+}
+
+func (t *topicPolicies) SetAutoSubscriptionCreation(
+       ctx context.Context,
+       topic utils.TopicName,
+       autoSubCreation utils.AutoSubscriptionCreationOverride,
+) error {
+       return t.scopedPostWithContext(ctx, t.topicEndpoint(topic, 
"autoSubscriptionCreation"), &autoSubCreation, nil)
+}
+
+func (t *topicPolicies) RemoveAutoSubscriptionCreation(
+       ctx context.Context,
+       topic utils.TopicName,
+) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"autoSubscriptionCreation"), nil)
+}
+
+func (t *topicPolicies) GetSchemaCompatibilityStrategy(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (*utils.SchemaCompatibilityStrategy, error) {
+       body, err := t.readPolicyBodyWithContext(ctx, t.topicEndpoint(topic, 
"schemaCompatibilityStrategy"), applied)
+       if err != nil {
+               return nil, err
+       }
+       return decodeOptionalSchemaCompatibilityStrategy(body)
+}
+
+func (t *topicPolicies) SetSchemaCompatibilityStrategy(
+       ctx context.Context,
+       topic utils.TopicName,
+       strategy utils.SchemaCompatibilityStrategy,
+) error {
+       return t.scopedPutWithContext(ctx, t.topicEndpoint(topic, 
"schemaCompatibilityStrategy"), strategy, nil)
+}
+
+func (t *topicPolicies) RemoveSchemaCompatibilityStrategy(
+       ctx context.Context,
+       topic utils.TopicName,
+) error {
+       return t.scopedDeleteWithContext(ctx, t.topicEndpoint(topic, 
"schemaCompatibilityStrategy"), nil)
+}
diff --git a/pulsaradmin/pkg/admin/topic_policies_test.go 
b/pulsaradmin/pkg/admin/topic_policies_test.go
new file mode 100644
index 00000000..b6eac691
--- /dev/null
+++ b/pulsaradmin/pkg/admin/topic_policies_test.go
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package admin
+
+import (
+       "context"
+       "io"
+       "net/http"
+       "net/http/httptest"
+       "net/url"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
+       "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
+)
+
+type topicPolicyRequest struct {
+       method string
+       path   string
+       query  url.Values
+       body   string
+}
+
+func newTopicPolicyTestClient(t *testing.T, handler http.HandlerFunc) (Client, 
*pulsarClient) {
+       t.Helper()
+
+       server := httptest.NewServer(handler)
+       t.Cleanup(server.Close)
+
+       client, err := New(&config.Config{WebServiceURL: server.URL})
+       require.NoError(t, err)
+
+       pulsarClient, ok := client.(*pulsarClient)
+       require.True(t, ok)
+       return client, pulsarClient
+}
+
+func mustTopicName(t *testing.T, topic string) *utils.TopicName {
+       t.Helper()
+
+       topicName, err := utils.GetTopicName(topic)
+       require.NoError(t, err)
+       return topicName
+}
+
+func TestTopicPoliciesOf(t *testing.T) {
+       client, err := New(&config.Config{})
+       require.NoError(t, err)
+
+       local, err := TopicPoliciesOf(client, false)
+       require.NoError(t, err)
+       require.NotNil(t, local)
+       assert.False(t, local.(*topicPolicies).isGlobal)
+
+       global, err := TopicPoliciesOf(client, true)
+       require.NoError(t, err)
+       require.NotNil(t, global)
+       assert.True(t, global.(*topicPolicies).isGlobal)
+}
+
+func TestTopicPoliciesScopeAndAppliedParams(t *testing.T) {
+       requests := make([]topicPolicyRequest, 0, 4)
+       client, pulsarClient := newTopicPolicyTestClient(t, func(w 
http.ResponseWriter, r *http.Request) {
+               body, err := io.ReadAll(r.Body)
+               require.NoError(t, err)
+
+               requests = append(requests, topicPolicyRequest{
+                       method: r.Method,
+                       path:   r.URL.EscapedPath(),
+                       query:  r.URL.Query(),
+                       body:   string(body),
+               })
+
+               switch {
+               case r.Method == http.MethodGet:
+                       _, err = w.Write([]byte("10"))
+                       require.NoError(t, err)
+               default:
+                       w.WriteHeader(http.StatusNoContent)
+               }
+       })
+
+       topic := mustTopicName(t, "persistent://public/default/scoped-policy")
+
+       localPolicies, err := TopicPoliciesOf(client, false)
+       require.NoError(t, err)
+       globalPolicies, err := TopicPoliciesOf(client, true)
+       require.NoError(t, err)
+
+       ttl, err := localPolicies.GetMessageTTL(context.Background(), *topic, 
true)
+       require.NoError(t, err)
+       require.NotNil(t, ttl)
+       assert.Equal(t, 10, *ttl)
+
+       ttl, err = globalPolicies.GetMessageTTL(context.Background(), *topic, 
false)
+       require.NoError(t, err)
+       require.NotNil(t, ttl)
+       assert.Equal(t, 10, *ttl)
+
+       err = globalPolicies.SetMaxProducers(context.Background(), *topic, 3)
+       require.NoError(t, err)
+
+       err = globalPolicies.RemoveRetention(context.Background(), *topic)
+       require.NoError(t, err)
+
+       expectedMessageTTLPath := pulsarClient.endpoint("", 
topic.GetRestPath(), "messageTTL")
+       expectedMaxProducersPath := pulsarClient.endpoint("", 
topic.GetRestPath(), "maxProducers")
+       expectedRetentionPath := pulsarClient.endpoint("", topic.GetRestPath(), 
"retention")
+       decodedExpectedMessageTTLPath, err := 
url.PathUnescape(expectedMessageTTLPath)
+       require.NoError(t, err)
+       decodedExpectedMaxProducersPath, err := 
url.PathUnescape(expectedMaxProducersPath)
+       require.NoError(t, err)
+       decodedExpectedRetentionPath, err := 
url.PathUnescape(expectedRetentionPath)
+       require.NoError(t, err)
+
+       require.Len(t, requests, 4)
+
+       assert.Equal(t, http.MethodGet, requests[0].method)
+       assert.Equal(t, decodedExpectedMessageTTLPath, requests[0].path)
+       assert.Equal(t, "true", requests[0].query.Get("applied"))
+       assert.Empty(t, requests[0].query.Get("isGlobal"))
+
+       assert.Equal(t, http.MethodGet, requests[1].method)
+       assert.Equal(t, decodedExpectedMessageTTLPath, requests[1].path)
+       assert.Equal(t, "false", requests[1].query.Get("applied"))
+       assert.Equal(t, "true", requests[1].query.Get("isGlobal"))
+
+       assert.Equal(t, http.MethodPost, requests[2].method)
+       assert.Equal(t, decodedExpectedMaxProducersPath, requests[2].path)
+       assert.Equal(t, "true", requests[2].query.Get("isGlobal"))
+       assert.Contains(t, requests[2].body, "3")
+
+       assert.Equal(t, http.MethodDelete, requests[3].method)
+       assert.Equal(t, decodedExpectedRetentionPath, requests[3].path)
+       assert.Equal(t, "true", requests[3].query.Get("isGlobal"))
+}
+
+func TestTopicPoliciesNullDecodingAndLegacyDefaults(t *testing.T) {
+       client, _ := newTopicPolicyTestClient(t, func(w http.ResponseWriter, _ 
*http.Request) {
+               _, err := w.Write([]byte("null"))
+               require.NoError(t, err)
+       })
+
+       topic := mustTopicName(t, "persistent://public/default/null-policy")
+       policies, err := TopicPoliciesOf(client, false)
+       require.NoError(t, err)
+
+       ttl, err := policies.GetMessageTTL(context.Background(), *topic, false)
+       require.NoError(t, err)
+       assert.Nil(t, ttl)
+
+       deduplication, err := 
policies.GetDeduplicationStatus(context.Background(), *topic, false)
+       require.NoError(t, err)
+       assert.Nil(t, deduplication)
+
+       threshold, err := policies.GetCompactionThreshold(context.Background(), 
*topic, false)
+       require.NoError(t, err)
+       assert.Nil(t, threshold)
+
+       strategy, err := 
policies.GetSchemaCompatibilityStrategy(context.Background(), *topic, false)
+       require.NoError(t, err)
+       assert.Nil(t, strategy)
+
+       inactivePolicies, err := 
policies.GetInactiveTopicPolicies(context.Background(), *topic, false)
+       require.NoError(t, err)
+       assert.Nil(t, inactivePolicies)
+
+       replicationClusters, err := 
policies.GetReplicationClusters(context.Background(), *topic, false)
+       require.NoError(t, err)
+       assert.Nil(t, replicationClusters)
+
+       backlogQuotaMap, err := 
policies.GetBacklogQuotaMap(context.Background(), *topic, false)
+       require.NoError(t, err)
+       assert.Nil(t, backlogQuotaMap)
+
+       legacyTTL, err := client.Topics().GetMessageTTL(*topic)
+       require.NoError(t, err)
+       assert.Equal(t, -1, legacyTTL)
+
+       legacyDeduplication, err := 
client.Topics().GetDeduplicationStatus(*topic)
+       require.NoError(t, err)
+       assert.False(t, legacyDeduplication)
+
+       legacyThreshold, err := client.Topics().GetCompactionThreshold(*topic, 
false)
+       require.NoError(t, err)
+       assert.Equal(t, int64(-1), legacyThreshold)
+
+       legacyStrategy, err := 
client.Topics().GetSchemaCompatibilityStrategyApplied(*topic, false)
+       require.NoError(t, err)
+       assert.Equal(t, utils.SchemaCompatibilityStrategyUndefined, 
legacyStrategy)
+
+       legacyInactivePolicies, err := 
client.Topics().GetInactiveTopicPolicies(*topic, false)
+       require.NoError(t, err)
+       assert.Equal(t, utils.InactiveTopicPolicies{}, legacyInactivePolicies)
+
+       legacyReplicationClusters, err := 
client.Topics().GetReplicationClusters(*topic)
+       require.NoError(t, err)
+       assert.Nil(t, legacyReplicationClusters)
+
+       legacyBacklogQuotaMap, err := 
client.Topics().GetBacklogQuotaMap(*topic, false)
+       require.NoError(t, err)
+       assert.Nil(t, legacyBacklogQuotaMap)
+}
+
+func TestLocalTopicPoliciesParityWithTopics(t *testing.T) {
+       client, _ := newTopicPolicyTestClient(t, func(w http.ResponseWriter, _ 
*http.Request) {
+               _, err := w.Write([]byte("600"))
+               require.NoError(t, err)
+       })
+
+       topic := mustTopicName(t, "persistent://public/default/parity-policy")
+       localPolicies, err := TopicPoliciesOf(client, false)
+       require.NoError(t, err)
+
+       newTTL, err := localPolicies.GetMessageTTL(context.Background(), 
*topic, false)
+       require.NoError(t, err)
+       require.NotNil(t, newTTL)
+
+       legacyTTL, err := client.Topics().GetMessageTTL(*topic)
+       require.NoError(t, err)
+
+       assert.Equal(t, legacyTTL, *newTTL)
+}

Reply via email to