This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fc27fe78f9e [fix][broker]Producer with AUTO_PRODUCE schema failed to 
reconnect, which caused by schema incompatible (#25437)
fc27fe78f9e is described below

commit fc27fe78f9e73c9667d78556a43f81805f661c38
Author: fengyubiao <[email protected]>
AuthorDate: Wed Apr 1 10:10:00 2026 +0800

    [fix][broker]Producer with AUTO_PRODUCE schema failed to reconnect, which 
caused by schema incompatible (#25437)
---
 ...eWayReplicatorSchemaValidationEnforcedTest.java | 24 +++++++++++++++--
 .../apache/pulsar/client/impl/ProducerImpl.java    | 11 ++++++++
 .../pulsar/client/impl/PulsarClientImpl.java       | 31 ++++++++++++----------
 3 files changed, 50 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java
index 2dd94bb1cd1..cc5a957793c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java
@@ -24,7 +24,6 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
@@ -79,7 +78,7 @@ public class OneWayReplicatorSchemaValidationEnforcedTest 
extends OneWayReplicat
         admin2.schemas().createSchema(topicName, 
myClassSchema.getSchemaInfo());
 
         // consume from the remote cluster (r2)
-        Consumer<MyClass> consumer2 = client2.newConsumer(myClassSchema)
+        org.apache.pulsar.client.api.Consumer<MyClass> consumer2 = 
client2.newConsumer(myClassSchema)
                 .topic(topicName).subscriptionName("sub").subscribe();
 
         // produce to local cluster (r1)
@@ -100,4 +99,25 @@ public class OneWayReplicatorSchemaValidationEnforcedTest 
extends OneWayReplicat
         });
     }
 
+    @Test(timeOut = 30000)
+    public void testReplicationReconnectWithSchemaValidationEnforced() throws 
Exception {
+        Schema<MyClass> myClassSchema = Schema.AVRO(MyClass.class);
+        final String topicName =
+                BrokerTestUtil.newUniqueName("persistent://" + 
sourceClusterAlwaysSchemaCompatibleNamespace + "/tp_");
+        // With the creation of the topic, replication will be initiated.
+        // The schema that the internal producer of replication holes will be 
"Auto_producer -> bytes".
+        admin1.topics().createNonPartitionedTopic(topicName);
+        waitReplicatorStarted(topicName);
+        // Registers new scheme on the remote-side.
+        admin2.schemas().createSchema(topicName, 
myClassSchema.getSchemaInfo());
+        // After a reconnection, the schema that the internal producer of 
replication holes should be
+        // "Auto_producer -> myClassSchema".
+        ServerCnx serverCnx2 = (ServerCnx) 
pulsar2.getBrokerService().getTopic(topicName, false).get().get()
+                .getProducers().values().iterator().next().getCnx();
+        serverCnx2.ctx().channel().close();
+        // Verify: the producer reconnected successfully.
+        Thread.sleep(2000);
+        waitReplicatorStarted(topicName);
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 8b7fec005b8..89f1c81cadd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -86,6 +86,7 @@ import 
org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
 import org.apache.pulsar.client.impl.metrics.Unit;
 import org.apache.pulsar.client.impl.metrics.UpDownCounter;
+import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.SchemaUtils;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -2021,6 +2022,16 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 return null;
             }
 
+            if (cause instanceof 
PulsarClientException.IncompatibleSchemaException
+                    && schema instanceof AutoProduceBytesSchema 
autoProduceBytesSchema
+                    && !autoProduceBytesSchema.hasUserProvidedSchema()) {
+                client.reloadSchemaForAutoProduceProducer(topic, 
autoProduceBytesSchema)
+                    .whenComplete((__, throwable) -> {
+                        future.completeExceptionally(cause);
+                    });
+                return null;
+            }
+
             if (cause instanceof TimeoutException) {
                 // Creating the producer has timed out. We need to ensure the 
broker closes the producer
                 // in case it was indeed created, otherwise it might prevent 
new create producer operation,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index f15dc8284ab..c7a43a912a5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -421,26 +421,29 @@ public class PulsarClientImpl implements PulsarClient {
             if (autoProduceBytesSchema.hasUserProvidedSchema()) {
                 return createProducerAsync(topic, conf, schema, interceptors);
             }
-            return lookup.getSchema(TopicName.get(conf.getTopicName()))
-                    .thenCompose(schemaInfoOptional -> {
-                        if (schemaInfoOptional.isPresent()) {
-                            SchemaInfo schemaInfo = schemaInfoOptional.get();
-                            if (schemaInfo.getType() == SchemaType.PROTOBUF) {
-                                autoProduceBytesSchema.setSchema(new 
GenericAvroSchema(schemaInfo));
-                            } else {
-                                
autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfo));
-                            }
-                        } else {
-                            autoProduceBytesSchema.setSchema(Schema.BYTES);
-                        }
-                        return createProducerAsync(topic, conf, schema, 
interceptors);
-                    });
+            return reloadSchemaForAutoProduceProducer(topic, 
autoProduceBytesSchema)
+                    .thenCompose(schemaInfoOptional -> 
createProducerAsync(topic, conf, schema, interceptors));
         } else {
             return createProducerAsync(topic, conf, schema, interceptors);
         }
 
     }
 
+    public CompletableFuture<Void> reloadSchemaForAutoProduceProducer(String 
topic, AutoProduceBytesSchema autoSchema) {
+        return 
lookup.getSchema(TopicName.get(topic)).thenAccept(schemaInfoOptional -> {
+            if (schemaInfoOptional.isPresent()) {
+                SchemaInfo schemaInfo = schemaInfoOptional.get();
+                if (schemaInfo.getType() == SchemaType.PROTOBUF) {
+                    autoSchema.setSchema(new GenericAvroSchema(schemaInfo));
+                } else {
+                    autoSchema.setSchema(Schema.getSchema(schemaInfo));
+                }
+            } else {
+                autoSchema.setSchema(Schema.BYTES);
+            }
+        });
+    }
+
     private CompletableFuture<Integer> checkPartitions(String topic, boolean 
forceNoPartitioned,
                                                        @Nullable String 
producerNameForLog) {
         CompletableFuture<Integer> checkPartitions = new CompletableFuture<>();

Reply via email to