This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 17ead1d691c [improve][broker] PIP-473 P5.3: metadata-store TC leader
election + assignment watch (#25929)
17ead1d691c is described below
commit 17ead1d691cde5a02750a2fd0f62ee608203be6a
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 4 17:41:35 2026 -0700
[improve][broker] PIP-473 P5.3: metadata-store TC leader election +
assignment watch (#25929)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 16 ++
.../apache/pulsar/broker/service/ServerCnx.java | 94 ++++++-
.../coordinator/v5/TransactionCoordinatorV5.java | 229 ++++++++++++++---
.../broker/transaction/metadata/TcLeader.java | 39 +++
.../broker/transaction/metadata/TxnPaths.java | 22 ++
.../v5/TransactionCoordinatorV5Test.java | 112 +++++++-
.../client/impl/TransactionClientConnectTest.java | 12 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 68 +++++
.../pulsar/client/impl/ConnectionHandler.java | 29 ++-
.../apache/pulsar/client/impl/HandlerState.java | 7 +
.../client/impl/TransactionMetaStoreHandler.java | 65 ++++-
.../impl/transaction/AssignTopicTcDiscovery.java | 124 +++++++++
.../client/impl/transaction/TcDiscovery.java | 67 +++++
.../TransactionCoordinatorClientImpl.java | 138 +++++-----
.../transaction/WatchTcAssignmentsDiscovery.java | 285 +++++++++++++++++++++
.../apache/pulsar/common/protocol/Commands.java | 66 +++++
.../pulsar/common/protocol/PulsarDecoder.java | 32 +++
pulsar-common/src/main/proto/PulsarApi.proto | 51 ++++
.../common/protocol/CommandsTcAssignmentsTest.java | 125 +++++++++
.../pulsar/proxy/server/ProxyConnection.java | 4 +-
.../transaction/TcMetadataDiscoveryTest.java | 152 +++++++++++
.../transaction/TcMetadataDiscoveryTestBase.java | 71 +++++
.../src/test/resources/pulsar-transaction.xml | 1 +
23 files changed, 1695 insertions(+), 114 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e5e69ea14ad..f10b2cbdc32 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3834,6 +3834,22 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int transactionCoordinatorScalableTopicsGcRetentionSeconds = 900;
+ @FieldContext(
+ category = CATEGORY_TRANSACTION,
+ minValue = 1,
+ doc = "Degree of parallelism for the scalable-topics transaction
coordinator: how many"
+ + " independent coordinator instances run across the
cluster. Each is"
+ + " leader-elected independently in the metadata store and
coordinates the"
+ + " transactions whose id maps to it. Fixed at cluster
bring-up — changing it"
+ + " later would strand the coordinator id encoded in
existing transaction ids"
+ + " (and, because an aborted transaction's records are
retained as long as its"
+ + " messages are, the value can only be reduced once all
transactions created"
+ + " under the previous value have been fully cleaned up).
All brokers must agree"
+ + " on this value; a mismatch is rejected at startup. Only
relevant when"
+ + " transactionCoordinatorScalableTopicsEnabled = true."
+ )
+ private int transactionCoordinatorScalableTopicsParallelism = 16;
+
@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Class name for transaction metadata store provider"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index af7e8930f36..00253bbb431 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -108,6 +108,7 @@ import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaExce
import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.topiclistlimit.TopicListMemoryLimiter;
import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache;
+import
org.apache.pulsar.broker.transaction.coordinator.v5.TransactionCoordinatorV5;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
@@ -505,6 +506,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
});
scalableTopicsWatchers.clear();
+ // Same for transaction-coordinator assignment watchers.
+ tcAssignmentWatchers.values().forEach(this::closeQuietly);
+ tcAssignmentWatchers.clear();
+
// Notify the scalable-topic controller that this connection's
scalable consumers
// have dropped. The controller marks them disconnected and starts the
grace-period
// timer; if they reconnect in time, their assignment is preserved.
@@ -865,6 +870,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ScalableTopicsWatcherSession>
scalableTopicsWatchers = new ConcurrentHashMap<>();
+ // --- Transaction-coordinator assignment watchers ---
+ // watchId -> deregistration handle for the listener registered on
TransactionCoordinatorV5.
+ private final ConcurrentHashMap<Long, AutoCloseable> tcAssignmentWatchers
= new ConcurrentHashMap<>();
+ // Delay before re-pushing a TC-assignment snapshot that was incomplete (a
partition mid-election)
+ // or that failed to build, so the client converges without waiting for an
external trigger.
+ private static final long TC_ASSIGNMENTS_REPUSH_DELAY_MS = 1000L;
+
@Override
protected void handleCommandWatchScalableTopics(
CommandWatchScalableTopics cmd) {
@@ -965,6 +977,85 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
}
+ // --- Transaction-coordinator assignment watch ---
+
+ @Override
+ protected void handleCommandWatchTcAssignments(
+ org.apache.pulsar.common.api.proto.CommandWatchTcAssignments cmd) {
+ checkArgument(state == State.Connected);
+ final long watchId = cmd.getWatchId();
+ log.debug().attr("watchId", watchId).log("Received
WatchTcAssignments");
+
+ if
(!service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
{
+ ctx.writeAndFlush(Commands.newWatchTcAssignmentsError(watchId,
ServerError.NotAllowedError,
+ "Scalable-topics transaction coordinator is disabled on
this broker"));
+ return;
+ }
+ TransactionCoordinatorV5 tc =
service.getPulsar().getTransactionCoordinatorV5();
+ if (tc == null) {
+ ctx.writeAndFlush(Commands.newWatchTcAssignmentsError(watchId,
ServerError.ServiceNotReady,
+ "Transaction coordinator not ready"));
+ return;
+ }
+ // Register a listener that re-pushes the full snapshot on any
leadership change, then send
+ // the initial snapshot. Authz: this is broker-internal coordination,
not a per-topic op, so
+ // an authenticated connection is sufficient (same trust model as
TC_CLIENT_CONNECT).
+ AutoCloseable handle = tc.registerAssignmentChangeListener(
+ () -> ctx.executor().execute(() ->
sendTcAssignmentsSnapshot(watchId, tc)));
+ AutoCloseable prev = tcAssignmentWatchers.put(watchId, handle);
+ closeQuietly(prev);
+ sendTcAssignmentsSnapshot(watchId, tc);
+ }
+
+ private void sendTcAssignmentsSnapshot(long watchId,
TransactionCoordinatorV5 tc) {
+ if (!tcAssignmentWatchers.containsKey(watchId)) {
+ return;
+ }
+ tc.buildAssignmentsSnapshot().thenAccept(snapshot ->
ctx.executor().execute(() -> {
+ if (!tcAssignmentWatchers.containsKey(watchId)) {
+ return;
+ }
+ java.util.Map<Integer, String[]> leaders = new
java.util.HashMap<>();
+ snapshot.assignments().forEach((partition, leader) ->
leaders.put(partition,
+ new String[] {leader.brokerServiceUrl(),
leader.brokerServiceUrlTls()}));
+ ctx.writeAndFlush(Commands.newWatchTcAssignmentsSnapshot(
+ watchId, snapshot.partitionCount(), leaders));
+ // If some partition is still mid-election, the snapshot is
incomplete. Schedule a single
+ // delayed re-push so the client doesn't stay parked on a missing
partition waiting for a
+ // leadership change that may never come (the cache repopulating
fires no TC listener).
+ if (!snapshot.isComplete()) {
+ ctx.executor().schedule(() ->
sendTcAssignmentsSnapshot(watchId, tc),
+ TC_ASSIGNMENTS_REPUSH_DELAY_MS, TimeUnit.MILLISECONDS);
+ }
+ })).exceptionally(ex -> {
+ log.warn().attr("watchId", watchId).exception(ex)
+ .log("Failed to build TC-assignments snapshot; retrying
shortly");
+ ctx.executor().schedule(() -> sendTcAssignmentsSnapshot(watchId,
tc),
+ TC_ASSIGNMENTS_REPUSH_DELAY_MS, TimeUnit.MILLISECONDS);
+ return null;
+ });
+ }
+
+ @Override
+ protected void handleCommandWatchTcAssignmentsClose(
+ org.apache.pulsar.common.api.proto.CommandWatchTcAssignmentsClose
cmd) {
+ checkArgument(state == State.Connected);
+ long watchId = cmd.getWatchId();
+ log.debug().attr("watchId", watchId).log("Received
WatchTcAssignmentsClose");
+ closeQuietly(tcAssignmentWatchers.remove(watchId));
+ }
+
+ private void closeQuietly(AutoCloseable handle) {
+ if (handle == null) {
+ return;
+ }
+ try {
+ handle.close();
+ } catch (Exception e) {
+ log.warn().exceptionMessage(e).log("Error closing TC-assignment
watcher");
+ }
+ }
+
@Override
protected void handleCommandScalableTopicClose(
CommandScalableTopicClose commandScalableTopicClose) {
@@ -1316,7 +1407,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
maybeScheduleAuthenticationCredentialsRefresh();
}
writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize, enableTopicListWatcher,
- scalableTopicsEnabled));
+ scalableTopicsEnabled,
+
service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
log.debug()
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
index d4bfd8581a2..22ad4aee3ef 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
@@ -24,9 +24,13 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -35,6 +39,7 @@ import java.util.function.Supplier;
import lombok.CustomLog;
import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
+import org.apache.pulsar.broker.transaction.metadata.TcLeader;
import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
import org.apache.pulsar.broker.transaction.metadata.TxnIds;
@@ -50,6 +55,8 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.ScanConsumer;
+import org.apache.pulsar.metadata.api.coordination.LeaderElection;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
@@ -57,10 +64,20 @@ import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException
/**
* Metadata-driven transaction coordinator for scalable topics — broker-side
service.
*
- * <p>Per-partition coordinator. A broker runs the TC for partition {@code N}
iff it owns
- * partition {@code N} of {@code
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN} — same
- * leader-election mechanism the legacy {@code
TransactionMetadataStoreService} uses; reusing
- * it keeps the client-side discovery surface unchanged.
+ * <p>Per-partition coordinator. Leadership rests on the metadata store
directly: each TC
+ * partition {@code N} has a {@link LeaderElection} at {@code
/txn/tc/leader/<N>}, and a broker
+ * runs the TC for partition {@code N} iff it currently leads that election.
This removes the
+ * dependency on the {@code transaction_coordinator_assign} topic and its
bundle ownership — TC
+ * coordination liveness no longer rides on the topic/namespace/load-balancer
machinery, only on
+ * the metadata store (which the TC already hard-depends on for every header
read/write).
+ *
+ * <p><b>Distribution.</b> Every broker calls {@code elect()} on every
partition (elect-all):
+ * the {@code LeaderElection} primitive only fails a leader over to a broker
that is already a
+ * candidate, so to keep every partition survivable every broker must be a
candidate for every
+ * partition. The N independent elections start concurrently, so on a co-start
leadership lands
+ * roughly balanced across brokers, and every partition has B−1 standby
candidates for instant
+ * failover. (After a strictly sequential scale-up an early broker can hold
more partitions until
+ * it restarts; TC load is light, so v1 does not actively rebalance.)
*
* <p>Wire commands handled (routed by {@code ServerCnx} when
* {@code transactionCoordinatorScalableTopicsEnabled} is on):
@@ -71,6 +88,8 @@ import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException
* advertise themselves by writing {@code /txn/op} records, so the TC
doesn't need a
* pre-registration step.</li>
* <li>{@code END_TXN} → {@link #endTransaction}</li>
+ * <li>{@code WATCH_TC_ASSIGNMENTS} → {@link #buildAssignmentsSnapshot} +
push-on-change, the
+ * client's discovery surface (which broker leads which partition).</li>
* </ul>
*
* <p>{@code endTransaction} CAS-updates the header to the terminal state,
enumerates
@@ -79,18 +98,18 @@ import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException
* {@code (segment, subscription)} pair. The fan-out is metadata-store writes
(not RPCs) and
* is bounded by the txn's participant count.
*
- * <p>Background sweeps: a single elected broker — the owner of partition 0 of
- * {@code transaction_coordinator_assign} — periodically (a) aborts timed-out
open transactions
- * ({@link #sweepTimeouts}) and (b) garbage-collects finalized transactions
whose retention has
- * elapsed ({@link #sweepGc}). Concurrent sweeps from a stale owner are still
safe — every state
- * transition is a header CAS — so the single-sweeper election is an
efficiency measure, not a
- * correctness one.
+ * <p>Background sweeps: the broker that leads partition 0 periodically (a)
aborts timed-out open
+ * transactions ({@link #sweepTimeouts}) and (b) garbage-collects finalized
transactions whose
+ * retention has elapsed ({@link #sweepGc}). Concurrent sweeps from a stale
leader are still safe
+ * — every state transition is a header CAS — so the single-sweeper election
is an efficiency
+ * measure, not a correctness one.
*/
@CustomLog
public class TransactionCoordinatorV5 {
private final PulsarService pulsar;
private final TxnMetadataStore txnStore;
+ private final int partitionCount;
private final long timeoutSweepIntervalMs;
private final long gcSweepIntervalMs;
@@ -100,10 +119,18 @@ public class TransactionCoordinatorV5 {
private final AtomicBoolean timeoutSweepRunning = new AtomicBoolean(false);
private final AtomicBoolean gcSweepRunning = new AtomicBoolean(false);
+ /** Per-partition leader-election controllers, keyed by partition
(0..partitionCount-1). */
+ private final Map<Integer, LeaderElection<TcLeader>> elections = new
ConcurrentHashMap<>();
+ /** The local broker's election value — what we propose for every
partition we lead. */
+ private volatile TcLeader localLeader;
+ /** Open assignment-watch listeners (one per watching client connection).
*/
+ private final List<Runnable> assignmentChangeListeners = new
CopyOnWriteArrayList<>();
+
public TransactionCoordinatorV5(PulsarService pulsar) {
this.pulsar = pulsar;
this.txnStore = new TxnMetadataStore(pulsar.getLocalMetadataStore());
var config = pulsar.getConfiguration();
+ this.partitionCount =
config.getTransactionCoordinatorScalableTopicsParallelism();
this.timeoutSweepIntervalMs = TimeUnit.SECONDS.toMillis(
config.getTransactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds());
this.gcSweepIntervalMs = TimeUnit.SECONDS.toMillis(
@@ -115,14 +142,32 @@ public class TransactionCoordinatorV5 {
// ---- Lifecycle --------------------------------------------------------
/**
- * Start the periodic timeout / GC sweeps on a dedicated single-thread
scheduler. Each tick is
- * gated by {@link #ifElectedSweeper} so only the partition-0 owner does
the scan. Idempotent —
- * a second call is ignored.
+ * Start the coordinator: create a per-partition {@link LeaderElection}
and {@code elect()} on
+ * every partition (elect-all), then start the periodic timeout / GC
sweeps on a dedicated
+ * single-thread scheduler. Sweep ticks are gated by {@link
#ifElectedSweeper} so only the
+ * partition-0 leader scans. Idempotent — a second call is ignored.
*/
public synchronized void start() {
if (closed || sweepExecutor != null) {
return;
}
+ verifyParallelismConsistency();
+ this.localLeader = new TcLeader(pulsar.getBrokerId(),
pulsar.getBrokerServiceUrl(),
+ pulsar.getBrokerServiceUrlTls(),
pulsar.getSafeWebServiceAddress());
+ for (int partition = 0; partition < partitionCount; partition++) {
+ final int p = partition;
+ LeaderElection<TcLeader> election =
pulsar.getCoordinationService().getLeaderElection(
+ TcLeader.class, TxnPaths.tcLeaderPath(p), state ->
onElectionStateChange(p, state));
+ elections.put(p, election);
+ // elect-all: become a candidate for every partition so leadership
is balanced across
+ // brokers and every partition has standbys for failover. Errors
are logged; the
+ // LeaderElection retries internally.
+ election.elect(localLeader).exceptionally(ex -> {
+ log.warn().attr("partition", p).exception(ex).log("v5 TC
initial elect failed");
+ return null;
+ });
+ }
+
sweepExecutor = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("pulsar-txn-v5-sweep"));
sweepExecutor.scheduleWithFixedDelay(
@@ -133,13 +178,87 @@ public class TransactionCoordinatorV5 {
gcSweepIntervalMs, gcSweepIntervalMs, TimeUnit.MILLISECONDS);
}
- /** Stop the sweeps. Idempotent. */
+ /**
+ * Persist this broker's configured parallelism cluster-wide on first
start, and verify every
+ * subsequent broker agrees. A mismatch means brokers would run different
election sets and the
+ * coordinator-count encoded in transaction ids would be ambiguous — fatal
misconfiguration, so
+ * we fail fast rather than start in an inconsistent state.
+ */
+ private void verifyParallelismConsistency() {
+ var store = pulsar.getLocalMetadataStore();
+ try {
+ byte[] value =
Integer.toString(partitionCount).getBytes(java.nio.charset.StandardCharsets.UTF_8);
+ var existing = store.get(TxnPaths.TXN_TC_PARALLELISM_PATH).get();
+ if (existing.isEmpty()) {
+ // First broker to start writes the value (CAS create; lose
harmlessly to a racing peer).
+ store.put(TxnPaths.TXN_TC_PARALLELISM_PATH, value,
java.util.Optional.of(-1L))
+ .get();
+ var after = store.get(TxnPaths.TXN_TC_PARALLELISM_PATH).get();
+ if (after.isPresent()) {
+ checkParallelismMatches(after.get().getValue());
+ }
+ } else {
+ checkParallelismMatches(existing.get().getValue());
+ }
+ } catch (IllegalStateException e) {
+ throw e;
+ } catch (Exception e) {
+ // A racing create (BadVersion) or read-after-write resolves by
re-reading and comparing.
+ try {
+ var after = store.get(TxnPaths.TXN_TC_PARALLELISM_PATH).get();
+ after.ifPresent(r -> checkParallelismMatches(r.getValue()));
+ } catch (Exception ignore) {
+ log.warn().exception(e).log("Could not verify TC parallelism
consistency; proceeding");
+ }
+ }
+ }
+
+ private void checkParallelismMatches(byte[] storedValue) {
+ int stored = Integer.parseInt(new String(storedValue,
java.nio.charset.StandardCharsets.UTF_8).trim());
+ if (stored != partitionCount) {
+ throw new IllegalStateException(
+ "transactionCoordinatorScalableTopicsParallelism mismatch:
this broker is configured"
+ + " with " + partitionCount + " but the cluster
was initialized with " + stored
+ + ". The value is fixed at cluster bring-up and
must be identical on every"
+ + " broker.");
+ }
+ }
+
+ /** Stop the sweeps and release every leader-election lease. Idempotent. */
public synchronized void close() {
closed = true;
if (sweepExecutor != null) {
sweepExecutor.shutdownNow();
sweepExecutor = null;
}
+ elections.values().forEach(e -> e.asyncClose().exceptionally(ex -> {
+ log.warn().exception(ex).log("v5 TC election close failed");
+ return null;
+ }));
+ elections.clear();
+ assignmentChangeListeners.clear();
+ }
+
+ /**
+ * Whether this broker currently leads TC partition {@code partition}.
Used to gate
+ * client-connect acceptance and the sweep. A partition with no local
election (out-of-range or
+ * pre-{@code start()}) is not led here.
+ */
+ public boolean isLeaderFor(int partition) {
+ LeaderElection<TcLeader> election = elections.get(partition);
+ return election != null && election.getState() ==
LeaderElectionState.Leading;
+ }
+
+ /** Fire every assignment-watch listener — a leader changed somewhere, so
the map moved. */
+ private void onElectionStateChange(int partition, LeaderElectionState
state) {
+ log.debug().attr("partition", partition).attr("state", state).log("v5
TC election state changed");
+ for (Runnable listener : assignmentChangeListeners) {
+ try {
+ listener.run();
+ } catch (Throwable t) {
+ log.warn().exception(t).log("v5 TC assignment listener
failed");
+ }
+ }
}
/**
@@ -174,17 +293,75 @@ public class TransactionCoordinatorV5 {
// ---- TC client connect ------------------------------------------------
/**
- * Verify this broker is the leader for {@code tcId} (owns the
corresponding partition of
- * {@code transaction_coordinator_assign}). Mirrors the ownership check
the legacy
- * {@code TransactionMetadataStoreService.handleTcClientConnect} performs
— the same
- * topic-ownership mechanism serves as our leader-election surface.
+ * Verify this broker may coordinate {@code tcId}. A new client reaches us
via the assignment
+ * watch, so we are the metadata-store election leader for that partition.
An old client (no
+ * assignment-watch support) reaches us via an assign-topic lookup, so we
own that partition's
+ * assign-topic bundle. Accept either: both are correctness-safe because
every transaction
+ * state transition is a metadata-store CAS, so even a stale router can't
corrupt state. We
+ * accept-if-leader first (cheap, in-memory) and fall back to the
assign-topic ownership check
+ * only when we're not the election leader.
*/
public CompletableFuture<Void>
handleClientConnect(TransactionCoordinatorID tcId) {
+ if (isLeaderFor((int) tcId.getId())) {
+ return CompletableFuture.completedFuture(null);
+ }
String assignPartition =
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
.getPartition((int) tcId.getId()).toString();
return
pulsar.getBrokerService().checkTopicNsOwnership(assignPartition);
}
+ // ---- Assignment discovery (client watch) ------------------------------
+
+ /**
+ * Build the current full {@code partition → leader} snapshot from the
election state. Uses the
+ * async {@link LeaderElection#getLeaderValue()} (which loads from the
metadata store on a cache
+ * miss) rather than the cache-only {@code getLeaderValueIfPresent()}:
when this broker just
+ * transitioned to {@code Following} for a partition, its local cache for
the new leader's node
+ * may not be repopulated yet, and a cache-only read would silently omit
that partition. Loading
+ * from the store closes that window so a follower's snapshot is still
complete.
+ *
+ * <p>A partition still genuinely without a leader (no broker elected yet)
is omitted; the caller
+ * ({@code ServerCnx}) re-pushes shortly after so the client isn't
stranded. Always the complete
+ * map — the watch protocol sends full snapshots, never diffs.
+ *
+ * @return a future of the snapshot plus whether it is complete (every
partition has a leader)
+ */
+ public CompletableFuture<TcAssignmentsSnapshot> buildAssignmentsSnapshot()
{
+ Map<Integer, TcLeader> assignments = new ConcurrentSkipListMap<>();
+ List<CompletableFuture<Void>> loads = new
ArrayList<>(elections.size());
+ for (Map.Entry<Integer, LeaderElection<TcLeader>> e :
elections.entrySet()) {
+ int partition = e.getKey();
+ loads.add(e.getValue().getLeaderValue()
+ .thenAccept(opt -> opt.ifPresent(leader ->
assignments.put(partition, leader)))
+ .exceptionally(ex -> {
+ // Treat a load error as "leader unknown for now"; the
re-push will retry.
+ log.debug().attr("partition", partition).exception(ex)
+ .log("v5 TC leader-value load failed while
building snapshot");
+ return null;
+ }));
+ }
+ return FutureUtil.waitForAll(loads)
+ .thenApply(__ -> new TcAssignmentsSnapshot(partitionCount, new
TreeMap<>(assignments)));
+ }
+
+ /**
+ * Register a listener fired whenever the assignment map may have changed
(any partition's
+ * leadership moved). Returns an {@link AutoCloseable} that deregisters it
— the
+ * {@code ServerCnx} closes it when the client closes the watch or
disconnects.
+ */
+ public AutoCloseable registerAssignmentChangeListener(Runnable listener) {
+ assignmentChangeListeners.add(listener);
+ return () -> assignmentChangeListeners.remove(listener);
+ }
+
+ /** Immutable full assignment snapshot: partition count + the
currently-known leaders. */
+ public record TcAssignmentsSnapshot(int partitionCount, Map<Integer,
TcLeader> assignments) {
+ /** @return true if every partition has a known leader (no
mid-election gaps). */
+ public boolean isComplete() {
+ return assignments.size() == partitionCount;
+ }
+ }
+
// ---- newTransaction ---------------------------------------------------
/**
@@ -459,21 +636,15 @@ public class TransactionCoordinatorV5 {
}
/**
- * Run {@code action} only on the elected sweeper — the broker that owns
partition 0 of
- * {@code transaction_coordinator_assign}. Not owning it (or any error
checking ownership) means
- * "skip this cycle". Correctness doesn't depend on the election: every
transition is a header
- * CAS, so a stale owner sweeping concurrently is harmless.
+ * Run {@code action} only on the elected sweeper — the broker that leads
TC partition 0. Not
+ * leading it means "skip this cycle". Correctness doesn't depend on the
election: every
+ * transition is a header CAS, so a stale leader sweeping concurrently is
harmless.
*/
private CompletableFuture<Void>
ifElectedSweeper(Supplier<CompletableFuture<Void>> action) {
- if (closed) {
+ if (closed || !isLeaderFor(0)) {
return CompletableFuture.completedFuture(null);
}
- String assignPartition0 =
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
- .getPartition(0).toString();
- return
pulsar.getBrokerService().checkTopicNsOwnership(assignPartition0)
- .handle((v, ex) -> ex == null)
- .thenCompose(owned -> (owned && !closed)
- ? action.get() :
CompletableFuture.completedFuture(null));
+ return action.get();
}
/** A {@code (segment, subscription)} ack participant; keys the ack
fan-out de-dup set. */
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcLeader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcLeader.java
new file mode 100644
index 00000000000..cefba83d7c9
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcLeader.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * Value stored in a per-partition transaction-coordinator leader-election node
+ * ({@code /txn/tc/leader/<partition>}). Identifies the broker currently
coordinating that TC
+ * partition and carries the connection URLs a client needs to reach it — so
any broker can
+ * answer a client's assignment watch from the election value alone, without a
further lookup.
+ *
+ * <p>Serialized as JSON via {@link
org.apache.pulsar.common.util.ObjectMapperFactory} by the
+ * coordination service's {@code LeaderElection} serde.
+ *
+ * @param brokerId the elected broker's id (matches the {@code
/loadbalance/brokers} key)
+ * @param brokerServiceUrl the broker's binary service URL (non-TLS); may
be {@code null} if the
+ * broker only advertises a TLS endpoint
+ * @param brokerServiceUrlTls the broker's binary service URL (TLS); may be
{@code null} if TLS is
+ * disabled
+ * @param webServiceUrl the broker's HTTP service URL, for admin/CLI
resolution
+ */
+public record TcLeader(String brokerId, String brokerServiceUrl, String
brokerServiceUrlTls,
+ String webServiceUrl) {
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
index 0c5bbcfd972..e94d0b17053 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -107,6 +107,28 @@ public final class TxnPaths {
return TXN_TC_SEQ_PREFIX + "/" + tcId;
}
+ /**
+ * Path prefix for the per-partition transaction-coordinator
leader-election nodes. Each
+ * partition {@code N} has a {@code LeaderElection} under {@code
/txn/tc/leader/<N>} whose
+ * value is the {@link TcLeader} currently coordinating that partition.
Replaces the
+ * {@code transaction_coordinator_assign} topic as the v5 TC's election
surface — election
+ * rests on the metadata store directly, not on topic/bundle ownership.
+ */
+ public static final String TXN_TC_LEADER_PREFIX = "/txn/tc/leader";
+
+ /** @return {@code /txn/tc/leader/<partition>} — the leader-election node
for {@code partition}. */
+ public static String tcLeaderPath(int partition) {
+ return TXN_TC_LEADER_PREFIX + "/" + partition;
+ }
+
+ /**
+ * Cluster-wide record of the scalable-topics TC parallelism, written once
by the first broker to
+ * start. Every broker verifies its configured value against this and
refuses to start on a
+ * mismatch, so the coordinator-count encoded in transaction ids stays
stable for the cluster's
+ * lifetime.
+ */
+ public static final String TXN_TC_PARALLELISM_PATH = "/txn/tc/parallelism";
+
/** Width used when formatting long values into
lexicographically-orderable index keys. */
public static final int LONG_KEY_WIDTH = 20;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
index c5f261e85ae..9db63021dd9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
@@ -39,7 +39,9 @@ import org.apache.pulsar.broker.transaction.metadata.TxnState;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.awaitility.Awaitility;
@@ -60,6 +62,7 @@ public class TransactionCoordinatorV5Test {
private TxnMetadataStore txnStore;
private PulsarService pulsar;
private BrokerService brokerService;
+ private CoordinationService coordinationService;
private TransactionCoordinatorV5 tc;
@BeforeMethod
@@ -67,17 +70,30 @@ public class TransactionCoordinatorV5Test {
store = MetadataStoreExtended.create("memory:local",
MetadataStoreConfig.builder().fsyncEnable(false).build());
txnStore = new TxnMetadataStore(store);
+ coordinationService = new CoordinationServiceImpl(store);
pulsar = mock(PulsarService.class);
when(pulsar.getLocalMetadataStore()).thenReturn(store);
+ when(pulsar.getCoordinationService()).thenReturn(coordinationService);
+ when(pulsar.getBrokerId()).thenReturn("broker-test:8080");
+
when(pulsar.getBrokerServiceUrl()).thenReturn("pulsar://broker-test:6650");
+ when(pulsar.getBrokerServiceUrlTls()).thenReturn(null);
+
when(pulsar.getSafeWebServiceAddress()).thenReturn("http://broker-test:8080");
ServiceConfiguration cfg = new ServiceConfiguration();
// GC sweep tests assume retention has already elapsed.
cfg.setTransactionCoordinatorScalableTopicsGcRetentionSeconds(0);
+ // Keep the election small so start() converges quickly in unit tests.
+ cfg.setTransactionCoordinatorScalableTopicsParallelism(4);
when(pulsar.getConfiguration()).thenReturn(cfg);
brokerService = mock(BrokerService.class);
when(pulsar.getBrokerService()).thenReturn(brokerService);
- // Default: owned. Tests that want to assert the not-owned path can
override.
+ // Default: owned (assign-topic fallback path in handleClientConnect).
Tests that want to
+ // assert the not-owned path can override.
when(brokerService.checkTopicNsOwnership(any())).thenReturn(CompletableFuture.completedFuture(null));
tc = new TransactionCoordinatorV5(pulsar);
+ tc.start();
+ // As the only broker, we win every partition's election; wait until
partition 0 is led so
+ // the sweep-gating and client-connect paths behave deterministically.
+ Awaitility.await().until(() -> tc.isLeaderFor(0));
}
@AfterMethod(alwaysRun = true)
@@ -85,6 +101,9 @@ public class TransactionCoordinatorV5Test {
if (tc != null) {
tc.close();
}
+ if (coordinationService != null) {
+ coordinationService.close();
+ }
if (store != null) {
store.close();
}
@@ -320,16 +339,89 @@ public class TransactionCoordinatorV5Test {
}
@Test
- public void sweeps_skipWhenNotElected() throws Exception {
- // Override the owned-default with a failure → not the elected sweeper
→ action skipped.
- when(brokerService.checkTopicNsOwnership(any())).thenReturn(
- CompletableFuture.failedFuture(new RuntimeException("not
owner")));
-
+ public void sweeps_skipWhenNotLeader() throws Exception {
+ // Create an expired txn, then drop leadership (close releases the
election leases). The
+ // sweep is gated on isLeaderFor(0), so on a fresh non-leader TC it
must skip.
TxnID expired = tc.newTransaction(TC_ID, 1L, "owner").get();
- tc.sweepTimeouts().get();
+ tc.close();
+
+ // A second TC that never started (no elections) is not the leader for
partition 0.
+ TransactionCoordinatorV5 notLeader = new
TransactionCoordinatorV5(pulsar);
+ try {
+ notLeader.sweepTimeouts().get();
+ // Still OPEN — the sweep never ran because this TC leads no
partition.
+ var header =
txnStore.getHeader(TxnIds.toKey(expired)).get().orElseThrow();
+ assertThat(header.value().getState()).isEqualTo(TxnState.OPEN);
+ } finally {
+ notLeader.close();
+ }
+ }
- // Still OPEN — the sweep never ran because we don't own
assign-partition 0.
- var header =
txnStore.getHeader(TxnIds.toKey(expired)).get().orElseThrow();
- assertThat(header.value().getState()).isEqualTo(TxnState.OPEN);
+ // ---- Election + assignment discovery ----------------------------------
+
+ @Test
+ public void election_singleBrokerLeadsAllPartitions() {
+ // As the only broker, this TC wins every partition's election.
+ for (int p = 0; p < 4; p++) {
+ final int partition = p;
+ Awaitility.await().until(() -> tc.isLeaderFor(partition));
+ }
+ assertThat(tc.isLeaderFor(4)).isFalse(); // out of range (parallelism
= 4)
+ }
+
+ @Test
+ public void buildAssignmentsSnapshot_reportsAllLedPartitions() {
+
assertThat(tc.buildAssignmentsSnapshot().join().partitionCount()).isEqualTo(4);
+ Awaitility.await().untilAsserted(() -> {
+ var snap = tc.buildAssignmentsSnapshot().join();
+ assertThat(snap.assignments()).hasSize(4);
+ assertThat(snap.isComplete()).isTrue();
+ assertThat(snap.assignments().get(0).brokerServiceUrl())
+ .isEqualTo("pulsar://broker-test:6650");
+
assertThat(snap.assignments().get(0).brokerId()).isEqualTo("broker-test:8080");
+ });
+ }
+
+ @Test
+ public void registerAssignmentChangeListener_deregistersOnClose() throws
Exception {
+ // The handle deregisters the listener; after close() it must not be
invoked again. We only
+ // assert the registration/deregistration contract here — the
fire-on-election-change path
+ // needs a multi-broker setup and is covered at integration level.
+ AutoCloseable handle = tc.registerAssignmentChangeListener(() -> { });
+ handle.close();
+ }
+
+ @Test
+ public void handleClientConnect_acceptsWhenLeader() throws Exception {
+ // We lead partition 0, so connect is accepted without consulting
assign-topic ownership.
+ tc.handleClientConnect(TC_ID).get();
+ }
+
+ @Test
+ public void start_failsOnParallelismMismatch() throws Exception {
+ // The running tc (from setUp) persisted parallelism=4. A second
coordinator configured with a
+ // different value against the same metadata store must refuse to
start.
+ ServiceConfiguration mismatchCfg = new ServiceConfiguration();
+
mismatchCfg.setTransactionCoordinatorScalableTopicsGcRetentionSeconds(0);
+ mismatchCfg.setTransactionCoordinatorScalableTopicsParallelism(8);
+ PulsarService other = mock(PulsarService.class);
+ when(other.getLocalMetadataStore()).thenReturn(store);
+ when(other.getCoordinationService()).thenReturn(coordinationService);
+ when(other.getConfiguration()).thenReturn(mismatchCfg);
+ when(other.getBrokerId()).thenReturn("broker-other:8080");
+
when(other.getBrokerServiceUrl()).thenReturn("pulsar://broker-other:6650");
+
when(other.getSafeWebServiceAddress()).thenReturn("http://broker-other:8080");
+ when(other.getBrokerService()).thenReturn(brokerService);
+
+ TransactionCoordinatorV5 mismatched = new
TransactionCoordinatorV5(other);
+ try {
+ assertThatThrownBy(mismatched::start)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("mismatch")
+ .hasMessageContaining("8")
+ .hasMessageContaining("4");
+ } finally {
+ mismatched.close();
+ }
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
index ecd0479d085..e63542cd060 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
@@ -138,10 +138,7 @@ public class TransactionClientConnectTest extends
TransactionTestBase {
@Test
public void testPulsarClientCloseThenCloseTcClient() throws Exception {
TransactionCoordinatorClientImpl transactionCoordinatorClient =
((PulsarClientImpl) pulsarClient).getTcClient();
- Field field =
TransactionCoordinatorClientImpl.class.getDeclaredField("handlers");
- field.setAccessible(true);
- TransactionMetaStoreHandler[] handlers =
- (TransactionMetaStoreHandler[])
field.get(transactionCoordinatorClient);
+ java.util.Collection<TransactionMetaStoreHandler> handlers =
transactionCoordinatorClient.getHandlers();
for (TransactionMetaStoreHandler handler : handlers) {
handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
@@ -168,11 +165,8 @@ public class TransactionClientConnectTest extends
TransactionTestBase {
public void testHandlerStateChangeToReady() throws Exception {
TransactionCoordinatorClientImpl transactionCoordinatorClient =
((PulsarClientImpl) pulsarClient).getTcClient();
- Field field =
TransactionCoordinatorClientImpl.class.getDeclaredField("handlers");
- field.setAccessible(true);
- TransactionMetaStoreHandler[] handlers =
- (TransactionMetaStoreHandler[])
field.get(transactionCoordinatorClient);
- TransactionMetaStoreHandler transactionMetaStoreHandler = handlers[0];
+ TransactionMetaStoreHandler transactionMetaStoreHandler =
+ transactionCoordinatorClient.getHandlers().iterator().next();
Assert.assertEquals(transactionMetaStoreHandler.getConnectHandleState(),
HandlerState.State.Ready);
Assert.assertTrue(transactionMetaStoreHandler.changeToReadyState());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index ff12f01b40f..16d78585531 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -190,6 +190,12 @@ public class ClientCnx extends PulsarHandler {
.concurrencyLevel(1)
.build();
+ private final ConcurrentLongHashMap<TcAssignmentsWatcherSession>
tcAssignmentsWatchers =
+ ConcurrentLongHashMap.<TcAssignmentsWatcherSession>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .build();
+
private final CompletableFuture<Void> connectionFuture = new
CompletableFuture<Void>();
private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new
ConcurrentLinkedQueue<>();
@@ -238,6 +244,8 @@ public class ClientCnx extends PulsarHandler {
private boolean supportsTopicWatcherReconcile;
@Getter
private boolean supportsScalableTopics;
+ @Getter
+ private boolean supportsTcMetadataDiscovery;
/** Idle stat. **/
@Getter
@@ -393,6 +401,7 @@ public class ClientCnx extends PulsarHandler {
dagWatchSessions.forEach((__, session) -> session.connectionClosed());
scalableConsumerSessions.forEach((__, session) ->
session.connectionClosed());
scalableTopicsWatchers.forEach((__, session) ->
session.connectionClosed());
+ tcAssignmentsWatchers.forEach((__, session) ->
session.connectionClosed());
waitingLookupRequests.clear();
@@ -402,6 +411,7 @@ public class ClientCnx extends PulsarHandler {
dagWatchSessions.clear();
scalableConsumerSessions.clear();
scalableTopicsWatchers.clear();
+ tcAssignmentsWatchers.clear();
timeoutTask.cancel(true);
}
@@ -458,6 +468,8 @@ public class ClientCnx extends PulsarHandler {
connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsTopicWatcherReconcile();
supportsScalableTopics =
connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsScalableTopics();
+ supportsTcMetadataDiscovery =
+ connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsTcMetadataDiscovery();
// set remote protocol version to the correct version before we
complete the connection future
setRemoteEndpointProtocolVersion(connected.getProtocolVersion());
@@ -1516,6 +1528,62 @@ public class ClientCnx extends PulsarHandler {
scalableTopicsWatchers.remove(watchId);
}
+ /** Client-side receiver for transaction-coordinator assignment snapshots.
*/
+ public interface TcAssignmentsWatcherSession {
+ void onSnapshot(int parallelism, java.util.Map<Long, String[]>
leaders);
+
+ void onError(org.apache.pulsar.common.api.proto.ServerError error,
String message);
+
+ void connectionClosed();
+ }
+
+ public void registerTcAssignmentsWatcher(long watchId,
TcAssignmentsWatcherSession watcher) {
+ tcAssignmentsWatchers.put(watchId, watcher);
+ }
+
+ public void removeTcAssignmentsWatcher(long watchId) {
+ tcAssignmentsWatchers.remove(watchId);
+ }
+
+ @Override
+ protected void handleCommandWatchTcAssignmentsUpdate(
+ org.apache.pulsar.common.api.proto.CommandWatchTcAssignmentsUpdate
cmd) {
+ checkArgument(state == State.Ready);
+ long watchId = cmd.getWatchId();
+ log.debug().attr("watchId", watchId).log("Received
WatchTcAssignmentsUpdate");
+
+ if (cmd.hasError()) {
+ TcAssignmentsWatcherSession session =
tcAssignmentsWatchers.remove(watchId);
+ if (session != null) {
+ session.onError(cmd.getError(), cmd.hasMessage() ?
cmd.getMessage() : null);
+ } else {
+ log.warn().attr("watchId", watchId)
+ .log("Received TC-assignments watch error for unknown
watcher");
+ }
+ return;
+ }
+
+ TcAssignmentsWatcherSession session =
tcAssignmentsWatchers.get(watchId);
+ if (session == null) {
+ log.warn().attr("watchId", watchId)
+ .log("Received TC-assignments watch update for unknown
watcher");
+ return;
+ }
+ if (!cmd.hasSnapshot()) {
+ log.warn().attr("watchId", watchId).log("TC-assignments update
with no snapshot payload");
+ return;
+ }
+ var snapshot = cmd.getSnapshot();
+ java.util.Map<Long, String[]> leaders = new java.util.HashMap<>();
+ for (int i = 0; i < snapshot.getAssignmentsCount(); i++) {
+ var a = snapshot.getAssignmentAt(i);
+ leaders.put(a.getTcId(), new String[] {
+ a.hasBrokerServiceUrl() ? a.getBrokerServiceUrl() : null,
+ a.hasBrokerServiceUrlTls() ? a.getBrokerServiceUrlTls() :
null});
+ }
+ session.onSnapshot(snapshot.getParallelism(), leaders);
+ }
+
/**
* check serverError and take appropriate action.
* <ul>
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 4a704be7e47..e398b0583fa 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -58,6 +58,10 @@ public class ConnectionHandler {
protected final int randomKeyForSelectConnection;
private volatile Boolean useProxy;
+ // The explicit target broker for connections that bypass topic lookup (v5
TC metadata-store
+ // discovery). Remembered so the error-retry path (reconnectLater)
re-dials the same leader
+ // instead of falling back to the service URL. Null means "use the normal
lookup path".
+ private volatile URI explicitHostURI;
interface Connection {
@@ -96,6 +100,18 @@ public class ConnectionHandler {
grabCnx(Optional.empty());
}
+ /**
+ * Connect to a specific broker {@code hostURI}, routing through the proxy
when {@code useProxy}
+ * is true (logical = the broker, physical = the proxy) or directly
otherwise. Used by the v5
+ * transaction coordinator's metadata-store discovery, where the elected
leader's address is
+ * known but, behind a proxy, isn't directly reachable.
+ */
+ protected void grabCnx(URI hostURI, boolean useProxy) {
+ this.useProxy = useProxy;
+ this.explicitHostURI = hostURI;
+ grabCnx(Optional.of(hostURI));
+ }
+
protected void grabCnx(Optional<URI> hostURI) {
if (!duringConnect.compareAndSet(false, true)) {
log.info().log("Skip grabbing the connection since there is a
pending connection");
@@ -189,7 +205,15 @@ public class ConnectionHandler {
if (state.changeToConnecting()) {
state.client.timer().newTimeout(timeout -> {
log.info("Reconnecting after connection was closed");
- grabCnx();
+ // Re-dial the explicit leader target (v5 TC discovery) if
set; otherwise the normal
+ // lookup path. Without this, a first-attempt failure during
failover would fall back
+ // to the service URL and never reach the partition's new
leader.
+ URI target = explicitHostURI;
+ if (target != null) {
+ grabCnx(Optional.of(target));
+ } else {
+ grabCnx();
+ }
}, delayMs, TimeUnit.MILLISECONDS);
} else {
log.info("Ignoring reconnection request");
@@ -203,6 +227,9 @@ public class ConnectionHandler {
public void connectionClosed(ClientCnx cnx, Optional<Long>
initialConnectionDelayMs, Optional<URI> hostUrl) {
lastConnectionClosedTimestamp = System.currentTimeMillis();
duringConnect.set(false);
+ // Remember an explicit reconnect target so a later first-attempt
failure (reconnectLater)
+ // re-dials the same broker rather than falling back to the service
URL.
+ hostUrl.ifPresent(uri -> this.explicitHostURI = uri);
state.client.getCnxPool().releaseConnection(cnx);
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
if (!state.changeToConnecting()) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
index e9f14a06768..ef604cd4d19 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
@@ -55,6 +55,13 @@ abstract class HandlerState {
protected void setRedirectedClusterURI(String serviceUrl, String
serviceUrlTls) throws URISyntaxException {
String url = client.conf.isUseTls() &&
StringUtils.isNotBlank(serviceUrlTls) ? serviceUrlTls : serviceUrl;
+ if (StringUtils.isBlank(url)) {
+ // e.g. a non-TLS client given a TLS-only endpoint (or vice
versa). Surface a clear,
+ // catchable error rather than letting new URI(null) throw an NPE.
+ throw new URISyntaxException(String.valueOf(url),
+ "No usable service URL (useTls=" + client.conf.isUseTls()
+ + ", serviceUrl=" + serviceUrl + ",
serviceUrlTls=" + serviceUrlTls + ")");
+ }
this.redirectedClusterURI = new URI(url);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index f2731c6b74b..ae52cfd0550 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -27,8 +27,10 @@ import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.io.Closeable;
import java.io.IOException;
+import java.net.URI;
import java.time.Duration;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
@@ -90,11 +92,37 @@ public class TransactionMetaStoreHandler extends
HandlerState
private final long lookupDeadline;
private final AtomicInteger previousExceptionCount = new AtomicInteger();
+ // Metadata-store discovery (watch mode): the elected leader broker for
this coordinator and
+ // whether it must be reached through the proxy. Null leaderUri means
assign-topic mode.
+ private volatile URI leaderUri;
+ private volatile boolean useProxy;
+
public TransactionMetaStoreHandler(long transactionCoordinatorId,
PulsarClientImpl pulsarClient, String topic,
CompletableFuture<Void> connectFuture) {
+ this(transactionCoordinatorId, pulsarClient, topic, null, false,
connectFuture);
+ }
+
+ /**
+ * Construct a handler that connects to a fixed leader broker
(metadata-store discovery) rather
+ * than resolving a coordinator via an assign-topic lookup. The leader
address is dialled
+ * through the proxy when {@code useProxy} is true (the broker URL isn't
directly reachable
+ * behind a proxy) or directly otherwise. Use {@link #retargetLeader} when
the elected leader
+ * changes.
+ */
+ public TransactionMetaStoreHandler(long transactionCoordinatorId,
PulsarClientImpl pulsarClient,
+ URI leaderUri, boolean useProxy,
+ CompletableFuture<Void> connectFuture) {
+ this(transactionCoordinatorId, pulsarClient, null, leaderUri,
useProxy, connectFuture);
+ }
+
+ private TransactionMetaStoreHandler(long transactionCoordinatorId,
PulsarClientImpl pulsarClient, String topic,
+ URI leaderUri, boolean useProxy,
+ CompletableFuture<Void> connectFuture)
{
super(pulsarClient, topic);
+ this.leaderUri = leaderUri;
+ this.useProxy = useProxy;
this.transactionCoordinatorId = transactionCoordinatorId;
this.timeoutQueue = new ConcurrentLinkedQueue<>();
this.blockIfReachMaxPendingOps = true;
@@ -117,7 +145,13 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
public void start() {
- this.connectionHandler.grabCnx();
+ if (leaderUri != null) {
+ // Metadata-store discovery: dial the elected leader (through the
proxy if needed).
+ this.connectionHandler.grabCnx(leaderUri, useProxy);
+ } else {
+ // Assign-topic discovery: resolve the coordinator via a topic
lookup.
+ this.connectionHandler.grabCnx();
+ }
}
@Override
@@ -799,8 +833,35 @@ public class TransactionMetaStoreHandler extends
HandlerState
return this.connectionHandler.cnx();
}
+ /**
+ * Point this handler at a (possibly new) elected leader broker and
reconnect. Called by the
+ * metadata-store discovery when an assignment snapshot moves this
coordinator's leadership to a
+ * different broker. If the leader and proxy-mode are unchanged and the
handler is already
+ * connected, this is a no-op; otherwise it (re)connects to the new leader.
+ */
+ public void retargetLeader(URI newLeaderUri, boolean newUseProxy) {
+ if (newLeaderUri.equals(this.leaderUri) && newUseProxy ==
this.useProxy && cnx() != null) {
+ return;
+ }
+ this.leaderUri = newLeaderUri;
+ this.useProxy = newUseProxy;
+ ClientCnx current = cnx();
+ if (current != null) {
+ // Drop the current connection; connectionClosed re-grabs against
the new leader.
+ current.channel().close();
+ } else {
+ connectionHandler.grabCnx(newLeaderUri, newUseProxy);
+ }
+ }
+
void connectionClosed(ClientCnx cnx) {
- this.connectionHandler.connectionClosed(cnx);
+ if (leaderUri != null) {
+ // Metadata-store discovery: reconnect to the elected leader (via
the proxy if needed),
+ // not the configured service URL. useProxy is preserved on the
ConnectionHandler.
+ this.connectionHandler.connectionClosed(cnx, Optional.empty(),
Optional.of(leaderUri));
+ } else {
+ this.connectionHandler.connectionClosed(cnx);
+ }
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/AssignTopicTcDiscovery.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/AssignTopicTcDiscovery.java
new file mode 100644
index 00000000000..442bd0b79a9
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/AssignTopicTcDiscovery.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.transaction;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.CustomLog;
+import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
+import org.apache.pulsar.client.util.MathUtils;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+
+/**
+ * Coordinator discovery via the {@code transaction_coordinator_assign}
partitioned topic — the
+ * original mechanism. Each coordinator is a partition of the assign topic;
the handler connects to
+ * the broker that owns that partition's bundle (resolved by an ordinary topic
lookup). Used against
+ * brokers that don't advertise {@code supports_tc_metadata_discovery}.
+ */
+@CustomLog
+class AssignTopicTcDiscovery implements TcDiscovery {
+
+ private final PulsarClientImpl pulsarClient;
+ private TransactionMetaStoreHandler[] handlers;
+ private final ConcurrentLongHashMap<TransactionMetaStoreHandler>
handlerMap =
+ ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
+ private final AtomicLong epoch = new AtomicLong(0);
+
+ AssignTopicTcDiscovery(PulsarClientImpl pulsarClient) {
+ this.pulsarClient = pulsarClient;
+ }
+
+ @Override
+ public CompletableFuture<Void> start() {
+ return pulsarClient.getPartitionedTopicMetadata(
+
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(),
true, false)
+ .thenCompose(partitionMeta -> {
+ List<CompletableFuture<Void>> connectFutureList = new
ArrayList<>();
+ log.debug().attr("partitions", partitionMeta.partitions)
+ .log("Transaction meta store assign partition
is.");
+ if (partitionMeta.partitions > 0) {
+ handlers = new
TransactionMetaStoreHandler[partitionMeta.partitions];
+ for (int i = 0; i < partitionMeta.partitions; i++) {
+ CompletableFuture<Void> connectFuture = new
CompletableFuture<>();
+ connectFutureList.add(connectFuture);
+ TransactionMetaStoreHandler handler = new
TransactionMetaStoreHandler(
+ i, pulsarClient, getTCAssignTopicName(i),
connectFuture);
+ handlers[i] = handler;
+ handlerMap.put(i, handler);
+ handler.start();
+ }
+ } else {
+ return FutureUtil.failedFuture(new
TransactionCoordinatorClientException(
+ "The broker doesn't enable the transaction
coordinator, "
+ + "or the transaction coordinator has
not initialized"));
+ }
+ return FutureUtil.waitForAll(connectFutureList);
+ });
+ }
+
+ private static String getTCAssignTopicName(int partition) {
+ return SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
+ + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
+ }
+
+ @Override
+ public TransactionMetaStoreHandler handlerForCoordinator(long tcId) {
+ return handlerMap.get(tcId);
+ }
+
+ @Override
+ public TransactionMetaStoreHandler nextHandler() {
+ if (handlers == null || handlers.length == 0) {
+ return null;
+ }
+ int index = MathUtils.signSafeMod(epoch.incrementAndGet(),
handlers.length);
+ return handlers[index];
+ }
+
+ @Override
+ public java.util.Collection<TransactionMetaStoreHandler> handlers() {
+ TransactionMetaStoreHandler[] snapshot = handlers;
+ return snapshot == null ? java.util.List.of() :
java.util.List.of(snapshot);
+ }
+
+ @Override
+ public void close() {
+ if (handlers != null) {
+ for (TransactionMetaStoreHandler handler : handlers) {
+ try {
+ handler.close();
+ } catch (IOException e) {
+ log.warn().exception(e).log("Close transaction meta store
handler error");
+ }
+ }
+ handlers = null;
+ }
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TcDiscovery.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TcDiscovery.java
new file mode 100644
index 00000000000..476823b2868
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TcDiscovery.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.transaction;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
+
+/**
+ * Strategy for locating transaction-coordinator instances and giving the
+ * {@link TransactionCoordinatorClientImpl} a {@link
TransactionMetaStoreHandler} per coordinator.
+ *
+ * <p>The discriminator is the per-connection {@code
supports_tc_metadata_discovery} feature flag,
+ * read once at {@link TransactionCoordinatorClientImpl#startAsync()}:
+ * <ul>
+ * <li>{@link AssignTopicTcDiscovery} — the original mechanism: discover
coordinators via a
+ * lookup on the {@code transaction_coordinator_assign} partitioned
topic. Used against
+ * brokers that don't advertise the flag (legacy/v4 coordinator, or v5
coordinator
+ * disabled).</li>
+ * <li>{@link WatchTcAssignmentsDiscovery} — the metadata-store election
mechanism: open one
+ * assignment watch and point each handler at its partition's elected
leader broker. Used
+ * against brokers that advertise the flag.</li>
+ * </ul>
+ *
+ * <p>The handler-routing surface ({@code newTransaction} round-robin, {@code
commit}/{@code abort}
+ * by {@code TxnID.mostSigBits}) is shared and lives in {@link
TransactionCoordinatorClientImpl};
+ * only coordinator <em>location</em> differs between strategies.
+ */
+interface TcDiscovery extends AutoCloseable {
+
+ /**
+ * Discover the coordinators and create their handlers. Completes when
every handler has
+ * connected to its coordinator.
+ */
+ CompletableFuture<Void> start();
+
+ /**
+ * @return the handler for coordinator {@code tcId} (= {@code
TxnID.mostSigBits}), or
+ * {@code null} if no such coordinator exists in the current
assignment.
+ */
+ TransactionMetaStoreHandler handlerForCoordinator(long tcId);
+
+ /**
+ * @return the handler for the next transaction, chosen round-robin across
coordinators, or
+ * {@code null} if there are no coordinators available.
+ */
+ TransactionMetaStoreHandler nextHandler();
+
+ /** @return all current coordinator handlers. Visible for testing. */
+ Collection<TransactionMetaStoreHandler> handlers();
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 91a0fa5c6ec..6fe938d2d62 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -18,13 +18,12 @@
*/
package org.apache.pulsar.client.impl.transaction;
-import java.io.IOException;
-import java.util.ArrayList;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.CustomLog;
import org.apache.pulsar.client.api.PulsarClient;
@@ -34,28 +33,23 @@ import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientExce
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
-import org.apache.pulsar.client.util.MathUtils;
import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.api.proto.TxnAction;
-import org.apache.pulsar.common.naming.SystemTopicNames;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
/**
- * Transaction coordinator client based topic assigned.
+ * Transaction coordinator client. Coordinator <em>location</em> is delegated
to a {@link TcDiscovery}
+ * strategy chosen from the broker's {@code supports_tc_metadata_discovery}
feature flag at
+ * {@link #startAsync()}: {@link WatchTcAssignmentsDiscovery} when the broker
advertises the
+ * metadata-store election, else {@link AssignTopicTcDiscovery} (the
assign-topic mechanism). The
+ * routing surface here ({@code newTransaction} round-robin, {@code
commit}/{@code abort} by
+ * {@code TxnID.mostSigBits}) is the same for both.
*/
@CustomLog
public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorClient {
private final PulsarClientImpl pulsarClient;
- private TransactionMetaStoreHandler[] handlers;
- private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap =
- ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
- .expectedItems(16)
- .concurrencyLevel(1)
- .build();
- private final AtomicLong epoch = new AtomicLong(0);
+ private volatile TcDiscovery discovery;
private static final
AtomicReferenceFieldUpdater<TransactionCoordinatorClientImpl, State>
STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(TransactionCoordinatorClientImpl.class,
State.class, "state");
@@ -77,42 +71,50 @@ public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorC
@Override
public CompletableFuture<Void> startAsync() {
if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
- return pulsarClient.getPartitionedTopicMetadata(
-
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(),
true, false)
- .thenCompose(partitionMeta -> {
- List<CompletableFuture<Void>> connectFutureList = new
ArrayList<>();
- log.debug().attr("partitions",
partitionMeta.partitions)
- .log("Transaction meta store assign partition
is.");
- if (partitionMeta.partitions > 0) {
- handlers = new
TransactionMetaStoreHandler[partitionMeta.partitions];
- for (int i = 0; i < partitionMeta.partitions; i++) {
- CompletableFuture<Void> connectFuture = new
CompletableFuture<>();
- connectFutureList.add(connectFuture);
- TransactionMetaStoreHandler handler = new
TransactionMetaStoreHandler(
- i, pulsarClient, getTCAssignTopicName(i),
connectFuture);
- handlers[i] = handler;
- handlerMap.put(i, handler);
- handler.start();
- }
- } else {
- return FutureUtil.failedFuture(new
TransactionCoordinatorClientException(
- "The broker doesn't enable the transaction
coordinator, "
- + "or the transaction coordinator has
not initialized"));
- }
-
- STATE_UPDATER.set(TransactionCoordinatorClientImpl.this,
State.READY);
-
- return FutureUtil.waitForAll(connectFutureList);
- });
+ return selectDiscovery()
+ .thenCompose(selected -> {
+ this.discovery = selected;
+ log.info().attr("discovery",
selected.getClass().getSimpleName())
+ .log("Transaction coordinator discovery
selected");
+ return selected.start();
+ })
+ .thenRun(() -> STATE_UPDATER.set(this, State.READY));
} else {
return FutureUtil.failedFuture(
new CoordinatorClientStateException("Can not start while
current state is " + state));
}
}
- private String getTCAssignTopicName(int partition) {
- return SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
- + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
+ /**
+ * Choose the discovery strategy. The metadata-store assignment watch
needs a binary-protocol
+ * connection, so it's only usable when the client is configured with a
{@code pulsar://}
+ * service URL; with an {@code http://} service URL we always use the
assign-topic flow (which
+ * resolves coordinators via the admin/HTTP-capable partitioned-metadata
lookup). When binary
+ * lookup is available, probe the broker's {@code
supports_tc_metadata_discovery} feature flag to
+ * decide; if the broker doesn't advertise it (old broker, or
scalable-topics TC disabled), fall
+ * back to the assign-topic flow.
+ */
+ private CompletableFuture<TcDiscovery> selectDiscovery() {
+ if (!pulsarClient.getLookup().isBinaryProtoLookupService()) {
+ return CompletableFuture.completedFuture(new
AssignTopicTcDiscovery(pulsarClient));
+ }
+ // Probe a broker connection to read the feature flag. Use
getAnyBrokerProxyConnection() (not
+ // getConnectionToServiceUrl()): when connecting through a proxy, the
latter yields the proxy's
+ // own CONNECTED, which carries the proxy lookup handshake's flags
rather than a broker's;
+ // getAnyBrokerProxyConnection() pairs to an actual broker (directly
or proxied) so the
+ // forwarded feature flags reflect the broker — the same connection
the watch itself uses.
+ // If the probe fails, fall back to the assign-topic flow, whose
lookup retries across hosts
+ // and still works against v5 brokers (the assign topic exists during
the deprecation window),
+ // so falling back is always safe.
+ return pulsarClient.getAnyBrokerProxyConnection()
+ .thenApply(cnx -> cnx.isSupportsTcMetadataDiscovery()
+ ? (TcDiscovery) new
WatchTcAssignmentsDiscovery(pulsarClient)
+ : new AssignTopicTcDiscovery(pulsarClient))
+ .exceptionally(ex -> {
+ log.info().exception(ex)
+ .log("TC discovery feature probe failed; using
assign-topic discovery");
+ return new AssignTopicTcDiscovery(pulsarClient);
+ });
}
@Override
@@ -131,16 +133,14 @@ public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorC
log.warn("The transaction meta store is closing or closed, doing
nothing.");
result.complete(null);
} else {
- if (handlers != null) {
- for (TransactionMetaStoreHandler handler : handlers) {
- try {
- handler.close();
- } catch (IOException e) {
- log.warn().exception(e).log("Close transaction meta
store handler error");
- }
+ if (discovery != null) {
+ try {
+ discovery.close();
+ } catch (Exception e) {
+ log.warn().exception(e).log("Close transaction coordinator
discovery error");
}
+ discovery = null;
}
- this.handlers = null;
result.complete(null);
}
return result;
@@ -171,7 +171,12 @@ public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorC
@Override
public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit
unit) {
- return nextHandler().newTransactionAsync(timeout, unit);
+ TransactionMetaStoreHandler handler = discovery.nextHandler();
+ if (handler == null) {
+ return FutureUtil.failedFuture(new
TransactionCoordinatorClientException(
+ "No transaction coordinator is currently available"));
+ }
+ return handler.newTransactionAsync(timeout, unit);
}
@Override
@@ -186,7 +191,7 @@ public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorC
@Override
public CompletableFuture<Void> addPublishPartitionToTxnAsync(TxnID txnID,
List<String> partitions) {
- TransactionMetaStoreHandler handler =
handlerMap.get(txnID.getMostSigBits());
+ TransactionMetaStoreHandler handler =
discovery.handlerForCoordinator(txnID.getMostSigBits());
if (handler == null) {
return FutureUtil.failedFuture(
new
TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(
@@ -207,7 +212,7 @@ public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorC
@Override
public CompletableFuture<Void> addSubscriptionToTxnAsync(TxnID txnID,
String topic, String subscription) {
- TransactionMetaStoreHandler handler =
handlerMap.get(txnID.getMostSigBits());
+ TransactionMetaStoreHandler handler =
discovery.handlerForCoordinator(txnID.getMostSigBits());
if (handler == null) {
return FutureUtil.failedFuture(
new
TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(
@@ -230,7 +235,7 @@ public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorC
@Override
public CompletableFuture<Void> commitAsync(TxnID txnID) {
- TransactionMetaStoreHandler handler =
handlerMap.get(txnID.getMostSigBits());
+ TransactionMetaStoreHandler handler =
discovery.handlerForCoordinator(txnID.getMostSigBits());
if (handler == null) {
return FutureUtil.failedFuture(
new
TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(
@@ -250,7 +255,7 @@ public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorC
@Override
public CompletableFuture<Void> abortAsync(TxnID txnID) {
- TransactionMetaStoreHandler handler =
handlerMap.get(txnID.getMostSigBits());
+ TransactionMetaStoreHandler handler =
discovery.handlerForCoordinator(txnID.getMostSigBits());
if (handler == null) {
return FutureUtil.failedFuture(
new
TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(
@@ -264,8 +269,19 @@ public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorC
return state;
}
- private TransactionMetaStoreHandler nextHandler() {
- int index = MathUtils.signSafeMod(epoch.incrementAndGet(),
handlers.length);
- return handlers[index];
+ /** @return the current coordinator handlers. Visible for testing. */
+ @VisibleForTesting
+ public Collection<TransactionMetaStoreHandler> getHandlers() {
+ return discovery == null ? List.of() : discovery.handlers();
+ }
+
+ /**
+ * @return {@code true} if coordinator discovery uses the metadata-store
assignment watch (rather
+ * than the assign-topic fallback). Visible for testing so integration
tests can assert the
+ * watch path was actually exercised.
+ */
+ @VisibleForTesting
+ public boolean isUsingMetadataDiscovery() {
+ return discovery instanceof WatchTcAssignmentsDiscovery;
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/WatchTcAssignmentsDiscovery.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/WatchTcAssignmentsDiscovery.java
new file mode 100644
index 00000000000..e1d8609edf1
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/WatchTcAssignmentsDiscovery.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.transaction;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.CustomLog;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
+import org.apache.pulsar.client.util.MathUtils;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.Backoff;
+
+/**
+ * Coordinator discovery via the metadata-store leader election. Opens a single
+ * {@code CommandWatchTcAssignments} watch on a service-URL connection; the
broker replies with the
+ * full {@code partition -> leader} snapshot and re-pushes it on every
leadership change. Each
+ * coordinator gets one {@link TransactionMetaStoreHandler} pointed at its
elected leader broker —
+ * dialled through the proxy when the watch connection is proxied, directly
otherwise (no
+ * per-coordinator lookup); when a snapshot moves a coordinator's leader, the
handler is retargeted.
+ * Used against brokers that advertise {@code supports_tc_metadata_discovery}.
+ */
+@CustomLog
+class WatchTcAssignmentsDiscovery implements TcDiscovery,
ClientCnx.TcAssignmentsWatcherSession {
+
+ private static final AtomicLong WATCH_ID_GENERATOR = new AtomicLong(0);
+
+ private final PulsarClientImpl pulsarClient;
+ private final long watchId = WATCH_ID_GENERATOR.incrementAndGet();
+ private final Backoff reconnectBackoff;
+
+ private final Map<Long, TransactionMetaStoreHandler> handlers = new
ConcurrentHashMap<>();
+ private final AtomicLong epoch = new AtomicLong(0);
+ private volatile int parallelism;
+
+ private final CompletableFuture<Void> initialSnapshotFuture = new
CompletableFuture<>();
+ private volatile ClientCnx cnx;
+ private volatile boolean closed;
+ private volatile long initialOpenDeadline;
+ private volatile boolean useProxy;
+
+ WatchTcAssignmentsDiscovery(PulsarClientImpl pulsarClient) {
+ this.pulsarClient = pulsarClient;
+ this.reconnectBackoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(100))
+ .maxBackoff(Duration.ofSeconds(30))
+ .build();
+ }
+
+ @Override
+ public CompletableFuture<Void> start() {
+ // Bound initial-open retries by the client's lookup timeout (fall
back to operation timeout
+ // when unset, mirroring TransactionMetaStoreHandler). A transient
failure on the watch broker
+ // (e.g. ServiceNotReady while its TC initializes) then retries rather
than hard-failing
+ // transaction-client startup.
+ long lookupTimeoutMs =
pulsarClient.getConfiguration().getLookupTimeoutMs();
+ if (lookupTimeoutMs < 0) {
+ lookupTimeoutMs =
pulsarClient.getConfiguration().getOperationTimeoutMs();
+ }
+ this.initialOpenDeadline = System.currentTimeMillis() +
lookupTimeoutMs;
+ openWatch();
+ return initialSnapshotFuture;
+ }
+
+ private void openWatch() {
+ if (closed) {
+ return;
+ }
+ pulsarClient.getAnyBrokerProxyConnection()
+ .thenAccept(this::attach)
+ .exceptionally(ex -> {
+ onAttachFailure(ex);
+ return null;
+ });
+ }
+
+ private void attach(ClientCnx newCnx) {
+ if (closed) {
+ return;
+ }
+ if (!newCnx.isSupportsTcMetadataDiscovery()) {
+ // The broker we landed on doesn't support the watch. On the very
first open this is a
+ // hard failure (the caller chose this strategy on a probe that
said it was supported);
+ // after that, it's likely transient — config drift or we hit a
different/old broker —
+ // so reconnect to find a supporting broker rather than freezing
on the last snapshot.
+ onAttachFailure(new PulsarClientException(
+ "Broker does not support metadata-store TC discovery"));
+ return;
+ }
+ this.cnx = newCnx;
+ // Behind a proxy the leader's advertised broker address isn't
directly reachable; handlers
+ // must dial it through the proxy. The watch connection tells us which
mode we're in.
+ this.useProxy = newCnx.isProxied();
+ newCnx.registerTcAssignmentsWatcher(watchId, this);
+ newCnx.ctx().writeAndFlush(Commands.newWatchTcAssignments(watchId))
+ .addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ newCnx.removeTcAssignmentsWatcher(watchId);
+ onAttachFailure(writeFuture.cause());
+ }
+ });
+ }
+
+ private void onAttachFailure(Throwable ex) {
+ if (closed) {
+ return;
+ }
+ if (!initialSnapshotFuture.isDone()) {
+ // During initial open, retry retryable failures until the lookup
deadline rather than
+ // failing transaction-client startup outright — the probe and the
watch can land on
+ // different brokers, and the watch broker may be briefly
not-ready.
+ Throwable cause = ex instanceof
java.util.concurrent.CompletionException && ex.getCause() != null
+ ? ex.getCause() : ex;
+ boolean retryable = !(cause instanceof PulsarClientException)
+ ||
PulsarClientException.isRetriableError((PulsarClientException) cause);
+ if (retryable && System.currentTimeMillis() < initialOpenDeadline)
{
+ log.warn().exception(cause).log("TC-assignments watch open
failed; retrying");
+ scheduleReconnect();
+ return;
+ }
+ initialSnapshotFuture.completeExceptionally(
+ PulsarClientException.wrap(ex, "Failed to open
TC-assignments watch"));
+ return;
+ }
+ scheduleReconnect();
+ }
+
+ @Override
+ public void onSnapshot(int newParallelism, Map<Long, String[]> leaders) {
+ if (closed) {
+ return;
+ }
+ reconnectBackoff.reset();
+ this.parallelism = newParallelism;
+ // Apply the full snapshot: create handlers for newly-seen
coordinators, retarget existing
+ // ones whose leader moved. A coordinator absent from the snapshot is
mid-election; leave its
+ // handler in place to retry against its last-known leader until the
next snapshot.
+ boolean proxy = this.useProxy;
+ for (Map.Entry<Long, String[]> e : leaders.entrySet()) {
+ long tcId = e.getKey();
+ URI leaderUri = selectLeaderUri(e.getValue()[0], e.getValue()[1]);
+ try {
+ handlers.compute(tcId, (id, existing) -> {
+ if (existing == null) {
+ TransactionMetaStoreHandler handler = new
TransactionMetaStoreHandler(
+ id, pulsarClient, leaderUri, proxy, new
CompletableFuture<>());
+ handler.start();
+ return handler;
+ }
+ existing.retargetLeader(leaderUri, proxy);
+ return existing;
+ });
+ } catch (RuntimeException ex) {
+ // A bad/unusable leader URL for one partition (e.g. a
TLS-only leader for a non-TLS
+ // client) must not abort applying the rest of the snapshot or
tear down the watch.
+ log.warn().attr("tcId", tcId).exception(ex)
+ .log("Skipping TC assignment with unusable leader
URL");
+ }
+ }
+ if (!initialSnapshotFuture.isDone()) {
+ initialSnapshotFuture.complete(null);
+ }
+ }
+
+ @Override
+ public void onError(ServerError error, String message) {
+ log.warn().attr("error", error).attr("message",
message).log("WatchTcAssignments rejected");
+ if (!initialSnapshotFuture.isDone()) {
+ initialSnapshotFuture.completeExceptionally(new
PulsarClientException(
+ "WatchTcAssignments failed: " + error + (message != null ?
" - " + message : "")));
+ return;
+ }
+ // Post-initial error: ClientCnx has already removed this session, so
no connectionClosed()
+ // will fire to drive recovery. Reconnect ourselves so a transient
rejection (ServiceNotReady
+ // while a broker re-initializes, or transient config drift) can't
freeze the watch forever.
+ cnx = null;
+ scheduleReconnect();
+ }
+
+ @Override
+ public void connectionClosed() {
+ cnx = null;
+ if (closed) {
+ return;
+ }
+ if (!initialSnapshotFuture.isDone()) {
+ initialSnapshotFuture.completeExceptionally(new
PulsarClientException(
+ "Connection closed before initial TC-assignments snapshot
arrived"));
+ return;
+ }
+ scheduleReconnect();
+ }
+
+ private void scheduleReconnect() {
+ if (closed) {
+ return;
+ }
+ long delayMs = reconnectBackoff.next().toMillis();
+ pulsarClient.timer().newTimeout(timeout -> openWatch(), delayMs,
TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Pick the leader URL matching the client's TLS setting and parse it.
Throws if no usable URL is
+ * present (e.g. a non-TLS client and a TLS-only leader) — the caller
skips that partition rather
+ * than tearing down the watch.
+ */
+ private URI selectLeaderUri(String url, String urlTls) {
+ boolean tls = pulsarClient.getConfiguration().isUseTls();
+ String chosen = tls && urlTls != null && !urlTls.isBlank() ? urlTls :
url;
+ if (chosen == null || chosen.isBlank()) {
+ throw new IllegalArgumentException("No usable leader URL (useTls="
+ tls
+ + ", url=" + url + ", urlTls=" + urlTls + ")");
+ }
+ return URI.create(chosen);
+ }
+
+ @Override
+ public TransactionMetaStoreHandler handlerForCoordinator(long tcId) {
+ return handlers.get(tcId);
+ }
+
+ @Override
+ public TransactionMetaStoreHandler nextHandler() {
+ int n = parallelism;
+ if (n <= 0) {
+ return null;
+ }
+ // Round-robin over coordinator ids 0..parallelism-1, skipping any
mid-election gap.
+ for (int attempt = 0; attempt < n; attempt++) {
+ long tcId = MathUtils.signSafeMod(epoch.incrementAndGet(), n);
+ TransactionMetaStoreHandler handler = handlers.get(tcId);
+ if (handler != null) {
+ return handler;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public java.util.Collection<TransactionMetaStoreHandler> handlers() {
+ return new java.util.ArrayList<>(handlers.values());
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ ClientCnx c = cnx;
+ if (c != null) {
+ c.removeTcAssignmentsWatcher(watchId);
+
c.ctx().writeAndFlush(Commands.newWatchTcAssignmentsClose(watchId));
+ }
+ handlers.values().forEach(handler -> {
+ try {
+ handler.close();
+ } catch (Exception e) {
+ log.warn().exception(e).log("Close transaction meta store
handler error");
+ }
+ });
+ handlers.clear();
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index e6cb2605315..3b8307dcfcb 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -313,6 +313,13 @@ public class Commands {
public static BaseCommand newConnectedCommand(int clientProtocolVersion,
int maxMessageSize,
boolean
supportsTopicWatchers, boolean supportsScalableTopics) {
+ return newConnectedCommand(clientProtocolVersion, maxMessageSize,
supportsTopicWatchers,
+ supportsScalableTopics, false);
+ }
+
+ public static BaseCommand newConnectedCommand(int clientProtocolVersion,
int maxMessageSize,
+ boolean
supportsTopicWatchers, boolean supportsScalableTopics,
+ boolean
supportsTcMetadataDiscovery) {
BaseCommand cmd = localCmd(Type.CONNECTED);
CommandConnected connected = cmd.setConnected()
.setServerVersion("Pulsar Server" +
PulsarVersion.getVersion());
@@ -333,6 +340,7 @@ public class Commands {
connected.setFeatureFlags().setSupportsReplDedupByLidAndEid(true);
connected.setFeatureFlags().setSupportsTopicWatcherReconcile(supportsTopicWatchers);
connected.setFeatureFlags().setSupportsScalableTopics(supportsScalableTopics);
+
connected.setFeatureFlags().setSupportsTcMetadataDiscovery(supportsTcMetadataDiscovery);
return cmd;
}
@@ -342,6 +350,12 @@ public class Commands {
supportsScalableTopics));
}
+ public static ByteBuf newConnected(int clientProtocolVersion, int
maxMessageSize, boolean supportsTopicWatchers,
+ boolean supportsScalableTopics, boolean
supportsTcMetadataDiscovery) {
+ return serializeWithSize(newConnectedCommand(clientProtocolVersion,
maxMessageSize, supportsTopicWatchers,
+ supportsScalableTopics, supportsTcMetadataDiscovery));
+ }
+
public static ByteBuf newAuthChallenge(String authMethod, AuthData
brokerData, int clientProtocolVersion) {
BaseCommand cmd = localCmd(Type.AUTH_CHALLENGE);
CommandAuthChallenge challenge = cmd.setAuthChallenge();
@@ -1869,6 +1883,58 @@ public class Commands {
return serializeWithSize(cmd);
}
+ // --- Transaction-coordinator assignment watch ---
+
+ /** Client -> Broker: open the TC-assignment watch. */
+ public static ByteBuf newWatchTcAssignments(long watchId) {
+ BaseCommand cmd = localCmd(Type.WATCH_TC_ASSIGNMENTS);
+ cmd.setWatchTcAssignments().setWatchId(watchId);
+ return serializeWithSize(cmd);
+ }
+
+ /** Client -> Broker: close the TC-assignment watch. */
+ public static ByteBuf newWatchTcAssignmentsClose(long watchId) {
+ BaseCommand cmd = localCmd(Type.WATCH_TC_ASSIGNMENTS_CLOSE);
+ cmd.setWatchTcAssignmentsClose().setWatchId(watchId);
+ return serializeWithSize(cmd);
+ }
+
+ /**
+ * Broker -> Client: emit the full {@code partition -> leader} snapshot.
Sent on initial watch
+ * and again, in full, on every leadership change. A partition currently
mid-election is simply
+ * absent from {@code leaders}; the client parks transactions routed there
until a later
+ * snapshot fills it in. URLs in a leader entry may be {@code null}
(broker advertises only one
+ * of plaintext / TLS).
+ *
+ * @param leaders partition -> {brokerServiceUrl, brokerServiceUrlTls}
+ */
+ public static ByteBuf newWatchTcAssignmentsSnapshot(long watchId, int
parallelism,
+ java.util.Map<Integer, String[]> leaders) {
+ BaseCommand cmd = new
BaseCommand().setType(Type.WATCH_TC_ASSIGNMENTS_UPDATE);
+ var snapshot = cmd.setWatchTcAssignmentsUpdate().setWatchId(watchId)
+ .setSnapshot().setParallelism(parallelism);
+ for (var entry : leaders.entrySet()) {
+ String[] urls = entry.getValue();
+ var assignment = snapshot.addAssignment().setTcId(entry.getKey());
+ if (urls[0] != null) {
+ assignment.setBrokerServiceUrl(urls[0]);
+ }
+ if (urls[1] != null) {
+ assignment.setBrokerServiceUrlTls(urls[1]);
+ }
+ }
+ return serializeWithSize(cmd);
+ }
+
+ public static ByteBuf newWatchTcAssignmentsError(long watchId, ServerError
error, String message) {
+ BaseCommand cmd = new
BaseCommand().setType(Type.WATCH_TC_ASSIGNMENTS_UPDATE);
+ cmd.setWatchTcAssignmentsUpdate()
+ .setWatchId(watchId)
+ .setError(error)
+ .setMessage(message);
+ return serializeWithSize(cmd);
+ }
+
public static ByteBuf serializeWithSize(BaseCommand cmd) {
return serializeWithPrecalculatedSerializedSize(cmd,
cmd.getSerializedSize());
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index bca87683f27..9a412c1def1 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -529,6 +529,21 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
handleCommandWatchScalableTopicsClose(cmd.getWatchScalableTopicsClose());
break;
+ case WATCH_TC_ASSIGNMENTS:
+ checkArgument(cmd.hasWatchTcAssignments());
+ handleCommandWatchTcAssignments(cmd.getWatchTcAssignments());
+ break;
+
+ case WATCH_TC_ASSIGNMENTS_UPDATE:
+ checkArgument(cmd.hasWatchTcAssignmentsUpdate());
+
handleCommandWatchTcAssignmentsUpdate(cmd.getWatchTcAssignmentsUpdate());
+ break;
+
+ case WATCH_TC_ASSIGNMENTS_CLOSE:
+ checkArgument(cmd.hasWatchTcAssignmentsClose());
+
handleCommandWatchTcAssignmentsClose(cmd.getWatchTcAssignmentsClose());
+ break;
+
default:
break;
}
@@ -839,6 +854,23 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
throw new UnsupportedOperationException();
}
+ protected void handleCommandWatchTcAssignments(
+ org.apache.pulsar.common.api.proto.CommandWatchTcAssignments
commandWatchTcAssignments) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleCommandWatchTcAssignmentsUpdate(
+ org.apache.pulsar.common.api.proto.CommandWatchTcAssignmentsUpdate
+ commandWatchTcAssignmentsUpdate) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleCommandWatchTcAssignmentsClose(
+ org.apache.pulsar.common.api.proto.CommandWatchTcAssignmentsClose
+ commandWatchTcAssignmentsClose) {
+ throw new UnsupportedOperationException();
+ }
+
private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) {
NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 28501c8bc66..76cd8d382ab 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -317,6 +317,7 @@ message FeatureFlags {
optional bool supports_repl_dedup_by_lid_and_eid = 6 [default = false];
optional bool supports_topic_watcher_reconcile = 7 [default = false];
optional bool supports_scalable_topics = 8 [default = false];
+ optional bool supports_tc_metadata_discovery = 9 [default = false];
}
message CommandConnected {
@@ -1013,6 +1014,48 @@ message CommandWatchScalableTopicsClose {
required uint64 watch_id = 1;
}
+/// --- Transaction-coordinator assignment watch ---
+// The scalable-topics transaction coordinator's discovery surface. The client
opens one watch
+// and the broker replies with the full (partition -> leader) map, then pushes
a fresh full
+// snapshot whenever any partition's leader changes. There is no point lookup
and no diff: the
+// map is bounded (parallelism, ~16) and changes rarely, so always sending the
whole snapshot is
+// simpler and removes a class of apply-ordering / drift bugs. Gated by the
+// supports_tc_metadata_discovery feature flag.
+
+// Client -> Broker: open the assignment watch.
+message CommandWatchTcAssignments {
+ required uint64 watch_id = 1; // client-assigned
+}
+
+// One (partition -> leader) entry. A partition currently mid-election is
simply absent from the
+// snapshot; the client parks any transaction routed there until a later
snapshot fills it in.
+message TcAssignment {
+ required uint64 tc_id = 1; // TC partition =
TxnID.mostSigBits
+ optional string broker_service_url = 2;
+ optional string broker_service_url_tls = 3;
+}
+
+// Full map. Sent on initial watch and again, in full, on every change. The
client replaces its
+// local map wholesale — no merge, no ordering rules. parallelism lets the
client size its handler
+// array without a separate metadata read.
+message TcAssignmentsSnapshot {
+ required uint32 parallelism = 1;
+ repeated TcAssignment assignments = 2;
+}
+
+// Broker -> Client: the current full snapshot, or (on initial-watch failure)
an error.
+message CommandWatchTcAssignmentsUpdate {
+ required uint64 watch_id = 1;
+ optional TcAssignmentsSnapshot snapshot = 2;
+ optional ServerError error = 3;
+ optional string message = 4;
+}
+
+// Client -> Broker: close the watch.
+message CommandWatchTcAssignmentsClose {
+ required uint64 watch_id = 1;
+}
+
message CommandGetSchema {
required uint64 request_id = 1;
required string topic = 2;
@@ -1262,6 +1305,10 @@ message BaseCommand {
WATCH_SCALABLE_TOPICS = 76;
WATCH_SCALABLE_TOPICS_UPDATE = 77;
WATCH_SCALABLE_TOPICS_CLOSE = 78;
+
+ WATCH_TC_ASSIGNMENTS = 79;
+ WATCH_TC_ASSIGNMENTS_UPDATE = 80;
+ WATCH_TC_ASSIGNMENTS_CLOSE = 81;
}
@@ -1357,4 +1404,8 @@ message BaseCommand {
optional CommandWatchScalableTopics watchScalableTopics
= 76;
optional CommandWatchScalableTopicsUpdate watchScalableTopicsUpdate
= 77;
optional CommandWatchScalableTopicsClose watchScalableTopicsClose
= 78;
+
+ optional CommandWatchTcAssignments watchTcAssignments
= 79;
+ optional CommandWatchTcAssignmentsUpdate watchTcAssignmentsUpdate
= 80;
+ optional CommandWatchTcAssignmentsClose watchTcAssignmentsClose
= 81;
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsTcAssignmentsTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsTcAssignmentsTest.java
new file mode 100644
index 00000000000..3a3071431a9
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsTcAssignmentsTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.testng.annotations.Test;
+
+/**
+ * Roundtrip tests for the {@code Commands.newWatchTcAssignments*} factory
methods: encode a
+ * command, reparse the serialized wire frame, and verify the fields survive
the trip.
+ */
+public class CommandsTcAssignmentsTest {
+
+ private static BaseCommand parseFrame(ByteBuf frame) {
+ try {
+ frame.skipBytes(4); // total size
+ int cmdSize = (int) frame.readUnsignedInt();
+ BaseCommand cmd = new BaseCommand();
+ cmd.parseFrom(frame, cmdSize);
+ cmd.materialize();
+ return cmd;
+ } finally {
+ frame.release();
+ }
+ }
+
+ @Test
+ public void testNewWatchTcAssignments() {
+ BaseCommand cmd = parseFrame(Commands.newWatchTcAssignments(7L));
+ assertEquals(cmd.getType(), BaseCommand.Type.WATCH_TC_ASSIGNMENTS);
+ assertTrue(cmd.hasWatchTcAssignments());
+ assertEquals(cmd.getWatchTcAssignments().getWatchId(), 7L);
+ }
+
+ @Test
+ public void testNewWatchTcAssignmentsClose() {
+ BaseCommand cmd = parseFrame(Commands.newWatchTcAssignmentsClose(7L));
+ assertEquals(cmd.getType(),
BaseCommand.Type.WATCH_TC_ASSIGNMENTS_CLOSE);
+ assertTrue(cmd.hasWatchTcAssignmentsClose());
+ assertEquals(cmd.getWatchTcAssignmentsClose().getWatchId(), 7L);
+ }
+
+ @Test
+ public void testNewWatchTcAssignmentsSnapshot() {
+ Map<Integer, String[]> leaders = new HashMap<>();
+ leaders.put(0, new String[] {"pulsar://b0:6650",
"pulsar+ssl://b0:6651"});
+ leaders.put(2, new String[] {"pulsar://b2:6650", null}); // partition
1 mid-election (absent)
+
+ BaseCommand cmd =
parseFrame(Commands.newWatchTcAssignmentsSnapshot(7L, 3, leaders));
+ assertEquals(cmd.getType(),
BaseCommand.Type.WATCH_TC_ASSIGNMENTS_UPDATE);
+ assertTrue(cmd.hasWatchTcAssignmentsUpdate());
+ var update = cmd.getWatchTcAssignmentsUpdate();
+ assertEquals(update.getWatchId(), 7L);
+ assertFalse(update.hasError());
+ assertTrue(update.hasSnapshot());
+
+ var snapshot = update.getSnapshot();
+ assertEquals(snapshot.getParallelism(), 3);
+ assertEquals(snapshot.getAssignmentsCount(), 2);
+
+ // Decode into a map for order-independent assertions.
+ Map<Long, String[]> decoded = new HashMap<>();
+ for (int i = 0; i < snapshot.getAssignmentsCount(); i++) {
+ var a = snapshot.getAssignmentAt(i);
+ decoded.put(a.getTcId(), new String[] {
+ a.hasBrokerServiceUrl() ? a.getBrokerServiceUrl() : null,
+ a.hasBrokerServiceUrlTls() ? a.getBrokerServiceUrlTls() :
null});
+ }
+ assertEquals(decoded.get(0L)[0], "pulsar://b0:6650");
+ assertEquals(decoded.get(0L)[1], "pulsar+ssl://b0:6651");
+ assertEquals(decoded.get(2L)[0], "pulsar://b2:6650");
+ assertNull(decoded.get(2L)[1]);
+ assertNull(decoded.get(1L)); // mid-election partition omitted
+ }
+
+ @Test
+ public void testNewWatchTcAssignmentsError() {
+ BaseCommand cmd = parseFrame(
+ Commands.newWatchTcAssignmentsError(7L,
ServerError.NotAllowedError, "disabled"));
+ assertEquals(cmd.getType(),
BaseCommand.Type.WATCH_TC_ASSIGNMENTS_UPDATE);
+ var update = cmd.getWatchTcAssignmentsUpdate();
+ assertEquals(update.getWatchId(), 7L);
+ assertTrue(update.hasError());
+ assertEquals(update.getError(), ServerError.NotAllowedError);
+ assertEquals(update.getMessage(), "disabled");
+ assertFalse(update.hasSnapshot());
+ }
+
+ @Test
+ public void testConnectedAdvertisesTcMetadataDiscoveryFlag() {
+ BaseCommand on = parseFrame(Commands.newConnected(
+ /* clientProtocolVersion */ 21, /* maxMessageSize */ 1024,
+ /* supportsTopicWatchers */ true, /* supportsScalableTopics */
true,
+ /* supportsTcMetadataDiscovery */ true));
+
assertTrue(on.getConnected().getFeatureFlags().isSupportsTcMetadataDiscovery());
+
+ BaseCommand off = parseFrame(Commands.newConnected(
+ 21, 1024, true, true));
+
assertFalse(off.getConnected().getFeatureFlags().isSupportsTcMetadataDiscovery());
+ }
+}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 53bd36f704d..70797e2671e 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -520,7 +520,9 @@ public class ProxyConnection extends PulsarHandler {
connected.hasMaxMessageSize() ?
connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
final ByteBuf msg =
Commands.newConnected(connected.getProtocolVersion(), maxMessageSize,
connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsTopicWatchers(),
- connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsScalableTopics());
+ connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsScalableTopics(),
+ connected.hasFeatureFlags()
+ &&
connected.getFeatureFlags().isSupportsTcMetadataDiscovery());
writeAndFlush(msg);
// Start auth refresh task only if we are not forwarding
authorization credentials
if
(!service.getConfiguration().isForwardAuthorizationCredentials()) {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
new file mode 100644
index 00000000000..d9a2997f240
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.transaction;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.CustomLog;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Integration test for the metadata-store transaction-coordinator discovery
path (PIP-473 P5.3).
+ *
+ * <p>Verifies, across a real multi-broker docker cluster, that a client
discovers coordinators via
+ * the {@code CommandWatchTcAssignments} stream (not the assign-topic lookup)
and can drive the
+ * transaction lifecycle, including after the broker leading a coordinator
partition is killed.
+ *
+ * @see TcMetadataDiscoveryTestBase for the scope note (lifecycle, not
data-in-txn).
+ */
+@CustomLog
+public class TcMetadataDiscoveryTest extends TcMetadataDiscoveryTestBase {
+
+ /**
+ * With the scalable-topics TC enabled, a client opens the assignment
watch and can open and
+ * commit / abort transactions across all coordinator partitions. Running
many transactions
+ * exercises the round-robin spread across the watch-discovered per-leader
connections.
+ */
+ @Test
+ public void transactionLifecycleOverMetadataDiscovery() throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .enableTransaction(true)
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+
+ // Guard against a silent fallback: assert the client actually
selected the metadata-store
+ // assignment-watch path. Otherwise a regression that breaks the watch
entirely would still
+ // pass, since the assign topic is initialized with the same partition
count.
+
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl
tcClient =
+ ((org.apache.pulsar.client.impl.PulsarClientImpl)
client).getTcClient();
+ assertTrue(tcClient.isUsingMetadataDiscovery(),
+ "client should use metadata-store TC discovery, not the
assign-topic fallback");
+
+ // Run transactions (commit and abort alternately) until every
coordinator partition has
+ // minted at least one — proving the client discovered and connected
to each partition's
+ // elected leader. An await loop tolerates the brief startup window
where a partition is
+ // still mid-election and absent from the assignment snapshot.
+ Set<Long> coordinatorsExercised = new HashSet<>();
+ final int[] i = {0};
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .until(() -> {
+ Transaction txn = client.newTransaction()
+ .withTransactionTimeout(1, TimeUnit.MINUTES)
+ .build().get();
+ TxnID txnId = txn.getTxnID();
+ assertNotNull(txnId);
+ // mostSigBits is the coordinator (TC partition) that
minted the txn.
+ coordinatorsExercised.add(txnId.getMostSigBits());
+ if (i[0]++ % 2 == 0) {
+ txn.commit().get();
+ } else {
+ txn.abort().get();
+ }
+ return coordinatorsExercised.size() == TC_PARALLELISM;
+ });
+ assertEquals(coordinatorsExercised.size(), TC_PARALLELISM,
+ "expected transactions to be coordinated by every TC
partition; got "
+ + coordinatorsExercised);
+ }
+
+ /**
+ * Kill one broker and confirm the client keeps working: the coordinator
partitions that broker
+ * was leading are re-elected to the survivor, the client's assignment
watch receives the new
+ * snapshot, retargets its handlers, and subsequent transactions across
all partitions still
+ * succeed.
+ */
+ @Test
+ public void transactionsSurviveLeaderBrokerFailure() throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .enableTransaction(true)
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .operationTimeout(30, TimeUnit.SECONDS)
+ .build();
+
+ // Warm up: confirm every coordinator is reachable before the failure.
+ runTxnOnEveryCoordinator(client);
+
+ // Kill one broker — about half the coordinator partitions lose their
leader.
+ BrokerContainer victim = pulsarCluster.getBrokers().iterator().next();
+ log.info().attr("broker", victim.getContainerName()).log("Stopping
broker to force TC failover");
+ victim.stop();
+
+ // After re-election + assignment-watch refresh, transactions across
all partitions succeed
+ // again. runTxnOnEveryCoordinator already retries within a bounded
wait while leadership and
+ // the client's handlers converge on the new leaders.
+ runTxnOnEveryCoordinator(client);
+ }
+
+ /**
+ * Open + commit one transaction on each coordinator partition; asserts
all are covered within a
+ * bounded wait. A coordinator's handler connects asynchronously (and,
after a failover, may be
+ * briefly mid-reconnect), so a transaction routed to a not-yet-ready
coordinator throws
+ * {@code MetaStoreHandlerNotReadyException} / times out — those are
retried rather than failing
+ * the run. The assertion is "every coordinator becomes reachable", not
"reachable on the first
+ * attempt".
+ */
+ private void runTxnOnEveryCoordinator(PulsarClient client) {
+ Set<Long> coordinators = new HashSet<>();
+ Awaitility.await()
+ .atMost(90, TimeUnit.SECONDS)
+ .pollInterval(2, TimeUnit.SECONDS)
+ .ignoreExceptions()
+ .until(() -> {
+ Transaction txn = client.newTransaction()
+ .withTransactionTimeout(1, TimeUnit.MINUTES)
+ .build().get();
+ coordinators.add(txn.getTxnID().getMostSigBits());
+ txn.commit().get();
+ return coordinators.size() == TC_PARALLELISM;
+ });
+ assertTrue(coordinators.size() == TC_PARALLELISM,
+ "expected all " + TC_PARALLELISM + " coordinators reachable;
got " + coordinators);
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
new file mode 100644
index 00000000000..8af84ab229e
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.transaction;
+
+import lombok.CustomLog;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+
+/**
+ * Base for the metadata-store transaction-coordinator discovery tests. Brings
up a multi-broker
+ * cluster with the scalable-topics transaction coordinator enabled, so
leadership of the TC
+ * partitions is established via the metadata-store election and clients
discover coordinators via
+ * the {@code CommandWatchTcAssignments} stream rather than the assign-topic
lookup.
+ *
+ * <p>Scope note: only the transaction-coordinator <em>control plane</em> is
enabled here. Producing
+ * / acking data inside a transaction additionally requires the scalable-topic
transaction buffer
+ * and pending-ack providers (and {@code segment://} topics), which land with
the default flip. These
+ * tests therefore exercise the transaction <em>lifecycle</em> (newTransaction
/ commit / abort) over
+ * the discovered connections — which is exactly the surface the new client
discovery path drives.
+ */
+@CustomLog
+public abstract class TcMetadataDiscoveryTestBase extends PulsarTestSuite {
+
+ /** Number of TC partitions; small so leadership spreads predictably
across the brokers. */
+ protected static final int TC_PARALLELISM = 4;
+
+ @Override
+ protected void beforeStartCluster() throws Exception {
+ super.beforeStartCluster();
+ for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
+ // transactionCoordinatorEnabled is present in broker.conf, so the
bare env var name
+ // overrides it. The two scalable-topics settings are NOT in
broker.conf, so they must
+ // use the PULSAR_PREFIX_ prefix to be appended as new config keys
by
+ // apply-config-from-env.py — otherwise a bare name is silently
ignored.
+ brokerContainer.withEnv("transactionCoordinatorEnabled", "true");
+
brokerContainer.withEnv("PULSAR_PREFIX_transactionCoordinatorScalableTopicsEnabled",
"true");
+
brokerContainer.withEnv("PULSAR_PREFIX_transactionCoordinatorScalableTopicsParallelism",
+ Integer.toString(TC_PARALLELISM));
+ }
+ }
+
+ @Override
+ public void setupCluster() throws Exception {
+ super.setupCluster();
+ // The assign-topic partitioned metadata is still created so the
legacy ownership-based
+ // fallback in handleClientConnect remains valid during the
deprecation window.
+ BrokerContainer brokerContainer =
pulsarCluster.getBrokers().iterator().next();
+ brokerContainer.execCmd(
+ "/pulsar/bin/pulsar",
"initialize-transaction-coordinator-metadata",
+ "-cs", ZKContainer.NAME,
+ "-c", pulsarCluster.getClusterName(),
+ "--initial-num-transaction-coordinators",
Integer.toString(TC_PARALLELISM));
+ }
+}
diff --git a/tests/integration/src/test/resources/pulsar-transaction.xml
b/tests/integration/src/test/resources/pulsar-transaction.xml
index 72c375d000d..0c23b9e93ab 100644
--- a/tests/integration/src/test/resources/pulsar-transaction.xml
+++ b/tests/integration/src/test/resources/pulsar-transaction.xml
@@ -23,6 +23,7 @@
<test name="pulsar-transaction-test-suite" preserve-order="true" >
<classes>
<class
name="org.apache.pulsar.tests.integration.transaction.TransactionTest" />
+ <class
name="org.apache.pulsar.tests.integration.transaction.TcMetadataDiscoveryTest"
/>
</classes>
</test>
</suite>