This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 450e9804707 [fix][txn] PIP-473: v5 transaction timeout applied 1000x
too long (#25959)
450e9804707 is described below
commit 450e9804707a18f957a013513f9110e067af3ea1
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jun 8 08:09:52 2026 -0700
[fix][txn] PIP-473: v5 transaction timeout applied 1000x too long (#25959)
---
.../apache/pulsar/broker/service/ServerCnx.java | 6 +-
.../client/api/v5/V5TransactionTimeoutTest.java | 129 +++++++++++++++++++++
.../apache/pulsar/common/protocol/Commands.java | 8 +-
pulsar-common/src/main/proto/PulsarApi.proto | 5 +-
4 files changed, 141 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 8c2dfe8d781..7dff4260d23 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3549,8 +3549,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
final String v5Owner = getPrincipal();
+ // txn_ttl_millis is already in milliseconds (the client sends
unit.toMillis(...)); the v5
+ // coordinator's newTransaction takes milliseconds too, so pass it
through unchanged.
service.pulsar().getTransactionCoordinatorV5()
- .newTransaction(tcId, command.getTxnTtlSeconds() * 1000L,
v5Owner)
+ .newTransaction(tcId, command.getTxnTtlMillis(), v5Owner)
.whenComplete((txnId, e) -> {
if (e == null) {
commandSender.sendNewTxnResponse(requestId, txnId,
tcId.getId());
@@ -3566,7 +3568,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
final String owner = getPrincipal();
- transactionMetadataStoreService.newTransaction(tcId,
command.getTxnTtlSeconds(), owner)
+ transactionMetadataStoreService.newTransaction(tcId,
command.getTxnTtlMillis(), owner)
.whenComplete(((txnID, ex) -> {
if (ex == null) {
log.debug()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTimeoutTest.java
new file mode 100644
index 00000000000..6651900cf3e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTimeoutTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import com.google.common.collect.Sets;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.CustomLog;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Diagnostic for the v5 transaction timeout sweep: a never-committed
transaction must be aborted by
+ * the broker's timeout sweep, unpinning the buffer so a later
non-transactional message becomes
+ * visible.
+ */
+@CustomLog
+public class V5TransactionTimeoutTest extends MockedPulsarServiceBaseTest {
+
+ private final String myNamespace = "pulsar/txn-timeout";
+ private PulsarClient v5Client;
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ ServiceConfiguration config = getDefaultConf();
+ config.setTransactionCoordinatorEnabled(true);
+ config.setTopicLevelPoliciesEnabled(false);
+
config.setTransactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds(1);
+ super.internalSetup(config);
+
+ admin.clusters().createCluster("test",
+
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+ new PartitionedTopicMetadata(1));
+
+ v5Client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofSeconds(3)).build())
+ .build();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ if (v5Client != null) {
+ v5Client.close();
+ v5Client = null;
+ }
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testTimeoutSweepAbortsDanglingTransaction() throws Exception {
+ // Stage 1: confirm this broker actually leads TC partition 0 (the
sweep only runs there).
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
+ pulsar.getTransactionCoordinatorV5() != null
+ &&
pulsar.getTransactionCoordinatorV5().isLeaderFor(0));
+ log.info().log("Stage 1 OK: broker leads TC partition 0");
+
+ String topic = "topic://" + myNamespace + "/scalable-" +
UUID.randomUUID().toString().substring(0, 8);
+ admin.scalableTopics().createScalableTopic(topic, 1);
+
+ @Cleanup
+ Producer<String> producer =
v5Client.newProducer(Schema.string()).topic(topic).create();
+ Transaction txn = v5Client.newTransaction();
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().transaction(txn).value("dangling-" +
i).send();
+ }
+ producer.newMessage().value("sentinel").send();
+
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("timeout-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ assertNull(consumer.receive(Duration.ofSeconds(1)), "Stage 2:
everything pinned while txn open");
+ log.info().log("Stage 2 OK: pinned while txn open");
+
+ // Stage 3: the timeout sweep (1s cadence) aborts the dangling txn
after its 3s timeout.
+ Message<String> msg = null;
+ long deadline = System.currentTimeMillis() + 40_000;
+ while (msg == null && System.currentTimeMillis() < deadline) {
+ msg = consumer.receive(Duration.ofSeconds(2));
+ }
+ assertNotNull(msg, "Stage 3: sentinel must appear once the timeout
sweep aborts the dangling txn");
+ org.testng.Assert.assertEquals(msg.value(), "sentinel");
+ consumer.acknowledge(msg.id());
+ log.info().log("Stage 3 OK: timeout sweep unpinned the buffer");
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 80b10f47890..2403e85e31a 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1435,16 +1435,16 @@ public class Commands {
// ---- transaction related ----
- public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds) {
- return newTxn(tcId, requestId, ttlSeconds, false);
+ public static ByteBuf newTxn(long tcId, long requestId, long ttlMillis) {
+ return newTxn(tcId, requestId, ttlMillis, false);
}
- public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds,
boolean scalable) {
+ public static ByteBuf newTxn(long tcId, long requestId, long ttlMillis,
boolean scalable) {
BaseCommand cmd = localCmd(Type.NEW_TXN);
cmd.setNewTxn()
.setTcId(tcId)
.setRequestId(requestId)
- .setTxnTtlSeconds(ttlSeconds)
+ .setTxnTtlMillis(ttlMillis)
.setScalable(scalable);
return serializeWithSize(cmd);
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index f769512660c..729359894d5 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -1109,7 +1109,10 @@ message CommandTcClientConnectResponse {
message CommandNewTxn {
required uint64 request_id = 1;
- optional uint64 txn_ttl_seconds = 2 [default = 0];
+ // Transaction timeout in milliseconds. Despite the field number's legacy
name history, the value
+ // has always been carried in milliseconds (the client sends
unit.toMillis(...) and both
+ // coordinators consume it as ms); the field is named accordingly to avoid
confusion.
+ optional uint64 txn_ttl_millis = 2 [default = 0];
optional uint64 tc_id = 3 [default = 0];
// When true, route to the metadata-driven (scalable-topics, PIP-473)
transaction coordinator
// instead of the legacy one. Set by v5 clients; absent for v4 clients.
Lets both coordinators