This is an automated email from the ASF dual-hosted git repository.

poorbarcode pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 71462d3151b [improve][broker]Part-3 of PIP-433:  always allow 
replicator to register a new compatible schema (#25461)
71462d3151b is described below

commit 71462d3151b2dceb776db08b8dc974d779d462e1
Author: fengyubiao <[email protected]>
AuthorDate: Thu Apr 30 11:16:38 2026 +0800

    [improve][broker]Part-3 of PIP-433:  always allow replicator to register a 
new compatible schema (#25461)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |   7 +-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  10 +-
 .../pulsar/broker/service/AbstractTopic.java       |  22 ++-
 .../apache/pulsar/broker/service/ServerCnx.java    |  19 +-
 .../org/apache/pulsar/broker/service/Topic.java    |   8 +
 .../service/nonpersistent/NonPersistentTopic.java  |   3 +
 .../broker/service/persistent/PersistentTopic.java |   2 +
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  25 ++-
 .../broker/admin/AdminApiSchemaAutoUpdateTest.java |  66 +++++++
 .../pulsar/broker/admin/NamespaceAuthZTest.java    |   2 +-
 .../broker/service/OneWayReplicatorTest.java       | 203 +++++++++++++++++++++
 ...OneWayReplicatorUsingGlobalPartitionedTest.java |   8 +
 .../service/OneWayReplicatorUsingGlobalZKTest.java |   7 +
 .../client/impl/ClientImplInternalSetter.java      |  27 +++
 .../pulsar/client/impl/NegativeAcksTest.java       |   2 +-
 .../SchemaCompatibilityCheckTest.java              |   8 +-
 .../org/apache/pulsar/client/admin/Namespaces.java |  14 +-
 .../pulsar/common/policies/data/Policies.java      |   6 +
 .../client/admin/internal/NamespacesImpl.java      |  12 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  11 +-
 .../apache/pulsar/admin/cli/TestCmdNamespaces.java |  26 +++
 .../apache/pulsar/client/impl/ProducerImpl.java    |   2 +-
 .../apache/pulsar/common/protocol/Commands.java    |   4 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |   7 +-
 24 files changed, 467 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 76ecfd05d38..c94c59df2bf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2758,13 +2758,18 @@ public abstract class NamespacesBase extends 
AdminResource {
                 "schemaValidationEnforced");
     }
 
-    protected void internalSetIsAllowAutoUpdateSchema(boolean 
isAllowAutoUpdateSchema) {
+    protected void internalSetIsAllowAutoUpdateSchema(boolean 
isAllowAutoUpdateSchema,
+                                                      Boolean 
isAllowAutoUpdateSchemaWithReplicator) {
         validateNamespacePolicyOperation(namespaceName, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
                 PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         mutatePolicy((policies) -> {
                     policies.is_allow_auto_update_schema = 
isAllowAutoUpdateSchema;
+                    if (isAllowAutoUpdateSchemaWithReplicator != null) {
+                        policies.is_allow_auto_update_schema_with_replicator =
+                                isAllowAutoUpdateSchemaWithReplicator;
+                    }
                     return policies;
                 }, (policies) -> policies.is_allow_auto_update_schema,
                 "isAllowAutoUpdateSchema");
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 626c4b314a0..e87c34ba769 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -2813,10 +2813,18 @@ public class Namespaces extends NamespacesBase {
     public void setIsAllowAutoUpdateSchema(
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
+            @QueryParam("allowAutoUpdateSchemaWithReplicator")
+            @ApiParam(value = "Allow replicator to auto update schema")
+                    Boolean allowAutoUpdateSchemaWithReplicator,
             @ApiParam(value = "Flag of whether to allow auto update schema", 
required = true)
                     boolean isAllowAutoUpdateSchema) {
         validateNamespaceName(tenant, namespace);
-        internalSetIsAllowAutoUpdateSchema(isAllowAutoUpdateSchema);
+        if (isAllowAutoUpdateSchema && allowAutoUpdateSchemaWithReplicator != 
null
+                && !allowAutoUpdateSchemaWithReplicator) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Can not 
enable for all producers but denies for"
+                    + " replicators, which is meaningless");
+        }
+        internalSetIsAllowAutoUpdateSchema(isAllowAutoUpdateSchema, 
allowAutoUpdateSchemaWithReplicator);
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 09ab12d3906..4d29c751f03 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -141,6 +141,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
     protected volatile boolean isEncryptionRequired = false;
 
     protected volatile Boolean isAllowAutoUpdateSchema;
+    @Getter
+    protected volatile Boolean isAllowAutoUpdateSchemaWithReplicator;
 
     protected volatile PublishRateLimiter topicPublishRateLimiter;
     protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;
@@ -733,6 +735,11 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
 
     @Override
     public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+        return addSchema(schema, false);
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> addSchema(SchemaData schema, 
boolean isReplicatorProducer) {
         if (schema == null) {
             return CompletableFuture.completedFuture(SchemaVersion.Empty);
         }
@@ -740,7 +747,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
         String id = getSchemaId();
         SchemaRegistryService schemaRegistryService = 
brokerService.pulsar().getSchemaRegistryService();
 
-        if (allowAutoUpdateSchema()) {
+        if (allowAutoUpdateSchema(isReplicatorProducer)) {
             return schemaRegistryService.putSchemaIfAbsent(id, schema, 
getSchemaCompatibilityStrategy());
         } else {
             return 
schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList
 ->
@@ -756,14 +763,19 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
         }
     }
 
-    private boolean allowAutoUpdateSchema() {
+    private boolean allowAutoUpdateSchema(boolean isReplicatorProducer) {
         if (brokerService.isSystemTopic(topic)) {
             return true;
         }
-        if (isAllowAutoUpdateSchema == null) {
-            return 
brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
+        // Allowed auto updating.
+        boolean allowSchemaAutoUpdate = isAllowAutoUpdateSchema == null
+                ? 
brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled()
+                : isAllowAutoUpdateSchema;
+        if (allowSchemaAutoUpdate) {
+            return true;
         }
-        return isAllowAutoUpdateSchema;
+        // Allowed replicator to update schemas.
+        return isReplicatorProducer && isAllowAutoUpdateSchemaWithReplicator;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4f130e28961..944148c5dc2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1999,8 +1999,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     }
 
                     disableTcpNoDelayIfNeeded(topicName.toString(), 
producerName);
-
-                    CompletableFuture<SchemaVersion> schemaVersionFuture = 
tryAddSchema(topic, schema);
+                    boolean isReplicatorProducer = 
Producer.isRemoteOrShadow(producerName,
+                            
getBrokerService().getPulsar().getConfig().getReplicatorPrefix());
+                    CompletableFuture<SchemaVersion> schemaVersionFuture = 
tryAddSchema(topic, schema,
+                            isReplicatorProducer);
 
                     schemaVersionFuture.exceptionallyAsync(exception -> {
                         if (producerFuture.completeExceptionally(exception)) {
@@ -3125,7 +3127,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         service.getTopicIfExists(topicName).thenAccept(topicOpt -> {
             if (topicOpt.isPresent()) {
                 Topic topic = topicOpt.get();
-                CompletableFuture<SchemaVersion> schemaVersionFuture = 
tryAddSchema(topic, schema);
+                boolean isReplicatorProducer = false;
+                if (commandGetOrCreateSchema.hasProducerName()) {
+                    isReplicatorProducer = 
Producer.isRemoteOrShadow(commandGetOrCreateSchema.getProducerName(),
+                            
getBrokerService().getPulsar().getConfig().getReplicatorPrefix());
+                }
+                CompletableFuture<SchemaVersion> schemaVersionFuture =
+                        tryAddSchema(topic, schema, isReplicatorProducer);
                 schemaVersionFuture.exceptionally(ex -> {
                     ServerError errorCode = 
BrokerServiceException.getClientErrorCode(ex);
                     String message = ex.getMessage();
@@ -3610,9 +3618,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         });
     }
 
-    private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, 
SchemaData schema) {
+    private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, 
SchemaData schema,
+                                                          boolean 
isReplicatorProducer) {
         if (schema != null) {
-            return topic.addSchema(schema);
+            return topic.addSchema(schema, isReplicatorProducer);
         } else {
             return topic.hasSchema().thenCompose((hasSchema) -> {
                 log.debug()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index fd3ffb1a34c..684e5c6c83a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -309,6 +309,14 @@ public interface Topic {
      */
     CompletableFuture<SchemaVersion> addSchema(SchemaData schema);
 
+    /**
+     * Add a schema to the topic, with an optional override for replication 
schema auto-update.
+     * The default implementation preserves existing behavior.
+     */
+    default CompletableFuture<SchemaVersion> addSchema(SchemaData schema, 
boolean isReplicatorProducer) {
+        return addSchema(schema);
+    }
+
     /**
      * Delete the schema if this topic has a schema defined for it.
      */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index c1b60ce49e8..8043199897a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -180,6 +180,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                         updateTopicPolicyByNamespacePolicy(policies);
                         isEncryptionRequired = policies.encryption_required;
                         isAllowAutoUpdateSchema = 
policies.is_allow_auto_update_schema;
+                        isAllowAutoUpdateSchemaWithReplicator =
+                                
policies.is_allow_auto_update_schema_with_replicator;
                     }
                     updatePublishRateLimiter();
                     updateResourceGroupLimiter(policies);
@@ -1157,6 +1159,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 
         isEncryptionRequired = data.encryption_required;
         isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
+        isAllowAutoUpdateSchemaWithReplicator = 
data.is_allow_auto_update_schema_with_replicator;
 
         List<CompletableFuture<Void>> producerCheckFutures = new 
ArrayList<>(producers.size());
         producers.values().forEach(producer -> producerCheckFutures.add(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index db5ab64ad11..2e76254dc97 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -500,6 +500,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     this.isEncryptionRequired = policies.encryption_required;
 
                     isAllowAutoUpdateSchema = 
policies.is_allow_auto_update_schema;
+                    isAllowAutoUpdateSchemaWithReplicator = 
policies.is_allow_auto_update_schema_with_replicator;
                 }, getOrderedExecutor())
                 .thenCompose(ignore -> initTopicPolicy())
                 .thenCompose(ignore -> removeOrphanReplicationCursors())
@@ -3806,6 +3807,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         checkReplicatedSubscriptionControllerState();
         isEncryptionRequired = data.encryption_required;
         isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
+        isAllowAutoUpdateSchemaWithReplicator = 
data.is_allow_auto_update_schema_with_replicator;
 
         // Apply policies for components.
         List<CompletableFuture<Void>> applyPolicyTasks = 
applyUpdatedTopicPolicies();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index fd323fa50c0..661877532b6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -26,6 +26,7 @@ import static 
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnl
 import static 
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.changed;
 import static 
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.none;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -40,6 +41,7 @@ import static org.testng.Assert.expectThrows;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocal;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
@@ -3305,6 +3307,13 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    static final FastThreadLocal<Boolean> 
COUNTER_AVOID_COUNTING_ADD_SCHEMA_REPEATEDLY = new FastThreadLocal<>() {
+        @Override
+        protected Boolean initialValue() throws Exception {
+            return false;
+        }
+    };
+
     private AtomicInteger injectSchemaCheckCounterForTopic(String topicName) {
         final var topics = pulsar.getBrokerService().getTopics();
         AbstractTopic topic = (AbstractTopic) 
topics.get(topicName).join().get();
@@ -3314,9 +3323,23 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
             @Override
             public Object answer(InvocationOnMock invocation) throws Throwable 
{
                 counter.incrementAndGet();
-                return invocation.callRealMethod();
+                COUNTER_AVOID_COUNTING_ADD_SCHEMA_REPEATEDLY.set(true);
+                try {
+                    return invocation.callRealMethod();
+                }  finally {
+                    COUNTER_AVOID_COUNTING_ADD_SCHEMA_REPEATEDLY.set(false);
+                }
             }
         }).when(spyTopic).addSchema(any(SchemaData.class));
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                if (!COUNTER_AVOID_COUNTING_ADD_SCHEMA_REPEATEDLY.get()) {
+                    counter.incrementAndGet();
+                }
+                return invocation.callRealMethod();
+            }
+        }).when(spyTopic).addSchema(any(SchemaData.class), anyBoolean());
         doAnswer(new Answer<Object>() {
             @Override
             public Object answer(InvocationOnMock invocation) throws Throwable 
{
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
index 43d7d77c7ca..93965571870 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
@@ -22,11 +22,15 @@ import java.util.Set;
 import lombok.CustomLog;
 import org.apache.avro.reflect.AvroAlias;
 import org.apache.avro.reflect.AvroDefault;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.Policies;
 import 
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
@@ -280,4 +284,66 @@ public class AdminApiSchemaAutoUpdateTest extends 
MockedPulsarServiceBaseTest {
         testAutoUpdateDisabled("prop-xyz/ns1", 
"persistent://prop-xyz/ns1/disabled");
         testAutoUpdateDisabled("prop-xyz/ns2", 
"non-persistent://prop-xyz/ns2/disabled-np");
     }
+
+    @Test(timeOut = 60_000)
+    @SuppressWarnings("deprecation")
+    public void testIsAllowAutoUpdateSchemaWithReplicator() throws Exception {
+        final String namespace = BrokerTestUtil.newUniqueName("prop-xyz/ns");
+        admin.namespaces().createNamespace(namespace);
+        final String topic = BrokerTestUtil.newUniqueName(namespace + "/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+
+        // By default, it is true.
+        Awaitility.await().untilAsserted(() -> {
+            
Assert.assertTrue(admin.namespaces().getPolicies(namespace).is_allow_auto_update_schema_with_replicator);
+            
Assert.assertTrue(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
+        });
+
+        try {
+            admin.namespaces().setIsAllowAutoUpdateSchema(
+                    namespace, true, false);
+            Assert.fail("Should have thrown exception");
+        } catch (PulsarAdminException e) {
+            Assert.assertTrue(e.getMessage().contains("Can not enable for all 
producers but denies for replicators,"
+                    + " which is meaningless"));
+        }
+
+        admin.namespaces().setIsAllowAutoUpdateSchema(
+                namespace, false, false);
+        Awaitility.await().untilAsserted(() -> {
+            // namespace level.
+            Policies policies = admin.namespaces().getPolicies(namespace);
+            Assert.assertFalse(policies.is_allow_auto_update_schema);
+            
Assert.assertFalse(policies.is_allow_auto_update_schema_with_replicator);
+            // topic level.
+            
Assert.assertFalse(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
+        });
+
+        admin.namespaces().setIsAllowAutoUpdateSchema(
+                namespace, true, true);
+        Awaitility.await().untilAsserted(() -> {
+            // namespace level.
+            Policies policies = admin.namespaces().getPolicies(namespace);
+            Assert.assertTrue(policies.is_allow_auto_update_schema);
+            
Assert.assertTrue(policies.is_allow_auto_update_schema_with_replicator);
+            // topic level.
+            
Assert.assertTrue(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
+        });
+
+        admin.namespaces().setIsAllowAutoUpdateSchema(
+                namespace, false, true);
+        Awaitility.await().untilAsserted(() -> {
+            // namespace level.
+            Policies policies = admin.namespaces().getPolicies(namespace);
+            Assert.assertFalse(policies.is_allow_auto_update_schema);
+            
Assert.assertTrue(policies.is_allow_auto_update_schema_with_replicator);
+            // topic level.
+            
Assert.assertTrue(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
+        });
+
+        // cleanup.
+        admin.topics().delete(topic);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
index ca9f07c474b..674ea80fadd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
@@ -2141,7 +2141,7 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         execFlag = setAuthorizationPolicyOperationChecker(subject,
                 PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, 
PolicyOperation.WRITE);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                () -> 
subAdmin.namespaces().setIsAllowAutoUpdateSchema(namespace, true));
+                () -> 
subAdmin.namespaces().setIsAllowAutoUpdateSchema(namespace, true, true));
         Assert.assertTrue(execFlag.get());
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index dab4da9b738..b0b12bd8f2c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -40,11 +40,14 @@ import io.netty.channel.Channel;
 import io.netty.util.concurrent.FastThreadLocalThread;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -67,6 +70,7 @@ import lombok.AllArgsConstructor;
 import lombok.Cleanup;
 import lombok.CustomLog;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -96,16 +100,21 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ClientImplInternalSetter;
+import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -113,6 +122,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.TopicType;
 import 
org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -233,6 +243,199 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
     }
 
+    @DataProvider
+    public Object[][] autoUpdateSchemaParams() {
+        return new Object[][] {
+                {true, true},
+                {true, null},
+                {false, true},
+                {false, false},
+                {false, null},
+        };
+    }
+
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @Data
+    private static class Customer {
+        String name;
+        int age;
+    }
+
+    @Test(dataProvider = "autoUpdateSchemaParams")
+    public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
+                                           Boolean 
allowAutoUpdateSchemaWithReplicator) throws Exception {
+        final String ns = BrokerTestUtil.newUniqueName("public/ns");
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ ns + "/tp_123");
+        final String subscribeName = "s1";
+        admin1.namespaces().createNamespace(ns);
+        admin2.namespaces().createNamespace(ns);
+        admin1.topics().createNonPartitionedTopic(topicName);
+        admin1.namespaces().setNamespaceReplicationClusters(ns,
+                new HashSet<>(Arrays.asList(cluster1, cluster2)), true);
+        waitReplicatorStarted(topicName);
+        admin1.namespaces().setSchemaCompatibilityStrategy(ns, 
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
+        admin2.namespaces().setSchemaCompatibilityStrategy(ns, 
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
+        admin1.namespaces().setIsAllowAutoUpdateSchemaAsync(ns, true, null);
+        admin2.namespaces().setIsAllowAutoUpdateSchemaAsync(ns, 
isAllowAutoUpdateSchema, null);
+        RetentionPolicies retentionPolicies = new RetentionPolicies(10, 1);
+        admin1.namespaces().setRetention(ns, retentionPolicies);
+        admin2.namespaces().setRetention(ns, retentionPolicies);
+        PersistentTopic topic1 = (PersistentTopic) broker1.getTopic(topicName, 
false).join().get();
+        PersistentTopic topic2 = (PersistentTopic) broker2.getTopic(topicName, 
false).join().get();
+        Awaitility.await().untilAsserted(() -> {
+            HierarchyTopicPolicies policies1 = 
topic1.getHierarchyTopicPolicies();
+            HierarchyTopicPolicies policies2 = 
topic2.getHierarchyTopicPolicies();
+            assertEquals(policies1.getSchemaCompatibilityStrategy().get(),
+                    SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
+            assertEquals(policies2.getSchemaCompatibilityStrategy().get(),
+                    SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
+            assertTrue(topic1.isAllowAutoUpdateSchema);
+            assertTrue(topic1.isAllowAutoUpdateSchemaWithReplicator);
+            assertEquals(topic2.isAllowAutoUpdateSchema, 
isAllowAutoUpdateSchema);
+            assertTrue(topic2.isAllowAutoUpdateSchemaWithReplicator);
+            
assertEquals(policies1.getRetentionPolicies().get().getRetentionTimeInMinutes(),
 10);
+            
assertEquals(policies2.getRetentionPolicies().get().getRetentionTimeInMinutes(),
 10);
+        });
+        // Build different schemas.
+        HashMap<String, String> schemaProps = new HashMap<>();
+        schemaProps.put("__jsr310ConversionEnabled", "false");
+        schemaProps.put("__alwaysAllowNull", "true");
+        SchemaInfoImpl schemaInfoV1 = new SchemaInfoImpl("", """
+            {
+              "type" : "record",
+              "name" : "Student",
+              "namespace" : 
"org.apache.pulsar.broker.service.OneWayReplicatorTest",
+              "fields" : [ {
+                "name" : "age",
+                "type" : "int"
+              }]
+            }
+        """.getBytes(StandardCharsets.UTF_8), SchemaType.AVRO, 0, schemaProps);
+        SchemaInfoImpl schemaInfoV2 = new SchemaInfoImpl("", """
+            {
+              "type" : "record",
+              "name" : "Student",
+              "namespace" : 
"org.apache.pulsar.broker.service.OneWayReplicatorTest",
+              "fields" : [ {
+                "name" : "age",
+                "type" : "int"
+              }, {
+                "name" : "name",
+                "type" : [ "null", "string" ],
+                "default" : null
+              } ]
+            }
+        """.getBytes(StandardCharsets.UTF_8), SchemaType.AVRO, 0, schemaProps);
+        admin1.schemas().createSchema(topicName, schemaInfoV1);
+        admin1.schemas().createSchema(topicName, schemaInfoV2);
+        long longSchemaVersion1 = 
admin1.schemas().getVersionBySchemaAsync(topicName, schemaInfoV1)
+                .get(2, TimeUnit.SECONDS);
+        long longSchemaVersion2 = 
admin1.schemas().getVersionBySchemaAsync(topicName, schemaInfoV2)
+                .get(2, TimeUnit.SECONDS);
+        LongSchemaVersion schemaVersion1 = new 
LongSchemaVersion(longSchemaVersion1);
+        LongSchemaVersion schemaVersion2 = new 
LongSchemaVersion(longSchemaVersion2);
+
+        // Publish messages with different schemas.
+        ProducerImpl<byte[]> producer1 =
+                (ProducerImpl<byte[]>) 
client1.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
+        TypedMessageBuilderImpl typedMessageBuilder1 = 
(TypedMessageBuilderImpl) producer1
+                .newMessage(Schema.AVRO(Customer.class)).value(new 
Customer(null, 16));
+        MessageImpl message1 = (MessageImpl) typedMessageBuilder1.getMessage();
+        message1.getMessageBuilder().setSchemaVersion(schemaVersion1.bytes());
+        ClientImplInternalSetter.setMessageSchemaState(message1, "Ready");
+        producer1.send(message1);
+        Awaitility.await().untilAsserted(() -> {
+            TopicStats topicStats = admin1.topics().getStats(topicName);
+            
assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(), 
0);
+        });
+
+        // Change policies.
+        admin1.namespaces().setIsAllowAutoUpdateSchemaAsync(ns, true, null);
+        admin2.namespaces().setIsAllowAutoUpdateSchemaAsync(ns, 
isAllowAutoUpdateSchema,
+                allowAutoUpdateSchemaWithReplicator);
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(topic1.isAllowAutoUpdateSchema);
+            assertTrue(topic1.isAllowAutoUpdateSchemaWithReplicator);
+            assertEquals(topic2.isAllowAutoUpdateSchema, 
isAllowAutoUpdateSchema);
+            if (allowAutoUpdateSchemaWithReplicator != null && 
!allowAutoUpdateSchemaWithReplicator) {
+                assertFalse(topic2.isAllowAutoUpdateSchemaWithReplicator);
+            } else {
+                assertTrue(topic2.isAllowAutoUpdateSchemaWithReplicator);
+            }
+        });
+
+        TypedMessageBuilderImpl typedMessageBuilder2 = 
(TypedMessageBuilderImpl) producer1
+                .newMessage(Schema.AVRO(Customer.class)).value(new 
Customer("Apache", 26));
+        MessageImpl message2 = (MessageImpl) typedMessageBuilder2.getMessage();
+        message2.getMessageBuilder().setSchemaVersion(schemaVersion2.bytes());
+        ClientImplInternalSetter.setMessageSchemaState(message2, "Ready");
+        producer1.send(message2);
+        if (allowAutoUpdateSchemaWithReplicator != null && 
!allowAutoUpdateSchemaWithReplicator) {
+            Thread.sleep(3000);
+            // The message can not be replicated to the remote side.
+            TopicStats topicStats = admin1.topics().getStats(topicName);
+            
assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(), 
1);
+            producer1.close();
+            return;
+        }
+        Awaitility.await().untilAsserted(() -> {
+            TopicStats topicStats = admin1.topics().getStats(topicName);
+            
assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(), 
0);
+        });
+
+        // Verify: the messages were built successfully.
+        admin1.topics().createSubscription(topicName, subscribeName, 
MessageId.earliest);
+        Consumer<Customer> consumer1 = 
client1.newConsumer(Schema.AVRO(Customer.class))
+                .subscriptionName(subscribeName).topic(topicName).subscribe();
+        Message<Customer> msg1 = consumer1.receive(5, TimeUnit.SECONDS);
+        assertNotNull(msg1);
+        byte[] bytesVersion1 = msg1.getSchemaVersion();
+        assertEquals(ByteBuffer.wrap(bytesVersion1).getLong(), 
longSchemaVersion1);
+        assertNull(msg1.getValue().getName());
+        assertEquals(msg1.getValue().getAge(), 16);
+        consumer1.acknowledge(msg1);
+        Message<Customer> msg2 = consumer1.receive(5, TimeUnit.SECONDS);
+        assertNotNull(msg2);
+        byte[] bytesVersion2 = msg2.getSchemaVersion();
+        assertEquals(ByteBuffer.wrap(bytesVersion2).getLong(), 
longSchemaVersion2);
+        assertEquals(msg2.getValue().getName(), "Apache");
+        assertEquals(msg2.getValue().getAge(), 26);
+        consumer1.acknowledge(msg2);
+
+        admin2.topics().createSubscription(topicName, subscribeName, 
MessageId.earliest);
+        Consumer<Customer> consumer2 = 
client2.newConsumer(Schema.AVRO(Customer.class))
+                .subscriptionName(subscribeName).topic(topicName).subscribe();
+        Message<Customer> msg21 = consumer2.receive(5, TimeUnit.SECONDS);
+        assertNotNull(msg21);
+        byte[] bytesVersion21 = msg21.getSchemaVersion();
+        assertEquals(ByteBuffer.wrap(bytesVersion21).getLong(), 0);
+        assertNull(msg21.getValue().getName());
+        assertEquals(msg21.getValue().getAge(), 16);
+        consumer2.acknowledge(msg21);
+        Message<Customer> msg22 = consumer2.receive(5, TimeUnit.SECONDS);
+        if (allowAutoUpdateSchemaWithReplicator != null && 
!allowAutoUpdateSchemaWithReplicator) {
+            assertNull(msg22);
+        } else {
+            assertNotNull(msg22);
+            byte[] bytesVersion22 = msg22.getSchemaVersion();
+            assertEquals(ByteBuffer.wrap(bytesVersion22).getLong(), 1);
+            assertEquals(msg22.getValue().getName(), "Apache");
+            assertEquals(msg22.getValue().getAge(), 26);
+            consumer2.acknowledge(msg22);
+        }
+
+        // cleanup.
+        consumer1.close();
+        consumer2.close();
+        producer1.close();
+        admin1.namespaces().setNamespaceReplicationClusters(ns,
+                new HashSet<>(Arrays.asList(cluster1)), true);
+        waitReplicatorStopped(pulsar1, pulsar2, topicName);
+        admin1.topics().delete(topicName);
+        admin2.topics().delete(topicName);
+    }
+
     @Test
     public void testDeleteRemoteTopicByGlobalPolicy() throws Exception {
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_123");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index 336d17460f1..7f6124e62a7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -73,6 +73,7 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
         config.setDefaultNumPartitions(1);
     }
 
+    @Override
     @Test(enabled = false)
     public void testDeleteTopicWhenReplicating() throws Exception {
         super.testDeleteTopicWhenReplicating();
@@ -84,6 +85,13 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
         super.testReplicatorProducerStatInTopic();
     }
 
+    @Override
+    @Test(enabled = false)
+    public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
+                                           Boolean 
allowAutoUpdateSchemaWithReplicator) throws Exception {
+        super.testDeleteTopicWhenReplicating();
+    }
+
     @Override
     @Test(enabled = false)
     public void testDeleteRemoteTopicByGlobalPolicy() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 2a9b0ac4896..de7e0cf0a3e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -100,6 +100,13 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
         super.testReplicatorProducerStatInTopic();
     }
 
+    @Override
+    @Test(enabled = false)
+    public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
+                                           Boolean 
allowAutoUpdateSchemaWithReplicator) throws Exception {
+        super.testDeleteTopicWhenReplicating();
+    }
+
     @Test(dataProvider = "isPartitioned")
     public void testReplicatorCreateTopic(boolean isPartitioned) throws 
Exception {
         super.testReplicatorCreateTopic(isPartitioned);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientImplInternalSetter.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientImplInternalSetter.java
new file mode 100644
index 00000000000..bbbd4c26153
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientImplInternalSetter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+public class ClientImplInternalSetter {
+
+    public static void setMessageSchemaState(MessageImpl message, String 
state) {
+        message.setSchemaState(MessageImpl.SchemaState.valueOf(state));
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 47ef8b5f4ce..c885e4a1941 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -332,7 +332,7 @@ public class NegativeAcksTest extends SharedPulsarBaseTest {
      */
     @Test
     public void testNegativeAcksWithBatch() throws Exception {
-        admin.namespaces().setIsAllowAutoUpdateSchema(getNamespace(), true);
+        admin.namespaces().setIsAllowAutoUpdateSchema(getNamespace(), true, 
true);
         String topic = newTopicName();
 
         @Cleanup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 7f31fa8a704..52c6628d37c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -347,7 +347,7 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
         
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), 
schemaCompatibilityStrategy);
         admin.schemas().createSchema(fqtn, 
Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
 
-        
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
+        
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false, 
true);
         ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
                 
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
                         (false).withSupportSchemaVersioning(true).
@@ -359,7 +359,7 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
             Assert.assertTrue(e.getMessage().contains("Schema not found and 
schema auto updating is disabled."));
         }
 
-        
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), true);
+        
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), true, 
true);
         ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = 
pulsarClient.newConsumer(Schema.AVRO(
                 
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
                         (false).withSupportSchemaVersioning(true).
@@ -382,7 +382,7 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
         producer.close();
         consumerTwo.close();
 
-        
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
+        
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false, 
true);
 
         producer = producerThreeBuilder.create();
         consumerTwo = comsumerBuilder.subscribe();
@@ -427,7 +427,7 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
         SchemaInfo schemaInfo = 
SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
         admin.schemas().createSchema(fqtn, schemaInfo);
 
-        
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
+        
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false, 
true);
         ProducerBuilder<Schemas.PersonOne> producerOneBuilder = pulsarClient
                 .newProducer(Schema.AVRO(Schemas.PersonOne.class))
                 .topic(fqtn);
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 85a8d1d744e..cc3f5a92964 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -4105,6 +4105,7 @@ public interface Namespaces {
      *
      * @param namespace pulsar namespace name
      * @param isAllowAutoUpdateSchema flag to enable or disable auto update 
schema
+     * @param allowAutoUpdateSchemaWithReplicator whether replicator can auto 
update schema
      * @throws NotAuthorizedException
      *             Don't have admin permission
      * @throws NotFoundException
@@ -4112,19 +4113,20 @@ public interface Namespaces {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    void setIsAllowAutoUpdateSchema(String namespace, boolean 
isAllowAutoUpdateSchema)
+    void setIsAllowAutoUpdateSchema(String namespace, boolean 
isAllowAutoUpdateSchema,
+                                    Boolean 
allowAutoUpdateSchemaWithReplicator)
             throws PulsarAdminException;
 
+
     /**
-     * Set whether to allow automatic schema updates asynchronously.
-     * <p/>
-     * The flag is when producer bring a new schema and the schema pass 
compatibility check
-     * whether allow schema auto registered
+     * Set whether to allow automatic schema updates and replicator schema 
updates in one call asynchronously.
      *
      * @param namespace pulsar namespace name
      * @param isAllowAutoUpdateSchema flag to enable or disable auto update 
schema
+     * @param allowAutoUpdateSchemaWithReplicator whether replicator can auto 
update schema
      */
-    CompletableFuture<Void> setIsAllowAutoUpdateSchemaAsync(String namespace, 
boolean isAllowAutoUpdateSchema);
+    CompletableFuture<Void> setIsAllowAutoUpdateSchemaAsync(String namespace, 
boolean isAllowAutoUpdateSchema,
+                                                            Boolean 
allowAutoUpdateSchemaWithReplicator);
 
     /**
      * Set the offload configuration for all the topics in a namespace.
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 99c39dec417..db0aa0620d5 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -118,6 +118,9 @@ public class Policies {
     @SuppressWarnings("checkstyle:MemberName")
     public Boolean is_allow_auto_update_schema = null;
 
+    @SuppressWarnings("checkstyle:MemberName")
+    public Boolean is_allow_auto_update_schema_with_replicator = true;
+
     @SuppressWarnings("checkstyle:MemberName")
     public boolean schema_validation_enforced = false;
 
@@ -169,6 +172,7 @@ public class Policies {
                 schema_validation_enforced,
                 schema_compatibility_strategy,
                 is_allow_auto_update_schema,
+                is_allow_auto_update_schema_with_replicator,
                 offload_policies,
                 subscription_types_enabled,
                 allowed_topic_property_keys_for_metrics,
@@ -218,6 +222,8 @@ public class Policies {
                     && schema_validation_enforced == 
other.schema_validation_enforced
                     && schema_compatibility_strategy == 
other.schema_compatibility_strategy
                     && is_allow_auto_update_schema == 
other.is_allow_auto_update_schema
+                    && 
Objects.equals(is_allow_auto_update_schema_with_replicator,
+                        other.is_allow_auto_update_schema_with_replicator)
                     && Objects.equals(offload_policies, other.offload_policies)
                     && Objects.equals(subscription_types_enabled, 
other.subscription_types_enabled)
                     && Objects.equals(allowed_topic_property_keys_for_metrics,
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index c62ecd0b205..c77e515f990 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -1783,15 +1783,21 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     }
 
     @Override
-    public void setIsAllowAutoUpdateSchema(String namespace, boolean 
isAllowAutoUpdateSchema)
+    public void setIsAllowAutoUpdateSchema(String namespace, boolean 
isAllowAutoUpdateSchema,
+                                           Boolean 
allowAutoUpdateSchemaWithReplicator)
             throws PulsarAdminException {
-        sync(() -> setIsAllowAutoUpdateSchemaAsync(namespace, 
isAllowAutoUpdateSchema));
+        sync(() -> setIsAllowAutoUpdateSchemaAsync(namespace, 
isAllowAutoUpdateSchema,
+                allowAutoUpdateSchemaWithReplicator));
     }
 
     @Override
-    public CompletableFuture<Void> setIsAllowAutoUpdateSchemaAsync(String 
namespace, boolean isAllowAutoUpdateSchema) {
+    public CompletableFuture<Void> setIsAllowAutoUpdateSchemaAsync(
+            String namespace, boolean isAllowAutoUpdateSchema, Boolean 
allowAutoUpdateSchemaWithReplicator) {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget path = namespacePath(ns, "isAllowAutoUpdateSchema");
+        if (allowAutoUpdateSchemaWithReplicator != null) {
+            path = path.queryParam("allowAutoUpdateSchemaWithReplicator", 
allowAutoUpdateSchemaWithReplicator);
+        }
         return asyncPostRequest(path, Entity.entity(isAllowAutoUpdateSchema, 
MediaType.APPLICATION_JSON));
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 1030ef0c99b..abcd47fab35 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -2163,6 +2163,12 @@ public class CmdNamespaces extends CmdBase {
         @Option(names = { "--disable", "-d" }, description = "Disable schema 
validation enforced")
         private boolean disable = false;
 
+        @Option(names = { "--enable-for-replicator" },
+                description = "By default, brokers always allow replicator to 
register new compatible schemas even"
+                + " when auto updates are disabled, if you want to disable 
replicators to register compatible schemas,"
+                + " please set it to false")
+        private Boolean enableForReplicator;
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(namespaceName);
@@ -2170,7 +2176,10 @@ public class CmdNamespaces extends CmdBase {
             if (enable == disable) {
                 throw new ParameterException("Need to specify either --enable 
or --disable");
             }
-            getAdmin().namespaces().setIsAllowAutoUpdateSchema(namespace, 
enable);
+            if (enable && enableForReplicator != null && !enableForReplicator) 
{
+                throw new ParameterException("Can not enable for all producers 
but denies for replicators");
+            }
+            getAdmin().namespaces().setIsAllowAutoUpdateSchema(namespace, 
enable, enableForReplicator);
         }
     }
 
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdNamespaces.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdNamespaces.java
index cc506e6fe93..80de891952b 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdNamespaces.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdNamespaces.java
@@ -50,4 +50,30 @@ public class TestCmdNamespaces {
         verify(namespaces, times(1)).setRetention("public/default",
                 new RetentionPolicies(200 * 24 * 60, 2 * 1024 * 1024));
    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testSetIsAllowAutoUpdateSchemaCmd() throws Exception {
+        Namespaces namespaces = mock(Namespaces.class);
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        when(admin.namespaces()).thenReturn(namespaces);
+
+        CmdNamespaces cmd = new CmdNamespaces(() -> admin);
+
+        cmd.run("set-is-allow-auto-update-schema public/default --disable"
+                .split("\\s+"));
+        verify(namespaces, 
times(1)).setIsAllowAutoUpdateSchema("public/default", false, null);
+
+        cmd.run("set-is-allow-auto-update-schema public/default --enable"
+                .split("\\s+"));
+        verify(namespaces, 
times(1)).setIsAllowAutoUpdateSchema("public/default", true, null);
+
+        cmd.run("set-is-allow-auto-update-schema public/default --disable 
--enable-for-replicator"
+                .split("\\s+"));
+        verify(namespaces, 
times(1)).setIsAllowAutoUpdateSchema("public/default", false, Boolean.TRUE);
+
+        cmd.run("set-is-allow-auto-update-schema public/default --enable 
--enable-for-replicator"
+                .split("\\s+"));
+        verify(namespaces, 
times(1)).setIsAllowAutoUpdateSchema("public/default", true, Boolean.TRUE);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index d83bceb9d51..cf5ba97f436 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -977,7 +977,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                                     cnx.getRemoteEndpointProtocolVersion(), 
producerName, topic)));
         }
         long requestId = client.newRequestId();
-        ByteBuf request = Commands.newGetOrCreateSchema(requestId, topic, 
schemaInfo);
+        ByteBuf request = Commands.newGetOrCreateSchema(requestId, topic, 
producerName, schemaInfo);
         log.info("GetOrCreateSchema request");
         return cnx.sendGetOrCreateSchema(request, requestId);
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 523f47b4805..02e03a491d0 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1372,10 +1372,12 @@ public class Commands {
         return serializeWithSize(newGetSchemaResponseErrorCommand(requestId, 
error, errorMessage));
     }
 
-    public static ByteBuf newGetOrCreateSchema(long requestId, String topic, 
SchemaInfo schemaInfo) {
+    public static ByteBuf newGetOrCreateSchema(long requestId, String topic, 
String producerName,
+                                               SchemaInfo schemaInfo) {
         BaseCommand cmd = localCmd(Type.GET_OR_CREATE_SCHEMA);
         Schema schema = cmd.setGetOrCreateSchema()
                 .setRequestId(requestId)
+                .setProducerName(producerName)
                 .setTopic(topic)
                 .setSchema();
         convertSchema(schemaInfo, schema);
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 195fc597ecc..20bd8ab8185 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -957,9 +957,10 @@ message CommandGetSchemaResponse {
 }
 
 message CommandGetOrCreateSchema {
-    required uint64 request_id = 1;
-    required string topic      = 2;
-    required Schema schema     = 3;
+    required uint64 request_id      = 1;
+    required string topic           = 2;
+    required Schema schema          = 3;
+    optional string producerName    = 4;
 }
 
 message CommandGetOrCreateSchemaResponse {

Reply via email to