This is an automated email from the ASF dual-hosted git repository.
shibd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 7f6756d feat: add replicateSubscriptionState to ConsumerConfig (#482)
7f6756d is described below
commit 7f6756dd6a3484438193f8d056ce9a272752ac4c
Author: Baodi Shi <[email protected]>
AuthorDate: Wed Apr 22 21:45:00 2026 +0800
feat: add replicateSubscriptionState to ConsumerConfig (#482)
Add support for replicateSubscriptionState option in ConsumerConfig,
which enables geo-replication failover by synchronizing subscription
cursor state across clusters.
Fixes #478
---
index.d.ts | 1 +
src/ConsumerConfig.cc | 8 ++++++++
2 files changed, 9 insertions(+)
diff --git a/index.d.ts b/index.d.ts
index 0e4f070..607842f 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -110,6 +110,7 @@ export interface ConsumerConfig {
deadLetterPolicy?: DeadLetterPolicy;
batchReceivePolicy?: ConsumerBatchReceivePolicy;
keySharedPolicy?: KeySharedPolicy;
+ replicateSubscriptionState?: boolean;
}
export class Consumer {
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index e7419c6..eff2024 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -62,6 +62,7 @@ static const std::string CFG_KEY_SHARED_POLICY_MODE =
"keyShareMode";
static const std::string CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER =
"allowOutOfOrderDelivery";
static const std::string CFG_KEY_SHARED_POLICY_STICKY_RANGES = "stickyRanges";
static const std::string CFG_CRYPTO_KEY_READER = "cryptoKeyReader";
+static const std::string CFG_REPLICATE_SUBSCRIPTION_STATE =
"replicateSubscriptionState";
static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Exclusive", pulsar_ConsumerExclusive},
@@ -400,6 +401,13 @@ void
ConsumerConfig::InitConfig(std::shared_ptr<ThreadSafeDeferred> deferred,
}
this->cConsumerConfig.get()->consumerConfiguration.setKeySharedPolicy(cppKeySharedPolicy);
}
+
+ if (consumerConfig.Has(CFG_REPLICATE_SUBSCRIPTION_STATE) &&
+ consumerConfig.Get(CFG_REPLICATE_SUBSCRIPTION_STATE).IsBoolean()) {
+ bool replicateSubscriptionState =
consumerConfig.Get(CFG_REPLICATE_SUBSCRIPTION_STATE).ToBoolean();
+
this->cConsumerConfig.get()->consumerConfiguration.setReplicateSubscriptionStateEnabled(
+ replicateSubscriptionState);
+ }
}
ConsumerConfig::~ConsumerConfig() {