This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new b54302c1849 [fix][broker]Producer with AUTO_PRODUCE schema failed to
reconnect, which caused by schema incompatible (#25437)
b54302c1849 is described below
commit b54302c18494f7ecc042da08867077251a804afc
Author: fengyubiao <[email protected]>
AuthorDate: Wed Apr 1 10:58:23 2026 +0800
[fix][broker]Producer with AUTO_PRODUCE schema failed to reconnect, which
caused by schema incompatible (#25437)
(cherry picked from commit fc27fe78f9e73c9667d78556a43f81805f661c38)
---
...eWayReplicatorSchemaValidationEnforcedTest.java | 25 +++++++++++++++--
.../apache/pulsar/client/impl/ProducerImpl.java | 11 ++++++++
.../pulsar/client/impl/PulsarClientImpl.java | 31 ++++++++++++----------
3 files changed, 51 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java
index 2dd94bb1cd1..ed8f8ac479d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java
@@ -24,7 +24,6 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
@@ -79,7 +78,7 @@ public class OneWayReplicatorSchemaValidationEnforcedTest
extends OneWayReplicat
admin2.schemas().createSchema(topicName,
myClassSchema.getSchemaInfo());
// consume from the remote cluster (r2)
- Consumer<MyClass> consumer2 = client2.newConsumer(myClassSchema)
+ org.apache.pulsar.client.api.Consumer<MyClass> consumer2 =
client2.newConsumer(myClassSchema)
.topic(topicName).subscriptionName("sub").subscribe();
// produce to local cluster (r1)
@@ -100,4 +99,26 @@ public class OneWayReplicatorSchemaValidationEnforcedTest
extends OneWayReplicat
});
}
+ @Test(timeOut = 30000)
+ public void testReplicationReconnectWithSchemaValidationEnforced() throws
Exception {
+ Schema<MyClass> myClassSchema = Schema.AVRO(MyClass.class);
+ final String topicName =
+ BrokerTestUtil.newUniqueName("persistent://" +
sourceClusterAlwaysSchemaCompatibleNamespace + "/tp_");
+ // With the creation of the topic, replication will be initiated.
+ // The schema that the internal producer of replication holes will be
"Auto_producer -> bytes".
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin2.topics().createNonPartitionedTopic(topicName);
+ waitReplicatorStarted(topicName);
+ // Registers new scheme on the remote-side.
+ admin2.schemas().createSchema(topicName,
myClassSchema.getSchemaInfo());
+ // After a reconnection, the schema that the internal producer of
replication holes should be
+ // "Auto_producer -> myClassSchema".
+ ServerCnx serverCnx2 = (ServerCnx)
pulsar2.getBrokerService().getTopic(topicName, false).get().get()
+ .getProducers().values().iterator().next().getCnx();
+ serverCnx2.ctx().channel().close();
+ // Verify: the producer reconnected successfully.
+ Thread.sleep(2000);
+ waitReplicatorStarted(topicName);
+ }
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 4fd01387ddc..1622a6cd8a4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -85,6 +85,7 @@ import
org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.client.impl.metrics.Unit;
import org.apache.pulsar.client.impl.metrics.UpDownCounter;
+import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -2012,6 +2013,16 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
return null;
}
+ if (cause instanceof
PulsarClientException.IncompatibleSchemaException
+ && schema instanceof AutoProduceBytesSchema
autoProduceBytesSchema
+ && !autoProduceBytesSchema.hasUserProvidedSchema()) {
+ client.reloadSchemaForAutoProduceProducer(topic,
autoProduceBytesSchema)
+ .whenComplete((__, throwable) -> {
+ future.completeExceptionally(cause);
+ });
+ return null;
+ }
+
if (cause instanceof TimeoutException) {
// Creating the producer has timed out. We need to ensure the
broker closes the producer
// in case it was indeed created, otherwise it might prevent
new create producer operation,
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 85077c22c4c..d8b27e7264b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -428,26 +428,29 @@ public class PulsarClientImpl implements PulsarClient {
if (autoProduceBytesSchema.hasUserProvidedSchema()) {
return createProducerAsync(topic, conf, schema, interceptors);
}
- return lookup.getSchema(TopicName.get(conf.getTopicName()))
- .thenCompose(schemaInfoOptional -> {
- if (schemaInfoOptional.isPresent()) {
- SchemaInfo schemaInfo = schemaInfoOptional.get();
- if (schemaInfo.getType() == SchemaType.PROTOBUF) {
- autoProduceBytesSchema.setSchema(new
GenericAvroSchema(schemaInfo));
- } else {
-
autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfo));
- }
- } else {
- autoProduceBytesSchema.setSchema(Schema.BYTES);
- }
- return createProducerAsync(topic, conf, schema,
interceptors);
- });
+ return reloadSchemaForAutoProduceProducer(topic,
autoProduceBytesSchema)
+ .thenCompose(schemaInfoOptional ->
createProducerAsync(topic, conf, schema, interceptors));
} else {
return createProducerAsync(topic, conf, schema, interceptors);
}
}
+ public CompletableFuture<Void> reloadSchemaForAutoProduceProducer(String
topic, AutoProduceBytesSchema autoSchema) {
+ return
lookup.getSchema(TopicName.get(topic)).thenAccept(schemaInfoOptional -> {
+ if (schemaInfoOptional.isPresent()) {
+ SchemaInfo schemaInfo = schemaInfoOptional.get();
+ if (schemaInfo.getType() == SchemaType.PROTOBUF) {
+ autoSchema.setSchema(new GenericAvroSchema(schemaInfo));
+ } else {
+ autoSchema.setSchema(Schema.getSchema(schemaInfo));
+ }
+ } else {
+ autoSchema.setSchema(Schema.BYTES);
+ }
+ });
+ }
+
private CompletableFuture<Integer> checkPartitions(String topic, boolean
forceNoPartitioned,
@Nullable String
producerNameForLog) {
CompletableFuture<Integer> checkPartitions = new CompletableFuture<>();