This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 08d89a2ea1a [improve][broker] PIP-464: Strict Avro schema validation 
for SchemaType.JSON (#25362)
08d89a2ea1a is described below

commit 08d89a2ea1a2a566197f5db004881c4e222bb3ac
Author: Penghui Li <[email protected]>
AuthorDate: Mon Mar 23 09:10:00 2026 -0700

    [improve][broker] PIP-464: Strict Avro schema validation for 
SchemaType.JSON (#25362)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  10 +
 .../schema/JsonSchemaCompatibilityCheck.java       |  29 ++-
 .../service/schema/SchemaRegistryService.java      |  13 +-
 .../schema/validator/SchemaDataValidator.java      |  20 +-
 ...hemaRegistryServiceWithSchemaDataValidator.java |  18 +-
 .../validator/StructSchemaDataValidator.java       |  33 ++--
 .../admin/AdminApiSchemaJsonValidationTest.java    | 217 +++++++++++++++++++++
 .../schema/JsonSchemaCompatibilityCheckTest.java   |  66 ++++++-
 .../schema/validator/SchemaDataValidatorTest.java  |  99 ++++++++++
 ...RegistryServiceWithSchemaDataValidatorTest.java |  72 +++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |   3 +
 11 files changed, 548 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 10d1a089c8b..0931aa64114 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3423,6 +3423,16 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private SchemaCompatibilityStrategy schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.FULL;
 
+    @FieldContext(
+        category = CATEGORY_SCHEMA,
+        doc = "Whether to allow legacy Jackson JsonSchema format for 
SchemaType.JSON schema definitions. "
+            + "When false (default), only valid Apache Avro schema format is 
accepted for SchemaType.JSON, "
+            + "consistent with what the consumer side requires. When true, the 
pre-2.1 backward-compatible "
+            + "behavior is preserved for deployments that still have topics 
with legacy-format schemas. "
+            + "See PIP-464 for details."
+    )
+    private boolean schemaJsonAllowLegacyJacksonFormat = false;
+
     /**** --- WebSocket. --- ****/
     @FieldContext(
         category = CATEGORY_WEBSOCKET,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
index 94afdd14620..dc8be3e0651 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
@@ -36,11 +36,21 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 @SuppressWarnings("unused")
 public class JsonSchemaCompatibilityCheck extends 
AvroSchemaBasedCompatibilityCheck {
 
+    private volatile boolean allowLegacyJacksonFormat = false;
+
     @Override
     public SchemaType getSchemaType() {
         return SchemaType.JSON;
     }
 
+    /**
+     * Set whether to allow legacy Jackson JsonSchema format for backward 
compatibility.
+     * When false (default), only valid Avro schema format is accepted 
(PIP-464).
+     */
+    public void setAllowLegacyJacksonFormat(boolean allowLegacyJacksonFormat) {
+        this.allowLegacyJacksonFormat = allowLegacyJacksonFormat;
+    }
+
     @Override
     public void checkCompatible(SchemaData from, SchemaData to, 
SchemaCompatibilityStrategy strategy)
             throws IncompatibleSchemaException {
@@ -48,14 +58,14 @@ public class JsonSchemaCompatibilityCheck extends 
AvroSchemaBasedCompatibilityCh
             if (isAvroSchema(to)) {
                 // if both producer and broker have the schema in avro format
                 super.checkCompatible(from, to, strategy);
-            } else if (isJsonSchema(to)) {
+            } else if (allowLegacyJacksonFormat && isJsonSchema(to)) {
                 // if broker have the schema in avro format but producer sent 
a schema in the old json format
-                // allow old schema format for backwards compatibility
+                // allow old schema format for backwards compatibility (only 
when legacy format is enabled)
             } else {
-                // unknown schema format
-                throw new IncompatibleSchemaException("Unknown schema format");
+                throw new IncompatibleSchemaException(
+                        "Incompatible schema: expected Avro schema format for 
SchemaType.JSON");
             }
-        } else if (isJsonSchema(from)){
+        } else if (allowLegacyJacksonFormat && isJsonSchema(from)) {
 
             if (isAvroSchema(to)) {
                 // if broker have the schema in old json format but producer 
sent a schema in the avro format
@@ -64,9 +74,14 @@ public class JsonSchemaCompatibilityCheck extends 
AvroSchemaBasedCompatibilityCh
                 // if both producer and broker have the schema in old json 
format
                 isCompatibleJsonSchema(from, to);
             } else {
-                // unknown schema format
-                throw new IncompatibleSchemaException("Unknown schema format");
+                throw new IncompatibleSchemaException(
+                        "Incompatible schema: expected Avro schema format for 
SchemaType.JSON");
             }
+        } else if (!allowLegacyJacksonFormat && !isAvroSchema(from)) {
+            // When legacy format is disabled, the existing schema must be 
valid Avro.
+            // If it's not, this is a defense-in-depth rejection (PIP-464).
+            throw new IncompatibleSchemaException(
+                    "Incompatible schema: existing schema is not in valid Avro 
format for SchemaType.JSON");
         } else {
             // broker has schema format with unknown format
             // maybe corrupted?
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index 2a2467d3947..aa6ac6580f2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -49,8 +49,19 @@ public interface SchemaRegistryService extends 
SchemaRegistry {
             try {
                 Map<SchemaType, SchemaCompatibilityCheck> checkers = 
getCheckers(schemaRegistryCompatibilityCheckers);
                 checkers.put(SchemaType.KEY_VALUE, new 
KeyValueSchemaCompatibilityCheck(checkers));
+
+                // PIP-464: propagate schemaJsonAllowLegacyJacksonFormat to 
JsonSchemaCompatibilityCheck
+                boolean allowLegacyJacksonFormat =
+                        
pulsarService.getConfiguration().isSchemaJsonAllowLegacyJacksonFormat();
+                SchemaCompatibilityCheck jsonCheck = 
checkers.get(SchemaType.JSON);
+                if (jsonCheck instanceof JsonSchemaCompatibilityCheck) {
+                    ((JsonSchemaCompatibilityCheck) jsonCheck)
+                            
.setAllowLegacyJacksonFormat(allowLegacyJacksonFormat);
+                }
+
                 return SchemaRegistryServiceWithSchemaDataValidator.of(
-                        new SchemaRegistryServiceImpl(schemaStorage, checkers, 
pulsarService));
+                        new SchemaRegistryServiceImpl(schemaStorage, checkers, 
pulsarService),
+                        allowLegacyJacksonFormat);
             } catch (Exception e) {
                 LOG.warn("Unable to create schema registry storage, defaulting 
to empty storage", e);
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
index a26cc4434b2..b872016ee7c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
@@ -34,16 +34,30 @@ public interface SchemaDataValidator {
 
     /**
      * Validate if the schema data is well formed.
+     * Uses strict Avro-only validation for SchemaType.JSON (no legacy Jackson 
fallback).
      *
      * @param schemaData schema data to validate
      * @throws InvalidSchemaDataException if the schema data is not in a valid 
form.
      */
     static void validateSchemaData(SchemaData schemaData) throws 
InvalidSchemaDataException {
+        validateSchemaData(schemaData, false);
+    }
+
+    /**
+     * Validate if the schema data is well formed.
+     *
+     * @param schemaData schema data to validate
+     * @param allowLegacyJacksonFormat if true, allows legacy Jackson 
JsonSchema format for SchemaType.JSON
+     *                                  for backward compatibility with 
pre-2.1 schemas (PIP-464)
+     * @throws InvalidSchemaDataException if the schema data is not in a valid 
form.
+     */
+    static void validateSchemaData(SchemaData schemaData,
+                                   boolean allowLegacyJacksonFormat) throws 
InvalidSchemaDataException {
         switch (schemaData.getType()) {
             case AVRO:
             case JSON:
             case PROTOBUF:
-                StructSchemaDataValidator.of().validate(schemaData);
+                
StructSchemaDataValidator.of(allowLegacyJacksonFormat).validate(schemaData);
                 break;
             case PROTOBUF_NATIVE:
                 ProtobufNativeSchemaDataValidator.of().validate(schemaData);
@@ -80,8 +94,8 @@ public interface SchemaDataValidator {
             case KEY_VALUE:
                 KeyValue<SchemaData, SchemaData> kvSchema =
                     
KeyValueSchemaCompatibilityCheck.decodeKeyValueSchemaData(schemaData);
-                validateSchemaData(kvSchema.getKey());
-                validateSchemaData(kvSchema.getValue());
+                validateSchemaData(kvSchema.getKey(), 
allowLegacyJacksonFormat);
+                validateSchemaData(kvSchema.getValue(), 
allowLegacyJacksonFormat);
                 break;
             default:
                 throw new InvalidSchemaDataException("Unknown schema type : " 
+ schemaData.getType());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
index b3032d5498f..980993dba6b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
@@ -33,13 +33,21 @@ import org.apache.pulsar.common.util.FutureUtil;
 public class SchemaRegistryServiceWithSchemaDataValidator implements 
SchemaRegistryService {
 
     public static SchemaRegistryServiceWithSchemaDataValidator 
of(SchemaRegistryService service) {
-        return new SchemaRegistryServiceWithSchemaDataValidator(service);
+        return new SchemaRegistryServiceWithSchemaDataValidator(service, 
false);
+    }
+
+    public static SchemaRegistryServiceWithSchemaDataValidator 
of(SchemaRegistryService service,
+                                                                   boolean 
allowLegacyJacksonFormat) {
+        return new SchemaRegistryServiceWithSchemaDataValidator(service, 
allowLegacyJacksonFormat);
     }
 
     private final SchemaRegistryService service;
+    private final boolean allowLegacyJacksonFormat;
 
-    private SchemaRegistryServiceWithSchemaDataValidator(SchemaRegistryService 
service) {
+    private SchemaRegistryServiceWithSchemaDataValidator(SchemaRegistryService 
service,
+                                                         boolean 
allowLegacyJacksonFormat) {
         this.service = service;
+        this.allowLegacyJacksonFormat = allowLegacyJacksonFormat;
     }
 
     @Override
@@ -89,7 +97,7 @@ public class SchemaRegistryServiceWithSchemaDataValidator 
implements SchemaRegis
                                                               SchemaData 
schema,
                                                               
SchemaCompatibilityStrategy strategy) {
         try {
-            SchemaDataValidator.validateSchemaData(schema);
+            SchemaDataValidator.validateSchemaData(schema, 
allowLegacyJacksonFormat);
         } catch (InvalidSchemaDataException e) {
             return FutureUtil.failedFuture(e);
         }
@@ -115,7 +123,7 @@ public class SchemaRegistryServiceWithSchemaDataValidator 
implements SchemaRegis
     public CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData 
schema,
                                                    SchemaCompatibilityStrategy 
strategy) {
         try {
-            SchemaDataValidator.validateSchemaData(schema);
+            SchemaDataValidator.validateSchemaData(schema, 
allowLegacyJacksonFormat);
         } catch (InvalidSchemaDataException e) {
             return FutureUtil.failedFuture(e);
         }
@@ -127,7 +135,7 @@ public class SchemaRegistryServiceWithSchemaDataValidator 
implements SchemaRegis
                                                    SchemaData schema,
                                                    SchemaCompatibilityStrategy 
strategy) {
         try {
-            SchemaDataValidator.validateSchemaData(schema);
+            SchemaDataValidator.validateSchemaData(schema, 
allowLegacyJacksonFormat);
         } catch (InvalidSchemaDataException e) {
             return FutureUtil.failedFuture(e);
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
index 8202c5720c2..7417e06c2d4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import java.io.IOException;
 import org.apache.avro.NameValidator;
 import org.apache.avro.Schema;
-import org.apache.avro.SchemaParseException;
 import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -39,10 +38,21 @@ public class StructSchemaDataValidator implements 
SchemaDataValidator {
         return INSTANCE;
     }
 
-    private static final StructSchemaDataValidator INSTANCE = new 
StructSchemaDataValidator();
+    public static StructSchemaDataValidator of(boolean 
allowLegacyJacksonFormat) {
+        return allowLegacyJacksonFormat ? LEGACY_INSTANCE : INSTANCE;
+    }
+
+    // Default instance: strict Avro-only validation for SchemaType.JSON 
(PIP-464)
+    private static final StructSchemaDataValidator INSTANCE = new 
StructSchemaDataValidator(false);
+    // Legacy instance: allows Jackson JsonSchema fallback for backward 
compatibility
+    private static final StructSchemaDataValidator LEGACY_INSTANCE = new 
StructSchemaDataValidator(true);
     public static final NameValidator COMPATIBLE_NAME_VALIDATOR = new 
CompatibleNameValidator();
 
-    private StructSchemaDataValidator() {}
+    private final boolean allowLegacyJacksonFormat;
+
+    private StructSchemaDataValidator(boolean allowLegacyJacksonFormat) {
+        this.allowLegacyJacksonFormat = allowLegacyJacksonFormat;
+    }
 
     private static final ObjectReader JSON_SCHEMA_READER =
             ObjectMapperFactory.getMapper().reader().forType(JsonSchema.class);
@@ -57,11 +67,14 @@ public class StructSchemaDataValidator implements 
SchemaDataValidator {
             if (SchemaType.AVRO.equals(schemaData.getType())) {
                 checkAvroSchemaTypeSupported(schema);
             }
-        } catch (SchemaParseException e) {
-            if (schemaData.getType() == SchemaType.JSON) {
-                // we used JsonSchema for storing the definition of a JSON 
schema
-                // hence for backward compatibility consideration, we need to 
try
-                // to use JsonSchema to decode the schema data
+        } catch (InvalidSchemaDataException invalidSchemaDataException) {
+            throw invalidSchemaDataException;
+        } catch (Exception e) {
+            // Avro 1.12.0 may throw NullPointerException (not 
SchemaParseException) for
+            // non-Avro schemas, so the legacy fallback must be in the general 
catch block.
+            if (schemaData.getType() == SchemaType.JSON && 
allowLegacyJacksonFormat) {
+                // For backward compatibility with pre-2.1 schemas: try 
Jackson JsonSchema parsing.
+                // This fallback is only enabled when 
schemaJsonAllowLegacyJacksonFormat=true (PIP-464).
                 try {
                     JSON_SCHEMA_READER.readValue(data);
                 } catch (IOException ioe) {
@@ -70,10 +83,6 @@ public class StructSchemaDataValidator implements 
SchemaDataValidator {
             } else {
                 throwInvalidSchemaDataException(schemaData, e);
             }
-        } catch (InvalidSchemaDataException invalidSchemaDataException) {
-            throw invalidSchemaDataException;
-        } catch (Exception e) {
-            throwInvalidSchemaDataException(schemaData, e);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaJsonValidationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaJsonValidationTest.java
new file mode 100644
index 00000000000..880cdae3d74
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaJsonValidationTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+import java.util.HashMap;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for PIP-464: Strict Avro schema validation for 
SchemaType.JSON.
+ * Tests that the broker correctly accepts/rejects schema definitions based on
+ * the schemaJsonAllowLegacyJacksonFormat configuration.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaJsonValidationTest extends 
MockedPulsarServiceBaseTest {
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+        admin.clusters().createCluster("test",
+                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", 
"role2"), Set.of("test"));
+        admin.tenants().createTenant("schema-json-validation", tenantInfo);
+        admin.namespaces().createNamespace("schema-json-validation/test-ns", 
Set.of("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testAvroFormatJsonSchemaAccepted() throws Exception {
+        // Valid Avro schema for SchemaType.JSON should always be accepted
+        String topicName = 
"persistent://schema-json-validation/test-ns/avro-format-accepted";
+        String avroSchema = 
"{\"type\":\"record\",\"name\":\"TestRecord\",\"namespace\":\"org.example\","
+                + "\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},"
+                + "{\"name\":\"field2\",\"type\":\"int\"}]}";
+
+        PostSchemaPayload payload = new PostSchemaPayload("JSON", avroSchema, 
new HashMap<>());
+        admin.schemas().createSchema(topicName, payload);
+
+        // Verify the schema was stored
+        assertNotNull(admin.schemas().getSchemaInfo(topicName));
+        
assertEquals(admin.schemas().getSchemaInfo(topicName).getType().name(), "JSON");
+    }
+
+    @Test
+    public void testAvroFormatJsonSchemaViaProducerAccepted() throws Exception 
{
+        // Java client's JSONSchema.of() generates Avro format — should always 
be accepted
+        String topicName = 
"persistent://schema-json-validation/test-ns/avro-format-producer-accepted";
+
+        try (var producer = pulsarClient.newProducer(
+                
Schema.JSON(SchemaDefinition.builder().withPojo(TestPojo.class).build()))
+                .topic(topicName).create()) {
+            producer.send(new TestPojo("hello", 42));
+        }
+
+        // Verify the schema was stored
+        assertNotNull(admin.schemas().getSchemaInfo(topicName));
+        
assertEquals(admin.schemas().getSchemaInfo(topicName).getType().name(), "JSON");
+    }
+
+    @Test
+    public void testJsonSchemaDraftRejectedByDefault() throws Exception {
+        // JSON Schema Draft 2020-12 (the problematic case from non-Java 
clients) should be rejected
+        // when schemaJsonAllowLegacyJacksonFormat=false (default)
+        String topicName = 
"persistent://schema-json-validation/test-ns/json-draft-rejected";
+        String jsonSchemaDraft = 
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",";
+                + 
"\"type\":\"object\",\"properties\":{\"field1\":{\"type\":\"string\"},"
+                + "\"field2\":{\"type\":\"integer\"}}}";
+
+        PostSchemaPayload payload = new PostSchemaPayload("JSON", 
jsonSchemaDraft, new HashMap<>());
+        try {
+            admin.schemas().createSchema(topicName, payload);
+            fail("Should reject JSON Schema Draft format when 
schemaJsonAllowLegacyJacksonFormat=false");
+        } catch (PulsarAdminException e) {
+            log.info("Expected rejection: {}", e.getMessage());
+        }
+    }
+
+    @Test
+    public void testJacksonJsonSchemaRejectedByDefault() throws Exception {
+        // Old Jackson JsonSchema format should be rejected when 
schemaJsonAllowLegacyJacksonFormat=false (default)
+        String topicName = 
"persistent://schema-json-validation/test-ns/jackson-rejected";
+        String jacksonSchema = 
"{\"type\":\"object\",\"id\":\"urn:jsonschema:org:example:TestRecord\","
+                + 
"\"properties\":{\"field1\":{\"type\":\"string\"},\"field2\":{\"type\":\"integer\"}}}";
+
+        PostSchemaPayload payload = new PostSchemaPayload("JSON", 
jacksonSchema, new HashMap<>());
+        try {
+            admin.schemas().createSchema(topicName, payload);
+            fail("Should reject Jackson JsonSchema format when 
schemaJsonAllowLegacyJacksonFormat=false");
+        } catch (PulsarAdminException e) {
+            log.info("Expected rejection: {}", e.getMessage());
+        }
+    }
+
+    @Test
+    public void testJacksonJsonSchemaAcceptedWhenLegacyEnabled() throws 
Exception {
+        // Restart broker with legacy format enabled
+        cleanup();
+        conf.setSchemaJsonAllowLegacyJacksonFormat(true);
+        setup();
+
+        String topicName = 
"persistent://schema-json-validation/test-ns/jackson-accepted-legacy";
+        String jacksonSchema = 
"{\"type\":\"object\",\"id\":\"urn:jsonschema:org:example:TestRecord\","
+                + 
"\"properties\":{\"field1\":{\"type\":\"string\"},\"field2\":{\"type\":\"integer\"}}}";
+
+        PostSchemaPayload payload = new PostSchemaPayload("JSON", 
jacksonSchema, new HashMap<>());
+        admin.schemas().createSchema(topicName, payload);
+
+        // Verify the schema was stored
+        assertNotNull(admin.schemas().getSchemaInfo(topicName));
+        
assertEquals(admin.schemas().getSchemaInfo(topicName).getType().name(), "JSON");
+    }
+
+    @Test
+    public void testJsonSchemaDraftAcceptedWhenLegacyEnabled() throws 
Exception {
+        // Restart broker with legacy format enabled
+        cleanup();
+        conf.setSchemaJsonAllowLegacyJacksonFormat(true);
+        setup();
+
+        String topicName = 
"persistent://schema-json-validation/test-ns/json-draft-accepted-legacy";
+        String jsonSchemaDraft = 
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",";
+                + 
"\"type\":\"object\",\"properties\":{\"field1\":{\"type\":\"string\"},"
+                + "\"field2\":{\"type\":\"integer\"}}}";
+
+        PostSchemaPayload payload = new PostSchemaPayload("JSON", 
jsonSchemaDraft, new HashMap<>());
+        admin.schemas().createSchema(topicName, payload);
+
+        // Verify the schema was stored
+        assertNotNull(admin.schemas().getSchemaInfo(topicName));
+    }
+
+    @Test
+    public void testAvroSchemaTypeUnaffectedByConfig() throws Exception {
+        // SchemaType.AVRO should always require Avro format, regardless of 
the JSON legacy config
+        String topicName = 
"persistent://schema-json-validation/test-ns/avro-type-unaffected";
+        String avroSchema = 
"{\"type\":\"record\",\"name\":\"TestRecord\",\"namespace\":\"org.example\","
+                + "\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
+
+        PostSchemaPayload payload = new PostSchemaPayload("AVRO", avroSchema, 
new HashMap<>());
+        admin.schemas().createSchema(topicName, payload);
+
+        // Verify the schema was stored
+        assertNotNull(admin.schemas().getSchemaInfo(topicName));
+        
assertEquals(admin.schemas().getSchemaInfo(topicName).getType().name(), "AVRO");
+    }
+
+    @Test
+    public void testSchemaCompatibilityRejectsNonAvroByDefault() throws 
Exception {
+        // First register a valid Avro schema
+        String topicName = 
"persistent://schema-json-validation/test-ns/compat-rejects-non-avro";
+        String avroSchemaV1 = 
"{\"type\":\"record\",\"name\":\"TestRecord\",\"namespace\":\"org.example\","
+                + "\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
+
+        PostSchemaPayload payload1 = new PostSchemaPayload("JSON", 
avroSchemaV1, new HashMap<>());
+        admin.schemas().createSchema(topicName, payload1);
+
+        // Now try to register a JSON Schema Draft as v2 — should be rejected
+        String jsonSchemaDraft = 
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",";
+                + 
"\"type\":\"object\",\"properties\":{\"field1\":{\"type\":\"string\"}}}";
+
+        PostSchemaPayload payload2 = new PostSchemaPayload("JSON", 
jsonSchemaDraft, new HashMap<>());
+        try {
+            admin.schemas().createSchema(topicName, payload2);
+            fail("Should reject JSON Schema Draft as incompatible with 
existing Avro schema");
+        } catch (PulsarAdminException e) {
+            log.info("Expected rejection on compatibility check: {}", 
e.getMessage());
+        }
+    }
+
+    private static class TestPojo {
+        public String field1;
+        public int field2;
+
+        public TestPojo() {}
+
+        public TestPojo(String field1, int field2) {
+            this.field1 = field1;
+            this.field2 = field2;
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index b192110c2d7..3664c93559d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -47,18 +47,76 @@ public class JsonSchemaCompatibilityCheckTest extends 
BaseAvroSchemaCompatibilit
     }
 
     @Test
-    public void testJsonSchemaBackwardsCompatibility() throws 
JsonProcessingException {
+    public void testJsonSchemaBackwardsCompatibilityWithLegacyEnabled() throws 
JsonProcessingException {
+        // When legacy format is enabled, backward compatibility between old 
and new formats should work
+        JsonSchemaCompatibilityCheck check = new 
JsonSchemaCompatibilityCheck();
+        check.setAllowLegacyJacksonFormat(true);
+
+        SchemaData from = 
SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
+        SchemaData to = 
SchemaData.builder().data(JSONSchema.of(SchemaDefinition.builder()
+                
.withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
+        Assert.assertTrue(check.isCompatible(from, to, 
SchemaCompatibilityStrategy.FULL));
+
+        from = 
SchemaData.builder().data(JSONSchema.of(SchemaDefinition.<Foo>builder()
+                
.withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
+        to = 
SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
+        Assert.assertTrue(check.isCompatible(from, to, 
SchemaCompatibilityStrategy.FULL));
+    }
 
+    @Test
+    public void testJsonSchemaBackwardsCompatibilityRejectedByDefault() throws 
JsonProcessingException {
+        // When legacy format is disabled (default), mixed old/new format 
should be rejected
+        JsonSchemaCompatibilityCheck check = new 
JsonSchemaCompatibilityCheck();
+        // allowLegacyJacksonFormat defaults to false
+
+        // Old format (Jackson) -> New format (Avro): should be rejected 
because 'from' is not valid Avro
         SchemaData from = 
SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
         SchemaData to = 
SchemaData.builder().data(JSONSchema.of(SchemaDefinition.builder()
                 
.withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
-        JsonSchemaCompatibilityCheck jsonSchemaCompatibilityCheck = new 
JsonSchemaCompatibilityCheck();
-        Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, 
SchemaCompatibilityStrategy.FULL));
+        Assert.assertFalse(check.isCompatible(from, to, 
SchemaCompatibilityStrategy.FULL));
 
+        // New format (Avro) -> Old format (Jackson): should be rejected 
because 'to' is not valid Avro
         from = 
SchemaData.builder().data(JSONSchema.of(SchemaDefinition.<Foo>builder()
                 
.withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
         to = 
SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
-        Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, 
SchemaCompatibilityStrategy.FULL));
+        Assert.assertFalse(check.isCompatible(from, to, 
SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testAvroToAvroCompatibilityUnaffectedByConfig() throws 
JsonProcessingException {
+        // Both Avro schemas: should work the same regardless of legacy flag
+        JsonSchemaCompatibilityCheck strictCheck = new 
JsonSchemaCompatibilityCheck();
+        JsonSchemaCompatibilityCheck legacyCheck = new 
JsonSchemaCompatibilityCheck();
+        legacyCheck.setAllowLegacyJacksonFormat(true);
+
+        SchemaData from = 
SchemaData.builder().data(JSONSchema.of(SchemaDefinition.<Foo>builder()
+                
.withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
+        SchemaData to = 
SchemaData.builder().data(JSONSchema.of(SchemaDefinition.<Foo>builder()
+                
.withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
+
+        Assert.assertTrue(strictCheck.isCompatible(from, to, 
SchemaCompatibilityStrategy.FULL));
+        Assert.assertTrue(legacyCheck.isCompatible(from, to, 
SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testJsonSchemaDraftRejectedByDefault() {
+        // JSON Schema Draft 2020-12 format should be rejected when legacy is 
disabled
+        JsonSchemaCompatibilityCheck check = new 
JsonSchemaCompatibilityCheck();
+
+        String avroSchema = 
"{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.example\","
+                + "\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
+        String jsonSchemaDraft = 
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",";
+                + 
"\"type\":\"object\",\"properties\":{\"field1\":{\"type\":\"string\"}}}";
+
+        // Avro -> JSON Schema Draft: should be rejected
+        SchemaData from = 
SchemaData.builder().data(avroSchema.getBytes(UTF_8)).build();
+        SchemaData to = 
SchemaData.builder().data(jsonSchemaDraft.getBytes(UTF_8)).build();
+        Assert.assertFalse(check.isCompatible(from, to, 
SchemaCompatibilityStrategy.FULL));
+
+        // JSON Schema Draft -> Avro: should be rejected (existing schema not 
valid Avro)
+        from = 
SchemaData.builder().data(jsonSchemaDraft.getBytes(UTF_8)).build();
+        to = SchemaData.builder().data(avroSchema.getBytes(UTF_8)).build();
+        Assert.assertFalse(check.isCompatible(from, to, 
SchemaCompatibilityStrategy.FULL));
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
index a69bb649e7c..079405407f4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
@@ -153,6 +153,105 @@ public class SchemaDataValidatorTest {
         }
     }
 
+    // PIP-464: Strict Avro validation tests for SchemaType.JSON
+
+    @Test
+    public void testJsonSchemaWithAvroFormatAcceptedRegardlessOfConfig() 
throws Exception {
+        // Valid Avro schema should be accepted whether legacy format is 
allowed or not
+        Schema<Foo> schema = Schema.AVRO(Foo.class);
+        SchemaData data = SchemaData.builder()
+            .type(SchemaType.JSON)
+            .data(schema.getSchemaInfo().getSchema())
+            .build();
+        // Strict mode (default)
+        SchemaDataValidator.validateSchemaData(data, false);
+        // Legacy mode
+        SchemaDataValidator.validateSchemaData(data, true);
+    }
+
+    @Test(expectedExceptions = InvalidSchemaDataException.class)
+    public void testJsonSchemaWithJacksonFormatRejectedByDefault() throws 
Exception {
+        // Jackson JsonSchema format should be rejected when 
allowLegacyJacksonFormat=false (default)
+        ObjectMapper mapper = 
ObjectMapperFactory.getMapper().getObjectMapper();
+        SchemaData data = SchemaData.builder()
+            .type(SchemaType.JSON)
+            .data(mapper.writeValueAsBytes(new 
JsonSchemaGenerator(mapper).generateSchema(Foo.class)))
+            .build();
+        SchemaDataValidator.validateSchemaData(data, false);
+    }
+
+    @Test
+    public void testJsonSchemaWithJacksonFormatAcceptedWhenLegacyEnabled() 
throws Exception {
+        // Jackson JsonSchema format should be accepted when 
allowLegacyJacksonFormat=true
+        ObjectMapper mapper = 
ObjectMapperFactory.getMapper().getObjectMapper();
+        SchemaData data = SchemaData.builder()
+            .type(SchemaType.JSON)
+            .data(mapper.writeValueAsBytes(new 
JsonSchemaGenerator(mapper).generateSchema(Foo.class)))
+            .build();
+        SchemaDataValidator.validateSchemaData(data, true);
+    }
+
+    @Test(expectedExceptions = InvalidSchemaDataException.class)
+    public void testJsonSchemaWithJsonSchemaDraftRejectedByDefault() throws 
Exception {
+        // JSON Schema Draft 2020-12 format (the problematic case from 
non-Java clients)
+        // should be rejected when allowLegacyJacksonFormat=false
+        String jsonSchemaDraft = 
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",";
+                + 
"\"type\":\"object\",\"properties\":{\"field\":{\"type\":\"integer\"}}}";
+        SchemaData data = SchemaData.builder()
+            .type(SchemaType.JSON)
+            .data(jsonSchemaDraft.getBytes(UTF_8))
+            .build();
+        SchemaDataValidator.validateSchemaData(data, false);
+    }
+
+    @Test
+    public void testJsonSchemaWithJsonSchemaDraftAcceptedWhenLegacyEnabled() 
throws Exception {
+        // JSON Schema Draft format should be accepted when 
allowLegacyJacksonFormat=true
+        // (this is the problematic behavior we're fixing by default)
+        String jsonSchemaDraft = 
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",";
+                + 
"\"type\":\"object\",\"properties\":{\"field\":{\"type\":\"integer\"}}}";
+        SchemaData data = SchemaData.builder()
+            .type(SchemaType.JSON)
+            .data(jsonSchemaDraft.getBytes(UTF_8))
+            .build();
+        SchemaDataValidator.validateSchemaData(data, true);
+    }
+
+    @Test(expectedExceptions = InvalidSchemaDataException.class)
+    public void testJsonSchemaWithArbitraryJsonRejectedInBothModes() throws 
Exception {
+        // Arbitrary JSON that is neither Avro nor Jackson JsonSchema should 
be rejected
+        String arbitraryJson = "{\"foo\":\"bar\",\"baz\":123}";
+        SchemaData data = SchemaData.builder()
+            .type(SchemaType.JSON)
+            .data(arbitraryJson.getBytes(UTF_8))
+            .build();
+        // Even in legacy mode, this should fail (Jackson JsonSchema parsing 
rejects it)
+        SchemaDataValidator.validateSchemaData(data, true);
+    }
+
+    @Test
+    public void testAvroSchemaTypeUnaffectedByLegacyFlag() throws Exception {
+        // The legacy flag should only affect SchemaType.JSON, not AVRO
+        Schema<Foo> schema = Schema.AVRO(Foo.class);
+        SchemaData data = SchemaData.builder()
+            .type(SchemaType.AVRO)
+            .data(schema.getSchemaInfo().getSchema())
+            .build();
+        SchemaDataValidator.validateSchemaData(data, false);
+        SchemaDataValidator.validateSchemaData(data, true);
+    }
+
+    @Test(expectedExceptions = InvalidSchemaDataException.class)
+    public void testAvroSchemaTypeWithJacksonFormatRejectedRegardlessOfFlag() 
throws Exception {
+        // Jackson format for SchemaType.AVRO should always be rejected (flag 
only applies to JSON)
+        ObjectMapper mapper = 
ObjectMapperFactory.getMapper().getObjectMapper();
+        SchemaData data = SchemaData.builder()
+            .type(SchemaType.AVRO)
+            .data(mapper.writeValueAsBytes(new 
JsonSchemaGenerator(mapper).generateSchema(Foo.class)))
+            .build();
+        SchemaDataValidator.validateSchemaData(data, true);
+    }
+
     @Test
     public void testCompatibleNameValidatorValidNames() {
         CompatibleNameValidator validator = new CompatibleNameValidator();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
index 9d94242068a..d4397b700e3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.schema.validator;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -28,6 +29,8 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
 import java.util.concurrent.CompletableFuture;
 import 
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
@@ -36,6 +39,7 @@ import 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -159,4 +163,72 @@ public class 
SchemaRegistryServiceWithSchemaDataValidatorTest {
             .putSchemaIfAbsent(eq(schemaId), same(schemaData), eq(strategy));
     }
 
+    // PIP-464: Tests for legacy Jackson format handling
+
+    @Test
+    public void testPutSchemaRejectsJacksonFormatByDefault() throws Exception {
+        // Default (strict mode) should reject Jackson JSON schema format
+        SchemaRegistryServiceWithSchemaDataValidator strictService =
+                
SchemaRegistryServiceWithSchemaDataValidator.of(underlyingService);
+
+        ObjectMapper mapper = 
ObjectMapperFactory.getMapper().getObjectMapper();
+        SchemaData schemaData = SchemaData.builder()
+            .type(SchemaType.JSON)
+            .data(mapper.writeValueAsBytes(new 
JsonSchemaGenerator(mapper).generateSchema(Object.class)))
+            .build();
+
+        try {
+            strictService.putSchemaIfAbsent("test", schemaData, 
SchemaCompatibilityStrategy.FULL).get();
+            fail("Should fail with InvalidSchemaDataException");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof InvalidSchemaDataException);
+        }
+        verify(underlyingService, times(0))
+            .putSchemaIfAbsent(eq("test"), any(SchemaData.class), 
any(SchemaCompatibilityStrategy.class));
+    }
+
+    @Test
+    public void testPutSchemaAcceptsJacksonFormatWhenLegacyEnabled() throws 
Exception {
+        // Legacy mode should accept Jackson JSON schema format
+        SchemaRegistryServiceWithSchemaDataValidator legacyService =
+                
SchemaRegistryServiceWithSchemaDataValidator.of(underlyingService, true);
+
+        ObjectMapper mapper = 
ObjectMapperFactory.getMapper().getObjectMapper();
+        SchemaData schemaData = SchemaData.builder()
+            .type(SchemaType.JSON)
+            .data(mapper.writeValueAsBytes(new 
JsonSchemaGenerator(mapper).generateSchema(Object.class)))
+            .build();
+
+        CompletableFuture<SchemaVersion> future = new CompletableFuture<>();
+        when(underlyingService.putSchemaIfAbsent(eq("test"), 
any(SchemaData.class),
+                eq(SchemaCompatibilityStrategy.FULL))).thenReturn(future);
+
+        assertSame(future, legacyService.putSchemaIfAbsent("test", schemaData, 
SchemaCompatibilityStrategy.FULL));
+        verify(underlyingService, times(1))
+            .putSchemaIfAbsent(eq("test"), same(schemaData), 
eq(SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testPutSchemaRejectsJsonSchemaDraftByDefault() throws 
Exception {
+        // JSON Schema Draft 2020-12 should be rejected in strict mode
+        SchemaRegistryServiceWithSchemaDataValidator strictService =
+                
SchemaRegistryServiceWithSchemaDataValidator.of(underlyingService);
+
+        String jsonSchemaDraft = 
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",";
+                + 
"\"type\":\"object\",\"properties\":{\"field\":{\"type\":\"integer\"}}}";
+        SchemaData schemaData = SchemaData.builder()
+            .type(SchemaType.JSON)
+            .data(jsonSchemaDraft.getBytes(UTF_8))
+            .build();
+
+        try {
+            strictService.putSchemaIfAbsent("test", schemaData, 
SchemaCompatibilityStrategy.FULL).get();
+            fail("Should fail with InvalidSchemaDataException");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof InvalidSchemaDataException);
+        }
+        verify(underlyingService, times(0))
+            .putSchemaIfAbsent(eq("test"), any(SchemaData.class), 
any(SchemaCompatibilityStrategy.class));
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 62e8373e2d7..4119fc8b6ed 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1935,6 +1935,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                     if 
(Commands.peerSupportJsonSchemaAvroFormat(cnx.getRemoteEndpointProtocolVersion()))
 {
                         schemaInfo = schema.getSchemaInfo();
                     } else if (schema instanceof JSONSchema) {
+                        // Deprecated (PIP-464): This backward-compatible path 
sends old Jackson JsonSchema
+                        // format to brokers below protocol v13 (Pulsar < 
2.1). Scheduled for removal in a
+                        // future major release.
                         JSONSchema jsonSchema = (JSONSchema) schema;
                         schemaInfo = 
jsonSchema.getBackwardsCompatibleJsonSchemaInfo();
                     } else {


Reply via email to