This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 08d89a2ea1a [improve][broker] PIP-464: Strict Avro schema validation
for SchemaType.JSON (#25362)
08d89a2ea1a is described below
commit 08d89a2ea1a2a566197f5db004881c4e222bb3ac
Author: Penghui Li <[email protected]>
AuthorDate: Mon Mar 23 09:10:00 2026 -0700
[improve][broker] PIP-464: Strict Avro schema validation for
SchemaType.JSON (#25362)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 10 +
.../schema/JsonSchemaCompatibilityCheck.java | 29 ++-
.../service/schema/SchemaRegistryService.java | 13 +-
.../schema/validator/SchemaDataValidator.java | 20 +-
...hemaRegistryServiceWithSchemaDataValidator.java | 18 +-
.../validator/StructSchemaDataValidator.java | 33 ++--
.../admin/AdminApiSchemaJsonValidationTest.java | 217 +++++++++++++++++++++
.../schema/JsonSchemaCompatibilityCheckTest.java | 66 ++++++-
.../schema/validator/SchemaDataValidatorTest.java | 99 ++++++++++
...RegistryServiceWithSchemaDataValidatorTest.java | 72 +++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 3 +
11 files changed, 548 insertions(+), 32 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 10d1a089c8b..0931aa64114 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3423,6 +3423,16 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
+ @FieldContext(
+ category = CATEGORY_SCHEMA,
+ doc = "Whether to allow legacy Jackson JsonSchema format for
SchemaType.JSON schema definitions. "
+ + "When false (default), only valid Apache Avro schema format is
accepted for SchemaType.JSON, "
+ + "consistent with what the consumer side requires. When true, the
pre-2.1 backward-compatible "
+ + "behavior is preserved for deployments that still have topics
with legacy-format schemas. "
+ + "See PIP-464 for details."
+ )
+ private boolean schemaJsonAllowLegacyJacksonFormat = false;
+
/**** --- WebSocket. --- ****/
@FieldContext(
category = CATEGORY_WEBSOCKET,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
index 94afdd14620..dc8be3e0651 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
@@ -36,11 +36,21 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
@SuppressWarnings("unused")
public class JsonSchemaCompatibilityCheck extends
AvroSchemaBasedCompatibilityCheck {
+ private volatile boolean allowLegacyJacksonFormat = false;
+
@Override
public SchemaType getSchemaType() {
return SchemaType.JSON;
}
+ /**
+ * Set whether to allow legacy Jackson JsonSchema format for backward
compatibility.
+ * When false (default), only valid Avro schema format is accepted
(PIP-464).
+ */
+ public void setAllowLegacyJacksonFormat(boolean allowLegacyJacksonFormat) {
+ this.allowLegacyJacksonFormat = allowLegacyJacksonFormat;
+ }
+
@Override
public void checkCompatible(SchemaData from, SchemaData to,
SchemaCompatibilityStrategy strategy)
throws IncompatibleSchemaException {
@@ -48,14 +58,14 @@ public class JsonSchemaCompatibilityCheck extends
AvroSchemaBasedCompatibilityCh
if (isAvroSchema(to)) {
// if both producer and broker have the schema in avro format
super.checkCompatible(from, to, strategy);
- } else if (isJsonSchema(to)) {
+ } else if (allowLegacyJacksonFormat && isJsonSchema(to)) {
// if broker have the schema in avro format but producer sent
a schema in the old json format
- // allow old schema format for backwards compatibility
+ // allow old schema format for backwards compatibility (only
when legacy format is enabled)
} else {
- // unknown schema format
- throw new IncompatibleSchemaException("Unknown schema format");
+ throw new IncompatibleSchemaException(
+ "Incompatible schema: expected Avro schema format for
SchemaType.JSON");
}
- } else if (isJsonSchema(from)){
+ } else if (allowLegacyJacksonFormat && isJsonSchema(from)) {
if (isAvroSchema(to)) {
// if broker have the schema in old json format but producer
sent a schema in the avro format
@@ -64,9 +74,14 @@ public class JsonSchemaCompatibilityCheck extends
AvroSchemaBasedCompatibilityCh
// if both producer and broker have the schema in old json
format
isCompatibleJsonSchema(from, to);
} else {
- // unknown schema format
- throw new IncompatibleSchemaException("Unknown schema format");
+ throw new IncompatibleSchemaException(
+ "Incompatible schema: expected Avro schema format for
SchemaType.JSON");
}
+ } else if (!allowLegacyJacksonFormat && !isAvroSchema(from)) {
+ // When legacy format is disabled, the existing schema must be
valid Avro.
+ // If it's not, this is a defense-in-depth rejection (PIP-464).
+ throw new IncompatibleSchemaException(
+ "Incompatible schema: existing schema is not in valid Avro
format for SchemaType.JSON");
} else {
// broker has schema format with unknown format
// maybe corrupted?
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index 2a2467d3947..aa6ac6580f2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -49,8 +49,19 @@ public interface SchemaRegistryService extends
SchemaRegistry {
try {
Map<SchemaType, SchemaCompatibilityCheck> checkers =
getCheckers(schemaRegistryCompatibilityCheckers);
checkers.put(SchemaType.KEY_VALUE, new
KeyValueSchemaCompatibilityCheck(checkers));
+
+ // PIP-464: propagate schemaJsonAllowLegacyJacksonFormat to
JsonSchemaCompatibilityCheck
+ boolean allowLegacyJacksonFormat =
+
pulsarService.getConfiguration().isSchemaJsonAllowLegacyJacksonFormat();
+ SchemaCompatibilityCheck jsonCheck =
checkers.get(SchemaType.JSON);
+ if (jsonCheck instanceof JsonSchemaCompatibilityCheck) {
+ ((JsonSchemaCompatibilityCheck) jsonCheck)
+
.setAllowLegacyJacksonFormat(allowLegacyJacksonFormat);
+ }
+
return SchemaRegistryServiceWithSchemaDataValidator.of(
- new SchemaRegistryServiceImpl(schemaStorage, checkers,
pulsarService));
+ new SchemaRegistryServiceImpl(schemaStorage, checkers,
pulsarService),
+ allowLegacyJacksonFormat);
} catch (Exception e) {
LOG.warn("Unable to create schema registry storage, defaulting
to empty storage", e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
index a26cc4434b2..b872016ee7c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
@@ -34,16 +34,30 @@ public interface SchemaDataValidator {
/**
* Validate if the schema data is well formed.
+ * Uses strict Avro-only validation for SchemaType.JSON (no legacy Jackson
fallback).
*
* @param schemaData schema data to validate
* @throws InvalidSchemaDataException if the schema data is not in a valid
form.
*/
static void validateSchemaData(SchemaData schemaData) throws
InvalidSchemaDataException {
+ validateSchemaData(schemaData, false);
+ }
+
+ /**
+ * Validate if the schema data is well formed.
+ *
+ * @param schemaData schema data to validate
+ * @param allowLegacyJacksonFormat if true, allows legacy Jackson
JsonSchema format for SchemaType.JSON
+ * for backward compatibility with
pre-2.1 schemas (PIP-464)
+ * @throws InvalidSchemaDataException if the schema data is not in a valid
form.
+ */
+ static void validateSchemaData(SchemaData schemaData,
+ boolean allowLegacyJacksonFormat) throws
InvalidSchemaDataException {
switch (schemaData.getType()) {
case AVRO:
case JSON:
case PROTOBUF:
- StructSchemaDataValidator.of().validate(schemaData);
+
StructSchemaDataValidator.of(allowLegacyJacksonFormat).validate(schemaData);
break;
case PROTOBUF_NATIVE:
ProtobufNativeSchemaDataValidator.of().validate(schemaData);
@@ -80,8 +94,8 @@ public interface SchemaDataValidator {
case KEY_VALUE:
KeyValue<SchemaData, SchemaData> kvSchema =
KeyValueSchemaCompatibilityCheck.decodeKeyValueSchemaData(schemaData);
- validateSchemaData(kvSchema.getKey());
- validateSchemaData(kvSchema.getValue());
+ validateSchemaData(kvSchema.getKey(),
allowLegacyJacksonFormat);
+ validateSchemaData(kvSchema.getValue(),
allowLegacyJacksonFormat);
break;
default:
throw new InvalidSchemaDataException("Unknown schema type : "
+ schemaData.getType());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
index b3032d5498f..980993dba6b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
@@ -33,13 +33,21 @@ import org.apache.pulsar.common.util.FutureUtil;
public class SchemaRegistryServiceWithSchemaDataValidator implements
SchemaRegistryService {
public static SchemaRegistryServiceWithSchemaDataValidator
of(SchemaRegistryService service) {
- return new SchemaRegistryServiceWithSchemaDataValidator(service);
+ return new SchemaRegistryServiceWithSchemaDataValidator(service,
false);
+ }
+
+ public static SchemaRegistryServiceWithSchemaDataValidator
of(SchemaRegistryService service,
+ boolean
allowLegacyJacksonFormat) {
+ return new SchemaRegistryServiceWithSchemaDataValidator(service,
allowLegacyJacksonFormat);
}
private final SchemaRegistryService service;
+ private final boolean allowLegacyJacksonFormat;
- private SchemaRegistryServiceWithSchemaDataValidator(SchemaRegistryService
service) {
+ private SchemaRegistryServiceWithSchemaDataValidator(SchemaRegistryService
service,
+ boolean
allowLegacyJacksonFormat) {
this.service = service;
+ this.allowLegacyJacksonFormat = allowLegacyJacksonFormat;
}
@Override
@@ -89,7 +97,7 @@ public class SchemaRegistryServiceWithSchemaDataValidator
implements SchemaRegis
SchemaData
schema,
SchemaCompatibilityStrategy strategy) {
try {
- SchemaDataValidator.validateSchemaData(schema);
+ SchemaDataValidator.validateSchemaData(schema,
allowLegacyJacksonFormat);
} catch (InvalidSchemaDataException e) {
return FutureUtil.failedFuture(e);
}
@@ -115,7 +123,7 @@ public class SchemaRegistryServiceWithSchemaDataValidator
implements SchemaRegis
public CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData
schema,
SchemaCompatibilityStrategy
strategy) {
try {
- SchemaDataValidator.validateSchemaData(schema);
+ SchemaDataValidator.validateSchemaData(schema,
allowLegacyJacksonFormat);
} catch (InvalidSchemaDataException e) {
return FutureUtil.failedFuture(e);
}
@@ -127,7 +135,7 @@ public class SchemaRegistryServiceWithSchemaDataValidator
implements SchemaRegis
SchemaData schema,
SchemaCompatibilityStrategy
strategy) {
try {
- SchemaDataValidator.validateSchemaData(schema);
+ SchemaDataValidator.validateSchemaData(schema,
allowLegacyJacksonFormat);
} catch (InvalidSchemaDataException e) {
return FutureUtil.failedFuture(e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
index 8202c5720c2..7417e06c2d4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import java.io.IOException;
import org.apache.avro.NameValidator;
import org.apache.avro.Schema;
-import org.apache.avro.SchemaParseException;
import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
@@ -39,10 +38,21 @@ public class StructSchemaDataValidator implements
SchemaDataValidator {
return INSTANCE;
}
- private static final StructSchemaDataValidator INSTANCE = new
StructSchemaDataValidator();
+ public static StructSchemaDataValidator of(boolean
allowLegacyJacksonFormat) {
+ return allowLegacyJacksonFormat ? LEGACY_INSTANCE : INSTANCE;
+ }
+
+ // Default instance: strict Avro-only validation for SchemaType.JSON
(PIP-464)
+ private static final StructSchemaDataValidator INSTANCE = new
StructSchemaDataValidator(false);
+ // Legacy instance: allows Jackson JsonSchema fallback for backward
compatibility
+ private static final StructSchemaDataValidator LEGACY_INSTANCE = new
StructSchemaDataValidator(true);
public static final NameValidator COMPATIBLE_NAME_VALIDATOR = new
CompatibleNameValidator();
- private StructSchemaDataValidator() {}
+ private final boolean allowLegacyJacksonFormat;
+
+ private StructSchemaDataValidator(boolean allowLegacyJacksonFormat) {
+ this.allowLegacyJacksonFormat = allowLegacyJacksonFormat;
+ }
private static final ObjectReader JSON_SCHEMA_READER =
ObjectMapperFactory.getMapper().reader().forType(JsonSchema.class);
@@ -57,11 +67,14 @@ public class StructSchemaDataValidator implements
SchemaDataValidator {
if (SchemaType.AVRO.equals(schemaData.getType())) {
checkAvroSchemaTypeSupported(schema);
}
- } catch (SchemaParseException e) {
- if (schemaData.getType() == SchemaType.JSON) {
- // we used JsonSchema for storing the definition of a JSON
schema
- // hence for backward compatibility consideration, we need to
try
- // to use JsonSchema to decode the schema data
+ } catch (InvalidSchemaDataException invalidSchemaDataException) {
+ throw invalidSchemaDataException;
+ } catch (Exception e) {
+ // Avro 1.12.0 may throw NullPointerException (not
SchemaParseException) for
+ // non-Avro schemas, so the legacy fallback must be in the general
catch block.
+ if (schemaData.getType() == SchemaType.JSON &&
allowLegacyJacksonFormat) {
+ // For backward compatibility with pre-2.1 schemas: try
Jackson JsonSchema parsing.
+ // This fallback is only enabled when
schemaJsonAllowLegacyJacksonFormat=true (PIP-464).
try {
JSON_SCHEMA_READER.readValue(data);
} catch (IOException ioe) {
@@ -70,10 +83,6 @@ public class StructSchemaDataValidator implements
SchemaDataValidator {
} else {
throwInvalidSchemaDataException(schemaData, e);
}
- } catch (InvalidSchemaDataException invalidSchemaDataException) {
- throw invalidSchemaDataException;
- } catch (Exception e) {
- throwInvalidSchemaDataException(schemaData, e);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaJsonValidationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaJsonValidationTest.java
new file mode 100644
index 00000000000..880cdae3d74
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaJsonValidationTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+import java.util.HashMap;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for PIP-464: Strict Avro schema validation for
SchemaType.JSON.
+ * Tests that the broker correctly accepts/rejects schema definitions based on
+ * the schemaJsonAllowLegacyJacksonFormat configuration.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaJsonValidationTest extends
MockedPulsarServiceBaseTest {
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.internalSetup();
+ admin.clusters().createCluster("test",
+
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1",
"role2"), Set.of("test"));
+ admin.tenants().createTenant("schema-json-validation", tenantInfo);
+ admin.namespaces().createNamespace("schema-json-validation/test-ns",
Set.of("test"));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testAvroFormatJsonSchemaAccepted() throws Exception {
+ // Valid Avro schema for SchemaType.JSON should always be accepted
+ String topicName =
"persistent://schema-json-validation/test-ns/avro-format-accepted";
+ String avroSchema =
"{\"type\":\"record\",\"name\":\"TestRecord\",\"namespace\":\"org.example\","
+ + "\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},"
+ + "{\"name\":\"field2\",\"type\":\"int\"}]}";
+
+ PostSchemaPayload payload = new PostSchemaPayload("JSON", avroSchema,
new HashMap<>());
+ admin.schemas().createSchema(topicName, payload);
+
+ // Verify the schema was stored
+ assertNotNull(admin.schemas().getSchemaInfo(topicName));
+
assertEquals(admin.schemas().getSchemaInfo(topicName).getType().name(), "JSON");
+ }
+
+ @Test
+ public void testAvroFormatJsonSchemaViaProducerAccepted() throws Exception
{
+ // Java client's JSONSchema.of() generates Avro format — should always
be accepted
+ String topicName =
"persistent://schema-json-validation/test-ns/avro-format-producer-accepted";
+
+ try (var producer = pulsarClient.newProducer(
+
Schema.JSON(SchemaDefinition.builder().withPojo(TestPojo.class).build()))
+ .topic(topicName).create()) {
+ producer.send(new TestPojo("hello", 42));
+ }
+
+ // Verify the schema was stored
+ assertNotNull(admin.schemas().getSchemaInfo(topicName));
+
assertEquals(admin.schemas().getSchemaInfo(topicName).getType().name(), "JSON");
+ }
+
+ @Test
+ public void testJsonSchemaDraftRejectedByDefault() throws Exception {
+ // JSON Schema Draft 2020-12 (the problematic case from non-Java
clients) should be rejected
+ // when schemaJsonAllowLegacyJacksonFormat=false (default)
+ String topicName =
"persistent://schema-json-validation/test-ns/json-draft-rejected";
+ String jsonSchemaDraft =
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\","
+ +
"\"type\":\"object\",\"properties\":{\"field1\":{\"type\":\"string\"},"
+ + "\"field2\":{\"type\":\"integer\"}}}";
+
+ PostSchemaPayload payload = new PostSchemaPayload("JSON",
jsonSchemaDraft, new HashMap<>());
+ try {
+ admin.schemas().createSchema(topicName, payload);
+ fail("Should reject JSON Schema Draft format when
schemaJsonAllowLegacyJacksonFormat=false");
+ } catch (PulsarAdminException e) {
+ log.info("Expected rejection: {}", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testJacksonJsonSchemaRejectedByDefault() throws Exception {
+ // Old Jackson JsonSchema format should be rejected when
schemaJsonAllowLegacyJacksonFormat=false (default)
+ String topicName =
"persistent://schema-json-validation/test-ns/jackson-rejected";
+ String jacksonSchema =
"{\"type\":\"object\",\"id\":\"urn:jsonschema:org:example:TestRecord\","
+ +
"\"properties\":{\"field1\":{\"type\":\"string\"},\"field2\":{\"type\":\"integer\"}}}";
+
+ PostSchemaPayload payload = new PostSchemaPayload("JSON",
jacksonSchema, new HashMap<>());
+ try {
+ admin.schemas().createSchema(topicName, payload);
+ fail("Should reject Jackson JsonSchema format when
schemaJsonAllowLegacyJacksonFormat=false");
+ } catch (PulsarAdminException e) {
+ log.info("Expected rejection: {}", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testJacksonJsonSchemaAcceptedWhenLegacyEnabled() throws
Exception {
+ // Restart broker with legacy format enabled
+ cleanup();
+ conf.setSchemaJsonAllowLegacyJacksonFormat(true);
+ setup();
+
+ String topicName =
"persistent://schema-json-validation/test-ns/jackson-accepted-legacy";
+ String jacksonSchema =
"{\"type\":\"object\",\"id\":\"urn:jsonschema:org:example:TestRecord\","
+ +
"\"properties\":{\"field1\":{\"type\":\"string\"},\"field2\":{\"type\":\"integer\"}}}";
+
+ PostSchemaPayload payload = new PostSchemaPayload("JSON",
jacksonSchema, new HashMap<>());
+ admin.schemas().createSchema(topicName, payload);
+
+ // Verify the schema was stored
+ assertNotNull(admin.schemas().getSchemaInfo(topicName));
+
assertEquals(admin.schemas().getSchemaInfo(topicName).getType().name(), "JSON");
+ }
+
+ @Test
+ public void testJsonSchemaDraftAcceptedWhenLegacyEnabled() throws
Exception {
+ // Restart broker with legacy format enabled
+ cleanup();
+ conf.setSchemaJsonAllowLegacyJacksonFormat(true);
+ setup();
+
+ String topicName =
"persistent://schema-json-validation/test-ns/json-draft-accepted-legacy";
+ String jsonSchemaDraft =
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\","
+ +
"\"type\":\"object\",\"properties\":{\"field1\":{\"type\":\"string\"},"
+ + "\"field2\":{\"type\":\"integer\"}}}";
+
+ PostSchemaPayload payload = new PostSchemaPayload("JSON",
jsonSchemaDraft, new HashMap<>());
+ admin.schemas().createSchema(topicName, payload);
+
+ // Verify the schema was stored
+ assertNotNull(admin.schemas().getSchemaInfo(topicName));
+ }
+
+ @Test
+ public void testAvroSchemaTypeUnaffectedByConfig() throws Exception {
+ // SchemaType.AVRO should always require Avro format, regardless of
the JSON legacy config
+ String topicName =
"persistent://schema-json-validation/test-ns/avro-type-unaffected";
+ String avroSchema =
"{\"type\":\"record\",\"name\":\"TestRecord\",\"namespace\":\"org.example\","
+ + "\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
+
+ PostSchemaPayload payload = new PostSchemaPayload("AVRO", avroSchema,
new HashMap<>());
+ admin.schemas().createSchema(topicName, payload);
+
+ // Verify the schema was stored
+ assertNotNull(admin.schemas().getSchemaInfo(topicName));
+
assertEquals(admin.schemas().getSchemaInfo(topicName).getType().name(), "AVRO");
+ }
+
+ @Test
+ public void testSchemaCompatibilityRejectsNonAvroByDefault() throws
Exception {
+ // First register a valid Avro schema
+ String topicName =
"persistent://schema-json-validation/test-ns/compat-rejects-non-avro";
+ String avroSchemaV1 =
"{\"type\":\"record\",\"name\":\"TestRecord\",\"namespace\":\"org.example\","
+ + "\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
+
+ PostSchemaPayload payload1 = new PostSchemaPayload("JSON",
avroSchemaV1, new HashMap<>());
+ admin.schemas().createSchema(topicName, payload1);
+
+ // Now try to register a JSON Schema Draft as v2 — should be rejected
+ String jsonSchemaDraft =
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\","
+ +
"\"type\":\"object\",\"properties\":{\"field1\":{\"type\":\"string\"}}}";
+
+ PostSchemaPayload payload2 = new PostSchemaPayload("JSON",
jsonSchemaDraft, new HashMap<>());
+ try {
+ admin.schemas().createSchema(topicName, payload2);
+ fail("Should reject JSON Schema Draft as incompatible with
existing Avro schema");
+ } catch (PulsarAdminException e) {
+ log.info("Expected rejection on compatibility check: {}",
e.getMessage());
+ }
+ }
+
+ private static class TestPojo {
+ public String field1;
+ public int field2;
+
+ public TestPojo() {}
+
+ public TestPojo(String field1, int field2) {
+ this.field1 = field1;
+ this.field2 = field2;
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index b192110c2d7..3664c93559d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -47,18 +47,76 @@ public class JsonSchemaCompatibilityCheckTest extends
BaseAvroSchemaCompatibilit
}
@Test
- public void testJsonSchemaBackwardsCompatibility() throws
JsonProcessingException {
+ public void testJsonSchemaBackwardsCompatibilityWithLegacyEnabled() throws
JsonProcessingException {
+ // When legacy format is enabled, backward compatibility between old
and new formats should work
+ JsonSchemaCompatibilityCheck check = new
JsonSchemaCompatibilityCheck();
+ check.setAllowLegacyJacksonFormat(true);
+
+ SchemaData from =
SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
+ SchemaData to =
SchemaData.builder().data(JSONSchema.of(SchemaDefinition.builder()
+
.withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
+ Assert.assertTrue(check.isCompatible(from, to,
SchemaCompatibilityStrategy.FULL));
+
+ from =
SchemaData.builder().data(JSONSchema.of(SchemaDefinition.<Foo>builder()
+
.withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
+ to =
SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
+ Assert.assertTrue(check.isCompatible(from, to,
SchemaCompatibilityStrategy.FULL));
+ }
+ @Test
+ public void testJsonSchemaBackwardsCompatibilityRejectedByDefault() throws
JsonProcessingException {
+ // When legacy format is disabled (default), mixed old/new format
should be rejected
+ JsonSchemaCompatibilityCheck check = new
JsonSchemaCompatibilityCheck();
+ // allowLegacyJacksonFormat defaults to false
+
+ // Old format (Jackson) -> New format (Avro): should be rejected
because 'from' is not valid Avro
SchemaData from =
SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
SchemaData to =
SchemaData.builder().data(JSONSchema.of(SchemaDefinition.builder()
.withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
- JsonSchemaCompatibilityCheck jsonSchemaCompatibilityCheck = new
JsonSchemaCompatibilityCheck();
- Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to,
SchemaCompatibilityStrategy.FULL));
+ Assert.assertFalse(check.isCompatible(from, to,
SchemaCompatibilityStrategy.FULL));
+ // New format (Avro) -> Old format (Jackson): should be rejected
because 'to' is not valid Avro
from =
SchemaData.builder().data(JSONSchema.of(SchemaDefinition.<Foo>builder()
.withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
to =
SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
- Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to,
SchemaCompatibilityStrategy.FULL));
+ Assert.assertFalse(check.isCompatible(from, to,
SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testAvroToAvroCompatibilityUnaffectedByConfig() throws
JsonProcessingException {
+ // Both Avro schemas: should work the same regardless of legacy flag
+ JsonSchemaCompatibilityCheck strictCheck = new
JsonSchemaCompatibilityCheck();
+ JsonSchemaCompatibilityCheck legacyCheck = new
JsonSchemaCompatibilityCheck();
+ legacyCheck.setAllowLegacyJacksonFormat(true);
+
+ SchemaData from =
SchemaData.builder().data(JSONSchema.of(SchemaDefinition.<Foo>builder()
+
.withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
+ SchemaData to =
SchemaData.builder().data(JSONSchema.of(SchemaDefinition.<Foo>builder()
+
.withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
+
+ Assert.assertTrue(strictCheck.isCompatible(from, to,
SchemaCompatibilityStrategy.FULL));
+ Assert.assertTrue(legacyCheck.isCompatible(from, to,
SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testJsonSchemaDraftRejectedByDefault() {
+ // JSON Schema Draft 2020-12 format should be rejected when legacy is
disabled
+ JsonSchemaCompatibilityCheck check = new
JsonSchemaCompatibilityCheck();
+
+ String avroSchema =
"{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.example\","
+ + "\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
+ String jsonSchemaDraft =
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\","
+ +
"\"type\":\"object\",\"properties\":{\"field1\":{\"type\":\"string\"}}}";
+
+ // Avro -> JSON Schema Draft: should be rejected
+ SchemaData from =
SchemaData.builder().data(avroSchema.getBytes(UTF_8)).build();
+ SchemaData to =
SchemaData.builder().data(jsonSchemaDraft.getBytes(UTF_8)).build();
+ Assert.assertFalse(check.isCompatible(from, to,
SchemaCompatibilityStrategy.FULL));
+
+ // JSON Schema Draft -> Avro: should be rejected (existing schema not
valid Avro)
+ from =
SchemaData.builder().data(jsonSchemaDraft.getBytes(UTF_8)).build();
+ to = SchemaData.builder().data(avroSchema.getBytes(UTF_8)).build();
+ Assert.assertFalse(check.isCompatible(from, to,
SchemaCompatibilityStrategy.FULL));
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
index a69bb649e7c..079405407f4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
@@ -153,6 +153,105 @@ public class SchemaDataValidatorTest {
}
}
+ // PIP-464: Strict Avro validation tests for SchemaType.JSON
+
+ @Test
+ public void testJsonSchemaWithAvroFormatAcceptedRegardlessOfConfig()
throws Exception {
+ // Valid Avro schema should be accepted whether legacy format is
allowed or not
+ Schema<Foo> schema = Schema.AVRO(Foo.class);
+ SchemaData data = SchemaData.builder()
+ .type(SchemaType.JSON)
+ .data(schema.getSchemaInfo().getSchema())
+ .build();
+ // Strict mode (default)
+ SchemaDataValidator.validateSchemaData(data, false);
+ // Legacy mode
+ SchemaDataValidator.validateSchemaData(data, true);
+ }
+
+ @Test(expectedExceptions = InvalidSchemaDataException.class)
+ public void testJsonSchemaWithJacksonFormatRejectedByDefault() throws
Exception {
+ // Jackson JsonSchema format should be rejected when
allowLegacyJacksonFormat=false (default)
+ ObjectMapper mapper =
ObjectMapperFactory.getMapper().getObjectMapper();
+ SchemaData data = SchemaData.builder()
+ .type(SchemaType.JSON)
+ .data(mapper.writeValueAsBytes(new
JsonSchemaGenerator(mapper).generateSchema(Foo.class)))
+ .build();
+ SchemaDataValidator.validateSchemaData(data, false);
+ }
+
+ @Test
+ public void testJsonSchemaWithJacksonFormatAcceptedWhenLegacyEnabled()
throws Exception {
+ // Jackson JsonSchema format should be accepted when
allowLegacyJacksonFormat=true
+ ObjectMapper mapper =
ObjectMapperFactory.getMapper().getObjectMapper();
+ SchemaData data = SchemaData.builder()
+ .type(SchemaType.JSON)
+ .data(mapper.writeValueAsBytes(new
JsonSchemaGenerator(mapper).generateSchema(Foo.class)))
+ .build();
+ SchemaDataValidator.validateSchemaData(data, true);
+ }
+
+ @Test(expectedExceptions = InvalidSchemaDataException.class)
+ public void testJsonSchemaWithJsonSchemaDraftRejectedByDefault() throws
Exception {
+ // JSON Schema Draft 2020-12 format (the problematic case from
non-Java clients)
+ // should be rejected when allowLegacyJacksonFormat=false
+ String jsonSchemaDraft =
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\","
+ +
"\"type\":\"object\",\"properties\":{\"field\":{\"type\":\"integer\"}}}";
+ SchemaData data = SchemaData.builder()
+ .type(SchemaType.JSON)
+ .data(jsonSchemaDraft.getBytes(UTF_8))
+ .build();
+ SchemaDataValidator.validateSchemaData(data, false);
+ }
+
+ @Test
+ public void testJsonSchemaWithJsonSchemaDraftAcceptedWhenLegacyEnabled()
throws Exception {
+ // JSON Schema Draft format should be accepted when
allowLegacyJacksonFormat=true
+ // (this is the problematic behavior we're fixing by default)
+ String jsonSchemaDraft =
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\","
+ +
"\"type\":\"object\",\"properties\":{\"field\":{\"type\":\"integer\"}}}";
+ SchemaData data = SchemaData.builder()
+ .type(SchemaType.JSON)
+ .data(jsonSchemaDraft.getBytes(UTF_8))
+ .build();
+ SchemaDataValidator.validateSchemaData(data, true);
+ }
+
+ @Test(expectedExceptions = InvalidSchemaDataException.class)
+ public void testJsonSchemaWithArbitraryJsonRejectedInBothModes() throws
Exception {
+ // Arbitrary JSON that is neither Avro nor Jackson JsonSchema should
be rejected
+ String arbitraryJson = "{\"foo\":\"bar\",\"baz\":123}";
+ SchemaData data = SchemaData.builder()
+ .type(SchemaType.JSON)
+ .data(arbitraryJson.getBytes(UTF_8))
+ .build();
+ // Even in legacy mode, this should fail (Jackson JsonSchema parsing
rejects it)
+ SchemaDataValidator.validateSchemaData(data, true);
+ }
+
+ @Test
+ public void testAvroSchemaTypeUnaffectedByLegacyFlag() throws Exception {
+ // The legacy flag should only affect SchemaType.JSON, not AVRO
+ Schema<Foo> schema = Schema.AVRO(Foo.class);
+ SchemaData data = SchemaData.builder()
+ .type(SchemaType.AVRO)
+ .data(schema.getSchemaInfo().getSchema())
+ .build();
+ SchemaDataValidator.validateSchemaData(data, false);
+ SchemaDataValidator.validateSchemaData(data, true);
+ }
+
+ @Test(expectedExceptions = InvalidSchemaDataException.class)
+ public void testAvroSchemaTypeWithJacksonFormatRejectedRegardlessOfFlag()
throws Exception {
+ // Jackson format for SchemaType.AVRO should always be rejected (flag
only applies to JSON)
+ ObjectMapper mapper =
ObjectMapperFactory.getMapper().getObjectMapper();
+ SchemaData data = SchemaData.builder()
+ .type(SchemaType.AVRO)
+ .data(mapper.writeValueAsBytes(new
JsonSchemaGenerator(mapper).generateSchema(Foo.class)))
+ .build();
+ SchemaDataValidator.validateSchemaData(data, true);
+ }
+
@Test
public void testCompatibleNameValidatorValidNames() {
CompatibleNameValidator validator = new CompatibleNameValidator();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
index 9d94242068a..d4397b700e3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.schema.validator;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -28,6 +29,8 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import java.util.concurrent.CompletableFuture;
import
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
@@ -36,6 +39,7 @@ import
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -159,4 +163,72 @@ public class
SchemaRegistryServiceWithSchemaDataValidatorTest {
.putSchemaIfAbsent(eq(schemaId), same(schemaData), eq(strategy));
}
+ // PIP-464: Tests for legacy Jackson format handling
+
+ @Test
+ public void testPutSchemaRejectsJacksonFormatByDefault() throws Exception {
+ // Default (strict mode) should reject Jackson JSON schema format
+ SchemaRegistryServiceWithSchemaDataValidator strictService =
+
SchemaRegistryServiceWithSchemaDataValidator.of(underlyingService);
+
+ ObjectMapper mapper =
ObjectMapperFactory.getMapper().getObjectMapper();
+ SchemaData schemaData = SchemaData.builder()
+ .type(SchemaType.JSON)
+ .data(mapper.writeValueAsBytes(new
JsonSchemaGenerator(mapper).generateSchema(Object.class)))
+ .build();
+
+ try {
+ strictService.putSchemaIfAbsent("test", schemaData,
SchemaCompatibilityStrategy.FULL).get();
+ fail("Should fail with InvalidSchemaDataException");
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof InvalidSchemaDataException);
+ }
+ verify(underlyingService, times(0))
+ .putSchemaIfAbsent(eq("test"), any(SchemaData.class),
any(SchemaCompatibilityStrategy.class));
+ }
+
+ @Test
+ public void testPutSchemaAcceptsJacksonFormatWhenLegacyEnabled() throws
Exception {
+ // Legacy mode should accept Jackson JSON schema format
+ SchemaRegistryServiceWithSchemaDataValidator legacyService =
+
SchemaRegistryServiceWithSchemaDataValidator.of(underlyingService, true);
+
+ ObjectMapper mapper =
ObjectMapperFactory.getMapper().getObjectMapper();
+ SchemaData schemaData = SchemaData.builder()
+ .type(SchemaType.JSON)
+ .data(mapper.writeValueAsBytes(new
JsonSchemaGenerator(mapper).generateSchema(Object.class)))
+ .build();
+
+ CompletableFuture<SchemaVersion> future = new CompletableFuture<>();
+ when(underlyingService.putSchemaIfAbsent(eq("test"),
any(SchemaData.class),
+ eq(SchemaCompatibilityStrategy.FULL))).thenReturn(future);
+
+ assertSame(future, legacyService.putSchemaIfAbsent("test", schemaData,
SchemaCompatibilityStrategy.FULL));
+ verify(underlyingService, times(1))
+ .putSchemaIfAbsent(eq("test"), same(schemaData),
eq(SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testPutSchemaRejectsJsonSchemaDraftByDefault() throws
Exception {
+ // JSON Schema Draft 2020-12 should be rejected in strict mode
+ SchemaRegistryServiceWithSchemaDataValidator strictService =
+
SchemaRegistryServiceWithSchemaDataValidator.of(underlyingService);
+
+ String jsonSchemaDraft =
"{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\","
+ +
"\"type\":\"object\",\"properties\":{\"field\":{\"type\":\"integer\"}}}";
+ SchemaData schemaData = SchemaData.builder()
+ .type(SchemaType.JSON)
+ .data(jsonSchemaDraft.getBytes(UTF_8))
+ .build();
+
+ try {
+ strictService.putSchemaIfAbsent("test", schemaData,
SchemaCompatibilityStrategy.FULL).get();
+ fail("Should fail with InvalidSchemaDataException");
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof InvalidSchemaDataException);
+ }
+ verify(underlyingService, times(0))
+ .putSchemaIfAbsent(eq("test"), any(SchemaData.class),
any(SchemaCompatibilityStrategy.class));
+ }
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 62e8373e2d7..4119fc8b6ed 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1935,6 +1935,9 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
if
(Commands.peerSupportJsonSchemaAvroFormat(cnx.getRemoteEndpointProtocolVersion()))
{
schemaInfo = schema.getSchemaInfo();
} else if (schema instanceof JSONSchema) {
+ // Deprecated (PIP-464): This backward-compatible path
sends old Jackson JsonSchema
+ // format to brokers below protocol v13 (Pulsar <
2.1). Scheduled for removal in a
+ // future major release.
JSONSchema jsonSchema = (JSONSchema) schema;
schemaInfo =
jsonSchema.getBackwardsCompatibleJsonSchemaInfo();
} else {