This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c6705719338 [improve][broker] PIP-473 P5.4: v4/v5 transaction
coordinator coexistence + enable v5 by default (#25945)
c6705719338 is described below
commit c67057193380577930bee5b016adce19f9ad4ea0
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Jun 7 06:35:47 2026 -0700
[improve][broker] PIP-473 P5.4: v4/v5 transaction coordinator coexistence +
enable v5 by default (#25945)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 34 ++--
.../apache/pulsar/broker/service/ServerCnx.java | 72 +++++--
.../buffer/impl/MetadataTransactionBuffer.java | 23 ++-
.../pendingack/impl/MetadataPendingAckStore.java | 17 +-
.../pendingack/PendingAckPersistentTest.java | 7 +-
.../client/impl/v5/PulsarClientBuilderV5.java | 4 +
.../client/impl/TransactionMetaStoreHandler.java | 26 ++-
.../client/impl/conf/ClientConfigurationData.java | 7 +
.../TransactionCoordinatorClientImpl.java | 41 ++--
.../apache/pulsar/common/protocol/Commands.java | 37 +++-
pulsar-common/src/main/proto/PulsarApi.proto | 12 ++
.../protocol/CommandsScalableTxnFlagTest.java | 92 +++++++++
.../apache/pulsar/testclient/PerfClientUtils.java | 26 +++
.../pulsar/testclient/PerformanceConsumer.java | 33 ++--
.../pulsar/testclient/PerformanceProducer.java | 3 +-
.../pulsar/testclient/PerformanceTransaction.java | 31 ++-
.../Oauth2PerformanceTransactionTest.java | 84 ++++----
.../testclient/PerformanceTransactionTest.java | 209 ++++++++++----------
tests/integration/build.gradle.kts | 2 +
.../transaction/TcMetadataDiscoveryTest.java | 213 +++++++++++----------
20 files changed, 631 insertions(+), 342 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f10b2cbdc32..7771b43059f 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3798,13 +3798,14 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Enable the metadata-driven transaction coordinator used by
scalable topics."
- + " When true, wire commands (NEW_TXN / END_TXN / etc.)
are served by the"
- + " metadata-store-backed coordinator instead of the
legacy"
- + " TransactionMetadataStoreService. Requires
transactionCoordinatorEnabled"
- + " = true, and must be enabled together with the
scalable-topic transaction"
- + " buffer and pending-ack store providers."
+ + " When true, transaction wire commands flagged as
scalable (sent by v5 SDK"
+ + " clients) are served by the metadata-store-backed
coordinator, while legacy"
+ + " (v4) clients continue to be served by
TransactionMetadataStoreService — the"
+ + " two coexist on the same cluster. Requires
transactionCoordinatorEnabled"
+ + " = true. Enabled by default together with the
dispatching transaction buffer"
+ + " and pending-ack store providers."
)
- private boolean transactionCoordinatorScalableTopicsEnabled = false;
+ private boolean transactionCoordinatorScalableTopicsEnabled = true;
@FieldContext(
category = CATEGORY_TRANSACTION,
@@ -3859,21 +3860,26 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
category = CATEGORY_TRANSACTION,
- doc = "Class name for transaction buffer provider. Default routes
segment:// topics to the"
- + " legacy TopicTransactionBuffer. Set this to"
- + "
org.apache.pulsar.broker.transaction.buffer.impl.DispatchingTransactionBufferProvider"
- + " once the v5 transaction coordinator (PIP-473 P5) is
enabled to opt segment topics"
- + " into MetadataTransactionBuffer."
+ doc = "Class name for transaction buffer provider. The default
DispatchingTransactionBufferProvider"
+ + " routes segment:// topics to the metadata-driven
MetadataTransactionBuffer (PIP-473)"
+ + " and persistent:// / topic:// topics to the legacy
TopicTransactionBuffer. Set this to"
+ + "
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider
to"
+ + " force the legacy buffer for all topics."
)
private String transactionBufferProviderClassName =
-
"org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider";
+
"org.apache.pulsar.broker.transaction.buffer.impl.DispatchingTransactionBufferProvider";
@FieldContext(
category = CATEGORY_TRANSACTION,
- doc = "Class name for transaction pending ack store provider"
+ doc = "Class name for transaction pending ack store provider. The
default"
+ + " DispatchingTransactionPendingAckStoreProvider routes
subscriptions on segment:// topics"
+ + " to the metadata-driven MetadataPendingAckStore
(PIP-473) and others to the legacy"
+ + " MLPendingAckStore. Set this to"
+ + "
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider
to force"
+ + " the legacy store for all subscriptions."
)
private String transactionPendingAckStoreProviderClassName =
-
"org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider";
+
"org.apache.pulsar.broker.transaction.pendingack.impl.DispatchingTransactionPendingAckStoreProvider";
@FieldContext(
category = CATEGORY_TRANSACTION,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 00253bbb431..8c2dfe8d781 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3446,7 +3446,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
- if
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
{
+ if (command.isScalable()) {
+ if (!isScalableTcAvailable()) {
+ commandSender.sendTcClientConnectResponse(requestId,
ServerError.NotAllowedError,
+ "Scalable-topics transaction coordinator is not
enabled on this broker");
+ return;
+ }
service.pulsar().getTransactionCoordinatorV5().handleClientConnect(tcId)
.whenComplete((__, e) -> {
if (e == null) {
@@ -3492,6 +3497,16 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return true;
}
}
+ /**
+ * @return true if the scalable-topics (PIP-473) transaction coordinator
is enabled and ready on
+ * this broker. Transaction commands carrying {@code scalable=true}
route to it; commands
+ * without the flag always go to the legacy coordinator, so v4 and v5
clients coexist.
+ */
+ private boolean isScalableTcAvailable() {
+ return
service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()
+ && service.getPulsar().getTransactionCoordinatorV5() != null;
+ }
+
private Throwable handleTxnException(Throwable ex, String op, long
requestId) {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof
CoordinatorException.CoordinatorNotFoundException) {
@@ -3527,7 +3542,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
- if
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
{
+ if (command.isScalable()) {
+ if (!isScalableTcAvailable()) {
+ commandSender.sendNewTxnErrorResponse(requestId, tcId.getId(),
ServerError.NotAllowedError,
+ "Scalable-topics transaction coordinator is not
enabled on this broker");
+ return;
+ }
final String v5Owner = getPrincipal();
service.pulsar().getTransactionCoordinatorV5()
.newTransaction(tcId, command.getTxnTtlSeconds() * 1000L,
v5Owner)
@@ -3594,11 +3614,17 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
- if
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
{
+ if (command.isScalable()) {
+ if (!isScalableTcAvailable()) {
+ writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
txnID.getLeastSigBits(),
+ txnID.getMostSigBits(), ServerError.NotAllowedError,
+ "Scalable-topics transaction coordinator is not
enabled on this broker"));
+ return;
+ }
// v5: TC doesn't need pre-registration — participants advertise
themselves by writing
// /txn/op records when they actually apply ops. Still verify
ownership before acking,
// matching the legacy authorization surface.
- verifyTxnOwnership(txnID)
+ verifyTxnOwnership(txnID, true)
.thenCompose(isOwner -> isOwner ?
CompletableFuture.<Void>completedFuture(null)
: failedFutureTxnNotOwned(txnID))
.whenComplete((v, ex) -> {
@@ -3618,7 +3644,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
- verifyTxnOwnership(txnID)
+ verifyTxnOwnership(txnID, false)
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnNotOwned(txnID);
@@ -3676,8 +3702,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
- if
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
{
- verifyTxnOwnership(txnID)
+ if (command.isScalable()) {
+ if (!isScalableTcAvailable()) {
+ commandSender.sendEndTxnErrorResponse(requestId, txnID,
ServerError.NotAllowedError,
+ "Scalable-topics transaction coordinator is not
enabled on this broker");
+ return;
+ }
+ verifyTxnOwnership(txnID, true)
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnNotOwned(txnID);
@@ -3700,7 +3731,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
- verifyTxnOwnership(txnID)
+ verifyTxnOwnership(txnID, false)
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnNotOwned(txnID);
@@ -3739,14 +3770,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
}
- private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
+ private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID, boolean
scalable) {
assert ctx.executor().inEventLoop();
- CompletableFuture<Boolean> ownerCheck =
-
service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()
- ? service.pulsar().getTransactionCoordinatorV5()
- .verifyTxnOwnership(txnID, getPrincipal())
- : service.pulsar().getTransactionMetadataStoreService()
- .verifyTxnOwnership(txnID, getPrincipal());
+ CompletableFuture<Boolean> ownerCheck = scalable
+ ? service.pulsar().getTransactionCoordinatorV5()
+ .verifyTxnOwnership(txnID, getPrincipal())
+ : service.pulsar().getTransactionMetadataStoreService()
+ .verifyTxnOwnership(txnID, getPrincipal());
return ownerCheck
.thenComposeAsync(isOwner -> {
if (isOwner) {
@@ -4016,11 +4046,17 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
- if
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
{
+ if (command.isScalable()) {
+ if (!isScalableTcAvailable()) {
+
writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getLeastSigBits(),
+ txnID.getMostSigBits(), ServerError.NotAllowedError,
+ "Scalable-topics transaction coordinator is not
enabled on this broker"));
+ return;
+ }
// v5: TC doesn't need pre-registration — participants advertise
themselves by writing
// /txn/op records when they actually apply ops. Still verify
ownership before acking,
// matching the legacy authorization surface.
- verifyTxnOwnership(txnID)
+ verifyTxnOwnership(txnID, true)
.thenCompose(isOwner -> isOwner ?
CompletableFuture.<Void>completedFuture(null)
: failedFutureTxnNotOwned(txnID))
.whenComplete((v, ex) -> {
@@ -4041,7 +4077,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
- verifyTxnOwnership(txnID)
+ verifyTxnOwnership(txnID, false)
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnNotOwned(txnID);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
index 845c1c0328a..4ac6b89ae51 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -119,9 +119,6 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
/** Version of the durable watermark record; -1 if it doesn't exist yet. */
private long watermarkVersion = -1L;
- /** Latest dispatched position from non-txn publishes — the natural
ceiling when no opens pin. */
- private Position lastDispatchable;
-
/** Current maxReadPosition; never moves above the watermark while
recovery-discovered opens exist. */
private Position maxReadPosition;
@@ -141,7 +138,6 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
this.segmentName = topic.getName();
this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
this.maxReadPosition = ledger.getLastConfirmedEntry();
- this.lastDispatchable = this.maxReadPosition;
recover();
}
@@ -544,18 +540,28 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
next = watermarkPos != null ? watermarkPos : maxReadPosition;
} else {
Position min = null;
+ boolean anyOpen = false;
for (TxnEntry e : txns.values()) {
- if (e.state == TxnState.OPEN && e.firstPosition != null) {
- if (min == null || e.firstPosition.compareTo(min) < 0) {
+ if (e.state == TxnState.OPEN) {
+ anyOpen = true;
+ if (e.firstPosition != null
+ && (min == null || e.firstPosition.compareTo(min)
< 0)) {
min = e.firstPosition;
}
}
}
if (min != null) {
+ // Pin just below the lowest open txn's first write.
next = ledger.getPreviousPosition(min);
+ } else if (anyOpen) {
+ // Open txn(s) whose first write isn't tracked yet (an append
is in flight between
+ // the /txn/op record and the ledger entry): hold the current
ceiling rather than
+ // risk exposing the in-flight entry.
+ next = maxReadPosition;
} else {
- // No open txns pinning anything: free to advance to
last-dispatched.
- next = lastDispatchable;
+ // No open txns: every written entry is resolved — committed
data is visible and
+ // aborted data is filtered by isTxnAborted — so advance to
the last written entry.
+ next = ledger.getLastConfirmedEntry();
}
}
Position prev = maxReadPosition;
@@ -593,7 +599,6 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
}
topic.updateLastDispatchablePosition(position);
synchronized (lock) {
- lastDispatchable = position;
recomputeMaxReadPositionLocked();
// Persist the new watermark if it advanced as a result of the
non-txn append.
stateTail = stateTail.thenCompose(__ ->
persistWatermarkIfAdvanced())
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
index fba4ec44361..439b75ecab2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService;
import lombok.CustomLog;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.metadata.TxnIds;
import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
@@ -165,7 +166,21 @@ public class MetadataPendingAckStore implements
PendingAckStore {
return;
}
recoveryFuture.complete(null);
- pendingAckHandle.changeToReadyState();
+ // Mirror the legacy MLPendingAckStore completion: flip the handle
to Ready and
+ // complete the handle future — PersistentSubscription.addConsumer
blocks on that
+ // future, so skipping it hangs every subscribe on a segment topic
— then drain any
+ // ack requests queued during recovery. Run on the pinned executor
so the
+ // state-machine transition and the cache drain stay
single-threaded.
+ executorService.execute(() -> {
+ if (pendingAckHandle.changeToReadyState()) {
+ pendingAckHandle.completeHandleFuture();
+ } else {
+ pendingAckHandle.exceptionHandleFuture(
+ new
BrokerServiceException.ServiceUnitNotReadyException(
+ "Failed to change PendingAckHandle state
to Ready"));
+ }
+ pendingAckHandle.handleCacheRequest();
+ });
// Drain any events that fired during recovery.
triggerReconcile();
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index ffd3a40a814..5c66c6d0e7c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -166,8 +166,11 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
when(mockProvider.newPendingAckStore(any()))
// First, the method newPendingAckStore will fail with a
retryable exception.
.thenReturn(FutureUtil.failedFuture(new
ManagedLedgerException("mock fail new store")))
- // Then, the method will be executed successfully.
- .thenCallRealMethod();
+ // Then, the method will be executed successfully. Delegate to
the real provider
+ // rather than thenCallRealMethod(): the configured provider
is now the dispatching
+ // provider, and a Mockito mock of it has null delegate
fields, so calling its real
+ // method would NPE. The original real provider behaves
identically for this topic.
+ .thenAnswer(invocation ->
pendingAckStoreProvider.newPendingAckStore(invocation.getArgument(0)));
transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0),
mockProvider);
Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
.subscriptionName("subName3")
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
index 8104c4bbc36..05dba76c00d 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
@@ -43,6 +43,10 @@ final class PulsarClientBuilderV5 implements
PulsarClientBuilder {
PulsarClientBuilderV5() {
conf.setStatsIntervalSeconds(0);
+ // v5 SDK transactions use the metadata-store (PIP-473) coordinator.
This internal flag
+ // routes the underlying v4 TC client to it, keeping v5 transactions
independent from any
+ // v4 SDK clients (which use the legacy coordinator) on the same
cluster.
+ conf.setScalableTransactions(true);
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index ae52cfd0550..64dd7ef4344 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -96,6 +96,7 @@ public class TransactionMetaStoreHandler extends HandlerState
// whether it must be reached through the proxy. Null leaderUri means
assign-topic mode.
private volatile URI leaderUri;
private volatile boolean useProxy;
+ private final boolean scalable;
@@ -123,6 +124,10 @@ public class TransactionMetaStoreHandler extends
HandlerState
super(pulsarClient, topic);
this.leaderUri = leaderUri;
this.useProxy = useProxy;
+ // A handler built with a fixed leader URI is a v5 (metadata-store
discovery) handler; one
+ // built with a topic name is a legacy v4 handler. The flag routes
each command to the
+ // matching coordinator on the broker so v4 and v5 clients coexist.
+ this.scalable = leaderUri != null;
this.transactionCoordinatorId = transactionCoordinatorId;
this.timeoutQueue = new ConcurrentLinkedQueue<>();
this.blockIfReachMaxPendingOps = true;
@@ -199,7 +204,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
// if broker protocol version < 19, don't send
TcClientConnectRequest to broker.
if (cnx.getRemoteEndpointProtocolVersion() >
ProtocolVersion.v18.getValue()) {
long requestId = client.newRequestId();
- ByteBuf request =
Commands.newTcClientConnectRequest(transactionCoordinatorId, requestId);
+ ByteBuf request =
+
Commands.newTcClientConnectRequest(transactionCoordinatorId, requestId,
scalable);
cnx.sendRequestWithId(request, requestId).thenRun(() -> {
internalPinnedExecutor.execute(() -> {
@@ -274,7 +280,7 @@ public class TransactionMetaStoreHandler extends
HandlerState
return callback;
}
long requestId = client.newRequestId();
- ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId,
unit.toMillis(timeout));
+ ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId,
unit.toMillis(timeout), scalable);
String description = String.format("Create new transaction %s",
transactionCoordinatorId);
OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback,
client, description, cnx());
internalPinnedExecutor.execute(() -> {
@@ -352,7 +358,7 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
long requestId = client.newRequestId();
ByteBuf cmd = Commands.newAddPartitionToTxn(
- requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
partitions);
+ requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
partitions, scalable);
String description = String.format("Add partition %s to TXN %s",
String.valueOf(partitions),
String.valueOf(txnID));
OpForVoidCallBack op = OpForVoidCallBack
@@ -435,7 +441,7 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
long requestId = client.newRequestId();
ByteBuf cmd = Commands.newAddSubscriptionToTxn(
- requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
subscriptionList);
+ requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
subscriptionList, scalable);
String description = String.format("Add subscription %s to TXN %s",
toStringSubscriptionList(subscriptionList),
String.valueOf(txnID));
OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client,
description, cnx());
@@ -526,7 +532,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
return callback;
}
long requestId = client.newRequestId();
- BaseCommand cmd = Commands.newEndTxn(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits(), action);
+ BaseCommand cmd = Commands.newEndTxn(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits(),
+ action, scalable);
ByteBuf buf = Commands.serializeWithSize(cmd);
String description = String.format("End [%s] TXN %s",
String.valueOf(action), String.valueOf(txnID));
OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client,
description, cnx());
@@ -756,6 +763,14 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
return true;
case Connecting:
+ case Uninitialized:
+ // Not connected yet, but the handler is (or will be)
establishing the connection. For
+ // the metadata-store coordinator the partition leader is
still being resolved via the
+ // assignment watch, so the handler can sit in Uninitialized
briefly after the client is
+ // built. Leave the op queued in pendingRequests; it is
retried from connectionOpened
+ // once the handler is Ready, and the operation-timeout sweep
fails it if the connection
+ // never comes. Failing fast here would make a freshly-built
client's first request
+ // race the asynchronous connect.
return true;
case Closing:
case Closed:
@@ -767,7 +782,6 @@ public class TransactionMetaStoreHandler extends
HandlerState
onResponse(op);
return false;
case Failed:
- case Uninitialized:
op.callback.completeExceptionally(
new
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
"Transaction meta store handler for tcId "
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 1c282198eb0..e1c747d18aa 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -85,6 +85,13 @@ public class ClientConfigurationData implements
Serializable, Cloneable {
)
private long serviceUrlQuarantineMaxDurationMs = TimeUnit.DAYS.toMillis(1);
+ // Internal: set by the v5 SDK (PulsarClientBuilderV5), not exposed on the
public ClientBuilder.
+ // When true, transactions use the metadata-driven (PIP-473) coordinator;
when false (v4 SDK),
+ // they use the legacy coordinator. Routes coexistence at the TC layer by
client/SDK kind rather
+ // than broker capability, so a v4 client keeps using the legacy TC even
on a v5-enabled cluster.
+ @JsonIgnore
+ private boolean scalableTransactions = false;
+
@Schema(
name = "authentication",
description = "Authentication settings of the client."
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 6fe938d2d62..13e8a5ca2ec 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.CustomLog;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.CoordinatorClientStateException;
@@ -86,35 +87,25 @@ public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorC
}
/**
- * Choose the discovery strategy. The metadata-store assignment watch
needs a binary-protocol
- * connection, so it's only usable when the client is configured with a
{@code pulsar://}
- * service URL; with an {@code http://} service URL we always use the
assign-topic flow (which
- * resolves coordinators via the admin/HTTP-capable partitioned-metadata
lookup). When binary
- * lookup is available, probe the broker's {@code
supports_tc_metadata_discovery} feature flag to
- * decide; if the broker doesn't advertise it (old broker, or
scalable-topics TC disabled), fall
- * back to the assign-topic flow.
+ * Choose the discovery strategy by client/SDK kind, not by broker
capability. A v5 SDK client
+ * sets the internal {@code scalableTransactions} config flag and uses the
metadata-store
+ * coordinator (assignment watch); a v4 SDK client leaves it unset and
uses the legacy
+ * assign-topic coordinator. This keeps v4 and v5 transactions independent
on the same cluster:
+ * flipping the broker default to enable the v5 TC must not silently
re-route v4 clients to it,
+ * since the v5 TC notifies participants via metadata-store events that
the legacy transaction
+ * buffer / pending-ack store don't consume.
*/
private CompletableFuture<TcDiscovery> selectDiscovery() {
- if (!pulsarClient.getLookup().isBinaryProtoLookupService()) {
+ if (!pulsarClient.getConfiguration().isScalableTransactions()) {
return CompletableFuture.completedFuture(new
AssignTopicTcDiscovery(pulsarClient));
}
- // Probe a broker connection to read the feature flag. Use
getAnyBrokerProxyConnection() (not
- // getConnectionToServiceUrl()): when connecting through a proxy, the
latter yields the proxy's
- // own CONNECTED, which carries the proxy lookup handshake's flags
rather than a broker's;
- // getAnyBrokerProxyConnection() pairs to an actual broker (directly
or proxied) so the
- // forwarded feature flags reflect the broker — the same connection
the watch itself uses.
- // If the probe fails, fall back to the assign-topic flow, whose
lookup retries across hosts
- // and still works against v5 brokers (the assign topic exists during
the deprecation window),
- // so falling back is always safe.
- return pulsarClient.getAnyBrokerProxyConnection()
- .thenApply(cnx -> cnx.isSupportsTcMetadataDiscovery()
- ? (TcDiscovery) new
WatchTcAssignmentsDiscovery(pulsarClient)
- : new AssignTopicTcDiscovery(pulsarClient))
- .exceptionally(ex -> {
- log.info().exception(ex)
- .log("TC discovery feature probe failed; using
assign-topic discovery");
- return new AssignTopicTcDiscovery(pulsarClient);
- });
+ // The metadata-store assignment watch needs a binary connection. A v5
client on an
+ // http:// service URL is a misconfiguration — fail clearly rather
than silently downgrade.
+ if (!pulsarClient.getLookup().isBinaryProtoLookupService()) {
+ return FutureUtil.failedFuture(new
PulsarClientException.InvalidServiceURL(
+ "Scalable-topics transactions require a pulsar:// service
URL", null));
+ }
+ return CompletableFuture.completedFuture(new
WatchTcAssignmentsDiscovery(pulsarClient));
}
@Override
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 3b8307dcfcb..80b10f47890 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -724,8 +724,12 @@ public class Commands {
}
public static ByteBuf newTcClientConnectRequest(long tcId, long requestId)
{
+ return newTcClientConnectRequest(tcId, requestId, false);
+ }
+
+ public static ByteBuf newTcClientConnectRequest(long tcId, long requestId,
boolean scalable) {
BaseCommand cmd = localCmd(Type.TC_CLIENT_CONNECT_REQUEST);
- cmd.setTcClientConnectRequest().setTcId(tcId).setRequestId(requestId);
+
cmd.setTcClientConnectRequest().setTcId(tcId).setRequestId(requestId).setScalable(scalable);
return serializeWithSize(cmd);
}
@@ -1432,11 +1436,16 @@ public class Commands {
// ---- transaction related ----
public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds) {
+ return newTxn(tcId, requestId, ttlSeconds, false);
+ }
+
+ public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds,
boolean scalable) {
BaseCommand cmd = localCmd(Type.NEW_TXN);
cmd.setNewTxn()
.setTcId(tcId)
.setRequestId(requestId)
- .setTxnTtlSeconds(ttlSeconds);
+ .setTxnTtlSeconds(ttlSeconds)
+ .setScalable(scalable);
return serializeWithSize(cmd);
}
@@ -1463,11 +1472,17 @@ public class Commands {
public static ByteBuf newAddPartitionToTxn(long requestId, long
txnIdLeastBits, long txnIdMostBits,
List<String> partitions) {
+ return newAddPartitionToTxn(requestId, txnIdLeastBits, txnIdMostBits,
partitions, false);
+ }
+
+ public static ByteBuf newAddPartitionToTxn(long requestId, long
txnIdLeastBits, long txnIdMostBits,
+ List<String> partitions,
boolean scalable) {
BaseCommand cmd = localCmd(Type.ADD_PARTITION_TO_TXN);
CommandAddPartitionToTxn req = cmd.setAddPartitionToTxn()
.setRequestId(requestId)
.setTxnidLeastBits(txnIdLeastBits)
- .setTxnidMostBits(txnIdMostBits);
+ .setTxnidMostBits(txnIdMostBits)
+ .setScalable(scalable);
if (partitions != null) {
partitions.forEach(req::addPartition);
}
@@ -1503,11 +1518,17 @@ public class Commands {
public static ByteBuf newAddSubscriptionToTxn(long requestId, long
txnIdLeastBits, long txnIdMostBits,
List<Subscription> subscriptions) {
+ return newAddSubscriptionToTxn(requestId, txnIdLeastBits,
txnIdMostBits, subscriptions, false);
+ }
+
+ public static ByteBuf newAddSubscriptionToTxn(long requestId, long
txnIdLeastBits, long txnIdMostBits,
+ List<Subscription> subscriptions, boolean scalable) {
BaseCommand cmd = localCmd(Type.ADD_SUBSCRIPTION_TO_TXN);
CommandAddSubscriptionToTxn add = cmd.setAddSubscriptionToTxn()
.setRequestId(requestId)
.setTxnidLeastBits(txnIdLeastBits)
- .setTxnidMostBits(txnIdMostBits);
+ .setTxnidMostBits(txnIdMostBits)
+ .setScalable(scalable);
subscriptions.forEach(s -> add.addSubscription().copyFrom(s));
return serializeWithSize(cmd);
}
@@ -1536,11 +1557,17 @@ public class Commands {
}
public static BaseCommand newEndTxn(long requestId, long txnIdLeastBits,
long txnIdMostBits, TxnAction txnAction) {
+ return newEndTxn(requestId, txnIdLeastBits, txnIdMostBits, txnAction,
false);
+ }
+
+ public static BaseCommand newEndTxn(long requestId, long txnIdLeastBits,
long txnIdMostBits, TxnAction txnAction,
+ boolean scalable) {
BaseCommand cmd = localCmd(Type.END_TXN);
cmd.setEndTxn()
.setRequestId(requestId)
.setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits)
- .setTxnAction(txnAction);
+ .setTxnAction(txnAction)
+ .setScalable(scalable);
return cmd;
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 76cd8d382ab..f769512660c 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -1097,6 +1097,8 @@ enum TxnAction {
message CommandTcClientConnectRequest {
required uint64 request_id = 1;
required uint64 tc_id = 2 [default = 0];
+ // Route to the scalable-topics (PIP-473) coordinator. See
CommandNewTxn.scalable.
+ optional bool scalable = 3 [default = false];
}
message CommandTcClientConnectResponse {
@@ -1109,6 +1111,10 @@ message CommandNewTxn {
required uint64 request_id = 1;
optional uint64 txn_ttl_seconds = 2 [default = 0];
optional uint64 tc_id = 3 [default = 0];
+ // When true, route to the metadata-driven (scalable-topics, PIP-473)
transaction coordinator
+ // instead of the legacy one. Set by v5 clients; absent for v4 clients.
Lets both coordinators
+ // serve their own clients on the same cluster.
+ optional bool scalable = 4 [default = false];
}
message CommandNewTxnResponse {
@@ -1124,6 +1130,8 @@ message CommandAddPartitionToTxn {
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
repeated string partitions = 4;
+ // Route to the scalable-topics (PIP-473) coordinator. See
CommandNewTxn.scalable.
+ optional bool scalable = 5 [default = false];
}
message CommandAddPartitionToTxnResponse {
@@ -1143,6 +1151,8 @@ message CommandAddSubscriptionToTxn {
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
repeated Subscription subscription = 4;
+ // Route to the scalable-topics (PIP-473) coordinator. See
CommandNewTxn.scalable.
+ optional bool scalable = 5 [default = false];
}
message CommandAddSubscriptionToTxnResponse {
@@ -1158,6 +1168,8 @@ message CommandEndTxn {
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional TxnAction txn_action = 4;
+ // Route to the scalable-topics (PIP-473) coordinator. See
CommandNewTxn.scalable.
+ optional bool scalable = 5 [default = false];
}
message CommandEndTxnResponse {
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTxnFlagTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTxnFlagTest.java
new file mode 100644
index 00000000000..fa650992c13
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTxnFlagTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import java.util.List;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.TxnAction;
+import org.testng.annotations.Test;
+
+/**
+ * Roundtrip tests for the {@code scalable} routing flag on the transaction
commands (PIP-473
+ * coexistence): a v5 client sets it so the broker routes the command to the
metadata-store
+ * coordinator; a v4 client omits it and routes to the legacy coordinator.
Each command is encoded,
+ * the serialized wire frame is reparsed, and the flag is checked in both
states.
+ */
+public class CommandsScalableTxnFlagTest {
+
+ private static BaseCommand parseFrame(ByteBuf frame) {
+ try {
+ frame.skipBytes(4); // total size
+ int cmdSize = (int) frame.readUnsignedInt();
+ BaseCommand cmd = new BaseCommand();
+ cmd.parseFrom(frame, cmdSize);
+ cmd.materialize();
+ return cmd;
+ } finally {
+ frame.release();
+ }
+ }
+
+ @Test
+ public void tcClientConnectCarriesScalable() {
+ assertTrue(parseFrame(Commands.newTcClientConnectRequest(1L, 2L, true))
+ .getTcClientConnectRequest().isScalable());
+ assertFalse(parseFrame(Commands.newTcClientConnectRequest(1L, 2L))
+ .getTcClientConnectRequest().isScalable());
+ }
+
+ @Test
+ public void newTxnCarriesScalable() {
+ assertTrue(parseFrame(Commands.newTxn(0L, 1L, 60_000L,
true)).getNewTxn().isScalable());
+ assertFalse(parseFrame(Commands.newTxn(0L, 1L,
60_000L)).getNewTxn().isScalable());
+ }
+
+ @Test
+ public void endTxnCarriesScalable() {
+ assertTrue(Commands.newEndTxn(1L, 2L, 0L, TxnAction.COMMIT,
true).getEndTxn().isScalable());
+ assertFalse(Commands.newEndTxn(1L, 2L, 0L,
TxnAction.COMMIT).getEndTxn().isScalable());
+ }
+
+ @Test
+ public void addPartitionCarriesScalable() {
+ assertTrue(parseFrame(Commands.newAddPartitionToTxn(1L, 2L, 0L,
List.of("t"), true))
+ .getAddPartitionToTxn().isScalable());
+ assertFalse(parseFrame(Commands.newAddPartitionToTxn(1L, 2L, 0L,
List.of("t")))
+ .getAddPartitionToTxn().isScalable());
+ }
+
+ @Test
+ public void addSubscriptionCarriesScalable() {
+ assertTrue(parseFrame(Commands.newAddSubscriptionToTxn(1L, 2L, 0L,
List.of(), true))
+ .getAddSubscriptionToTxn().isScalable());
+ assertFalse(parseFrame(Commands.newAddSubscriptionToTxn(1L, 2L, 0L,
List.of()))
+ .getAddSubscriptionToTxn().isScalable());
+ }
+
+ @Test
+ public void defaultIsFalse() {
+ // A command with no scalable field set (legacy/v4 client) must read
false.
+ assertEquals(parseFrame(Commands.newTxn(0L, 1L,
60_000L)).getNewTxn().isScalable(), false);
+ }
+}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
index d8528c3e593..633b7c895b4 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
@@ -291,6 +291,32 @@ public class PerfClientUtils {
}
}
+ /**
+ * Open a transaction on the V5 client, retrying briefly while the
transaction-coordinator handler
+ * finishes its asynchronous connect. The first {@code newTransaction()}
right after the client is
+ * built can race that connect and fail with {@code
MetaStoreHandlerNotReadyException}; the perf
+ * tools open their initial transaction before building
producers/consumers, so they hit this
+ * window (whereas a tool that builds participants first gives the handler
time to connect).
+ *
+ * @param client the V5 client to open the transaction on
+ * @return a new transaction once the coordinator is ready
+ */
+ public static org.apache.pulsar.client.api.v5.Transaction
newTransactionWithRetry(
+ org.apache.pulsar.client.api.v5.PulsarClient client)
+ throws org.apache.pulsar.client.api.v5.PulsarClientException,
InterruptedException {
+ long deadline = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(30);
+ while (true) {
+ try {
+ return client.newTransaction();
+ } catch (org.apache.pulsar.client.api.v5.PulsarClientException e) {
+ if (System.currentTimeMillis() > deadline ||
hasInterruptedException(e)) {
+ throw e;
+ }
+ Thread.sleep(200);
+ }
+ }
+ }
+
/**
* Check if the throwable or any of its causes is an InterruptedException.
*
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index cc4f2106d4c..6c0f101edd8 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -288,7 +288,7 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
AtomicReference<Transaction> atomicReference;
if (this.isEnableTransaction) {
- atomicReference = new
AtomicReference<>(pulsarClient.newTransaction());
+ atomicReference = new
AtomicReference<>(PerfClientUtils.newTransactionWithRetry(pulsarClient));
} else {
atomicReference = new AtomicReference<>(null);
}
@@ -475,6 +475,24 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
Thread mainThread,
PulsarClient pulsarClient) {
while (!Thread.currentThread().isInterrupted()) {
+ // Termination conditions that don't depend on having just
received a message. With
+ // asynchronous transaction commits the final commit can land
after the last available
+ // message is consumed, so the transaction count must be
re-checked on idle receives too;
+ // otherwise the consumer waits forever for a message that will
never arrive.
+ if (this.testTime > 0 && System.nanoTime() > testEndTime) {
+ log.info("------------------- DONE -----------------------");
+ PerfClientUtils.exit(0);
+ mainThread.interrupt();
+ return;
+ }
+ if (this.totalNumTxn > 0
+ && totalEndTxnOpFailNum.sum() +
totalEndTxnOpSuccessNum.sum() >= this.totalNumTxn) {
+ log.info("------------------- DONE -----------------------");
+ PerfClientUtils.exit(0);
+ mainThread.interrupt();
+ return;
+ }
+
Message<byte[]> msg;
try {
msg = consumer.receive(Duration.ofSeconds(1));
@@ -490,19 +508,6 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
continue;
}
- if (this.testTime > 0 && System.nanoTime() > testEndTime) {
- log.info("------------------- DONE -----------------------");
- PerfClientUtils.exit(0);
- mainThread.interrupt();
- return;
- }
- if (this.totalNumTxn > 0
- && totalEndTxnOpFailNum.sum() +
totalEndTxnOpSuccessNum.sum() >= this.totalNumTxn) {
- log.info("------------------- DONE -----------------------");
- PerfClientUtils.exit(0);
- mainThread.interrupt();
- return;
- }
messagesReceived.increment();
bytesReceived.add(msg.size());
totalMessagesReceived.increment();
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 2978456b8b2..05b82a7e156 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -545,7 +545,8 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
AtomicReference<Transaction> transactionAtomicReference;
if (this.isEnableTransaction) {
- transactionAtomicReference = new
AtomicReference<>(client.newTransaction());
+ transactionAtomicReference = new AtomicReference<>(
+ PerfClientUtils.newTransactionWithRetry(client));
} else {
transactionAtomicReference = new AtomicReference<>(null);
}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 73c1f79497c..7a84f7894a3 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -121,6 +121,15 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
+ "not trying to create a topic")
public Integer partitions = null;
+ @Option(names = {"--scalable"}, description = "Create the
producer/consumer topics as scalable"
+ + " topics (PIP-473) with --scalable-segments initial segments.
Required for transactions"
+ + " against the scalable-topics (v5) coordinator. Mutually
exclusive with --partitions.")
+ public boolean scalable = false;
+
+ @Option(names = {"--scalable-segments"}, description = "Number of initial
segments for scalable"
+ + " topics created via --scalable.")
+ public int scalableSegments = 1;
+
@Option(names = {"-time",
"--test-duration"}, description = "Test duration (in second). 0
means keeping publishing")
public long testTime = 0;
@@ -208,7 +217,27 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
for (int i = 0; i < payloadBytes.length; ++i) {
payloadBytes[i] = (byte) (random.nextInt(26) + 65);
}
- if (this.partitions != null) {
+ if (this.scalable) {
+ // Scalable topics (PIP-473) must be pre-created via the admin API
— they don't
+ // auto-create on produce. Create both the produce and consume
topics so a
+ // transaction against the scalable-topics coordinator has segment
participants.
+ final PulsarAdminBuilder adminBuilder = PerfClientUtils
+ .createAdminBuilderFromArguments(this, this.adminURL);
+ try (PulsarAdmin adminClient = adminBuilder.build()) {
+ List<String> allTopics = new ArrayList<>(this.producerTopic);
+ allTopics.addAll(this.consumerTopic);
+ for (String topic : allTopics) {
+ try {
+
adminClient.scalableTopics().createScalableTopic(topic, this.scalableSegments);
+ log.info().attr("topic", topic).attr("segments",
this.scalableSegments)
+ .log("Created scalable topic");
+ } catch (PulsarAdminException.ConflictException
alreadyExists) {
+ log.debug().attr("topic", topic).attr("exists",
alreadyExists)
+ .log("Scalable topic already exists");
+ }
+ }
+ }
+ } else if (this.partitions != null) {
final PulsarAdminBuilder adminBuilder = PerfClientUtils
.createAdminBuilderFromArguments(this, this.adminURL);
diff --git
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
index e6a5e195e4c..e8867fb5ddd 100644
---
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
+++
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
@@ -25,25 +25,24 @@ import java.net.URI;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import lombok.CustomLog;
import org.apache.pulsar.broker.auth.MockOIDCIdentityProvider;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -60,13 +59,17 @@ public class Oauth2PerformanceTransactionTest extends
ProducerConsumerBase {
private final String testTenant = "pulsar";
private final String testNamespace = "perf";
private final String myNamespace = testTenant + "/" + testNamespace;
- private final String testTopic = "persistent://" + myNamespace + "/test-";
+ // v5 transactions are scalable-topic-only; scalable topics use the
topic:// domain.
+ private final String testTopic = "topic://" + myNamespace + "/test-";
// Credentials File, which contains "client_id" and "client_secret"
private static final String CREDENTIALS_FILE =
"./src/test/resources/authentication/token/credentials_file.json";
private final String authenticationPlugin =
"org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2";
+ // v5 SDK client for the verification produce/consume (v4 SDK can't use
scalable topics).
+ private PulsarClient v5Client;
+
private MockOIDCIdentityProvider server;
private String authenticationParameters;
@@ -118,6 +121,10 @@ public class Oauth2PerformanceTransactionTest extends
ProducerConsumerBase {
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
+ if (v5Client != null) {
+ v5Client.close();
+ v5Client = null;
+ }
super.internalCleanup();
server.stop();
}
@@ -146,15 +153,19 @@ public class Oauth2PerformanceTransactionTest extends
ProducerConsumerBase {
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
new PartitionedTopicMetadata(1));
- replacePulsarClient(PulsarClient.builder().serviceUrl(new
URI(pulsar.getBrokerServiceUrl()).toString())
- .statsInterval(0, TimeUnit.SECONDS)
- .enableTransaction(true)
- .authentication(authenticationPlugin,
authenticationParameters));
+ // v5 SDK verification client: v5 transactions are
scalable-topic-only, and the v4 SDK
+ // can't produce/consume on scalable (topic://) topics.
transactionPolicy(...) opts the
+ // client into transactions and routes it to the scalable-topics (v5)
coordinator.
+ v5Client = PulsarClient.builder()
+ .serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
+ .authentication(authenticationPlugin, authenticationParameters)
+
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(5)).build())
+ .build();
}
@Test
public void testTransactionPerf() throws Exception {
- String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u
%s -ss %s -np 1 -au %s"
+ String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u
%s -ss %s --scalable -au %s"
+ " --auth-plugin %s --auth-params %s";
String testConsumeTopic = testTopic + UUID.randomUUID();
String testProduceTopic = testTopic + UUID.randomUUID();
@@ -163,27 +174,23 @@ public class Oauth2PerformanceTransactionTest extends
ProducerConsumerBase {
pulsar.getBrokerServiceUrl(), testSub, new
URL(pulsar.getWebServiceAddress()),
authenticationPlugin, authenticationParameters);
- Producer<byte[]> produceToConsumeTopic =
pulsarClient.newProducer(Schema.BYTES)
- .producerName("perf-transaction-producer")
- .sendTimeout(0, TimeUnit.SECONDS)
+ // v5 transactions are scalable-topic-only; scalable topics must be
pre-created (they don't
+ // auto-create on produce). Create the consume topic so the warm-up
producer below can write.
+ admin.scalableTopics().createScalableTopic(testConsumeTopic, 1);
+ admin.scalableTopics().createScalableTopic(testProduceTopic, 1);
+
+ Producer<byte[]> produceToConsumeTopic =
v5Client.newProducer(Schema.bytes())
.topic(testConsumeTopic)
.create();
- pulsarClient.newConsumer(Schema.BYTES)
- .consumerName("perf-transaction-consumeVerify")
+ v5Client.newQueueConsumer(Schema.bytes())
.topic(testConsumeTopic)
- .subscriptionType(SubscriptionType.Shared)
.subscriptionName(testSub + "pre")
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.subscribe();
- CountDownLatch countDownLatch = new CountDownLatch(50);
- for (int i = 0; i < 50
- ; i++) {
- produceToConsumeTopic.newMessage().value(("testConsume " +
i).getBytes()).sendAsync().thenRun(
- countDownLatch::countDown);
+ for (int i = 0; i < 50; i++) {
+ produceToConsumeTopic.newMessage().value(("testConsume " +
i).getBytes()).send();
}
- countDownLatch.await();
-
Thread thread = new Thread(() -> {
try {
new PerformanceTransaction().run(args.split(" "));
@@ -193,27 +200,24 @@ public class Oauth2PerformanceTransactionTest extends
ProducerConsumerBase {
});
thread.start();
thread.join();
- Consumer<byte[]> consumeFromConsumeTopic =
pulsarClient.newConsumer(Schema.BYTES)
- .consumerName("perf-transaction-consumeVerify")
+ QueueConsumer<byte[]> consumeFromConsumeTopic =
v5Client.newQueueConsumer(Schema.bytes())
.topic(testConsumeTopic)
- .subscriptionType(SubscriptionType.Shared)
.subscriptionName(testSub)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.subscribe();
- Consumer<byte[]> consumeFromProduceTopic =
pulsarClient.newConsumer(Schema.BYTES)
- .consumerName("perf-transaction-produceVerify")
+ QueueConsumer<byte[]> consumeFromProduceTopic =
v5Client.newQueueConsumer(Schema.bytes())
.topic(testProduceTopic)
.subscriptionName(testSub)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.subscribe();
for (int i = 0; i < 50; i++) {
- Message<byte[]> message = consumeFromProduceTopic.receive(2,
TimeUnit.SECONDS);
+ Message<byte[]> message =
consumeFromProduceTopic.receive(Duration.ofSeconds(5));
Assert.assertNotNull(message);
- consumeFromProduceTopic.acknowledge(message);
+ consumeFromProduceTopic.acknowledge(message.id());
}
- Message<byte[]> message = consumeFromConsumeTopic.receive(2,
TimeUnit.SECONDS);
+ Message<byte[]> message =
consumeFromConsumeTopic.receive(Duration.ofSeconds(2));
Assert.assertNull(message);
- message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS);
+ message = consumeFromProduceTopic.receive(Duration.ofSeconds(2));
Assert.assertNull(message);
}
diff --git
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index a2f75b7873d..4070d95d72d 100644
---
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -21,41 +21,49 @@ package org.apache.pulsar.testclient;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.net.URL;
+import java.time.Duration;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.CustomLog;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+/**
+ * The perf transaction tools target the scalable-topics (v5) coordinator,
which is transaction-aware
+ * only for scalable {@code topic://} topics (PIP-473). These tests therefore
pre-create scalable
+ * topics and verify with a v5 SDK client (the v4 SDK can't produce/consume on
scalable topics). The
+ * tools themselves are unchanged — they just receive {@code topic://} names
of pre-created topics.
+ */
@CustomLog
public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
private final String testTenant = "pulsar";
private final String testNamespace = "perf";
private final String myNamespace = testTenant + "/" + testNamespace;
- private final String testTopic = "persistent://" + myNamespace + "/test-";
+ // v5 transactions are scalable-topic-only; scalable topics use the
topic:// domain.
+ private final String testTopic = "topic://" + myNamespace + "/test-";
private final AtomicInteger lastExitCode = new AtomicInteger(0);
+ // v5 SDK verification client: v5 transactions are scalable-topic-only,
and the v4 SDK can't
+ // produce/consume on scalable (topic://) topics.
+ private PulsarClient v5Client;
+
@BeforeMethod
@Override
protected void setup() throws Exception {
@@ -78,11 +86,22 @@ public class PerformanceTransactionTest extends
MockedPulsarServiceBaseTest {
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
new PartitionedTopicMetadata(1));
+
+ // transactionPolicy(...) opts the verification client into
transactions and routes it to the
+ // scalable-topics (v5) coordinator.
+ v5Client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(5)).build())
+ .build();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
+ if (v5Client != null) {
+ v5Client.close();
+ v5Client = null;
+ }
super.internalCleanup();
int exitCode = lastExitCode.get();
if (exitCode != 0) {
@@ -90,47 +109,31 @@ public class PerformanceTransactionTest extends
MockedPulsarServiceBaseTest {
}
}
- @SuppressWarnings("deprecation")
@Test
public void testTxnPerf() throws Exception {
- String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u
%s -ss %s -rs -np 1 -au %s";
+ String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u
%s -ss %s --scalable -au %s";
String testConsumeTopic = testTopic + UUID.randomUUID();
String testProduceTopic = testTopic + UUID.randomUUID();
String testSub = "testSub";
- admin.topics().createPartitionedTopic(testConsumeTopic, 1);
String args = String.format(argString, testConsumeTopic,
testProduceTopic,
pulsar.getBrokerServiceUrl(), testSub, new
URL(pulsar.getWebServiceAddress()));
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder()
- .enableTransaction(true)
- .serviceUrl(pulsar.getBrokerServiceUrl())
- .connectionsPerBroker(100)
- .statsInterval(0, TimeUnit.SECONDS)
- .build();
- @Cleanup
- Producer<byte[]> produceToConsumeTopic =
pulsarClient.newProducer(Schema.BYTES)
- .producerName("perf-transaction-producer")
- .sendTimeout(0, TimeUnit.SECONDS)
+ // Scalable topics must be pre-created (they don't auto-create on
produce).
+ admin.scalableTopics().createScalableTopic(testConsumeTopic, 1);
+ admin.scalableTopics().createScalableTopic(testProduceTopic, 1);
+
+ Producer<byte[]> produceToConsumeTopic =
v5Client.newProducer(Schema.bytes())
.topic(testConsumeTopic)
.create();
- @Cleanup
- final Consumer<byte[]> consumer =
pulsarClient.newConsumer(Schema.BYTES)
- .consumerName("perf-transaction-consumeVerify")
+ v5Client.newQueueConsumer(Schema.bytes())
.topic(testConsumeTopic)
- .subscriptionType(SubscriptionType.Shared)
.subscriptionName(testSub + "pre")
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.subscribe();
- CountDownLatch countDownLatch = new CountDownLatch(50);
- for (int i = 0; i < 50
- ; i++) {
- produceToConsumeTopic.newMessage().value(("testConsume " +
i).getBytes()).sendAsync().thenRun(
- countDownLatch::countDown);
+ for (int i = 0; i < 50; i++) {
+ produceToConsumeTopic.newMessage().value(("testConsume " +
i).getBytes()).send();
}
- countDownLatch.await();
-
Thread thread = new Thread(() -> {
try {
new PerformanceTransaction().run(args.split(" "));
@@ -141,54 +144,41 @@ public class PerformanceTransactionTest extends
MockedPulsarServiceBaseTest {
thread.start();
thread.join();
- // Wait for all async transaction commits to complete before verifying
messages
- Awaitility.await().untilAsserted(() -> {
- admin.transactions().getCoordinatorStats().forEach((integer,
transactionCoordinatorStats) -> {
-
Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0);
- });
- });
-
- Assert.assertTrue(admin.topics().getPartitionedStats(testConsumeTopic,
false)
- .getSubscriptions().get(testSub).isReplicated());
- @Cleanup
- Consumer<byte[]> consumeFromConsumeTopic =
pulsarClient.newConsumer(Schema.BYTES)
- .consumerName("perf-transaction-consumeVerify")
+ QueueConsumer<byte[]> consumeFromConsumeTopic =
v5Client.newQueueConsumer(Schema.bytes())
.topic(testConsumeTopic)
- .subscriptionType(SubscriptionType.Shared)
.subscriptionName(testSub)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.subscribe();
- @Cleanup
- Consumer<byte[]> consumeFromProduceTopic =
pulsarClient.newConsumer(Schema.BYTES)
- .consumerName("perf-transaction-produceVerify")
+ QueueConsumer<byte[]> consumeFromProduceTopic =
v5Client.newQueueConsumer(Schema.bytes())
.topic(testProduceTopic)
.subscriptionName(testSub)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.subscribe();
for (int i = 0; i < 50; i++) {
- Message<byte[]> message = consumeFromProduceTopic.receive(10,
TimeUnit.SECONDS);
+ Message<byte[]> message =
consumeFromProduceTopic.receive(Duration.ofSeconds(10));
Assert.assertNotNull(message);
- consumeFromProduceTopic.acknowledge(message);
+ consumeFromProduceTopic.acknowledge(message.id());
}
- Message<byte[]> message = consumeFromConsumeTopic.receive(2,
TimeUnit.SECONDS);
+ Message<byte[]> message =
consumeFromConsumeTopic.receive(Duration.ofSeconds(2));
Assert.assertNull(message);
- message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS);
+ message = consumeFromProduceTopic.receive(Duration.ofSeconds(2));
Assert.assertNull(message);
-
}
-
@Test
- public void testProduceTxnMessage() throws InterruptedException,
PulsarClientException {
+ public void testProduceTxnMessage() throws Exception {
String argString = "%s -r 50 -u %s -m %d -txn";
String topic = testTopic + UUID.randomUUID();
int totalMessage = 100;
String args = String.format(argString, topic,
pulsar.getBrokerServiceUrl(), totalMessage);
+
+ admin.scalableTopics().createScalableTopic(topic, 1);
+
@Cleanup
- final Consumer<byte[]> subscribe =
pulsarClient.newConsumer().subscriptionName("subName" + "pre").topic(topic)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscriptionType(SubscriptionType.Exclusive)
- .enableBatchIndexAcknowledgment(false)
+ QueueConsumer<byte[]> subscribe =
v5Client.newQueueConsumer(Schema.bytes())
+ .subscriptionName("subName" + "pre")
+ .topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.subscribe();
Thread thread = new Thread(() -> {
try {
@@ -200,51 +190,60 @@ public class PerformanceTransactionTest extends
MockedPulsarServiceBaseTest {
thread.start();
thread.join();
- Awaitility.await().untilAsserted(() -> {
- admin.transactions().getCoordinatorStats().forEach((integer,
transactionCoordinatorStats) -> {
-
Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0);
- });
- });
-
@Cleanup
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName("subName").topic(topic)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscriptionType(SubscriptionType.Exclusive)
- .enableBatchIndexAcknowledgment(false)
+ QueueConsumer<byte[]> consumer =
v5Client.newQueueConsumer(Schema.bytes())
+ .subscriptionName("subName")
+ .topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.subscribe();
- for (int i = 0; i < totalMessage; i++) {
- Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
- Assert.assertNotNull(message);
- consumer.acknowledge(message);
+ // PerformanceProducer commits its transactions asynchronously and
run() returns without
+ // awaiting them, so committed messages may still be becoming visible
after the join. Drain
+ // up to a deadline rather than assuming all are immediately readable.
+ int received = 0;
+ long deadline = System.currentTimeMillis() + 30_000;
+ while (received < totalMessage && System.currentTimeMillis() <
deadline) {
+ Message<byte[]> message = consumer.receive(Duration.ofSeconds(2));
+ if (message == null) {
+ continue;
+ }
+ consumer.acknowledge(message.id());
+ received++;
}
- Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertEquals(received, totalMessage, "all committed produced
messages must be delivered");
+ Message<byte[]> message = consumer.receive(Duration.ofSeconds(2));
Assert.assertNull(message);
}
@Test
public void testConsumeTxnMessage() throws Exception {
- String argString = "%s -r 50 -u %s -txn -ss %s -st %s -sp %s -ntxn %d
-tto 5";
+ // A long transaction timeout (-tto) so none of the consumer's
transactions time out and abort
+ // on the slower scalable-topic path: an aborted txn would release its
pending-acked messages
+ // for redelivery and inflate what the verifier sees below.
+ String argString = "%s -r 50 -u %s -txn -ss %s -st %s -sp %s -ntxn %d
-tto 60";
String subName = "sub";
String topic = testTopic + UUID.randomUUID();
+ // -st is PerformanceConsumer's own SubscriptionType enum (Exclusive);
-sp is the v5
+ // SubscriptionInitialPosition enum (EARLIEST).
String args = String.format(argString, topic,
pulsar.getBrokerServiceUrl(), subName,
- SubscriptionType.Exclusive,
SubscriptionInitialPosition.Earliest, 10);
+ "Exclusive", "EARLIEST", 10);
+
+ admin.scalableTopics().createScalableTopic(topic, 1);
+
@Cleanup
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS)
- .create();
+ Producer<byte[]> producer =
v5Client.newProducer(Schema.bytes()).topic(topic).create();
@Cleanup
- final Consumer<byte[]> subscribe =
pulsarClient.newConsumer(Schema.BYTES)
- .consumerName("perf-transaction-consumeVerify")
+ QueueConsumer<byte[]> subscribe =
v5Client.newQueueConsumer(Schema.bytes())
.topic(topic)
- .subscriptionType(SubscriptionType.Shared)
.subscriptionName(subName + "pre")
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.subscribe();
- for (int i = 0; i < 505; i++) {
+ // Exactly numMessagesPerTransaction (50) * -ntxn (10) messages, so
the perf consumer commits
+ // all of them across 10 transactions and leaves the subscription
empty.
+ for (int i = 0; i < 500; i++) {
producer.newMessage().value("messages for test transaction
consumer".getBytes()).send();
}
Thread thread = new Thread(() -> {
try {
- log.info("");
new PerformanceConsumer().run(args.split(" "));
} catch (Exception e) {
e.printStackTrace();
@@ -253,26 +252,16 @@ public class PerformanceTransactionTest extends
MockedPulsarServiceBaseTest {
thread.start();
thread.join();
- Awaitility.await().untilAsserted(() -> {
- admin.transactions().getCoordinatorStats().forEach((integer,
transactionCoordinatorStats) -> {
-
Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0);
- });
- });
-
+ // The perf consumer committed all 10 txns * 50 msgs = 500
transactional acks, so every message
+ // is permanently acknowledged and a fresh consumer on the same
subscription sees nothing.
@Cleanup
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName(subName).topic(topic)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscriptionType(SubscriptionType.Exclusive)
- .enableBatchIndexAcknowledgment(false)
+ QueueConsumer<byte[]> consumer =
v5Client.newQueueConsumer(Schema.bytes())
+ .subscriptionName(subName)
+ .topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.subscribe();
- for (int i = 0; i < 5; i++) {
- Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
- Assert.assertNotNull(message);
- consumer.acknowledge(message);
- }
- Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
- Assert.assertNull(message);
-
+ Message<byte[]> message = consumer.receive(Duration.ofSeconds(2));
+ Assert.assertNull(message, "all transactionally-acked messages must
stay acknowledged");
}
}
diff --git a/tests/integration/build.gradle.kts
b/tests/integration/build.gradle.kts
index 693423e5f06..7fb9809a469 100644
--- a/tests/integration/build.gradle.kts
+++ b/tests/integration/build.gradle.kts
@@ -30,6 +30,8 @@ dependencies {
testImplementation(project(path = ":pulsar-broker-common", configuration =
"testJar"))
testImplementation(project(":pulsar-common"))
testImplementation(project(":pulsar-client-original"))
+ testImplementation(project(":pulsar-client-api-v5"))
+ testImplementation(project(":pulsar-client-v5"))
testImplementation(project(":pulsar-client-admin-original"))
testImplementation(project(":pulsar-proxy"))
testImplementation(project(":managed-ledger"))
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
index d9a2997f240..b3a7c9cceb8 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
@@ -18,135 +18,156 @@
*/
package org.apache.pulsar.tests.integration.transaction;
-import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import java.util.HashSet;
-import java.util.Set;
+import static org.testng.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.CustomLog;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
-import org.awaitility.Awaitility;
import org.testng.annotations.Test;
/**
- * Integration test for the metadata-store transaction-coordinator discovery
path (PIP-473 P5.3).
+ * Integration test for transaction-coordinator coexistence (PIP-473): on a
cluster with the
+ * scalable-topics (v5) coordinator enabled, a v5 SDK client routes its
transactions to the
+ * metadata-store coordinator while a v4 SDK client keeps using the legacy
coordinator — both on the
+ * same cluster. Routing is decided by client/SDK kind (the v5 SDK sets an
internal flag), not by
+ * broker capability, so flipping the broker default to enable v5 must not
break v4 transactions.
*
- * <p>Verifies, across a real multi-broker docker cluster, that a client
discovers coordinators via
- * the {@code CommandWatchTcAssignments} stream (not the assign-topic lookup)
and can drive the
- * transaction lifecycle, including after the broker leading a coordinator
partition is killed.
- *
- * @see TcMetadataDiscoveryTestBase for the scope note (lifecycle, not
data-in-txn).
+ * <p>Scope: transaction-coordinator routing + the v5 lifecycle/failover. Full
v5 data-in-transaction
+ * (produce/ack on segment topics) is exercised separately.
*/
@CustomLog
public class TcMetadataDiscoveryTest extends TcMetadataDiscoveryTestBase {
/**
- * With the scalable-topics TC enabled, a client opens the assignment
watch and can open and
- * commit / abort transactions across all coordinator partitions. Running
many transactions
- * exercises the round-robin spread across the watch-discovered per-leader
connections.
+ * A v5 SDK client runs many transactions (commit and abort) against the
v5-enabled cluster. This
+ * only succeeds if the client routed to the running metadata-store
coordinator — a regression
+ * that broke the watch path or mis-routed v5 to the legacy TC would fail
here.
*/
@Test
- public void transactionLifecycleOverMetadataDiscovery() throws Exception {
+ public void v5SdkTransactionsUseMetadataCoordinator() throws Exception {
@Cleanup
- PulsarClient client = PulsarClient.builder()
- .enableTransaction(true)
- .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
- .build();
-
- // Guard against a silent fallback: assert the client actually
selected the metadata-store
- // assignment-watch path. Otherwise a regression that breaks the watch
entirely would still
- // pass, since the assign topic is initialized with the same partition
count.
-
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl
tcClient =
- ((org.apache.pulsar.client.impl.PulsarClientImpl)
client).getTcClient();
- assertTrue(tcClient.isUsingMetadataDiscovery(),
- "client should use metadata-store TC discovery, not the
assign-topic fallback");
-
- // Run transactions (commit and abort alternately) until every
coordinator partition has
- // minted at least one — proving the client discovered and connected
to each partition's
- // elected leader. An await loop tolerates the brief startup window
where a partition is
- // still mid-election and absent from the assignment snapshot.
- Set<Long> coordinatorsExercised = new HashSet<>();
- final int[] i = {0};
- Awaitility.await()
- .atMost(1, TimeUnit.MINUTES)
- .pollInterval(1, TimeUnit.SECONDS)
- .until(() -> {
- Transaction txn = client.newTransaction()
- .withTransactionTimeout(1, TimeUnit.MINUTES)
- .build().get();
- TxnID txnId = txn.getTxnID();
- assertNotNull(txnId);
- // mostSigBits is the coordinator (TC partition) that
minted the txn.
- coordinatorsExercised.add(txnId.getMostSigBits());
- if (i[0]++ % 2 == 0) {
- txn.commit().get();
- } else {
- txn.abort().get();
- }
- return coordinatorsExercised.size() == TC_PARALLELISM;
- });
- assertEquals(coordinatorsExercised.size(), TC_PARALLELISM,
- "expected transactions to be coordinated by every TC
partition; got "
- + coordinatorsExercised);
+ org.apache.pulsar.client.api.v5.PulsarClient client =
+ org.apache.pulsar.client.api.v5.PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+
.transactionPolicy(org.apache.pulsar.client.api.v5.config.TransactionPolicy.builder()
+
.timeout(java.time.Duration.ofMinutes(1)).build())
+ .build();
+
+ // Enough transactions that the client's round-robin visits every
coordinator partition; if
+ // any were mis-routed to the legacy TC (which doesn't coordinate
scalable transactions) or
+ // the watch path were broken, these would fail.
+ runV5Transactions(client, TC_PARALLELISM * 4);
}
/**
- * Kill one broker and confirm the client keeps working: the coordinator
partitions that broker
- * was leading are re-elected to the survivor, the client's assignment
watch receives the new
- * snapshot, retargets its handlers, and subsequent transactions across
all partitions still
- * succeed.
+ * Kill one broker and confirm the v5 SDK client keeps working:
coordinator partitions led by the
+ * dead broker are re-elected to the survivor, the client's assignment
watch retargets, and
+ * subsequent transactions still succeed.
*/
@Test
- public void transactionsSurviveLeaderBrokerFailure() throws Exception {
+ public void v5SdkTransactionsSurviveLeaderBrokerFailure() throws Exception
{
@Cleanup
- PulsarClient client = PulsarClient.builder()
- .enableTransaction(true)
- .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
- .operationTimeout(30, TimeUnit.SECONDS)
- .build();
+ org.apache.pulsar.client.api.v5.PulsarClient client =
+ org.apache.pulsar.client.api.v5.PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+
.transactionPolicy(org.apache.pulsar.client.api.v5.config.TransactionPolicy.builder()
+
.timeout(java.time.Duration.ofMinutes(1)).build())
+ .build();
- // Warm up: confirm every coordinator is reachable before the failure.
- runTxnOnEveryCoordinator(client);
+ runV5Transactions(client, TC_PARALLELISM * 4);
- // Kill one broker — about half the coordinator partitions lose their
leader.
BrokerContainer victim = pulsarCluster.getBrokers().iterator().next();
log.info().attr("broker", victim.getContainerName()).log("Stopping
broker to force TC failover");
victim.stop();
- // After re-election + assignment-watch refresh, transactions across
all partitions succeed
- // again. runTxnOnEveryCoordinator already retries within a bounded
wait while leadership and
- // the client's handlers converge on the new leaders.
- runTxnOnEveryCoordinator(client);
+ // After re-election + assignment-watch refresh, transactions succeed
again. runV5Transactions
+ // retries within a bounded wait while leadership and the client's
handlers converge.
+ runV5Transactions(client, TC_PARALLELISM * 4);
}
/**
- * Open + commit one transaction on each coordinator partition; asserts
all are covered within a
- * bounded wait. A coordinator's handler connects asynchronously (and,
after a failover, may be
- * briefly mid-reconnect), so a transaction routed to a not-yet-ready
coordinator throws
- * {@code MetaStoreHandlerNotReadyException} / times out — those are
retried rather than failing
- * the run. The assertion is "every coordinator becomes reachable", not
"reachable on the first
- * attempt".
+ * Run {@code count} v5 transactions (commit) back to back. Each
transaction that fails with a
+ * transient error (a coordinator still connecting, or mid-reconnect after
a failover) is retried
+ * up to a deadline rather than spacing one transaction per poll interval
— driving them in a
+ * tight loop keeps total wall-clock bounded by transaction latency, not
by the retry cadence.
*/
- private void runTxnOnEveryCoordinator(PulsarClient client) {
- Set<Long> coordinators = new HashSet<>();
- Awaitility.await()
- .atMost(90, TimeUnit.SECONDS)
- .pollInterval(2, TimeUnit.SECONDS)
- .ignoreExceptions()
- .until(() -> {
- Transaction txn = client.newTransaction()
- .withTransactionTimeout(1, TimeUnit.MINUTES)
- .build().get();
- coordinators.add(txn.getTxnID().getMostSigBits());
- txn.commit().get();
- return coordinators.size() == TC_PARALLELISM;
- });
- assertTrue(coordinators.size() == TC_PARALLELISM,
- "expected all " + TC_PARALLELISM + " coordinators reachable;
got " + coordinators);
+ private void
runV5Transactions(org.apache.pulsar.client.api.v5.PulsarClient client, int
count)
+ throws Exception {
+ long deadline = System.currentTimeMillis() +
TimeUnit.MINUTES.toMillis(3);
+ for (int i = 0; i < count; i++) {
+ while (true) {
+ try {
+ org.apache.pulsar.client.api.v5.Transaction txn =
client.newTransaction();
+ txn.commit();
+ break;
+ } catch (Exception e) {
+ if (System.currentTimeMillis() > deadline) {
+ throw e;
+ }
+ Thread.sleep(500);
+ }
+ }
+ }
+ }
+
+ /**
+ * Coexistence: with the v5 coordinator enabled on the cluster, a v4 SDK
client running a
+ * transaction on a {@code persistent://} topic must still use the legacy
coordinator and work end
+ * to end. Routing by client kind means the v4 client's commands carry no
{@code scalable} flag,
+ * so the broker sends them to the legacy TC. This is the regression guard
for the P5.4 default
+ * flip.
+ */
+ @Test
+ public void v4SdkTransactionStillUsesLegacyCoordinator() throws Exception {
+ @Cleanup
+ org.apache.pulsar.client.api.PulsarClient client =
+ org.apache.pulsar.client.api.PulsarClient.builder()
+ .enableTransaction(true)
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+
+ // Routing assertion: a v4 SDK client must NOT use metadata-store
discovery even though the
+ // cluster has the v5 coordinator enabled — it stays on the legacy
assign-topic coordinator.
+ TransactionCoordinatorClientImpl tcClient = ((PulsarClientImpl)
client).getTcClient();
+ assertFalse(tcClient.isUsingMetadataDiscovery(),
+ "v4 SDK client must use the legacy coordinator, not
metadata-store discovery");
+
+ String topic = "persistent://public/default/v4-coexist-" +
randomName(6);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.STRING).topic(topic).create();
+ @Cleanup
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("coexist-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ // Non-transactional produce, then a transactional ack — exercises the
legacy TC end to end
+ // (newTxn -> addSubscription -> endTxn) on a persistent topic.
+ producer.send("m1");
+ Message<String> msg = consumer.receive(15, TimeUnit.SECONDS);
+ assertNotNull(msg, "should receive the produced message");
+
+ Transaction txn = client.newTransaction()
+ .withTransactionTimeout(1, TimeUnit.MINUTES)
+ .build().get();
+ consumer.acknowledgeAsync(msg.getMessageId(), txn).get();
+ txn.commit().get();
+
+ // After commit the message is acknowledged: redelivery on reconnect
must not return it.
+ consumer.redeliverUnacknowledgedMessages();
+ assertNull(consumer.receive(5, TimeUnit.SECONDS),
+ "committed transactional ack should have consumed the
message");
}
}