This is an automated email from the ASF dual-hosted git repository.

shibd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f6756d  feat: add replicateSubscriptionState to ConsumerConfig (#482)
7f6756d is described below

commit 7f6756dd6a3484438193f8d056ce9a272752ac4c
Author: Baodi Shi <[email protected]>
AuthorDate: Wed Apr 22 21:45:00 2026 +0800

    feat: add replicateSubscriptionState to ConsumerConfig (#482)
    
    Add support for replicateSubscriptionState option in ConsumerConfig,
    which enables geo-replication failover by synchronizing subscription
    cursor state across clusters.
    
    Fixes #478
---
 index.d.ts            | 1 +
 src/ConsumerConfig.cc | 8 ++++++++
 2 files changed, 9 insertions(+)

diff --git a/index.d.ts b/index.d.ts
index 0e4f070..607842f 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -110,6 +110,7 @@ export interface ConsumerConfig {
   deadLetterPolicy?: DeadLetterPolicy;
   batchReceivePolicy?: ConsumerBatchReceivePolicy;
   keySharedPolicy?: KeySharedPolicy;
+  replicateSubscriptionState?: boolean;
 }
 
 export class Consumer {
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index e7419c6..eff2024 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -62,6 +62,7 @@ static const std::string CFG_KEY_SHARED_POLICY_MODE = 
"keyShareMode";
 static const std::string CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER = 
"allowOutOfOrderDelivery";
 static const std::string CFG_KEY_SHARED_POLICY_STICKY_RANGES = "stickyRanges";
 static const std::string CFG_CRYPTO_KEY_READER = "cryptoKeyReader";
+static const std::string CFG_REPLICATE_SUBSCRIPTION_STATE = 
"replicateSubscriptionState";
 
 static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
     {"Exclusive", pulsar_ConsumerExclusive},
@@ -400,6 +401,13 @@ void 
ConsumerConfig::InitConfig(std::shared_ptr<ThreadSafeDeferred> deferred,
     }
     
this->cConsumerConfig.get()->consumerConfiguration.setKeySharedPolicy(cppKeySharedPolicy);
   }
+
+  if (consumerConfig.Has(CFG_REPLICATE_SUBSCRIPTION_STATE) &&
+      consumerConfig.Get(CFG_REPLICATE_SUBSCRIPTION_STATE).IsBoolean()) {
+    bool replicateSubscriptionState = 
consumerConfig.Get(CFG_REPLICATE_SUBSCRIPTION_STATE).ToBoolean();
+    
this->cConsumerConfig.get()->consumerConfiguration.setReplicateSubscriptionStateEnabled(
+        replicateSubscriptionState);
+  }
 }
 
 ConsumerConfig::~ConsumerConfig() {

Reply via email to