This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0daa5aaeeb0 [improve][test] PIP-473: extensive v5 transaction tests on 
scalable topics (#25958)
0daa5aaeeb0 is described below

commit 0daa5aaeeb0411c775cd7212a52df1737f7e0d75
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jun 8 09:12:49 2026 -0700

    [improve][test] PIP-473: extensive v5 transaction tests on scalable topics 
(#25958)
---
 .../client/api/v5/V5TransactionRecoveryTest.java   | 219 ++++++++++++
 .../client/api/v5/V5TransactionScalableTest.java   | 382 +++++++++++++++++++++
 .../pulsar/client/api/v5/V5TransactionTest.java    |  13 +-
 .../transaction/TcMetadataDiscoveryTestBase.java   |  10 +-
 .../V5ScalableTopicTransactionOnOxiaTest.java      |  46 +++
 .../V5ScalableTopicTransactionTest.java            | 346 +++++++++++++++++++
 .../src/test/resources/pulsar-transaction.xml      |   2 +
 7 files changed, 1006 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionRecoveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionRecoveryTest.java
new file mode 100644
index 00000000000..64fd9e88544
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionRecoveryTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import com.google.common.collect.Sets;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Recovery coverage for V5 transactions on scalable topics. Uses a single 
restartable broker
+ * (the mock BookKeeper + metadata stores are reused across {@link 
#restartBroker()}, so segment
+ * data and the {@code /txn} metadata layout survive a restart). After restart 
a segment topic is
+ * reloaded cold, exercising {@code MetadataTransactionBuffer} / {@code 
MetadataPendingAckStore}
+ * recovery: committed data must stay visible, aborted data must stay 
filtered, and committed
+ * transactional acks must not be redelivered. Also covers the timeout sweep 
aborting a dangling
+ * transaction.
+ */
+public class V5TransactionRecoveryTest extends MockedPulsarServiceBaseTest {
+
+    private final String myNamespace = "pulsar/txn-recovery";
+
+    private PulsarClient v5Client;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        ServiceConfiguration config = getDefaultConf();
+        config.setTransactionCoordinatorEnabled(true);
+        config.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup(config);
+
+        admin.clusters().createCluster("test",
+                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        // SYSTEM_NAMESPACE's tenant IS "pulsar" (pulsar/system), which also 
owns myNamespace below.
+        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet("test")));
+        admin.namespaces().createNamespace(myNamespace, 
Sets.newHashSet("test"));
+        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(1));
+
+        v5Client = newV5Client(Duration.ofMinutes(2));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        if (v5Client != null) {
+            v5Client.close();
+            v5Client = null;
+        }
+        super.internalCleanup();
+    }
+
+    private PulsarClient newV5Client(Duration txnTimeout) throws Exception {
+        return PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                
.transactionPolicy(TransactionPolicy.builder().timeout(txnTimeout).build())
+                .build();
+    }
+
+    private String newScalableTopic(int numInitialSegments) throws Exception {
+        String name = "topic://" + myNamespace + "/scalable-" + 
UUID.randomUUID().toString().substring(0, 8);
+        admin.scalableTopics().createScalableTopic(name, numInitialSegments);
+        return name;
+    }
+
+    private QueueConsumer<String> subscribe(String topic, String sub) throws 
Exception {
+        return v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(sub)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+    }
+
+    /** Restart the broker (segment data + metadata survive) and rebuild the 
V5 client. */
+    private void restartBrokerAndReconnect() throws Exception {
+        v5Client.close();
+        restartBroker();
+        v5Client = newV5Client(Duration.ofMinutes(2));
+    }
+
+    @Test
+    public void testCommittedMessagesVisibleAfterBrokerRestart() throws 
Exception {
+        // Commit a transaction, then restart so the segment topic is reloaded 
cold. The transaction
+        // buffer recovers from the /txn metadata and re-exposes the committed 
messages.
+        String topic = newScalableTopic(2);
+        @Cleanup
+        Producer<String> producer = 
v5Client.newProducer(Schema.string()).topic(topic).create();
+
+        Transaction txn = v5Client.newTransaction();
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + 
i).transaction(txn).value(v).send();
+            sent.add(v);
+        }
+        txn.commit();
+        producer.close();
+
+        restartBrokerAndReconnect();
+
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(topic, "after-restart-sub");
+        Set<String> got = new HashSet<>();
+        for (int i = 0; i < sent.size(); i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(15));
+            assertNotNull(msg, "committed message #" + i + " must survive 
restart");
+            got.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(got, sent, "every committed message must be visible after 
recovery");
+        assertNull(consumer.receive(Duration.ofMillis(500)), "no extra 
messages");
+    }
+
+    @Test
+    public void testAbortedMessagesFilteredAfterBrokerRestart() throws 
Exception {
+        // Abort a transaction and publish a non-transactional sentinel, then 
restart. Recovery must
+        // rebuild the aborted set (from the durable ABORTED header / aborted 
records) so the aborted
+        // messages stay filtered and only the sentinel is delivered.
+        String topic = newScalableTopic(1);
+        @Cleanup
+        Producer<String> producer = 
v5Client.newProducer(Schema.string()).topic(topic).create();
+
+        Transaction txn = v5Client.newTransaction();
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().transaction(txn).value("aborted-" + 
i).send();
+        }
+        txn.abort();
+        producer.newMessage().value("sentinel").send();
+        producer.close();
+
+        restartBrokerAndReconnect();
+
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(topic, 
"after-restart-abort-sub");
+        Message<String> msg = consumer.receive(Duration.ofSeconds(15));
+        assertNotNull(msg, "the non-transactional sentinel must be delivered 
after restart");
+        assertEquals(msg.value(), "sentinel", "aborted messages must stay 
filtered after recovery");
+        consumer.acknowledge(msg.id());
+        assertNull(consumer.receive(Duration.ofMillis(500)), "no aborted 
messages after the sentinel");
+    }
+
+    @Test
+    public void testTransactionalAcksSurviveBrokerRestart() throws Exception {
+        // Acknowledge messages inside a committed transaction, then restart. 
The pending-ack store
+        // recovery (and the durable cursor) must keep them acknowledged — no 
redelivery.
+        String topic = newScalableTopic(1);
+        @Cleanup
+        Producer<String> producer = 
v5Client.newProducer(Schema.string()).topic(topic).create();
+        int n = 10;
+        for (int i = 0; i < n; i++) {
+            producer.newMessage().value("m-" + i).send();
+        }
+
+        // Acknowledge all inside the transaction, keeping the consumer open 
through commit so the
+        // async ack operations complete before commit (closing it early fails 
them).
+        Transaction txn = v5Client.newTransaction();
+        QueueConsumer<String> consumer = subscribe(topic, "ack-sub");
+        for (int i = 0; i < n; i++) {
+            Message<String> m = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(m, "delivery #" + i);
+            consumer.acknowledge(m.id(), txn);
+        }
+        txn.commit();
+        consumer.close();
+
+        // Pre-restart: confirm the committed acks have materialised (a fresh 
consumer on the same
+        // subscription sees nothing), so the restart genuinely exercises 
recovery of acked state.
+        {
+            @Cleanup
+            QueueConsumer<String> check = subscribe(topic, "ack-sub");
+            assertNull(check.receive(Duration.ofSeconds(10)), "committed acks 
must materialise before restart");
+        }
+
+        restartBrokerAndReconnect();
+
+        @Cleanup
+        QueueConsumer<String> after = subscribe(topic, "ack-sub");
+        assertNull(after.receive(Duration.ofSeconds(10)),
+                "committed transactional acks must not be redelivered after 
restart");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionScalableTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionScalableTest.java
new file mode 100644
index 00000000000..bf8ef8e84eb
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionScalableTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.policies.data.ScalableTopicMetadata;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Extended coverage for V5 transactions on scalable topics: multi-segment 
producers, many
+ * concurrent transactions with mixed commit/abort, abort across layout 
changes (split / merge),
+ * large transactions, transactional acknowledgement lifecycle, 
per-subscription isolation, and
+ * read-committed visibility while a transaction is still open.
+ *
+ * <p>Complements {@link V5TransactionTest} (single-segment happy paths + 
commit-across-split/merge).
+ */
+public class V5TransactionScalableTest extends V5ClientBaseTest {
+
+    private PulsarClient newTxnClient() throws Exception {
+        return track(PulsarClient.builder()
+                .serviceUrl(getBrokerServiceUrl())
+                
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(2)).build())
+                .build());
+    }
+
+    private QueueConsumer<String> subscribe(PulsarClient client, String topic, 
String sub) throws Exception {
+        return client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(sub)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+    }
+
+    /** Receive exactly {@code count} messages (acking each), failing if any 
is missing. */
+    private Set<String> receiveValues(QueueConsumer<String> consumer, int 
count, Duration perMessage)
+            throws Exception {
+        Set<String> values = new HashSet<>();
+        for (int i = 0; i < count; i++) {
+            Message<String> msg = consumer.receive(perMessage);
+            assertNotNull(msg, "missing message #" + i + " (received so far: " 
+ values.size() + ")");
+            values.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        return values;
+    }
+
+    private long firstActiveSegment(String topic) throws Exception {
+        ScalableTopicMetadata meta = admin.scalableTopics().getMetadata(topic);
+        for (ScalableTopicMetadata.SegmentInfo seg : 
meta.getSegments().values()) {
+            if (seg.isActive()) {
+                return seg.getSegmentId();
+            }
+        }
+        throw new IllegalStateException("no active segment for " + topic);
+    }
+
+    private List<Long> activeSegments(String topic) throws Exception {
+        ScalableTopicMetadata meta = admin.scalableTopics().getMetadata(topic);
+        List<Long> ids = new ArrayList<>();
+        for (ScalableTopicMetadata.SegmentInfo seg : 
meta.getSegments().values()) {
+            if (seg.isActive()) {
+                ids.add(seg.getSegmentId());
+            }
+        }
+        return ids;
+    }
+
+    private void awaitActiveSegmentCount(String topic, int expected) {
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegments(topic).size(), expected));
+    }
+
+    @Test
+    public void testMultiSegmentTransactionCommit() throws Exception {
+        // A single transaction whose keyed writes spread across all segments 
of a multi-segment
+        // topic commits atomically: every message becomes visible together.
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(3);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(client, topic, 
"multi-seg-sub");
+
+        Transaction txn = client.newTransaction();
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < 60; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + 
i).transaction(txn).value(v).send();
+            sent.add(v);
+        }
+        assertNull(consumer.receive(Duration.ofMillis(500)), "nothing visible 
before commit");
+
+        txn.commit();
+        assertEquals(receiveValues(consumer, sent.size(), 
Duration.ofSeconds(10)), sent,
+                "every committed message across all segments must be 
delivered");
+        assertNull(consumer.receive(Duration.ofMillis(500)), "no extra 
messages");
+    }
+
+    @Test
+    public void testConcurrentTransactionsMixedCommitAbort() throws Exception {
+        // Many open transactions on one topic, interleaved; half commit, half 
abort. Only the
+        // committed transactions' messages are ever delivered. While any 
transaction stays open the
+        // buffer pins visibility, so nothing is visible until all resolve.
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(2);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(client, topic, 
"concurrent-sub");
+
+        int numTxns = 6;
+        int perTxn = 10;
+        List<Transaction> txns = new ArrayList<>();
+        for (int t = 0; t < numTxns; t++) {
+            txns.add(client.newTransaction());
+        }
+        Set<String> committedExpected = new HashSet<>();
+        for (int i = 0; i < perTxn; i++) {
+            for (int t = 0; t < numTxns; t++) {
+                String v = "t" + t + "-m" + i;
+                producer.newMessage().key("t" + 
t).transaction(txns.get(t)).value(v).send();
+                if (t % 2 == 0) {
+                    committedExpected.add(v);
+                }
+            }
+        }
+        assertNull(consumer.receive(Duration.ofMillis(500)), "nothing visible 
while transactions are open");
+
+        for (int t = 0; t < numTxns; t++) {
+            if (t % 2 == 0) {
+                txns.get(t).commit();
+            } else {
+                txns.get(t).abort();
+            }
+        }
+
+        assertEquals(receiveValues(consumer, committedExpected.size(), 
Duration.ofSeconds(10)),
+                committedExpected, "exactly the committed transactions' 
messages must be delivered");
+        assertNull(consumer.receive(Duration.ofSeconds(1)), "aborted 
transactions' messages must never appear");
+    }
+
+    @Test
+    public void testAbortSpansSplit() throws Exception {
+        // A transaction spanning a split that aborts must leave nothing 
visible from either the
+        // sealed parent or the new children. A non-transactional sentinel 
proves the consumer is
+        // live and that only it is delivered.
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(client, topic, 
"abort-split-sub");
+
+        Transaction txn = client.newTransaction();
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().key("k-" + 
i).transaction(txn).value("before-split-" + i).send();
+        }
+        admin.scalableTopics().splitSegment(topic, firstActiveSegment(topic));
+        awaitActiveSegmentCount(topic, 2);
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().key("k-after-" + 
i).transaction(txn).value("after-split-" + i).send();
+        }
+        txn.abort();
+
+        producer.newMessage().value("sentinel").send();
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg, "non-transactional sentinel must be delivered");
+        assertEquals(msg.value(), "sentinel", "aborted txn messages must not 
precede the sentinel");
+        consumer.acknowledge(msg.id());
+        assertNull(consumer.receive(Duration.ofMillis(500)), "no aborted 
messages after the sentinel");
+    }
+
+    @Test
+    public void testAbortSpansMerge() throws Exception {
+        // Same as the split case but across a merge: an aborted transaction 
whose lifetime spans a
+        // merge leaves nothing visible.
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(2);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(client, topic, 
"abort-merge-sub");
+
+        Transaction txn = client.newTransaction();
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().key("k-" + 
i).transaction(txn).value("before-merge-" + i).send();
+        }
+        List<Long> active = activeSegments(topic);
+        assertEquals(active.size(), 2, "expected 2 active segments before 
merge");
+        admin.scalableTopics().mergeSegments(topic, active.get(0), 
active.get(1));
+        awaitActiveSegmentCount(topic, 1);
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().key("k-after-" + 
i).transaction(txn).value("after-merge-" + i).send();
+        }
+        txn.abort();
+
+        producer.newMessage().value("sentinel").send();
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg, "non-transactional sentinel must be delivered");
+        assertEquals(msg.value(), "sentinel");
+        consumer.acknowledge(msg.id());
+        assertNull(consumer.receive(Duration.ofMillis(500)), "no aborted 
messages after the sentinel");
+    }
+
+    @Test
+    public void testLargeTransaction() throws Exception {
+        // A large transaction commits atomically and every message is 
delivered exactly once.
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(2);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(client, topic, 
"large-txn-sub");
+
+        int n = 500;
+        Transaction txn = client.newTransaction();
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + (i % 
50)).transaction(txn).value(v).send();
+            sent.add(v);
+        }
+        txn.commit();
+
+        assertEquals(receiveValues(consumer, n, Duration.ofSeconds(15)), sent,
+                "all messages in the large transaction must be delivered");
+        assertNull(consumer.receive(Duration.ofSeconds(1)), "no extra 
messages");
+    }
+
+    @Test
+    public void testTransactionalAckAbortRedeliversThenCommitSticks() throws 
Exception {
+        // Acknowledge messages inside a transaction that aborts -> the acks 
roll back and the
+        // messages are redelivered. Acknowledging the redelivered batch in a 
committed transaction
+        // makes the acks durable -> no further redelivery.
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        int n = 10;
+        Set<String> produced = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "m-" + i;
+            producer.newMessage().value(v).send();
+            produced.add(v);
+        }
+
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(client, topic, 
"ack-lifecycle-sub");
+
+        // Consume all, ack inside a transaction we then abort.
+        List<Message<String>> first = new ArrayList<>();
+        for (int i = 0; i < n; i++) {
+            Message<String> m = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(m, "initial delivery #" + i);
+            first.add(m);
+        }
+        Transaction abortTxn = client.newTransaction();
+        for (Message<String> m : first) {
+            consumer.acknowledge(m.id(), abortTxn);
+        }
+        abortTxn.abort();
+
+        // After abort the messages must be redelivered. Collect them in a 
committed transaction.
+        Transaction commitTxn = client.newTransaction();
+        Set<String> redelivered = new HashSet<>();
+        long deadline = System.currentTimeMillis() + 30_000;
+        while (redelivered.size() < n && System.currentTimeMillis() < 
deadline) {
+            Message<String> m = consumer.receive(Duration.ofSeconds(2));
+            if (m == null) {
+                continue;
+            }
+            redelivered.add(m.value());
+            consumer.acknowledge(m.id(), commitTxn);
+        }
+        assertEquals(redelivered, produced, "aborting a transactional ack must 
redeliver every message");
+        commitTxn.commit();
+
+        assertNull(consumer.receive(Duration.ofSeconds(2)),
+                "messages acked in a committed transaction must not be 
redelivered");
+    }
+
+    @Test
+    public void testMultipleSubscriptionsIndependentTransactionalAcks() throws 
Exception {
+        // A committed transactional ack on one subscription must not affect 
another subscription on
+        // the same scalable topic.
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        int n = 10;
+        Set<String> produced = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "m-" + i;
+            producer.newMessage().value(v).send();
+            produced.add(v);
+        }
+
+        @Cleanup
+        QueueConsumer<String> subA = subscribe(client, topic, "sub-a");
+        @Cleanup
+        QueueConsumer<String> subB = subscribe(client, topic, "sub-b");
+
+        Transaction txn = client.newTransaction();
+        for (int i = 0; i < n; i++) {
+            Message<String> m = subA.receive(Duration.ofSeconds(5));
+            assertNotNull(m, "sub-a delivery #" + i);
+            subA.acknowledge(m.id(), txn);
+        }
+        txn.commit();
+        assertNull(subA.receive(Duration.ofSeconds(2)), "sub-a committed acks 
must stick");
+
+        // sub-b is independent and must still see everything.
+        assertEquals(receiveValues(subB, n, Duration.ofSeconds(5)), produced,
+                "the other subscription must be unaffected by sub-a's 
transactional acks");
+    }
+
+    @Test
+    public void testOpenTransactionPinsLaterNonTransactionalWrites() throws 
Exception {
+        // Read-committed visibility: while a transaction is open, the 
buffer's max-read position is
+        // pinned below its first write, so even a non-transactional message 
published afterwards
+        // stays invisible until the transaction resolves. After abort, the 
non-transactional message
+        // becomes visible and the aborted ones are filtered.
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(client, topic, "pin-sub");
+
+        Transaction txn = client.newTransaction();
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().transaction(txn).value("txn-" + i).send();
+        }
+        assertNull(consumer.receive(Duration.ofSeconds(1)), "uncommitted 
writes are invisible");
+
+        producer.newMessage().value("sentinel").send();
+        assertNull(consumer.receive(Duration.ofSeconds(1)),
+                "a non-transactional write after an open txn is pinned until 
the txn resolves");
+
+        txn.abort();
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg, "after abort the non-transactional write becomes 
visible");
+        assertEquals(msg.value(), "sentinel", "only the non-transactional 
write is delivered");
+        consumer.acknowledge(msg.id());
+        assertNull(consumer.receive(Duration.ofMillis(500)), "aborted txn 
messages must stay filtered");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
index 6cdc197709f..59057de8bf3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
@@ -191,13 +191,7 @@ public class V5TransactionTest extends V5ClientBaseTest {
         assertEquals(mb.value(), "b-1");
     }
 
-    // Disabled: documents a real broker gap in the per-segment 
TransactionBuffer model.
-    // After split, the parent segment is sealed; the txn coordinator's COMMIT 
END-TXN to
-    // the sealed parent never returns and the commit RPC times out (~30s). 
The fix
-    // requires moving transaction tracking up to the scalable-topic level (so 
layout
-    // changes don't strand in-flight transactions on now-sealed segments) — 
that's
-    // beyond the scope of this commit.
-    @Test(enabled = false)
+    @Test
     public void testCommitSpansSplit() throws Exception {
         // A single transaction whose lifetime spans a layout-changing split 
must commit
         // atomically: pre-split writes (on the now-sealed parent) and 
post-split writes
@@ -270,10 +264,7 @@ public class V5TransactionTest extends V5ClientBaseTest {
         assertEquals(received, sent, "all txn messages across the split must 
be delivered after commit");
     }
 
-    // Disabled: same broker gap as testCommitSpansSplit. After merge, both 
source
-    // segments are sealed and the COMMIT marker can't be delivered, so the 
END-TXN
-    // request times out.
-    @Test(enabled = false)
+    @Test
     public void testCommitSpansMerge() throws Exception {
         // A single transaction whose lifetime spans a layout-changing merge 
must commit
         // atomically: writes to the two pre-merge segments and writes to the 
post-merge
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
index 8af84ab229e..05ed5747f64 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
@@ -64,8 +64,16 @@ public abstract class TcMetadataDiscoveryTestBase extends 
PulsarTestSuite {
         BrokerContainer brokerContainer = 
pulsarCluster.getBrokers().iterator().next();
         brokerContainer.execCmd(
                 "/pulsar/bin/pulsar", 
"initialize-transaction-coordinator-metadata",
-                "-cs", ZKContainer.NAME,
+                "-cs", configurationStoreConnectionString(),
                 "-c", pulsarCluster.getClusterName(),
                 "--initial-num-transaction-coordinators", 
Integer.toString(TC_PARALLELISM));
     }
+
+    /**
+     * Configuration-store connection string used to initialize the 
transaction-coordinator metadata.
+     * Defaults to the ZooKeeper container; an Oxia-backed subclass overrides 
this with the Oxia URL.
+     */
+    protected String configurationStoreConnectionString() {
+        return ZKContainer.NAME;
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/V5ScalableTopicTransactionOnOxiaTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/V5ScalableTopicTransactionOnOxiaTest.java
new file mode 100644
index 00000000000..e28030b4a1d
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/V5ScalableTopicTransactionOnOxiaTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.transaction;
+
+import lombok.CustomLog;
+import org.apache.pulsar.tests.integration.oxia.OxiaContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+
+/**
+ * Runs the full {@link V5ScalableTopicTransactionTest} suite against a 
cluster whose metadata store
+ * is a containerized Oxia instead of ZooKeeper. Oxia is the backend PIP-473 
targets natively — its
+ * {@code scanByIndex} / {@code subscribeSequence} / partition-key / 
sequence-delta primitives back the
+ * {@code /txn} layout directly, rather than the scan-and-filter fallback 
ZooKeeper uses — so this is
+ * the closest the test suite gets to the production transaction path.
+ */
+@CustomLog
+public class V5ScalableTopicTransactionOnOxiaTest extends 
V5ScalableTopicTransactionTest {
+
+    @Override
+    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+            String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder 
specBuilder) {
+        specBuilder.enableOxia(true);
+        return specBuilder;
+    }
+
+    @Override
+    protected String configurationStoreConnectionString() {
+        return "oxia://" + OxiaContainer.NAME + ":" + OxiaContainer.OXIA_PORT;
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/V5ScalableTopicTransactionTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/V5ScalableTopicTransactionTest.java
new file mode 100644
index 00000000000..0b5b62acd6c
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/V5ScalableTopicTransactionTest.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.transaction;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.Cleanup;
+import lombok.CustomLog;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.policies.data.ScalableTopicMetadata;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end integration coverage for V5 transactions producing and consuming 
data on scalable
+ * ({@code topic://}) topics across a multi-broker docker cluster: commit 
visibility, abort filtering,
+ * multi-topic atomic commit, consume-transform-produce, survival of a broker 
failover mid-transaction,
+ * and transactions whose lifetime spans a layout-changing split / merge.
+ */
+@CustomLog
+public class V5ScalableTopicTransactionTest extends 
TcMetadataDiscoveryTestBase {
+
+    private static final int OP_TIMEOUT_SECONDS = 30;
+
+    private PulsarAdmin newAdmin() throws Exception {
+        return 
PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+    }
+
+    private PulsarClient newTxnClient() throws Exception {
+        return PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(5)).build())
+                .build();
+    }
+
+    private String scalableTopicName() {
+        return "topic://public/default/scalable-" + randomName(8);
+    }
+
+    private QueueConsumer<String> subscribe(PulsarClient client, String topic, 
String sub) throws Exception {
+        return client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(sub)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+    }
+
+    private Set<String> drain(QueueConsumer<String> consumer, int count) 
throws Exception {
+        Set<String> values = new HashSet<>();
+        for (int i = 0; i < count; i++) {
+            Message<String> msg = 
consumer.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS));
+            assertNotNull(msg, "missing message #" + i + " (received so far: " 
+ values.size() + ")");
+            values.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        return values;
+    }
+
+    private List<Long> activeSegments(PulsarAdmin admin, String topic) throws 
Exception {
+        ScalableTopicMetadata meta = admin.scalableTopics().getMetadata(topic);
+        List<Long> ids = new ArrayList<>();
+        for (ScalableTopicMetadata.SegmentInfo seg : 
meta.getSegments().values()) {
+            if (seg.isActive()) {
+                ids.add(seg.getSegmentId());
+            }
+        }
+        return ids;
+    }
+
+    @Test(timeOut = 300_000)
+    public void testProduceConsumeCommitAndAbort() throws Exception {
+        @Cleanup
+        PulsarAdmin admin = newAdmin();
+        @Cleanup
+        PulsarClient client = newTxnClient();
+        String topic = scalableTopicName();
+        admin.scalableTopics().createScalableTopic(topic, 2);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(client, topic, "ce-sub");
+
+        // Commit path.
+        Transaction commit = client.newTransaction();
+        Set<String> committed = new HashSet<>();
+        for (int i = 0; i < 20; i++) {
+            String v = "commit-" + i;
+            producer.newMessage().key("k-" + 
i).transaction(commit).value(v).send();
+            committed.add(v);
+        }
+        assertNull(consumer.receive(Duration.ofSeconds(2)), "nothing visible 
before commit");
+        commit.commit();
+        assertEquals(drain(consumer, committed.size()), committed, "committed 
messages must all be delivered");
+
+        // Abort path: aborted messages never delivered; a sentinel proves the 
consumer is live.
+        Transaction abort = client.newTransaction();
+        for (int i = 0; i < 20; i++) {
+            producer.newMessage().key("k-" + 
i).transaction(abort).value("abort-" + i).send();
+        }
+        abort.abort();
+        producer.newMessage().value("sentinel").send();
+        Message<String> msg = 
consumer.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS));
+        assertNotNull(msg, "sentinel must be delivered");
+        assertEquals(msg.value(), "sentinel", "aborted messages must not 
precede the sentinel");
+        consumer.acknowledge(msg.id());
+        assertNull(consumer.receive(Duration.ofSeconds(2)), "no aborted 
messages after the sentinel");
+    }
+
+    @Test(timeOut = 300_000)
+    public void testMultiTopicAtomicCommit() throws Exception {
+        @Cleanup
+        PulsarAdmin admin = newAdmin();
+        @Cleanup
+        PulsarClient client = newTxnClient();
+        String topicA = scalableTopicName();
+        String topicB = scalableTopicName();
+        admin.scalableTopics().createScalableTopic(topicA, 2);
+        admin.scalableTopics().createScalableTopic(topicB, 2);
+
+        @Cleanup
+        Producer<String> prodA = 
client.newProducer(Schema.string()).topic(topicA).create();
+        @Cleanup
+        Producer<String> prodB = 
client.newProducer(Schema.string()).topic(topicB).create();
+        @Cleanup
+        QueueConsumer<String> consA = subscribe(client, topicA, "multi-a");
+        @Cleanup
+        QueueConsumer<String> consB = subscribe(client, topicB, "multi-b");
+
+        Transaction txn = client.newTransaction();
+        prodA.newMessage().value("a-1").send();
+        prodB.newMessage().value("b-1").send();
+        // The above are non-transactional sends to confirm the topics work; 
now the transactional ones:
+        prodA.newMessage().transaction(txn).value("a-txn").send();
+        prodB.newMessage().transaction(txn).value("b-txn").send();
+
+        // The non-transactional a-1/b-1 are pinned behind the open 
transaction's first write only if
+        // they were published after it; here they were published before, so 
they are visible.
+        
assertEquals(consA.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS)).value(), 
"a-1");
+        
assertEquals(consB.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS)).value(), 
"b-1");
+        assertNull(consA.receive(Duration.ofSeconds(2)), "txn message on A 
invisible pre-commit");
+        assertNull(consB.receive(Duration.ofSeconds(2)), "txn message on B 
invisible pre-commit");
+
+        txn.commit();
+        
assertEquals(consA.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS)).value(), 
"a-txn");
+        
assertEquals(consB.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS)).value(), 
"b-txn");
+    }
+
+    @Test(timeOut = 300_000)
+    public void testConsumeTransformProduce() throws Exception {
+        @Cleanup
+        PulsarAdmin admin = newAdmin();
+        @Cleanup
+        PulsarClient client = newTxnClient();
+        String input = scalableTopicName();
+        String output = scalableTopicName();
+        admin.scalableTopics().createScalableTopic(input, 1);
+        admin.scalableTopics().createScalableTopic(output, 1);
+
+        @Cleanup
+        Producer<String> seed = 
client.newProducer(Schema.string()).topic(input).create();
+        seed.newMessage().value("hello").send();
+
+        @Cleanup
+        QueueConsumer<String> in = subscribe(client, input, "in-sub");
+        @Cleanup
+        Producer<String> out = 
client.newProducer(Schema.string()).topic(output).create();
+        @Cleanup
+        QueueConsumer<String> verify = subscribe(client, output, "verify-sub");
+
+        Message<String> seedMsg = 
in.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS));
+        assertNotNull(seedMsg, "seed message must be delivered");
+
+        Transaction txn = client.newTransaction();
+        
out.newMessage().transaction(txn).value(seedMsg.value().toUpperCase()).send();
+        in.acknowledge(seedMsg.id(), txn);
+        assertNull(verify.receive(Duration.ofSeconds(2)), "output invisible 
before commit");
+
+        txn.commit();
+        Message<String> outMsg = 
verify.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS));
+        assertNotNull(outMsg, "committed output must be delivered");
+        assertEquals(outMsg.value(), "HELLO");
+    }
+
+    @Test(timeOut = 360_000)
+    public void testTransactionSurvivesBrokerFailover() throws Exception {
+        @Cleanup
+        PulsarAdmin admin = newAdmin();
+        @Cleanup
+        PulsarClient client = newTxnClient();
+        String topic = scalableTopicName();
+        // Several segments so they spread across both brokers.
+        admin.scalableTopics().createScalableTopic(topic, 4);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(client, topic, 
"failover-sub");
+
+        Transaction txn = client.newTransaction();
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            String v = "before-kill-" + i;
+            producer.newMessage().key("k-" + 
i).transaction(txn).value(v).send();
+            sent.add(v);
+        }
+
+        // Kill one broker mid-transaction; coordinator partitions and 
segments it led are reassigned.
+        BrokerContainer victim = pulsarCluster.getBrokers().iterator().next();
+        log.info().attr("broker", victim.getContainerName()).log("Stopping 
broker mid-transaction");
+        victim.stop();
+
+        // Continue producing in the same transaction; the producer reconnects 
to the survivor. Retry
+        // each send within a bounded window while reassignment/reconnection 
settles.
+        for (int i = 0; i < 10; i++) {
+            String v = "after-kill-" + i;
+            sendWithRetry(producer, txn, "k-after-" + i, v);
+            sent.add(v);
+        }
+
+        txn.commit();
+        assertEquals(drain(consumer, sent.size()), sent,
+                "every message in a transaction that spanned a broker failover 
must be delivered");
+    }
+
+    @Test(timeOut = 360_000)
+    public void testCommitSpansSplit() throws Exception {
+        @Cleanup
+        PulsarAdmin admin = newAdmin();
+        @Cleanup
+        PulsarClient client = newTxnClient();
+        String topic = scalableTopicName();
+        admin.scalableTopics().createScalableTopic(topic, 1);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(client, topic, "split-sub");
+
+        Transaction txn = client.newTransaction();
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < 5; i++) {
+            String v = "before-split-" + i;
+            producer.newMessage().key("k-" + 
i).transaction(txn).value(v).send();
+            sent.add(v);
+        }
+        List<Long> active = activeSegments(admin, topic);
+        assertEquals(active.size(), 1, "expected one active segment before 
split");
+        admin.scalableTopics().splitSegment(topic, active.get(0));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegments(admin, topic).size(), 2));
+
+        for (int i = 0; i < 5; i++) {
+            String v = "after-split-" + i;
+            sendWithRetry(producer, txn, "k-after-" + i, v);
+            sent.add(v);
+        }
+        assertNull(consumer.receive(Duration.ofSeconds(2)), "nothing visible 
before commit");
+        txn.commit();
+        assertEquals(drain(consumer, sent.size()), sent, "all messages across 
the split must be delivered");
+    }
+
+    @Test(timeOut = 360_000)
+    public void testCommitSpansMerge() throws Exception {
+        @Cleanup
+        PulsarAdmin admin = newAdmin();
+        @Cleanup
+        PulsarClient client = newTxnClient();
+        String topic = scalableTopicName();
+        admin.scalableTopics().createScalableTopic(topic, 2);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        @Cleanup
+        QueueConsumer<String> consumer = subscribe(client, topic, "merge-sub");
+
+        Transaction txn = client.newTransaction();
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            String v = "before-merge-" + i;
+            producer.newMessage().key("k-" + 
i).transaction(txn).value(v).send();
+            sent.add(v);
+        }
+        List<Long> active = activeSegments(admin, topic);
+        assertEquals(active.size(), 2, "expected two active segments before 
merge");
+        admin.scalableTopics().mergeSegments(topic, active.get(0), 
active.get(1));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegments(admin, topic).size(), 1));
+
+        for (int i = 0; i < 5; i++) {
+            String v = "after-merge-" + i;
+            sendWithRetry(producer, txn, "k-after-" + i, v);
+            sent.add(v);
+        }
+        assertNull(consumer.receive(Duration.ofSeconds(2)), "nothing visible 
before commit");
+        txn.commit();
+        assertEquals(drain(consumer, sent.size()), sent, "all messages across 
the merge must be delivered");
+    }
+
+    /** Send a transactional message, retrying within a bounded window while 
the cluster re-settles. */
+    private void sendWithRetry(Producer<String> producer, Transaction txn, 
String key, String value)
+            throws Exception {
+        long deadline = System.currentTimeMillis() + 60_000;
+        Exception last = null;
+        while (System.currentTimeMillis() < deadline) {
+            try {
+                
producer.newMessage().key(key).transaction(txn).value(value).send();
+                return;
+            } catch (Exception e) {
+                last = e;
+                Thread.sleep(1000);
+            }
+        }
+        assertTrue(false, "send did not succeed within the retry window: " + 
(last == null ? "?" : last));
+    }
+}
diff --git a/tests/integration/src/test/resources/pulsar-transaction.xml 
b/tests/integration/src/test/resources/pulsar-transaction.xml
index 0c23b9e93ab..b69d5f31288 100644
--- a/tests/integration/src/test/resources/pulsar-transaction.xml
+++ b/tests/integration/src/test/resources/pulsar-transaction.xml
@@ -24,6 +24,8 @@
         <classes>
             <class 
name="org.apache.pulsar.tests.integration.transaction.TransactionTest" />
             <class 
name="org.apache.pulsar.tests.integration.transaction.TcMetadataDiscoveryTest" 
/>
+            <class 
name="org.apache.pulsar.tests.integration.transaction.V5ScalableTopicTransactionTest"
 />
+            <class 
name="org.apache.pulsar.tests.integration.transaction.V5ScalableTopicTransactionOnOxiaTest"
 />
         </classes>
     </test>
 </suite>

Reply via email to