This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cf58aa84e63 [feat] PIP-468: Test schema evolution coverage on scalable 
topics (#25637)
cf58aa84e63 is described below

commit cf58aa84e63eda55f13c59257c2fada97abab062
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 1 06:07:11 2026 -0700

    [feat] PIP-468: Test schema evolution coverage on scalable topics (#25637)
---
 .../client/api/v5/V5SchemaEvolutionTest.java       | 278 +++++++++++++++++++++
 1 file changed, 278 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SchemaEvolutionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SchemaEvolutionTest.java
new file mode 100644
index 00000000000..7c8ac2d669e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SchemaEvolutionTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.CompletionException;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Schema evolution coverage for V5 scalable topics.
+ *
+ * <p>Schemas on a scalable topic are scoped at the parent topic, not 
per-segment:
+ * {@link org.apache.pulsar.common.naming.TopicName#getSchemaName()} strips 
the segment
+ * descriptor (since {@code segment://t/ns/parent/desc} parses with {@code 
localName} =
+ * {@code parent}), so every segment of the same scalable topic shares one 
schema id —
+ * {@code t/ns/parent}. The schema registry therefore holds a single version 
history per
+ * scalable topic, and split / merge segments inherit it transparently.
+ *
+ * <p>These tests pin that contract:
+ * <ul>
+ *   <li>{@link #testSingleSchemaIdAcrossSegments()} — produce v1, split into 
two children,
+ *       produce v1 again across the children. Only one schema entry exists 
when looked up
+ *       at the parent name; the children re-use it rather than registering 
their own.</li>
+ *   <li>{@link #testCompatibleSchemaUpgradeAcrossSplit()} — produce v1, 
split, produce a
+ *       BACKWARD-compatible v2 across the new children. The compatibility 
check sees the
+ *       v1 history that pre-dated the split and accepts v2; the registry ends 
up with both
+ *       versions and the consumer reads every message.</li>
+ *   <li>{@link #testIncompatibleSchemaAfterSplitFails()} — same setup, but 
the post-split
+ *       producer attempts an incompatible schema. The pre-split history is 
still in scope,
+ *       so the registration is rejected.</li>
+ * </ul>
+ */
+public class V5SchemaEvolutionTest extends V5ClientBaseTest {
+
+    @Test
+    public void testSingleSchemaIdAcrossSegments() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<V1> producer = v5Client.newProducer(Schema.avro(V1.class))
+                .topic(topic)
+                .create();
+        producer.newMessage().key("k-pre").value(new V1(1)).send();
+
+        // Split forces a second active segment to come into play.
+        long parentSegment = singleActiveSegmentId(topic);
+        admin.scalableTopics().splitSegment(topic, parentSegment);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 2,
+                "split must produce 2 active children"));
+
+        // Send across the new active segments. Same schema, but two distinct 
underlying
+        // segment topics — if schemas were per-segment we'd see one entry per 
segment.
+        for (int i = 0; i < 20; i++) {
+            producer.newMessage().key("k-" + i).value(new V1(100 + i)).send();
+        }
+
+        // The schemas admin endpoint resolves by 
tenant/namespace/topic-local-name; the
+        // local name strips the segment descriptor, so this query lines up 
with the
+        // schema id the broker wrote. One id, one version.
+        var schemas = admin.schemas().getAllSchemas(topic);
+        assertEquals(schemas.size(), 1,
+                "single schema id should be shared across all segments, got " 
+ schemas);
+    }
+
+    @Test
+    public void testCompatibleSchemaUpgradeAcrossSplit() throws Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(getNamespace(),
+                SchemaCompatibilityStrategy.BACKWARD);
+
+        String topic = newScalableTopic(1);
+
+        // Phase 1: schema v1 on the only initial segment.
+        Producer<V1> p1 = v5Client.newProducer(Schema.avro(V1.class))
+                .topic(topic)
+                .create();
+        p1.newMessage().key("v1-a").value(new V1(1)).send();
+        p1.newMessage().key("v1-b").value(new V1(2)).send();
+        p1.close();
+
+        // Split: parent sealed, two new active children.
+        long parentSegment = singleActiveSegmentId(topic);
+        admin.scalableTopics().splitSegment(topic, parentSegment);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 2));
+
+        // Phase 2: a NEW producer attaches with schema v2. v2 adds a nullable 
field on
+        // top of v1 — backward compatible. The compatibility check has to 
consult v1 (in
+        // the pre-split history) for this to pass; if the children carried 
their own
+        // empty schema state it would just be accepted as a new schema v0.
+        @Cleanup
+        Producer<V2> p2 = v5Client.newProducer(Schema.avro(V2.class))
+                .topic(topic)
+                .create();
+        p2.newMessage().key("v2-a").value(new V2(3, 30)).send();
+        p2.newMessage().key("v2-b").value(new V2(4, 40)).send();
+
+        // Two schema versions registered under the same scalable-topic id.
+        var schemas = admin.schemas().getAllSchemas(topic);
+        assertEquals(schemas.size(), 2,
+                "expected v1 and v2 to coexist under one scalable-topic schema 
id, got " + schemas);
+
+        // Consumer reads everything as v2 — v1 messages decode into V2 with j 
== null
+        // (the nullable default), v2 messages decode straight through.
+        @Cleanup
+        QueueConsumer<V2> consumer = 
v5Client.newQueueConsumer(Schema.avro(V2.class))
+                .topic(topic)
+                .subscriptionName("evolution-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int seenV1 = 0;
+        int seenV2 = 0;
+        for (int i = 0; i < 4; i++) {
+            Message<V2> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missing message #" + i);
+            V2 v = msg.value();
+            if (v.j == null) {
+                seenV1++;
+                assertTrue(v.i == 1 || v.i == 2, "unexpected v1-decoded 
payload: " + v);
+            } else {
+                seenV2++;
+                assertTrue(v.i == 3 || v.i == 4, "unexpected v2 payload: " + 
v);
+            }
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(seenV1, 2, "expected exactly the two v1 messages to 
decode with j == null");
+        assertEquals(seenV2, 2, "expected exactly the two v2 messages to 
decode with j set");
+    }
+
+    @Test
+    public void testIncompatibleSchemaAfterSplitFails() throws Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(getNamespace(),
+                SchemaCompatibilityStrategy.BACKWARD);
+
+        String topic = newScalableTopic(1);
+
+        Producer<V1> p1 = v5Client.newProducer(Schema.avro(V1.class))
+                .topic(topic)
+                .create();
+        p1.newMessage().key("v1").value(new V1(1)).send();
+        p1.close();
+
+        long parentSegment = singleActiveSegmentId(topic);
+        admin.scalableTopics().splitSegment(topic, parentSegment);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 2));
+
+        // VBad has the same field shape as V2 but with a non-nullable extra 
field — that
+        // breaks BACKWARD compatibility against V1 (an old reader can't fill 
a required
+        // field from a v1 record). The V5 producer creates v4 segment 
producers lazily on
+        // the first send, so the schema registration / compatibility check 
happens then;
+        // exercise it explicitly here.
+        @Cleanup
+        Producer<VBad> bad = v5Client.newProducer(Schema.avro(VBad.class))
+                .topic(topic)
+                .create();
+        try {
+            bad.newMessage().key("bad").value(new VBad(5, 50)).send();
+            fail("expected incompatible schema to be rejected post-split");
+        } catch (PulsarClientException expected) {
+            // good — the broker preserved v1 history across the split and 
rejected
+            // the registration.
+        } catch (CompletionException ce) {
+            // Some V5 builders may surface the error wrapped — accept either 
form.
+            assertTrue(ce.getCause() instanceof PulsarClientException,
+                    "unexpected wrapped error: " + ce);
+        }
+
+        // The registry still shows just v1 — the failed registration didn't 
leak through.
+        var schemas = admin.schemas().getAllSchemas(topic);
+        assertEquals(schemas.size(), 1,
+                "incompatible schema must not be added to the scalable-topic 
registry, got "
+                        + schemas);
+    }
+
+    // --- Helpers ---
+
+    private long singleActiveSegmentId(String topic) throws Exception {
+        var meta = admin.scalableTopics().getMetadata(topic);
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                return seg.getSegmentId();
+            }
+        }
+        throw new AssertionError("no active segment for " + topic);
+    }
+
+    private int activeSegmentCount(String topic) throws Exception {
+        int active = 0;
+        for (var seg : 
admin.scalableTopics().getMetadata(topic).getSegments().values()) {
+            if (seg.isActive()) {
+                active++;
+            }
+        }
+        return active;
+    }
+
+    // --- Schema POJOs ---
+
+    /** v1 schema: single int field. */
+    public static class V1 {
+        public int i;
+
+        public V1() {
+        }
+
+        public V1(int i) {
+            this.i = i;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return o instanceof V1 v && v.i == i;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(i);
+        }
+    }
+
+    /** v2 schema: adds a nullable Integer — backward-compatible upgrade. */
+    public static class V2 {
+        public int i;
+        public Integer j;
+
+        public V2() {
+        }
+
+        public V2(int i, Integer j) {
+            this.i = i;
+            this.j = j;
+        }
+
+        @Override
+        public String toString() {
+            return "V2{i=" + i + ", j=" + j + "}";
+        }
+    }
+
+    /** Incompatible upgrade: the extra field is required, not nullable. */
+    public static class VBad {
+        public int i;
+        public int j;
+
+        public VBad() {
+        }
+
+        public VBad(int i, int j) {
+            this.i = i;
+            this.j = j;
+        }
+    }
+}

Reply via email to