This is an automated email from the ASF dual-hosted git repository.

BewareMyPower pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 9abc9a8  Add auto cluster failover support (#487)
9abc9a8 is described below

commit 9abc9a8ec0333be20d572d64db66fd7a78827366
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Jun 2 20:11:55 2026 +0800

    Add auto cluster failover support (#487)
---
 index.d.ts                         |  22 +++-
 src/Client.cc                      | 257 ++++++++++++++++++++++++++++++++++--
 src/Client.h                       |   3 +-
 tests/client.test.js               |  15 ++-
 tests/failover_client_test.test.js | 259 +++++++++++++++++++++++++++++++++++++
 5 files changed, 544 insertions(+), 12 deletions(-)

diff --git a/index.d.ts b/index.d.ts
index 8e6fb85..61ade63 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -18,8 +18,12 @@
  */
 /// <reference types="node" />
 
-export interface ClientConfig {
-  serviceUrl: string;
+export type ClientConfig = ClientConfigBase & (
+  { serviceUrl: string; serviceUrlProvider?: never } |
+  { serviceUrl?: never; serviceUrlProvider: AutoClusterFailoverConfig }
+);
+
+export interface ClientConfigBase {
   authentication?: AuthenticationTls | AuthenticationAthenz | 
AuthenticationToken | AuthenticationOauth2 | AuthenticationBasic;
   operationTimeoutSeconds?: number;
   ioThreads?: number;
@@ -37,6 +41,20 @@ export interface ClientConfig {
   connectionTimeoutMs?: number;
 }
 
+export type ServiceInfo = string | {
+  serviceUrl: string;
+  authentication?: AuthenticationTls | AuthenticationAthenz | 
AuthenticationToken | AuthenticationOauth2 | AuthenticationBasic;
+  tlsTrustCertsFilePath?: string;
+};
+
+export interface AutoClusterFailoverConfig {
+  primary: ServiceInfo;
+  secondary: ServiceInfo[];
+  checkIntervalMs?: number;
+  failoverThreshold?: number;
+  switchBackThreshold?: number;
+}
+
 export class Client {
   static setLogHandler(logHandler: (level: LogLevel, file: string, line: 
number, message: string) => void): void;
   constructor(config: ClientConfig);
diff --git a/src/Client.cc b/src/Client.cc
index 6d09cd7..581b62c 100644
--- a/src/Client.cc
+++ b/src/Client.cc
@@ -23,12 +23,28 @@
 #include "Producer.h"
 #include "Reader.h"
 #include "ThreadSafeDeferred.h"
+#include <pulsar/AutoClusterFailover.h>
+#include <pulsar/Client.h>
+#include <pulsar/ServiceInfo.h>
+#include <pulsar/c/authentication.h>
 #include <pulsar/c/client.h>
 #include <pulsar/c/client_configuration.h>
 #include <pulsar/c/result.h>
 #include "pulsar/ClientConfiguration.h"
+#include <chrono>
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
 
 static const std::string CFG_SERVICE_URL = "serviceUrl";
+static const std::string CFG_SERVICE_URL_PROVIDER = "serviceUrlProvider";
+static const std::string CFG_PRIMARY = "primary";
+static const std::string CFG_SECONDARY = "secondary";
+static const std::string CFG_CHECK_INTERVAL_MS = "checkIntervalMs";
+static const std::string CFG_FAILOVER_THRESHOLD = "failoverThreshold";
+static const std::string CFG_SWITCH_BACK_THRESHOLD = "switchBackThreshold";
 static const std::string CFG_AUTH = "authentication";
 static const std::string CFG_AUTH_PROP = "binding";
 static const std::string CFG_OP_TIMEOUT = "operationTimeoutSeconds";
@@ -52,6 +68,194 @@ struct _pulsar_client_configuration {
   pulsar::ClientConfiguration conf;
 };
 
+struct _pulsar_client {
+  std::unique_ptr<pulsar::Client> client;
+};
+
+struct _pulsar_authentication {
+  pulsar::AuthenticationPtr auth;
+};
+
+static bool IsPresent(const Napi::Value &value) { return !value.IsUndefined() 
&& !value.IsNull(); }
+
+static std::optional<pulsar::AuthenticationPtr> BuildAuthenticationPtr(
+    const Napi::Object &authObject, std::vector<Napi::ObjectReference> 
&authRefs) {
+  Napi::Env env = authObject.Env();
+
+  if (!authObject.Has(CFG_AUTH_PROP) || 
!authObject.Get(CFG_AUTH_PROP).IsObject()) {
+    Napi::Error::New(env, "Authentication must be a Pulsar authentication 
object")
+        .ThrowAsJavaScriptException();
+    return std::nullopt;
+  }
+
+  Napi::Object binding = authObject.Get(CFG_AUTH_PROP).As<Napi::Object>();
+  authRefs.emplace_back(Napi::Persistent(binding));
+  Authentication *auth = Authentication::Unwrap(authRefs.back().Value());
+
+  if (auth == nullptr || auth->GetCAuthentication() == nullptr) {
+    Napi::Error::New(env, "Authentication must be a Pulsar authentication 
object")
+        .ThrowAsJavaScriptException();
+    return std::nullopt;
+  }
+
+  return auth->GetCAuthentication()->auth;
+}
+
+static std::optional<pulsar::ServiceInfo> BuildServiceInfo(const Napi::Value 
&value,
+                                                           const std::string 
&fieldName,
+                                                           
std::vector<Napi::ObjectReference> &authRefs,
+                                                           const 
pulsar::AuthenticationPtr &defaultAuth,
+                                                           const 
std::optional<std::string> &defaultTls) {
+  Napi::Env env = value.Env();
+
+  if (value.IsString()) {
+    std::string serviceUrl = value.ToString().Utf8Value();
+    if (serviceUrl.empty()) {
+      Napi::Error::New(env, fieldName + " service URL must be a non-empty 
string")
+          .ThrowAsJavaScriptException();
+      return std::nullopt;
+    }
+    return pulsar::ServiceInfo(serviceUrl, defaultAuth, defaultTls);
+  }
+
+  if (!value.IsObject()) {
+    Napi::Error::New(env, fieldName + " must be a service URL string or 
service info object")
+        .ThrowAsJavaScriptException();
+    return std::nullopt;
+  }
+
+  Napi::Object serviceInfo = value.As<Napi::Object>();
+  if (!serviceInfo.Has(CFG_SERVICE_URL) || 
!serviceInfo.Get(CFG_SERVICE_URL).IsString() ||
+      serviceInfo.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
+    Napi::Error::New(env, fieldName + ".serviceUrl is required and must be a 
non-empty string")
+        .ThrowAsJavaScriptException();
+    return std::nullopt;
+  }
+
+  std::string serviceUrl = 
serviceInfo.Get(CFG_SERVICE_URL).ToString().Utf8Value();
+  pulsar::AuthenticationPtr authentication = defaultAuth;
+  std::optional<std::string> tlsTrustCertsFilePath = defaultTls;
+
+  if (serviceInfo.Has(CFG_AUTH) && IsPresent(serviceInfo.Get(CFG_AUTH))) {
+    if (!serviceInfo.Get(CFG_AUTH).IsObject()) {
+      Napi::Error::New(env, fieldName + ".authentication must be a Pulsar 
authentication object")
+          .ThrowAsJavaScriptException();
+      return std::nullopt;
+    }
+
+    auto auth = 
BuildAuthenticationPtr(serviceInfo.Get(CFG_AUTH).As<Napi::Object>(), authRefs);
+    if (!auth.has_value()) {
+      return std::nullopt;
+    }
+    authentication = auth.value();
+  }
+
+  if (serviceInfo.Has(CFG_TLS_TRUST_CERT) && 
IsPresent(serviceInfo.Get(CFG_TLS_TRUST_CERT))) {
+    if (!serviceInfo.Get(CFG_TLS_TRUST_CERT).IsString()) {
+      Napi::Error::New(env, fieldName + ".tlsTrustCertsFilePath must be a 
string")
+          .ThrowAsJavaScriptException();
+      return std::nullopt;
+    }
+    tlsTrustCertsFilePath = 
serviceInfo.Get(CFG_TLS_TRUST_CERT).ToString().Utf8Value();
+  }
+
+  return pulsar::ServiceInfo(serviceUrl, authentication, 
tlsTrustCertsFilePath);
+}
+
+static bool SetPositiveUint32(const Napi::Object &config, const std::string 
&fieldName, uint32_t &target) {
+  if (!config.Has(fieldName) || !IsPresent(config.Get(fieldName))) {
+    return true;
+  }
+
+  Napi::Env env = config.Env();
+  if (!config.Get(fieldName).IsNumber()) {
+    Napi::Error::New(env, "serviceUrlProvider." + fieldName + " must be a 
positive number")
+        .ThrowAsJavaScriptException();
+    return false;
+  }
+
+  int64_t value = config.Get(fieldName).ToNumber().Int64Value();
+  if (value <= 0 || value > UINT32_MAX) {
+    Napi::Error::New(env, "serviceUrlProvider." + fieldName + " must be a 
positive number")
+        .ThrowAsJavaScriptException();
+    return false;
+  }
+
+  target = static_cast<uint32_t>(value);
+  return true;
+}
+
+static std::unique_ptr<pulsar::ServiceInfoProvider> BuildServiceInfoProvider(
+    const Napi::Object &clientConfig, std::vector<Napi::ObjectReference> 
&authRefs,
+    const pulsar::AuthenticationPtr &defaultAuth, const 
std::optional<std::string> &defaultTls) {
+  Napi::Value providerValue = clientConfig.Get(CFG_SERVICE_URL_PROVIDER);
+  Napi::Env env = clientConfig.Env();
+
+  if (!providerValue.IsObject()) {
+    Napi::Error::New(env, "serviceUrlProvider must be an 
object").ThrowAsJavaScriptException();
+    return nullptr;
+  }
+
+  Napi::Object providerConfig = providerValue.As<Napi::Object>();
+  if (!providerConfig.Has(CFG_PRIMARY) || 
!IsPresent(providerConfig.Get(CFG_PRIMARY))) {
+    Napi::Error::New(env, "serviceUrlProvider.primary is 
required").ThrowAsJavaScriptException();
+    return nullptr;
+  }
+
+  auto primary = BuildServiceInfo(providerConfig.Get(CFG_PRIMARY), 
"serviceUrlProvider.primary", authRefs,
+                                  defaultAuth, defaultTls);
+  if (!primary.has_value()) {
+    return nullptr;
+  }
+
+  if (!providerConfig.Has(CFG_SECONDARY) || 
!providerConfig.Get(CFG_SECONDARY).IsArray()) {
+    Napi::Error::New(env, "serviceUrlProvider.secondary is required and must 
be an array")
+        .ThrowAsJavaScriptException();
+    return nullptr;
+  }
+
+  Napi::Array secondaryConfig = 
providerConfig.Get(CFG_SECONDARY).As<Napi::Array>();
+  if (secondaryConfig.Length() == 0) {
+    Napi::Error::New(env, "serviceUrlProvider.secondary must contain at least 
one service")
+        .ThrowAsJavaScriptException();
+    return nullptr;
+  }
+
+  std::vector<pulsar::ServiceInfo> secondary;
+  secondary.reserve(secondaryConfig.Length());
+  for (uint32_t i = 0; i < secondaryConfig.Length(); i++) {
+    auto serviceInfo =
+        BuildServiceInfo(secondaryConfig.Get(i), 
"serviceUrlProvider.secondary[" + std::to_string(i) + "]",
+                         authRefs, defaultAuth, defaultTls);
+    if (!serviceInfo.has_value()) {
+      return nullptr;
+    }
+    secondary.emplace_back(std::move(serviceInfo.value()));
+  }
+
+  pulsar::AutoClusterFailover::Config 
autoClusterFailoverConfig(std::move(primary.value()),
+                                                                
std::move(secondary));
+
+  uint32_t checkIntervalMs = 
static_cast<uint32_t>(autoClusterFailoverConfig.checkInterval.count());
+  if (!SetPositiveUint32(providerConfig, CFG_CHECK_INTERVAL_MS, 
checkIntervalMs)) {
+    return nullptr;
+  }
+  autoClusterFailoverConfig.checkInterval = 
std::chrono::milliseconds(checkIntervalMs);
+
+  if (!SetPositiveUint32(providerConfig, CFG_FAILOVER_THRESHOLD,
+                         autoClusterFailoverConfig.failoverThreshold)) {
+    return nullptr;
+  }
+
+  if (!SetPositiveUint32(providerConfig, CFG_SWITCH_BACK_THRESHOLD,
+                         autoClusterFailoverConfig.switchBackThreshold)) {
+    return nullptr;
+  }
+
+  return std::unique_ptr<pulsar::ServiceInfoProvider>(
+      new pulsar::AutoClusterFailover(std::move(autoClusterFailoverConfig)));
+}
+
 void Client::SetLogHandler(const Napi::CallbackInfo &info) {
   Napi::Env env = info.Env();
   Napi::HandleScope scope(env);
@@ -103,16 +307,34 @@ Client::Client(const Napi::CallbackInfo &info) : 
Napi::ObjectWrap<Client>(info)
   Napi::HandleScope scope(env);
   Napi::Object clientConfig = info[0].As<Napi::Object>();
 
-  if (!clientConfig.Has(CFG_SERVICE_URL) || 
!clientConfig.Get(CFG_SERVICE_URL).IsString() ||
-      clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
+  bool hasServiceUrlProvider =
+      clientConfig.Has(CFG_SERVICE_URL_PROVIDER) && 
IsPresent(clientConfig.Get(CFG_SERVICE_URL_PROVIDER));
+  bool hasServiceUrl = clientConfig.Has(CFG_SERVICE_URL) && 
IsPresent(clientConfig.Get(CFG_SERVICE_URL));
+  if (hasServiceUrlProvider && hasServiceUrl) {
+    Napi::Error::New(env, "Only one of serviceUrl or serviceUrlProvider can be 
configured")
+        .ThrowAsJavaScriptException();
+    return;
+  }
+
+  if (!hasServiceUrlProvider && !hasServiceUrl) {
+    Napi::Error::New(env,
+                     "Service URL is required and must be specified as a 
string unless serviceUrlProvider "
+                     "is configured")
+        .ThrowAsJavaScriptException();
+    return;
+  }
+
+  if (hasServiceUrl && (!clientConfig.Get(CFG_SERVICE_URL).IsString() ||
+                        
clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty())) {
     Napi::Error::New(env, "Service URL is required and must be specified as a 
string")
         .ThrowAsJavaScriptException();
     return;
   }
-  Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();
 
   this->cClientConfig = 
std::shared_ptr<pulsar_client_configuration_t>(pulsar_client_configuration_create(),
                                                                        
pulsar_client_configuration_free);
+  pulsar::AuthenticationPtr defaultAuthentication = 
pulsar::AuthFactory::Disabled();
+  std::optional<std::string> defaultTlsTrustCertsFilePath = std::nullopt;
 
   // The logger can only be set once per process, so we will take control of it
   if (clientConfig.Has(CFG_LOG_LEVEL) && 
clientConfig.Get(CFG_LOG_LEVEL).IsNumber()) {
@@ -145,9 +367,12 @@ Client::Client(const Napi::CallbackInfo &info) : 
Napi::ObjectWrap<Client>(info)
   if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) {
     Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject();
     if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) {
-      this->authRef_ = 
Napi::Persistent(obj.Get(CFG_AUTH_PROP).As<Napi::Object>());
-      Authentication *auth = Authentication::Unwrap(this->authRef_.Value());
-      pulsar_client_configuration_set_auth(cClientConfig.get(), 
auth->GetCAuthentication());
+      auto auth = BuildAuthenticationPtr(obj, this->authRefs_);
+      if (!auth.has_value()) {
+        return;
+      }
+      cClientConfig.get()->conf.setAuth(auth.value());
+      defaultAuthentication = auth.value();
     }
   }
 
@@ -190,6 +415,7 @@ Client::Client(const Napi::CallbackInfo &info) : 
Napi::ObjectWrap<Client>(info)
     Napi::String tlsTrustCertsFilePath = 
clientConfig.Get(CFG_TLS_TRUST_CERT).ToString();
     
pulsar_client_configuration_set_tls_trust_certs_file_path(cClientConfig.get(),
                                                               
tlsTrustCertsFilePath.Utf8Value().c_str());
+    defaultTlsTrustCertsFilePath = tlsTrustCertsFilePath.Utf8Value();
   }
 
   if (clientConfig.Has(CFG_TLS_CERT_FILE) && 
clientConfig.Get(CFG_TLS_CERT_FILE).IsString()) {
@@ -227,8 +453,23 @@ Client::Client(const Napi::CallbackInfo &info) : 
Napi::ObjectWrap<Client>(info)
   }
 
   try {
-    this->cClient = std::shared_ptr<pulsar_client_t>(
-        pulsar_client_create(serviceUrl.Utf8Value().c_str(), 
cClientConfig.get()), pulsar_client_free);
+    if (hasServiceUrlProvider) {
+      std::unique_ptr<pulsar::ServiceInfoProvider> serviceInfoProvider = 
BuildServiceInfoProvider(
+          clientConfig, this->authRefs_, defaultAuthentication, 
defaultTlsTrustCertsFilePath);
+      if (serviceInfoProvider == nullptr) {
+        return;
+      }
+
+      std::unique_ptr<pulsar_client_t> rawClient(new pulsar_client_t);
+      rawClient->client.reset(new pulsar::Client(
+          pulsar::Client::create(std::move(serviceInfoProvider), 
cClientConfig.get()->conf)));
+      this->cClient = std::shared_ptr<pulsar_client_t>(rawClient.release(),
+                                                       [](pulsar_client_t 
*client) { delete client; });
+    } else {
+      Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();
+      this->cClient = std::shared_ptr<pulsar_client_t>(
+          pulsar_client_create(serviceUrl.Utf8Value().c_str(), 
cClientConfig.get()), pulsar_client_free);
+    }
   } catch (const std::exception &e) {
     Napi::Error::New(env, e.what()).ThrowAsJavaScriptException();
   }
diff --git a/src/Client.h b/src/Client.h
index b761d36..6ea9a15 100644
--- a/src/Client.h
+++ b/src/Client.h
@@ -22,6 +22,7 @@
 
 #include <napi.h>
 #include <pulsar/c/client.h>
+#include <vector>
 
 struct LogMessage {
   pulsar_logger_level_t level;
@@ -54,7 +55,7 @@ class Client : public Napi::ObjectWrap<Client> {
   std::shared_ptr<pulsar_client_t> cClient;
   std::shared_ptr<pulsar_client_configuration_t> cClientConfig;
   pulsar_logger_level_t logLevel = pulsar_logger_level_t::pulsar_INFO;
-  Napi::ObjectReference authRef_;
+  std::vector<Napi::ObjectReference> authRefs_;
 
   Napi::Value CreateProducer(const Napi::CallbackInfo &info);
   Napi::Value Subscribe(const Napi::CallbackInfo &info);
diff --git a/tests/client.test.js b/tests/client.test.js
index d97763e..f871b0d 100644
--- a/tests/client.test.js
+++ b/tests/client.test.js
@@ -26,9 +26,11 @@ const baseUrl = 'http://localhost:8080';
   describe('Client', () => {
     describe('CreateFailedByUrlSetIncorrect', () => {
       test('No Set Url', async () => {
+        const expectedError = 'Service URL is required and must be specified 
as a string '
+          + 'unless serviceUrlProvider is configured';
         await expect(() => new Pulsar.Client({
           operationTimeoutSeconds: 30,
-        })).toThrow('Service URL is required and must be specified as a 
string');
+        })).toThrow(expectedError);
       });
 
       test('Set empty url', async () => {
@@ -51,6 +53,17 @@ const baseUrl = 'http://localhost:8080';
           operationTimeoutSeconds: 30,
         })).toThrow('Service URL is required and must be specified as a 
string');
       });
+
+      test('Set both service url and service url provider', async () => {
+        await expect(() => new Pulsar.Client({
+          serviceUrl: 'pulsar://localhost:6650',
+          serviceUrlProvider: {
+            primary: 'pulsar://localhost:6650',
+            secondary: ['pulsar://localhost:6651'],
+          },
+          operationTimeoutSeconds: 30,
+        })).toThrow('Only one of serviceUrl or serviceUrlProvider can be 
configured');
+      });
     });
     describe('test getPartitionsForTopic', () => {
       test('GetPartitions for empty topic', async () => {
diff --git a/tests/failover_client_test.test.js 
b/tests/failover_client_test.test.js
new file mode 100644
index 0000000..c834b5b
--- /dev/null
+++ b/tests/failover_client_test.test.js
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const childProcess = require('child_process');
+const fs = require('fs');
+const http = require('http');
+const os = require('os');
+const path = require('path');
+const { promisify } = require('util');
+const Pulsar = require('../index');
+
+const execFile = promisify(childProcess.execFile);
+const dockerImage = process.env.PULSAR_TEST_IMAGE || 
'apachepulsar/pulsar:latest';
+const testRunId = `${Date.now()}-${process.pid}`;
+const tempRoot = path.join(os.tmpdir(), 
`pulsar-node-failover-client-test-${testRunId}`);
+const startedContainers = [];
+
+jest.setTimeout(180000);
+
+const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
+
+const docker = async (args, options = {}) => {
+  const { stdout } = await execFile('docker', args, {
+    maxBuffer: 1024 * 1024,
+    ...options,
+  });
+  return stdout.trim();
+};
+
+const waitForHttpOk = async (url, timeoutMs = 60000) => {
+  const deadline = Date.now() + timeoutMs;
+
+  while (Date.now() < deadline) {
+    try {
+      await new Promise((resolve, reject) => {
+        const request = http.get(url, (response) => {
+          response.resume();
+          if (response.statusCode >= 200 && response.statusCode < 300) {
+            resolve();
+          } else {
+            reject(new Error(`Unexpected status code ${response.statusCode}`));
+          }
+        });
+        request.on('error', reject);
+        request.setTimeout(2000, () => {
+          request.destroy(new Error('Timed out waiting for Pulsar HTTP 
endpoint'));
+        });
+      });
+      return;
+    } catch (e) {
+      await delay(1000);
+    }
+  }
+
+  throw new Error(`Timed out waiting for ${url}`);
+};
+
+const writeStandaloneConfig = (clusterName, webPort, brokerPort) => {
+  const clusterDir = path.join(tempRoot, clusterName);
+  fs.mkdirSync(clusterDir, { recursive: true });
+  fs.chmodSync(tempRoot, 0o755);
+  fs.chmodSync(clusterDir, 0o755);
+
+  const sourceConfig = fs.readFileSync(path.join(__dirname, 'conf', 
'standalone.conf'), 'utf8');
+  const config = sourceConfig
+    .replace(/^brokerServicePort=.*$/m, `brokerServicePort=${brokerPort}`)
+    .replace(/^brokerServicePortTls=.*$/m, `brokerServicePortTls=${brokerPort 
+ 100}`)
+    .replace(/^webServicePort=.*$/m, `webServicePort=${webPort}`)
+    .replace(/^webServicePortTls=.*$/m, `webServicePortTls=${webPort + 100}`)
+    .replace(/^advertisedListeners=.*$/m, 'advertisedListeners=');
+
+  const configPath = path.join(clusterDir, 'standalone.conf');
+  fs.writeFileSync(configPath, config);
+  fs.chmodSync(configPath, 0o644);
+
+  ['server.crt', 'server.key'].forEach((fileName) => {
+    const targetPath = path.join(clusterDir, fileName);
+    fs.copyFileSync(path.join(__dirname, 'certificate', fileName), targetPath);
+    fs.chmodSync(targetPath, 0o644);
+  });
+
+  return clusterDir;
+};
+
+const getStandaloneLogs = async (containerId) => {
+  const { stdout, stderr } = await execFile('docker', [
+    'exec',
+    containerId,
+    'bash',
+    '-lc',
+    'cat logs/pulsar-standalone-*.out logs/pulsar-standalone-*.log 2>/dev/null 
|| true',
+  ], { maxBuffer: 1024 * 1024 }).catch((e) => e);
+
+  return `${stdout || ''}${stderr || ''}`.trim();
+};
+
+const startStandaloneCluster = async ({ clusterName, webPort, brokerPort }) => 
{
+  const confDir = writeStandaloneConfig(clusterName, webPort, brokerPort);
+  const containerId = await docker([
+    'run',
+    '-i',
+    '-p',
+    `${webPort}:${webPort}`,
+    '-p',
+    `${brokerPort}:${brokerPort}`,
+    '--rm',
+    '--detach',
+    dockerImage,
+    'sleep',
+    '3600',
+  ]);
+  startedContainers.push(containerId);
+
+  await docker(['cp', confDir, `${containerId}:/pulsar/test-conf`]);
+  await docker([
+    'exec',
+    '-i',
+    containerId,
+    'env',
+    'PULSAR_STANDALONE_CONF=test-conf/standalone.conf',
+    'PULSAR_STANDALONE_USE_ZOOKEEPER=1',
+    'bin/pulsar-daemon',
+    'start',
+    'standalone',
+    '--no-functions-worker',
+    '--no-stream-storage',
+    '--bookkeeper-dir',
+    `data/bookkeeper-${clusterName}`,
+  ]);
+  try {
+    await waitForHttpOk(`http://localhost:${webPort}/admin/v2/clusters`);
+  } catch (e) {
+    const logs = await getStandaloneLogs(containerId);
+    throw new Error(`${e.message}\nStandalone logs for 
${clusterName}:\n${logs}`);
+  }
+
+  return { containerId, serviceUrl: `pulsar://localhost:${brokerPort}` };
+};
+
+const stopContainer = async (containerId) => {
+  await docker(['kill', containerId]).catch(() => {});
+};
+
+const sendAndReceive = async (client, topic, payload) => {
+  let consumer;
+  let producer;
+
+  try {
+    producer = await client.createProducer({ topic });
+    await producer.send({ data: Buffer.from(payload) });
+
+    consumer = await client.subscribe({
+      topic,
+      subscription: `sub-${testRunId}-${Math.random().toString(36).slice(2)}`,
+      subscriptionInitialPosition: 'Earliest',
+    });
+
+    const message = await consumer.receive(10000);
+    expect(message.getData().toString()).toBe(payload);
+    await consumer.acknowledge(message);
+  } finally {
+    if (producer) {
+      await producer.close().catch(() => {});
+    }
+    if (consumer) {
+      await consumer.close().catch(() => {});
+    }
+  }
+};
+
+const retryUntil = async (operation, timeoutMs = 60000) => {
+  const deadline = Date.now() + timeoutMs;
+  let lastError;
+
+  while (Date.now() < deadline) {
+    try {
+      return await operation();
+    } catch (e) {
+      lastError = e;
+      await delay(1000);
+    }
+  }
+
+  throw lastError;
+};
+
+describe('failoverClientTest', () => {
+  let primaryCluster;
+  let secondaryCluster;
+  let client;
+
+  beforeAll(async () => {
+    primaryCluster = await startStandaloneCluster({
+      clusterName: 'primary',
+      webPort: 18080,
+      brokerPort: 16650,
+    });
+    secondaryCluster = await startStandaloneCluster({
+      clusterName: 'secondary',
+      webPort: 18081,
+      brokerPort: 16651,
+    });
+
+    client = new Pulsar.Client({
+      serviceUrlProvider: {
+        primary: primaryCluster.serviceUrl,
+        secondary: [secondaryCluster.serviceUrl],
+        checkIntervalMs: 1000,
+        failoverThreshold: 1,
+        switchBackThreshold: 1,
+      },
+      operationTimeoutSeconds: 30,
+      connectionTimeoutMs: 1000,
+    });
+  });
+
+  afterAll(async () => {
+    if (client) {
+      await client.close().catch(() => {});
+    }
+
+    await Promise.all(startedContainers.map(stopContainer));
+    fs.rmSync(tempRoot, { force: true, recursive: true });
+  });
+
+  test('continues producing and consuming after the primary cluster stops', 
async () => {
+    expect(client).toBeDefined();
+
+    await retryUntil(() => sendAndReceive(
+      client,
+      `persistent://public/default/failover-primary-${testRunId}`,
+      'message-before-failover',
+    ));
+
+    await stopContainer(primaryCluster.containerId);
+
+    await retryUntil(() => sendAndReceive(
+      client,
+      `persistent://public/default/failover-secondary-${testRunId}`,
+      'message-after-failover',
+    ));
+  });
+});

Reply via email to