This is an automated email from the ASF dual-hosted git repository.
poorbarcode pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 71462d3151b [improve][broker]Part-3 of PIP-433: always allow
replicator to register a new compatible schema (#25461)
71462d3151b is described below
commit 71462d3151b2dceb776db08b8dc974d779d462e1
Author: fengyubiao <[email protected]>
AuthorDate: Thu Apr 30 11:16:38 2026 +0800
[improve][broker]Part-3 of PIP-433: always allow replicator to register a
new compatible schema (#25461)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 7 +-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 10 +-
.../pulsar/broker/service/AbstractTopic.java | 22 ++-
.../apache/pulsar/broker/service/ServerCnx.java | 19 +-
.../org/apache/pulsar/broker/service/Topic.java | 8 +
.../service/nonpersistent/NonPersistentTopic.java | 3 +
.../broker/service/persistent/PersistentTopic.java | 2 +
.../apache/pulsar/broker/admin/AdminApi2Test.java | 25 ++-
.../broker/admin/AdminApiSchemaAutoUpdateTest.java | 66 +++++++
.../pulsar/broker/admin/NamespaceAuthZTest.java | 2 +-
.../broker/service/OneWayReplicatorTest.java | 203 +++++++++++++++++++++
...OneWayReplicatorUsingGlobalPartitionedTest.java | 8 +
.../service/OneWayReplicatorUsingGlobalZKTest.java | 7 +
.../client/impl/ClientImplInternalSetter.java | 27 +++
.../pulsar/client/impl/NegativeAcksTest.java | 2 +-
.../SchemaCompatibilityCheckTest.java | 8 +-
.../org/apache/pulsar/client/admin/Namespaces.java | 14 +-
.../pulsar/common/policies/data/Policies.java | 6 +
.../client/admin/internal/NamespacesImpl.java | 12 +-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 11 +-
.../apache/pulsar/admin/cli/TestCmdNamespaces.java | 26 +++
.../apache/pulsar/client/impl/ProducerImpl.java | 2 +-
.../apache/pulsar/common/protocol/Commands.java | 4 +-
pulsar-common/src/main/proto/PulsarApi.proto | 7 +-
24 files changed, 467 insertions(+), 34 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 76ecfd05d38..c94c59df2bf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2758,13 +2758,18 @@ public abstract class NamespacesBase extends
AdminResource {
"schemaValidationEnforced");
}
- protected void internalSetIsAllowAutoUpdateSchema(boolean
isAllowAutoUpdateSchema) {
+ protected void internalSetIsAllowAutoUpdateSchema(boolean
isAllowAutoUpdateSchema,
+ Boolean
isAllowAutoUpdateSchemaWithReplicator) {
validateNamespacePolicyOperation(namespaceName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
policies.is_allow_auto_update_schema =
isAllowAutoUpdateSchema;
+ if (isAllowAutoUpdateSchemaWithReplicator != null) {
+ policies.is_allow_auto_update_schema_with_replicator =
+ isAllowAutoUpdateSchemaWithReplicator;
+ }
return policies;
}, (policies) -> policies.is_allow_auto_update_schema,
"isAllowAutoUpdateSchema");
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 626c4b314a0..e87c34ba769 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -2813,10 +2813,18 @@ public class Namespaces extends NamespacesBase {
public void setIsAllowAutoUpdateSchema(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
+ @QueryParam("allowAutoUpdateSchemaWithReplicator")
+ @ApiParam(value = "Allow replicator to auto update schema")
+ Boolean allowAutoUpdateSchemaWithReplicator,
@ApiParam(value = "Flag of whether to allow auto update schema",
required = true)
boolean isAllowAutoUpdateSchema) {
validateNamespaceName(tenant, namespace);
- internalSetIsAllowAutoUpdateSchema(isAllowAutoUpdateSchema);
+ if (isAllowAutoUpdateSchema && allowAutoUpdateSchemaWithReplicator !=
null
+ && !allowAutoUpdateSchemaWithReplicator) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Can not
enable for all producers but denies for"
+ + " replicators, which is meaningless");
+ }
+ internalSetIsAllowAutoUpdateSchema(isAllowAutoUpdateSchema,
allowAutoUpdateSchemaWithReplicator);
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 09ab12d3906..4d29c751f03 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -141,6 +141,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
protected volatile boolean isEncryptionRequired = false;
protected volatile Boolean isAllowAutoUpdateSchema;
+ @Getter
+ protected volatile Boolean isAllowAutoUpdateSchemaWithReplicator;
protected volatile PublishRateLimiter topicPublishRateLimiter;
protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;
@@ -733,6 +735,11 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
@Override
public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+ return addSchema(schema, false);
+ }
+
+ @Override
+ public CompletableFuture<SchemaVersion> addSchema(SchemaData schema,
boolean isReplicatorProducer) {
if (schema == null) {
return CompletableFuture.completedFuture(SchemaVersion.Empty);
}
@@ -740,7 +747,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
String id = getSchemaId();
SchemaRegistryService schemaRegistryService =
brokerService.pulsar().getSchemaRegistryService();
- if (allowAutoUpdateSchema()) {
+ if (allowAutoUpdateSchema(isReplicatorProducer)) {
return schemaRegistryService.putSchemaIfAbsent(id, schema,
getSchemaCompatibilityStrategy());
} else {
return
schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList
->
@@ -756,14 +763,19 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
}
}
- private boolean allowAutoUpdateSchema() {
+ private boolean allowAutoUpdateSchema(boolean isReplicatorProducer) {
if (brokerService.isSystemTopic(topic)) {
return true;
}
- if (isAllowAutoUpdateSchema == null) {
- return
brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
+ // Allowed auto updating.
+ boolean allowSchemaAutoUpdate = isAllowAutoUpdateSchema == null
+ ?
brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled()
+ : isAllowAutoUpdateSchema;
+ if (allowSchemaAutoUpdate) {
+ return true;
}
- return isAllowAutoUpdateSchema;
+ // Allowed replicator to update schemas.
+ return isReplicatorProducer && isAllowAutoUpdateSchemaWithReplicator;
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4f130e28961..944148c5dc2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1999,8 +1999,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
disableTcpNoDelayIfNeeded(topicName.toString(),
producerName);
-
- CompletableFuture<SchemaVersion> schemaVersionFuture =
tryAddSchema(topic, schema);
+ boolean isReplicatorProducer =
Producer.isRemoteOrShadow(producerName,
+
getBrokerService().getPulsar().getConfig().getReplicatorPrefix());
+ CompletableFuture<SchemaVersion> schemaVersionFuture =
tryAddSchema(topic, schema,
+ isReplicatorProducer);
schemaVersionFuture.exceptionallyAsync(exception -> {
if (producerFuture.completeExceptionally(exception)) {
@@ -3125,7 +3127,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
service.getTopicIfExists(topicName).thenAccept(topicOpt -> {
if (topicOpt.isPresent()) {
Topic topic = topicOpt.get();
- CompletableFuture<SchemaVersion> schemaVersionFuture =
tryAddSchema(topic, schema);
+ boolean isReplicatorProducer = false;
+ if (commandGetOrCreateSchema.hasProducerName()) {
+ isReplicatorProducer =
Producer.isRemoteOrShadow(commandGetOrCreateSchema.getProducerName(),
+
getBrokerService().getPulsar().getConfig().getReplicatorPrefix());
+ }
+ CompletableFuture<SchemaVersion> schemaVersionFuture =
+ tryAddSchema(topic, schema, isReplicatorProducer);
schemaVersionFuture.exceptionally(ex -> {
ServerError errorCode =
BrokerServiceException.getClientErrorCode(ex);
String message = ex.getMessage();
@@ -3610,9 +3618,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
});
}
- private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic,
SchemaData schema) {
+ private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic,
SchemaData schema,
+ boolean
isReplicatorProducer) {
if (schema != null) {
- return topic.addSchema(schema);
+ return topic.addSchema(schema, isReplicatorProducer);
} else {
return topic.hasSchema().thenCompose((hasSchema) -> {
log.debug()
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index fd3ffb1a34c..684e5c6c83a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -309,6 +309,14 @@ public interface Topic {
*/
CompletableFuture<SchemaVersion> addSchema(SchemaData schema);
+ /**
+ * Add a schema to the topic, with an optional override for replication
schema auto-update.
+ * The default implementation preserves existing behavior.
+ */
+ default CompletableFuture<SchemaVersion> addSchema(SchemaData schema,
boolean isReplicatorProducer) {
+ return addSchema(schema);
+ }
+
/**
* Delete the schema if this topic has a schema defined for it.
*/
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index c1b60ce49e8..8043199897a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -180,6 +180,8 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
updateTopicPolicyByNamespacePolicy(policies);
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema =
policies.is_allow_auto_update_schema;
+ isAllowAutoUpdateSchemaWithReplicator =
+
policies.is_allow_auto_update_schema_with_replicator;
}
updatePublishRateLimiter();
updateResourceGroupLimiter(policies);
@@ -1157,6 +1159,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
isEncryptionRequired = data.encryption_required;
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
+ isAllowAutoUpdateSchemaWithReplicator =
data.is_allow_auto_update_schema_with_replicator;
List<CompletableFuture<Void>> producerCheckFutures = new
ArrayList<>(producers.size());
producers.values().forEach(producer -> producerCheckFutures.add(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index db5ab64ad11..2e76254dc97 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -500,6 +500,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
this.isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema =
policies.is_allow_auto_update_schema;
+ isAllowAutoUpdateSchemaWithReplicator =
policies.is_allow_auto_update_schema_with_replicator;
}, getOrderedExecutor())
.thenCompose(ignore -> initTopicPolicy())
.thenCompose(ignore -> removeOrphanReplicationCursors())
@@ -3806,6 +3807,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
checkReplicatedSubscriptionControllerState();
isEncryptionRequired = data.encryption_required;
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
+ isAllowAutoUpdateSchemaWithReplicator =
data.is_allow_auto_update_schema_with_replicator;
// Apply policies for components.
List<CompletableFuture<Void>> applyPolicyTasks =
applyUpdatedTopicPolicies();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index fd323fa50c0..661877532b6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -26,6 +26,7 @@ import static
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnl
import static
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.changed;
import static
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.none;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -40,6 +41,7 @@ import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocal;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
@@ -3305,6 +3307,13 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
}
}
+ static final FastThreadLocal<Boolean>
COUNTER_AVOID_COUNTING_ADD_SCHEMA_REPEATEDLY = new FastThreadLocal<>() {
+ @Override
+ protected Boolean initialValue() throws Exception {
+ return false;
+ }
+ };
+
private AtomicInteger injectSchemaCheckCounterForTopic(String topicName) {
final var topics = pulsar.getBrokerService().getTopics();
AbstractTopic topic = (AbstractTopic)
topics.get(topicName).join().get();
@@ -3314,9 +3323,23 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable
{
counter.incrementAndGet();
- return invocation.callRealMethod();
+ COUNTER_AVOID_COUNTING_ADD_SCHEMA_REPEATEDLY.set(true);
+ try {
+ return invocation.callRealMethod();
+ } finally {
+ COUNTER_AVOID_COUNTING_ADD_SCHEMA_REPEATEDLY.set(false);
+ }
}
}).when(spyTopic).addSchema(any(SchemaData.class));
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
{
+ if (!COUNTER_AVOID_COUNTING_ADD_SCHEMA_REPEATEDLY.get()) {
+ counter.incrementAndGet();
+ }
+ return invocation.callRealMethod();
+ }
+ }).when(spyTopic).addSchema(any(SchemaData.class), anyBoolean());
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable
{
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
index 43d7d77c7ca..93965571870 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
@@ -22,11 +22,15 @@ import java.util.Set;
import lombok.CustomLog;
import org.apache.avro.reflect.AvroAlias;
import org.apache.avro.reflect.AvroDefault;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.Policies;
import
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
@@ -280,4 +284,66 @@ public class AdminApiSchemaAutoUpdateTest extends
MockedPulsarServiceBaseTest {
testAutoUpdateDisabled("prop-xyz/ns1",
"persistent://prop-xyz/ns1/disabled");
testAutoUpdateDisabled("prop-xyz/ns2",
"non-persistent://prop-xyz/ns2/disabled-np");
}
+
+ @Test(timeOut = 60_000)
+ @SuppressWarnings("deprecation")
+ public void testIsAllowAutoUpdateSchemaWithReplicator() throws Exception {
+ final String namespace = BrokerTestUtil.newUniqueName("prop-xyz/ns");
+ admin.namespaces().createNamespace(namespace);
+ final String topic = BrokerTestUtil.newUniqueName(namespace + "/tp");
+ admin.topics().createNonPartitionedTopic(topic);
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topic,
false).join().get();
+
+ // By default, it is true.
+ Awaitility.await().untilAsserted(() -> {
+
Assert.assertTrue(admin.namespaces().getPolicies(namespace).is_allow_auto_update_schema_with_replicator);
+
Assert.assertTrue(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
+ });
+
+ try {
+ admin.namespaces().setIsAllowAutoUpdateSchema(
+ namespace, true, false);
+ Assert.fail("Should have thrown exception");
+ } catch (PulsarAdminException e) {
+ Assert.assertTrue(e.getMessage().contains("Can not enable for all
producers but denies for replicators,"
+ + " which is meaningless"));
+ }
+
+ admin.namespaces().setIsAllowAutoUpdateSchema(
+ namespace, false, false);
+ Awaitility.await().untilAsserted(() -> {
+ // namespace level.
+ Policies policies = admin.namespaces().getPolicies(namespace);
+ Assert.assertFalse(policies.is_allow_auto_update_schema);
+
Assert.assertFalse(policies.is_allow_auto_update_schema_with_replicator);
+ // topic level.
+
Assert.assertFalse(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
+ });
+
+ admin.namespaces().setIsAllowAutoUpdateSchema(
+ namespace, true, true);
+ Awaitility.await().untilAsserted(() -> {
+ // namespace level.
+ Policies policies = admin.namespaces().getPolicies(namespace);
+ Assert.assertTrue(policies.is_allow_auto_update_schema);
+
Assert.assertTrue(policies.is_allow_auto_update_schema_with_replicator);
+ // topic level.
+
Assert.assertTrue(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
+ });
+
+ admin.namespaces().setIsAllowAutoUpdateSchema(
+ namespace, false, true);
+ Awaitility.await().untilAsserted(() -> {
+ // namespace level.
+ Policies policies = admin.namespaces().getPolicies(namespace);
+ Assert.assertFalse(policies.is_allow_auto_update_schema);
+
Assert.assertTrue(policies.is_allow_auto_update_schema_with_replicator);
+ // topic level.
+
Assert.assertTrue(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
+ });
+
+ // cleanup.
+ admin.topics().delete(topic);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
index ca9f07c474b..674ea80fadd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
@@ -2141,7 +2141,7 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.WRITE);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().setIsAllowAutoUpdateSchema(namespace, true));
+ () ->
subAdmin.namespaces().setIsAllowAutoUpdateSchema(namespace, true, true));
Assert.assertTrue(execFlag.get());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index dab4da9b738..b0b12bd8f2c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -40,11 +40,14 @@ import io.netty.channel.Channel;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
@@ -67,6 +70,7 @@ import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.CustomLog;
import lombok.Data;
+import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -96,16 +100,21 @@ import
org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ClientImplInternalSetter;
+import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -113,6 +122,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import
org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
@@ -233,6 +243,199 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
});
}
+ @DataProvider
+ public Object[][] autoUpdateSchemaParams() {
+ return new Object[][] {
+ {true, true},
+ {true, null},
+ {false, true},
+ {false, false},
+ {false, null},
+ };
+ }
+
+ @NoArgsConstructor
+ @AllArgsConstructor
+ @Data
+ private static class Customer {
+ String name;
+ int age;
+ }
+
+ @Test(dataProvider = "autoUpdateSchemaParams")
+ public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
+ Boolean
allowAutoUpdateSchemaWithReplicator) throws Exception {
+ final String ns = BrokerTestUtil.newUniqueName("public/ns");
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ ns + "/tp_123");
+ final String subscribeName = "s1";
+ admin1.namespaces().createNamespace(ns);
+ admin2.namespaces().createNamespace(ns);
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin1.namespaces().setNamespaceReplicationClusters(ns,
+ new HashSet<>(Arrays.asList(cluster1, cluster2)), true);
+ waitReplicatorStarted(topicName);
+ admin1.namespaces().setSchemaCompatibilityStrategy(ns,
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
+ admin2.namespaces().setSchemaCompatibilityStrategy(ns,
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
+ admin1.namespaces().setIsAllowAutoUpdateSchemaAsync(ns, true, null);
+ admin2.namespaces().setIsAllowAutoUpdateSchemaAsync(ns,
isAllowAutoUpdateSchema, null);
+ RetentionPolicies retentionPolicies = new RetentionPolicies(10, 1);
+ admin1.namespaces().setRetention(ns, retentionPolicies);
+ admin2.namespaces().setRetention(ns, retentionPolicies);
+ PersistentTopic topic1 = (PersistentTopic) broker1.getTopic(topicName,
false).join().get();
+ PersistentTopic topic2 = (PersistentTopic) broker2.getTopic(topicName,
false).join().get();
+ Awaitility.await().untilAsserted(() -> {
+ HierarchyTopicPolicies policies1 =
topic1.getHierarchyTopicPolicies();
+ HierarchyTopicPolicies policies2 =
topic2.getHierarchyTopicPolicies();
+ assertEquals(policies1.getSchemaCompatibilityStrategy().get(),
+ SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
+ assertEquals(policies2.getSchemaCompatibilityStrategy().get(),
+ SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
+ assertTrue(topic1.isAllowAutoUpdateSchema);
+ assertTrue(topic1.isAllowAutoUpdateSchemaWithReplicator);
+ assertEquals(topic2.isAllowAutoUpdateSchema,
isAllowAutoUpdateSchema);
+ assertTrue(topic2.isAllowAutoUpdateSchemaWithReplicator);
+
assertEquals(policies1.getRetentionPolicies().get().getRetentionTimeInMinutes(),
10);
+
assertEquals(policies2.getRetentionPolicies().get().getRetentionTimeInMinutes(),
10);
+ });
+ // Build different schemas.
+ HashMap<String, String> schemaProps = new HashMap<>();
+ schemaProps.put("__jsr310ConversionEnabled", "false");
+ schemaProps.put("__alwaysAllowNull", "true");
+ SchemaInfoImpl schemaInfoV1 = new SchemaInfoImpl("", """
+ {
+ "type" : "record",
+ "name" : "Student",
+ "namespace" :
"org.apache.pulsar.broker.service.OneWayReplicatorTest",
+ "fields" : [ {
+ "name" : "age",
+ "type" : "int"
+ }]
+ }
+ """.getBytes(StandardCharsets.UTF_8), SchemaType.AVRO, 0, schemaProps);
+ SchemaInfoImpl schemaInfoV2 = new SchemaInfoImpl("", """
+ {
+ "type" : "record",
+ "name" : "Student",
+ "namespace" :
"org.apache.pulsar.broker.service.OneWayReplicatorTest",
+ "fields" : [ {
+ "name" : "age",
+ "type" : "int"
+ }, {
+ "name" : "name",
+ "type" : [ "null", "string" ],
+ "default" : null
+ } ]
+ }
+ """.getBytes(StandardCharsets.UTF_8), SchemaType.AVRO, 0, schemaProps);
+ admin1.schemas().createSchema(topicName, schemaInfoV1);
+ admin1.schemas().createSchema(topicName, schemaInfoV2);
+ long longSchemaVersion1 =
admin1.schemas().getVersionBySchemaAsync(topicName, schemaInfoV1)
+ .get(2, TimeUnit.SECONDS);
+ long longSchemaVersion2 =
admin1.schemas().getVersionBySchemaAsync(topicName, schemaInfoV2)
+ .get(2, TimeUnit.SECONDS);
+ LongSchemaVersion schemaVersion1 = new
LongSchemaVersion(longSchemaVersion1);
+ LongSchemaVersion schemaVersion2 = new
LongSchemaVersion(longSchemaVersion2);
+
+ // Publish messages with different schemas.
+ ProducerImpl<byte[]> producer1 =
+ (ProducerImpl<byte[]>)
client1.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
+ TypedMessageBuilderImpl typedMessageBuilder1 =
(TypedMessageBuilderImpl) producer1
+ .newMessage(Schema.AVRO(Customer.class)).value(new
Customer(null, 16));
+ MessageImpl message1 = (MessageImpl) typedMessageBuilder1.getMessage();
+ message1.getMessageBuilder().setSchemaVersion(schemaVersion1.bytes());
+ ClientImplInternalSetter.setMessageSchemaState(message1, "Ready");
+ producer1.send(message1);
+ Awaitility.await().untilAsserted(() -> {
+ TopicStats topicStats = admin1.topics().getStats(topicName);
+
assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(),
0);
+ });
+
+ // Change policies.
+ admin1.namespaces().setIsAllowAutoUpdateSchemaAsync(ns, true, null);
+ admin2.namespaces().setIsAllowAutoUpdateSchemaAsync(ns,
isAllowAutoUpdateSchema,
+ allowAutoUpdateSchemaWithReplicator);
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(topic1.isAllowAutoUpdateSchema);
+ assertTrue(topic1.isAllowAutoUpdateSchemaWithReplicator);
+ assertEquals(topic2.isAllowAutoUpdateSchema,
isAllowAutoUpdateSchema);
+ if (allowAutoUpdateSchemaWithReplicator != null &&
!allowAutoUpdateSchemaWithReplicator) {
+ assertFalse(topic2.isAllowAutoUpdateSchemaWithReplicator);
+ } else {
+ assertTrue(topic2.isAllowAutoUpdateSchemaWithReplicator);
+ }
+ });
+
+ TypedMessageBuilderImpl typedMessageBuilder2 =
(TypedMessageBuilderImpl) producer1
+ .newMessage(Schema.AVRO(Customer.class)).value(new
Customer("Apache", 26));
+ MessageImpl message2 = (MessageImpl) typedMessageBuilder2.getMessage();
+ message2.getMessageBuilder().setSchemaVersion(schemaVersion2.bytes());
+ ClientImplInternalSetter.setMessageSchemaState(message2, "Ready");
+ producer1.send(message2);
+ if (allowAutoUpdateSchemaWithReplicator != null &&
!allowAutoUpdateSchemaWithReplicator) {
+ Thread.sleep(3000);
+ // The message can not be replicated to the remote side.
+ TopicStats topicStats = admin1.topics().getStats(topicName);
+
assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(),
1);
+ producer1.close();
+ return;
+ }
+ Awaitility.await().untilAsserted(() -> {
+ TopicStats topicStats = admin1.topics().getStats(topicName);
+
assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(),
0);
+ });
+
+ // Verify: the messages were built successfully.
+ admin1.topics().createSubscription(topicName, subscribeName,
MessageId.earliest);
+ Consumer<Customer> consumer1 =
client1.newConsumer(Schema.AVRO(Customer.class))
+ .subscriptionName(subscribeName).topic(topicName).subscribe();
+ Message<Customer> msg1 = consumer1.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg1);
+ byte[] bytesVersion1 = msg1.getSchemaVersion();
+ assertEquals(ByteBuffer.wrap(bytesVersion1).getLong(),
longSchemaVersion1);
+ assertNull(msg1.getValue().getName());
+ assertEquals(msg1.getValue().getAge(), 16);
+ consumer1.acknowledge(msg1);
+ Message<Customer> msg2 = consumer1.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg2);
+ byte[] bytesVersion2 = msg2.getSchemaVersion();
+ assertEquals(ByteBuffer.wrap(bytesVersion2).getLong(),
longSchemaVersion2);
+ assertEquals(msg2.getValue().getName(), "Apache");
+ assertEquals(msg2.getValue().getAge(), 26);
+ consumer1.acknowledge(msg2);
+
+ admin2.topics().createSubscription(topicName, subscribeName,
MessageId.earliest);
+ Consumer<Customer> consumer2 =
client2.newConsumer(Schema.AVRO(Customer.class))
+ .subscriptionName(subscribeName).topic(topicName).subscribe();
+ Message<Customer> msg21 = consumer2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg21);
+ byte[] bytesVersion21 = msg21.getSchemaVersion();
+ assertEquals(ByteBuffer.wrap(bytesVersion21).getLong(), 0);
+ assertNull(msg21.getValue().getName());
+ assertEquals(msg21.getValue().getAge(), 16);
+ consumer2.acknowledge(msg21);
+ Message<Customer> msg22 = consumer2.receive(5, TimeUnit.SECONDS);
+ if (allowAutoUpdateSchemaWithReplicator != null &&
!allowAutoUpdateSchemaWithReplicator) {
+ assertNull(msg22);
+ } else {
+ assertNotNull(msg22);
+ byte[] bytesVersion22 = msg22.getSchemaVersion();
+ assertEquals(ByteBuffer.wrap(bytesVersion22).getLong(), 1);
+ assertEquals(msg22.getValue().getName(), "Apache");
+ assertEquals(msg22.getValue().getAge(), 26);
+ consumer2.acknowledge(msg22);
+ }
+
+ // cleanup.
+ consumer1.close();
+ consumer2.close();
+ producer1.close();
+ admin1.namespaces().setNamespaceReplicationClusters(ns,
+ new HashSet<>(Arrays.asList(cluster1)), true);
+ waitReplicatorStopped(pulsar1, pulsar2, topicName);
+ admin1.topics().delete(topicName);
+ admin2.topics().delete(topicName);
+ }
+
@Test
public void testDeleteRemoteTopicByGlobalPolicy() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_123");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index 336d17460f1..7f6124e62a7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -73,6 +73,7 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
config.setDefaultNumPartitions(1);
}
+ @Override
@Test(enabled = false)
public void testDeleteTopicWhenReplicating() throws Exception {
super.testDeleteTopicWhenReplicating();
@@ -84,6 +85,13 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
super.testReplicatorProducerStatInTopic();
}
+ @Override
+ @Test(enabled = false)
+ public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
+ Boolean
allowAutoUpdateSchemaWithReplicator) throws Exception {
+ super.testDeleteTopicWhenReplicating();
+ }
+
@Override
@Test(enabled = false)
public void testDeleteRemoteTopicByGlobalPolicy() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 2a9b0ac4896..de7e0cf0a3e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -100,6 +100,13 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
super.testReplicatorProducerStatInTopic();
}
+ @Override
+ @Test(enabled = false)
+ public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
+ Boolean
allowAutoUpdateSchemaWithReplicator) throws Exception {
+ super.testDeleteTopicWhenReplicating();
+ }
+
@Test(dataProvider = "isPartitioned")
public void testReplicatorCreateTopic(boolean isPartitioned) throws
Exception {
super.testReplicatorCreateTopic(isPartitioned);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientImplInternalSetter.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientImplInternalSetter.java
new file mode 100644
index 00000000000..bbbd4c26153
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientImplInternalSetter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+public class ClientImplInternalSetter {
+
+ public static void setMessageSchemaState(MessageImpl message, String
state) {
+ message.setSchemaState(MessageImpl.SchemaState.valueOf(state));
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 47ef8b5f4ce..c885e4a1941 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -332,7 +332,7 @@ public class NegativeAcksTest extends SharedPulsarBaseTest {
*/
@Test
public void testNegativeAcksWithBatch() throws Exception {
- admin.namespaces().setIsAllowAutoUpdateSchema(getNamespace(), true);
+ admin.namespaces().setIsAllowAutoUpdateSchema(getNamespace(), true,
true);
String topic = newTopicName();
@Cleanup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 7f31fa8a704..52c6628d37c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -347,7 +347,7 @@ public class SchemaCompatibilityCheckTest extends
MockedPulsarServiceBaseTest {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(),
schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn,
Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
-
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
+
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false,
true);
ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
@@ -359,7 +359,7 @@ public class SchemaCompatibilityCheckTest extends
MockedPulsarServiceBaseTest {
Assert.assertTrue(e.getMessage().contains("Schema not found and
schema auto updating is disabled."));
}
-
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), true);
+
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), true,
true);
ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder =
pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
@@ -382,7 +382,7 @@ public class SchemaCompatibilityCheckTest extends
MockedPulsarServiceBaseTest {
producer.close();
consumerTwo.close();
-
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
+
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false,
true);
producer = producerThreeBuilder.create();
consumerTwo = comsumerBuilder.subscribe();
@@ -427,7 +427,7 @@ public class SchemaCompatibilityCheckTest extends
MockedPulsarServiceBaseTest {
SchemaInfo schemaInfo =
SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
admin.schemas().createSchema(fqtn, schemaInfo);
-
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
+
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false,
true);
ProducerBuilder<Schemas.PersonOne> producerOneBuilder = pulsarClient
.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(fqtn);
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 85a8d1d744e..cc3f5a92964 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -4105,6 +4105,7 @@ public interface Namespaces {
*
* @param namespace pulsar namespace name
* @param isAllowAutoUpdateSchema flag to enable or disable auto update
schema
+ * @param allowAutoUpdateSchemaWithReplicator whether replicator can auto
update schema
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
@@ -4112,19 +4113,20 @@ public interface Namespaces {
* @throws PulsarAdminException
* Unexpected error
*/
- void setIsAllowAutoUpdateSchema(String namespace, boolean
isAllowAutoUpdateSchema)
+ void setIsAllowAutoUpdateSchema(String namespace, boolean
isAllowAutoUpdateSchema,
+ Boolean
allowAutoUpdateSchemaWithReplicator)
throws PulsarAdminException;
+
/**
- * Set whether to allow automatic schema updates asynchronously.
- * <p/>
- * The flag is when producer bring a new schema and the schema pass
compatibility check
- * whether allow schema auto registered
+ * Set whether to allow automatic schema updates and replicator schema
updates in one call asynchronously.
*
* @param namespace pulsar namespace name
* @param isAllowAutoUpdateSchema flag to enable or disable auto update
schema
+ * @param allowAutoUpdateSchemaWithReplicator whether replicator can auto
update schema
*/
- CompletableFuture<Void> setIsAllowAutoUpdateSchemaAsync(String namespace,
boolean isAllowAutoUpdateSchema);
+ CompletableFuture<Void> setIsAllowAutoUpdateSchemaAsync(String namespace,
boolean isAllowAutoUpdateSchema,
+ Boolean
allowAutoUpdateSchemaWithReplicator);
/**
* Set the offload configuration for all the topics in a namespace.
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 99c39dec417..db0aa0620d5 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -118,6 +118,9 @@ public class Policies {
@SuppressWarnings("checkstyle:MemberName")
public Boolean is_allow_auto_update_schema = null;
+ @SuppressWarnings("checkstyle:MemberName")
+ public Boolean is_allow_auto_update_schema_with_replicator = true;
+
@SuppressWarnings("checkstyle:MemberName")
public boolean schema_validation_enforced = false;
@@ -169,6 +172,7 @@ public class Policies {
schema_validation_enforced,
schema_compatibility_strategy,
is_allow_auto_update_schema,
+ is_allow_auto_update_schema_with_replicator,
offload_policies,
subscription_types_enabled,
allowed_topic_property_keys_for_metrics,
@@ -218,6 +222,8 @@ public class Policies {
&& schema_validation_enforced ==
other.schema_validation_enforced
&& schema_compatibility_strategy ==
other.schema_compatibility_strategy
&& is_allow_auto_update_schema ==
other.is_allow_auto_update_schema
+ &&
Objects.equals(is_allow_auto_update_schema_with_replicator,
+ other.is_allow_auto_update_schema_with_replicator)
&& Objects.equals(offload_policies, other.offload_policies)
&& Objects.equals(subscription_types_enabled,
other.subscription_types_enabled)
&& Objects.equals(allowed_topic_property_keys_for_metrics,
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index c62ecd0b205..c77e515f990 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -1783,15 +1783,21 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
}
@Override
- public void setIsAllowAutoUpdateSchema(String namespace, boolean
isAllowAutoUpdateSchema)
+ public void setIsAllowAutoUpdateSchema(String namespace, boolean
isAllowAutoUpdateSchema,
+ Boolean
allowAutoUpdateSchemaWithReplicator)
throws PulsarAdminException {
- sync(() -> setIsAllowAutoUpdateSchemaAsync(namespace,
isAllowAutoUpdateSchema));
+ sync(() -> setIsAllowAutoUpdateSchemaAsync(namespace,
isAllowAutoUpdateSchema,
+ allowAutoUpdateSchemaWithReplicator));
}
@Override
- public CompletableFuture<Void> setIsAllowAutoUpdateSchemaAsync(String
namespace, boolean isAllowAutoUpdateSchema) {
+ public CompletableFuture<Void> setIsAllowAutoUpdateSchemaAsync(
+ String namespace, boolean isAllowAutoUpdateSchema, Boolean
allowAutoUpdateSchemaWithReplicator) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "isAllowAutoUpdateSchema");
+ if (allowAutoUpdateSchemaWithReplicator != null) {
+ path = path.queryParam("allowAutoUpdateSchemaWithReplicator",
allowAutoUpdateSchemaWithReplicator);
+ }
return asyncPostRequest(path, Entity.entity(isAllowAutoUpdateSchema,
MediaType.APPLICATION_JSON));
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 1030ef0c99b..abcd47fab35 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -2163,6 +2163,12 @@ public class CmdNamespaces extends CmdBase {
@Option(names = { "--disable", "-d" }, description = "Disable schema
validation enforced")
private boolean disable = false;
+ @Option(names = { "--enable-for-replicator" },
+ description = "By default, brokers always allow replicator to
register new compatible schemas even"
+ + " when auto updates are disabled, if you want to disable
replicators to register compatible schemas,"
+ + " please set it to false")
+ private Boolean enableForReplicator;
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(namespaceName);
@@ -2170,7 +2176,10 @@ public class CmdNamespaces extends CmdBase {
if (enable == disable) {
throw new ParameterException("Need to specify either --enable
or --disable");
}
- getAdmin().namespaces().setIsAllowAutoUpdateSchema(namespace,
enable);
+ if (enable && enableForReplicator != null && !enableForReplicator)
{
+ throw new ParameterException("Can not enable for all producers
but denies for replicators");
+ }
+ getAdmin().namespaces().setIsAllowAutoUpdateSchema(namespace,
enable, enableForReplicator);
}
}
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdNamespaces.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdNamespaces.java
index cc506e6fe93..80de891952b 100644
---
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdNamespaces.java
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdNamespaces.java
@@ -50,4 +50,30 @@ public class TestCmdNamespaces {
verify(namespaces, times(1)).setRetention("public/default",
new RetentionPolicies(200 * 24 * 60, 2 * 1024 * 1024));
}
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testSetIsAllowAutoUpdateSchemaCmd() throws Exception {
+ Namespaces namespaces = mock(Namespaces.class);
+ PulsarAdmin admin = mock(PulsarAdmin.class);
+ when(admin.namespaces()).thenReturn(namespaces);
+
+ CmdNamespaces cmd = new CmdNamespaces(() -> admin);
+
+ cmd.run("set-is-allow-auto-update-schema public/default --disable"
+ .split("\\s+"));
+ verify(namespaces,
times(1)).setIsAllowAutoUpdateSchema("public/default", false, null);
+
+ cmd.run("set-is-allow-auto-update-schema public/default --enable"
+ .split("\\s+"));
+ verify(namespaces,
times(1)).setIsAllowAutoUpdateSchema("public/default", true, null);
+
+ cmd.run("set-is-allow-auto-update-schema public/default --disable
--enable-for-replicator"
+ .split("\\s+"));
+ verify(namespaces,
times(1)).setIsAllowAutoUpdateSchema("public/default", false, Boolean.TRUE);
+
+ cmd.run("set-is-allow-auto-update-schema public/default --enable
--enable-for-replicator"
+ .split("\\s+"));
+ verify(namespaces,
times(1)).setIsAllowAutoUpdateSchema("public/default", true, Boolean.TRUE);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index d83bceb9d51..cf5ba97f436 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -977,7 +977,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
cnx.getRemoteEndpointProtocolVersion(),
producerName, topic)));
}
long requestId = client.newRequestId();
- ByteBuf request = Commands.newGetOrCreateSchema(requestId, topic,
schemaInfo);
+ ByteBuf request = Commands.newGetOrCreateSchema(requestId, topic,
producerName, schemaInfo);
log.info("GetOrCreateSchema request");
return cnx.sendGetOrCreateSchema(request, requestId);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 523f47b4805..02e03a491d0 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1372,10 +1372,12 @@ public class Commands {
return serializeWithSize(newGetSchemaResponseErrorCommand(requestId,
error, errorMessage));
}
- public static ByteBuf newGetOrCreateSchema(long requestId, String topic,
SchemaInfo schemaInfo) {
+ public static ByteBuf newGetOrCreateSchema(long requestId, String topic,
String producerName,
+ SchemaInfo schemaInfo) {
BaseCommand cmd = localCmd(Type.GET_OR_CREATE_SCHEMA);
Schema schema = cmd.setGetOrCreateSchema()
.setRequestId(requestId)
+ .setProducerName(producerName)
.setTopic(topic)
.setSchema();
convertSchema(schemaInfo, schema);
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 195fc597ecc..20bd8ab8185 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -957,9 +957,10 @@ message CommandGetSchemaResponse {
}
message CommandGetOrCreateSchema {
- required uint64 request_id = 1;
- required string topic = 2;
- required Schema schema = 3;
+ required uint64 request_id = 1;
+ required string topic = 2;
+ required Schema schema = 3;
+ optional string producerName = 4;
}
message CommandGetOrCreateSchemaResponse {