This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5af90157801 [federation] Introduced MultiClusterHelixBrokerStarter to 
start broker in federated mode (#17421)
5af90157801 is described below

commit 5af901578014b2e2f928dce70d46f0352bbe2717
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Sun Dec 28 21:05:18 2025 -0800

    [federation] Introduced MultiClusterHelixBrokerStarter to start broker in 
federated mode (#17421)
    
    * [federation] Introduced MultiClusterHelixBrokerStarter to initialize 
cross-cluster federated broker
    
    * Fixed starter logic
    
    * Added TODO
    
    * Added multi cluster broker startup failure metrics
    
    ---------
    
    Co-authored-by: shauryachats <[email protected]>
---
 .../broker/broker/helix/BaseBrokerStarter.java     | 115 +++--
 .../helix/MultiClusterHelixBrokerStarter.java      | 369 ++++++++++++++++
 .../apache/pinot/common/metrics/BrokerMeter.java   |   3 +
 .../core/routing/MultiClusterRoutingContext.java   |  55 +++
 .../multicluster/MultiClusterIntegrationTest.java  | 490 +++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   3 +
 6 files changed, 990 insertions(+), 45 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index d4c2b2c8754..6bb740339f7 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -81,6 +81,7 @@ import org.apache.pinot.common.utils.tls.TlsUtils;
 import org.apache.pinot.common.version.PinotVersion;
 import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.core.transport.ListenerConfig;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.core.util.ListenerConfigUtil;
@@ -150,6 +151,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
   protected HelixAdmin _helixAdmin;
   protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected HelixDataAccessor _helixDataAccessor;
+  protected TableCache _tableCache;
   protected PinotMetricsRegistry _metricsRegistry;
   protected BrokerMetrics _brokerMetrics;
   protected BrokerRoutingManager _routingManager;
@@ -310,12 +312,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     Utils.logVersions();
 
     LOGGER.info("Connecting spectator Helix manager");
-    _spectatorHelixManager =
-        HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, 
InstanceType.SPECTATOR, _zkServers);
-    _spectatorHelixManager.connect();
-    _helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
-    _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
-    _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
+    initSpectatorHelixManager();
 
     LOGGER.info("Setting up broker request handler");
     // Set up metric registry and broker metrics
@@ -336,8 +333,8 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     // Set up request handling classes
     _serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf, 
_brokerMetrics);
     _serverRoutingStatsManager.init();
-    _routingManager = new BrokerRoutingManager(_brokerMetrics, 
_serverRoutingStatsManager, _brokerConf);
-    _routingManager.init(_spectatorHelixManager);
+    initRoutingManager();
+
     final PinotConfiguration factoryConf = 
_brokerConf.subset(Broker.ACCESS_CONTROL_CONFIG_PREFIX);
     // Adding cluster name to the config so that it can be used by the 
AccessControlFactory
     factoryConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, 
_brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME));
@@ -353,7 +350,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     FunctionRegistry.init();
     boolean caseInsensitive =
         _brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, 
Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
-    TableCache tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
+    _tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
 
     LOGGER.info("Initializing Broker Event Listener Factory");
     
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
@@ -382,6 +379,9 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
         org.apache.pinot.spi.config.instance.InstanceType.BROKER);
     _threadAccountant.startWatcherTask();
 
+    // TODO: Hook multiClusterRoutingContext into request handlers 
subsequently.
+    MultiClusterRoutingContext multiClusterRoutingContext = 
getMultiClusterRoutingContext();
+
     // Create Broker request handler.
     String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, 
getDefaultBrokerId());
     BrokerRequestIdGenerator requestIdGenerator = new 
BrokerRequestIdGenerator();
@@ -391,7 +391,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     if 
(brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE))
 {
       singleStageBrokerRequestHandler =
           new GrpcBrokerRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
-              _accessControlFactory, _queryQuotaManager, tableCache, 
_failureDetector, _threadAccountant);
+              _accessControlFactory, _queryQuotaManager, _tableCache, 
_failureDetector, _threadAccountant);
     } else {
       // Default request handler type, i.e. netty
       NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, 
Broker.BROKER_NETTY_PREFIX);
@@ -402,7 +402,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       }
       singleStageBrokerRequestHandler =
           new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
-              _accessControlFactory, _queryQuotaManager, tableCache, 
nettyDefaults, tlsDefaults,
+              _accessControlFactory, _queryQuotaManager, _tableCache, 
nettyDefaults, tlsDefaults,
               _serverRoutingStatsManager, _failureDetector, _threadAccountant);
     }
     MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
@@ -416,7 +416,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       queryDispatcher = createQueryDispatcher(_brokerConf);
       multiStageBrokerRequestHandler =
           new MultiStageBrokerRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
-              _accessControlFactory, _queryQuotaManager, tableCache, 
_multiStageQueryThrottler, _failureDetector,
+              _accessControlFactory, _queryQuotaManager, _tableCache, 
_multiStageQueryThrottler, _failureDetector,
               _threadAccountant);
     }
     TimeSeriesRequestHandler timeSeriesRequestHandler = null;
@@ -424,7 +424,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       Preconditions.checkNotNull(queryDispatcher, "Multistage Engine should be 
enabled to use time-series engine");
       timeSeriesRequestHandler =
           new TimeSeriesRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
-              _accessControlFactory, _queryQuotaManager, tableCache, 
queryDispatcher, _threadAccountant);
+              _accessControlFactory, _queryQuotaManager, _tableCache, 
queryDispatcher, _threadAccountant);
     }
 
     LOGGER.info("Initializing PinotFSFactory");
@@ -471,6 +471,45 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     }
 
     LOGGER.info("Initializing cluster change mediator");
+    initClusterChangeMediator();
+
+    LOGGER.info("Connecting participant Helix manager");
+    _participantHelixManager =
+      HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, 
InstanceType.PARTICIPANT, _zkServers);
+    // Register state model factory
+    _participantHelixManager.getStateMachineEngine()
+      
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(),
+        new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, 
_helixDataAccessor, _routingManager,
+          _queryQuotaManager));
+    // Register user-define message handler factory
+    _participantHelixManager.getMessagingService()
+      
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+        new BrokerUserDefinedMessageHandlerFactory(_routingManager, 
_queryQuotaManager));
+    _participantHelixManager.connect();
+    updateInstanceConfigAndBrokerResourceIfNeeded();
+    _brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME,
+      () -> _participantHelixManager.isConnected() ? 1L : 0L);
+    _participantHelixManager.addPreConnectCallback(
+      () -> 
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 
1L));
+
+    // Initializing Groovy execution security
+    GroovyFunctionEvaluator.configureGroovySecurity(
+      
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_QUERY_STATIC_ANALYZER_CONFIG,
+        
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_ALL_STATIC_ANALYZER_CONFIG)));
+
+    // Register the service status handler
+    registerServiceStatusHandler();
+
+    _isStarting = false;
+    _brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
+      System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
+
+    
_defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
+
+    LOGGER.info("Finish starting Pinot broker");
+  }
+
+  protected void initClusterChangeMediator() throws Exception {
     for (ClusterChangeHandler clusterConfigChangeHandler : 
_clusterConfigChangeHandlers) {
       clusterConfigChangeHandler.init(_spectatorHelixManager);
     }
@@ -515,41 +554,27 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     if (!_liveInstanceChangeHandlers.isEmpty()) {
       
_spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
     }
+  }
 
-    LOGGER.info("Connecting participant Helix manager");
-    _participantHelixManager =
-        HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, 
InstanceType.PARTICIPANT, _zkServers);
-    // Register state model factory
-    _participantHelixManager.getStateMachineEngine()
-        
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(),
-            new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, 
_helixDataAccessor, _routingManager,
-                _queryQuotaManager));
-    // Register user-define message handler factory
-    _participantHelixManager.getMessagingService()
-        
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-            new BrokerUserDefinedMessageHandlerFactory(_routingManager, 
_queryQuotaManager));
-    _participantHelixManager.connect();
-    updateInstanceConfigAndBrokerResourceIfNeeded();
-    _brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME,
-        () -> _participantHelixManager.isConnected() ? 1L : 0L);
-    _participantHelixManager.addPreConnectCallback(
-        () -> 
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 
1L));
-
-    // Initializing Groovy execution security
-    GroovyFunctionEvaluator.configureGroovySecurity(
-        
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_QUERY_STATIC_ANALYZER_CONFIG,
-            
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_ALL_STATIC_ANALYZER_CONFIG)));
-
-    // Register the service status handler
-    registerServiceStatusHandler();
-
-    _isStarting = false;
-    _brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
-        System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
+  protected void initRoutingManager() throws Exception {
+    _routingManager = new BrokerRoutingManager(_brokerMetrics, 
_serverRoutingStatsManager, _brokerConf);
+    _routingManager.init(_spectatorHelixManager);
+  }
 
-    
_defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
+  protected void initSpectatorHelixManager() throws Exception {
+    _spectatorHelixManager =
+      HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, 
InstanceType.SPECTATOR, _zkServers);
+    _spectatorHelixManager.connect();
+    _helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
+    _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
+    _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
+  }
 
-    LOGGER.info("Finish starting Pinot broker");
+  /**
+   * Can be overridden to inject a custom MultiClusterRoutingContext from 
MultiClusterBrokerStarter.
+   */
+  protected MultiClusterRoutingContext getMultiClusterRoutingContext() {
+    return null;
   }
 
   /**
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/MultiClusterHelixBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/MultiClusterHelixBrokerStarter.java
new file mode 100644
index 00000000000..8c40f3a0499
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/MultiClusterHelixBrokerStarter.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.broker.routing.manager.MultiClusterRoutingManager;
+import 
org.apache.pinot.broker.routing.manager.RemoteClusterBrokerRoutingManager;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.config.provider.ZkTableCache;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Multi-cluster broker starter that extends the base Helix broker 
functionality
+ * to support federation across multiple Pinot clusters.
+ *
+ * This class handles:
+ * - Connection to remote clusters via separate ZooKeeper instances
+ * - Federated routing across primary and remote clusters
+ * - Cross-cluster query federation
+ * - Cluster change monitoring for remote clusters
+ */
+@SuppressWarnings("unused")
+public class MultiClusterHelixBrokerStarter extends BaseBrokerStarter {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiClusterHelixBrokerStarter.class);
+
+  // Remote cluster configuration
+  protected List<String> _remoteClusterNames;
+  protected Map<String, String> _remoteZkServers;
+  protected String _remoteInstanceId;
+
+  // Remote cluster Helix managers and routing
+  protected Map<String, HelixManager> _remoteSpectatorHelixManager;
+  protected Map<String, RemoteClusterBrokerRoutingManager> 
_remoteRoutingManagers;
+  protected MultiClusterRoutingManager _multiClusterRoutingManager;
+  protected MultiClusterRoutingContext _multiClusterRoutingContext;
+  protected Map<String, ClusterChangeMediator> _remoteClusterChangeMediator;
+
+  public MultiClusterHelixBrokerStarter() {
+  }
+
+  @Override
+  public void init(PinotConfiguration brokerConf)
+      throws Exception {
+    super.init(brokerConf);
+    _remoteInstanceId = _instanceId + "_remote";
+    initRemoteClusterNamesAndZk(brokerConf);
+  }
+
+  @Override
+  public void start()
+      throws Exception {
+    LOGGER.info("[multi-cluster] Starting multi-cluster broker");
+    super.start();
+    // build routing tables for remote clusters
+    initRemoteClusterRouting();
+    LOGGER.info("[multi-cluster] Multi-cluster broker started successfully");
+  }
+
+  @Override
+  protected void initSpectatorHelixManager() throws Exception {
+    super.initSpectatorHelixManager();
+    try {
+      initRemoteClusterSpectatorHelixManagers();
+    } catch (Exception e) {
+      LOGGER.error("[multi-cluster] Failed to initialize remote cluster 
spectator Helix managers", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
 1);
+    }
+  }
+
+  @Override
+  protected void initRoutingManager() throws Exception {
+    super.initRoutingManager();
+    try {
+      initRemoteClusterFederatedRoutingManager();
+    } catch (Exception e) {
+      LOGGER.error("[multi-cluster] Failed to initialize remote cluster 
federated routing manager", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
 1);
+    }
+  }
+
+  @Override
+  protected void initClusterChangeMediator() throws Exception {
+    super.initClusterChangeMediator();
+    try {
+      initRemoteClusterChangeMediator();
+    } catch (Exception e) {
+      LOGGER.error("[multi-cluster] Failed to initialize remote cluster change 
mediator", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
 1);
+    }
+  }
+
+  private void initRemoteClusterSpectatorHelixManagers() throws Exception {
+    if (_remoteZkServers == null || _remoteZkServers.isEmpty()) {
+      LOGGER.info("[multi-cluster] No remote ZK servers configured - skipping 
spectator Helix manager init");
+      return;
+    }
+
+    LOGGER.info("[multi-cluster] Initializing spectator Helix managers for {} 
remote clusters",
+      _remoteZkServers.size());
+    _remoteSpectatorHelixManager = new HashMap<>();
+
+    for (Map.Entry<String, String> entry : _remoteZkServers.entrySet()) {
+      String clusterName = entry.getKey();
+      String zkServers = entry.getValue();
+      try {
+        HelixManager helixManager = HelixManagerFactory.getZKHelixManager(
+          clusterName, _instanceId, InstanceType.SPECTATOR, zkServers);
+        helixManager.connect();
+        _remoteSpectatorHelixManager.put(clusterName, helixManager);
+        LOGGER.info("[multi-cluster] Connected to remote cluster '{}' at ZK: 
{}", clusterName, zkServers);
+      } catch (Exception e) {
+        LOGGER.error("[multi-cluster] Failed to connect to cluster '{}' at ZK: 
{}", clusterName, zkServers, e);
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
 1);
+      }
+    }
+
+    if (_remoteSpectatorHelixManager.isEmpty()) {
+      LOGGER.warn("[multi-cluster] Failed to connect to any remote clusters - "
+        + "multi-cluster will not be functional");
+    } else {
+      LOGGER.info("[multi-cluster] Connected to {}/{} remote clusters: {}", 
_remoteSpectatorHelixManager.size(),
+        _remoteZkServers.size(), _remoteSpectatorHelixManager.keySet());
+    }
+  }
+
+  protected void stopRemoteClusterComponents() {
+    if (_remoteClusterChangeMediator != null) {
+      
_remoteClusterChangeMediator.values().forEach(ClusterChangeMediator::stop);
+    }
+    if (_remoteRoutingManagers != null) {
+      for (RemoteClusterBrokerRoutingManager routingManager : 
_remoteRoutingManagers.values()) {
+        routingManager.shutdown();
+      }
+    }
+    if (_remoteSpectatorHelixManager != null) {
+      _remoteSpectatorHelixManager.values().forEach(HelixManager::disconnect);
+    }
+  }
+
+  private void initRemoteClusterNamesAndZk(PinotConfiguration brokerConf) {
+    LOGGER.info("[multi-cluster] Initializing remote cluster configuration");
+    String remoteClusterNames = 
brokerConf.getProperty(Helix.CONFIG_OF_REMOTE_CLUSTER_NAMES);
+
+    if (remoteClusterNames == null || remoteClusterNames.trim().isEmpty()) {
+      LOGGER.info("[multi-cluster] No remote cluster configured - 
multi-cluster mode disabled");
+      return;
+    }
+
+    _remoteClusterNames = Arrays.asList(remoteClusterNames.replaceAll("\\s+", 
"").split(","));
+    if (_remoteClusterNames.isEmpty()) {
+      LOGGER.warn("[multi-cluster] Remote cluster names list is empty after 
parsing");
+      return;
+    }
+    LOGGER.info("[multi-cluster] Configured remote cluster names: {}", 
_remoteClusterNames);
+
+    _remoteZkServers = new HashMap<>();
+    for (String name : _remoteClusterNames) {
+      String zkConfig = 
String.format(Helix.CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS, name);
+      String zkServers = brokerConf.getProperty(zkConfig);
+
+      if (zkServers == null || zkServers.trim().isEmpty()) {
+        LOGGER.error("[multi-cluster] Missing ZooKeeper configuration for 
cluster '{}', expected: {}", name, zkConfig);
+        continue;
+      }
+      _remoteZkServers.put(name, zkServers.replaceAll("\\s+", ""));
+    }
+
+    if (_remoteZkServers.isEmpty()) {
+      LOGGER.error("[multi-cluster] No valid ZooKeeper configurations found - 
multi-cluster will not be functional");
+      _remoteClusterNames = null;
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
 1);
+    } else {
+      LOGGER.info("[multi-cluster] Initialized {} remote cluster(s): {}", 
_remoteZkServers.size(),
+          _remoteZkServers.keySet());
+    }
+  }
+
+  private void initRemoteClusterFederatedRoutingManager() {
+    if (_remoteSpectatorHelixManager == null || 
_remoteSpectatorHelixManager.isEmpty()) {
+      LOGGER.info("[multi-cluster] No remote spectator Helix managers - 
skipping federated routing manager init");
+      return;
+    }
+
+    LOGGER.info("[multi-cluster] Initializing federated routing manager for {} 
clusters",
+        _remoteSpectatorHelixManager.size());
+    _remoteRoutingManagers = new HashMap<>();
+
+    for (Map.Entry<String, HelixManager> entry : 
_remoteSpectatorHelixManager.entrySet()) {
+      String clusterName = entry.getKey();
+      try {
+        RemoteClusterBrokerRoutingManager routingManager =
+            new RemoteClusterBrokerRoutingManager(clusterName, _brokerMetrics, 
_serverRoutingStatsManager, _brokerConf);
+        routingManager.init(entry.getValue());
+        _remoteRoutingManagers.put(clusterName, routingManager);
+      } catch (Exception e) {
+        LOGGER.error("[multi-cluster] Failed to initialize routing manager for 
cluster '{}'", clusterName, e);
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
 1);
+      }
+    }
+
+    if (_remoteRoutingManagers.isEmpty()) {
+      LOGGER.error("[multi-cluster] Failed to initialize any routing managers 
- federated routing unavailable");
+    } else {
+      _multiClusterRoutingManager = new 
MultiClusterRoutingManager(_routingManager,
+          new ArrayList<>(_remoteRoutingManagers.values()));
+      LOGGER.info("[multi-cluster] Created federated routing manager with 
{}/{} remote clusters",
+          _remoteRoutingManagers.size(), _remoteSpectatorHelixManager.size());
+    }
+  }
+
+  private void initRemoteClusterFederationProvider(TableCache 
primaryTableCache, boolean caseInsensitive) {
+    if (_multiClusterRoutingManager == null) {
+      LOGGER.info("[multi-cluster] Federation is not enabled - 
FederationProvider will be null");
+      _multiClusterRoutingContext = null;
+      return;
+    }
+
+    Map<String, TableCache> tableCacheMap = new HashMap<>();
+    tableCacheMap.put(_clusterName, primaryTableCache);
+
+    if (_remoteSpectatorHelixManager == null || 
_remoteSpectatorHelixManager.isEmpty()) {
+      LOGGER.info("[multi-cluster] No remote spectator Helix managers - "
+          + "creating provider with primary cluster only");
+      _multiClusterRoutingContext = null;
+      return;
+    }
+
+    LOGGER.info("[multi-cluster] Initializing federation provider with {} 
remote clusters",
+        _remoteSpectatorHelixManager.size());
+
+    for (Map.Entry<String, HelixManager> entry : 
_remoteSpectatorHelixManager.entrySet()) {
+      String clusterName = entry.getKey();
+      try {
+        TableCache remoteCache = new 
ZkTableCache(entry.getValue().getHelixPropertyStore(), caseInsensitive);
+        tableCacheMap.put(clusterName, remoteCache);
+      } catch (Exception e) {
+        LOGGER.error("[multi-cluster] Failed to create table cache for cluster 
'{}'", clusterName, e);
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
 1);
+      }
+    }
+
+    _multiClusterRoutingContext = new 
MultiClusterRoutingContext(tableCacheMap, _routingManager,
+        _multiClusterRoutingManager);
+    LOGGER.info("[multi-cluster] Created federation provider with {}/{} 
clusters (1 primary + {} remote)",
+        tableCacheMap.size(), _remoteSpectatorHelixManager.size() + 1, 
tableCacheMap.size() - 1);
+  }
+
+  private void initRemoteClusterRouting() {
+    if (_remoteRoutingManagers == null || _remoteRoutingManagers.isEmpty()) {
+      LOGGER.info("[multi-cluster] No remote routing managers - skipping 
routing table initialization");
+      return;
+    }
+
+    LOGGER.info("[multi-cluster] Initializing routing tables for {} remote 
clusters",
+        _remoteRoutingManagers.size());
+    int initialized = 0;
+
+    for (Map.Entry<String, RemoteClusterBrokerRoutingManager> entry : 
_remoteRoutingManagers.entrySet()) {
+      try {
+        entry.getValue().determineRoutingChangeForTables();
+        initialized++;
+      } catch (Exception e) {
+        LOGGER.error("[multi-cluster] Failed to initialize routing tables for 
cluster '{}'", entry.getKey(), e);
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
 1);
+      }
+    }
+
+    LOGGER.info("[multi-cluster] Initialized routing tables for {}/{} remote 
clusters",
+        initialized, _remoteRoutingManagers.size());
+  }
+
+  private void initRemoteClusterChangeMediator() throws Exception {
+    if (_remoteSpectatorHelixManager == null || 
_remoteSpectatorHelixManager.isEmpty()) {
+      LOGGER.info("[multi-cluster] No remote spectator Helix managers - 
skipping cluster change mediator init");
+      return;
+    }
+
+    if (_remoteRoutingManagers == null || _remoteRoutingManagers.isEmpty()) {
+      LOGGER.error("[multi-cluster] Remote routing managers not initialized - "
+          + "cannot create cluster change mediators");
+      return;
+    }
+
+    LOGGER.info("[multi-cluster] Initializing cluster change mediators for {} 
remote clusters",
+        _remoteSpectatorHelixManager.size());
+    _remoteClusterChangeMediator = new HashMap<>();
+
+    for (String clusterName : _remoteSpectatorHelixManager.keySet()) {
+      RemoteClusterBrokerRoutingManager routingManager = 
_remoteRoutingManagers.get(clusterName);
+      if (routingManager == null) {
+        LOGGER.error("[multi-cluster] Routing manager not found for cluster 
'{}' - skipping mediator setup",
+            clusterName);
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
 1);
+        continue;
+      }
+
+      try {
+        Map<ChangeType, List<ClusterChangeHandler>> handlers = new HashMap<>();
+        handlers.put(ChangeType.CLUSTER_CONFIG, new ArrayList<>());
+        handlers.put(ChangeType.IDEAL_STATE, 
Collections.singletonList(routingManager));
+        handlers.put(ChangeType.EXTERNAL_VIEW, 
Collections.singletonList(routingManager));
+        handlers.put(ChangeType.INSTANCE_CONFIG, 
Collections.singletonList(routingManager));
+        handlers.put(ChangeType.RESOURCE_CONFIG, 
Collections.singletonList(routingManager));
+
+        ClusterChangeMediator mediator = new ClusterChangeMediator(handlers, 
_brokerMetrics);
+        mediator.start();
+        _remoteClusterChangeMediator.put(clusterName, mediator);
+
+        HelixManager helixManager = 
_remoteSpectatorHelixManager.get(clusterName);
+        helixManager.addIdealStateChangeListener(mediator);
+        helixManager.addExternalViewChangeListener(mediator);
+        helixManager.addInstanceConfigChangeListener(mediator);
+        helixManager.addClusterfigChangeListener(mediator);
+      } catch (Exception e) {
+        LOGGER.error("[multi-cluster] Failed to initialize cluster change 
mediator for cluster '{}'", clusterName, e);
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
 1);
+      }
+    }
+
+    LOGGER.info("[multi-cluster] Initialized {}/{} cluster change mediators", 
_remoteClusterChangeMediator.size(),
+        _remoteSpectatorHelixManager.size());
+  }
+
+  @Override
+  protected MultiClusterRoutingContext getMultiClusterRoutingContext() {
+    initRemoteClusterFederationProvider(_tableCache,
+      _brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, 
Helix.DEFAULT_ENABLE_CASE_INSENSITIVE));
+    return _multiClusterRoutingContext;
+  }
+
+  @Override
+  public void stop() {
+    LOGGER.info("[multi-cluster] Shutting down multi-cluster broker");
+    super.stop();
+    stopRemoteClusterComponents();
+    LOGGER.info("[multi-cluster] Multi-cluster broker shut down successfully");
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 01676557a20..c0e730a1ca6 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -178,6 +178,9 @@ public class BrokerMeter implements AbstractMetrics.Meter {
 
   public static final BrokerMeter HELIX_ZOOKEEPER_RECONNECTS = 
create("HELIX_ZOOKEEPER_RECONNECTS", "reconnects", true);
 
+  public static final BrokerMeter MULTI_CLUSTER_BROKER_STARTUP_FAILURE = 
create(
+      "MULTI_CLUSTER_BROKER_STARTUP_FAILURE", "failureCount", true);
+
   public static final BrokerMeter REQUEST_DROPPED_DUE_TO_ACCESS_ERROR = create(
       "REQUEST_DROPPED_DUE_TO_ACCESS_ERROR", "requestsDropped", false);
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
new file mode 100644
index 00000000000..1f139da56ac
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.routing;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.config.provider.TableCache;
+
+
+/**
+ * A generic class which provides the dependencies for federation routing.
+ * This class is responsible for managing routing managers and providing the 
appropriate
+ * routing manager based on query options (e.g., whether federation is 
enabled).
+ */
+public class MultiClusterRoutingContext {
+  // Maps clusterName to TableCache. Includes the local and all remote 
clusters.
+  private final Map<String, TableCache> _tableCacheMap;
+
+  // Local
+  private final RoutingManager _localRoutingManager;
+
+  // Federated routing manager (for federated queries, may be null if 
federation is not configured)
+  @Nullable
+  private final RoutingManager _multiClusterRoutingManager;
+
+  /**
+   * Constructor for FederationProvider with routing managers.
+   *
+   * @param tableCacheMap Map of cluster name to TableCache
+   * @param localRoutingManager Local routing manager for non-federated queries
+   * @param multiClusterRoutingManager Multi cluster routing manager for 
cross-cluster queries (can be null)
+   */
+  public MultiClusterRoutingContext(Map<String, TableCache> tableCacheMap, 
RoutingManager localRoutingManager,
+      @Nullable RoutingManager multiClusterRoutingManager) {
+    _tableCacheMap = tableCacheMap;
+    _localRoutingManager = localRoutingManager;
+    _multiClusterRoutingManager = multiClusterRoutingManager;
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
new file mode 100644
index 00000000000..e9ac332e2e9
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.multicluster;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.http.HttpStatus;
+import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
+import org.apache.pinot.broker.broker.helix.MultiClusterHelixBrokerStarter;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.controller.BaseControllerStarter;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.ClusterTest;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Broker;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.NetUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+public class MultiClusterIntegrationTest extends ClusterTest {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiClusterIntegrationTest.class);
+
+  protected static final String SCHEMA_FILE = 
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
+  protected static final String TIME_COLUMN = "DaysSinceEpoch";
+  // TODO: N clusters instead of 2 in future iterations.
+  protected static final String CLUSTER_1_NAME = "DualIsolatedCluster1";
+  protected static final String CLUSTER_2_NAME = "DualIsolatedCluster2";
+  protected static final ClusterConfig CLUSTER_1_CONFIG = new 
ClusterConfig(CLUSTER_1_NAME, 30000);
+  protected static final ClusterConfig CLUSTER_2_CONFIG = new 
ClusterConfig(CLUSTER_2_NAME, 40000);
+
+  protected ClusterComponents _cluster1;
+  protected ClusterComponents _cluster2;
+  protected List<File> _cluster1AvroFiles;
+  protected List<File> _cluster2AvroFiles;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    LOGGER.info("Setting up MultiClusterIntegrationTest");
+
+    // Initialize cluster components
+    _cluster1 = new ClusterComponents();
+    _cluster2 = new ClusterComponents();
+
+    // Setup directories
+    setupDirectories();
+
+    // Start ZooKeeper instances for both clusters
+    startZookeeper(_cluster1);
+    startZookeeper(_cluster2);
+
+    // Start controllers for both clusters
+    startControllerInit(_cluster1, CLUSTER_1_CONFIG);
+    startControllerInit(_cluster2, CLUSTER_2_CONFIG);
+
+    // Start brokers and servers for both clusters
+    // Note: Each cluster's broker is configured to know about the other 
cluster as remote
+    startCluster(_cluster1, _cluster2, CLUSTER_1_CONFIG);
+    startCluster(_cluster2, _cluster1, CLUSTER_2_CONFIG);
+
+    LOGGER.info("MultiClusterIntegrationTest setup complete");
+  }
+
+  // TODO: Add more tests for cross-cluster queries in subsequent iterations.
+  @Test
+  public void testMultiClusterBrokerStartsAndIsQueryable() throws Exception {
+    LOGGER.info("Testing that multi-cluster broker starts successfully and is 
queryable");
+
+    // Verify both clusters' brokers are running 
(MultiClusterHelixBrokerStarter)
+    assertNotNull(_cluster1._brokerStarter, "Cluster 1 broker should be 
started");
+    assertNotNull(_cluster2._brokerStarter, "Cluster 2 broker should be 
started");
+    assertTrue(_cluster1._brokerStarter instanceof 
MultiClusterHelixBrokerStarter,
+        "Cluster 1 broker should be MultiClusterHelixBrokerStarter");
+    assertTrue(_cluster2._brokerStarter instanceof 
MultiClusterHelixBrokerStarter,
+        "Cluster 2 broker should be MultiClusterHelixBrokerStarter");
+
+    // Setup a test table on both clusters
+    String testTableName = "multicluster_test_table";
+    createSchemaAndTableOnBothClusters(testTableName);
+
+    // Create and load test data into both clusters
+    _cluster1AvroFiles = createAvroData(100, 1);
+    _cluster2AvroFiles = createAvroData(100, 2);
+
+    loadDataIntoCluster(_cluster1AvroFiles, testTableName, _cluster1);
+    loadDataIntoCluster(_cluster2AvroFiles, testTableName, _cluster2);
+
+    // Verify cluster 1 is queryable
+    String query = "SELECT COUNT(*) FROM " + testTableName;
+    String result1 = executeQuery(query, _cluster1);
+    assertNotNull(result1, "Query result from cluster 1 should not be null");
+    long count1 = parseCountResult(result1);
+    assertEquals(count1, 100, "Cluster 1 should have 100 records");
+
+    // Verify cluster 2 is queryable
+    String result2 = executeQuery(query, _cluster2);
+    assertNotNull(result2, "Query result from cluster 2 should not be null");
+    long count2 = parseCountResult(result2);
+    assertEquals(count2, 100, "Cluster 2 should have 100 records");
+
+    LOGGER.info("Multi-cluster broker test passed: both clusters started and 
queryable");
+  }
+
+  @Override
+  protected BaseBrokerStarter createBrokerStarter() {
+    return new MultiClusterHelixBrokerStarter();
+  }
+
+  protected static class ClusterConfig {
+    final String _name;
+    final int _basePort;
+
+    ClusterConfig(String name, int basePort) {
+      _name = name;
+      _basePort = basePort;
+    }
+  }
+
+  protected static class ClusterComponents {
+    ZkStarter.ZookeeperInstance _zkInstance;
+    BaseControllerStarter _controllerStarter;
+    BaseBrokerStarter _brokerStarter;
+    BaseServerStarter _serverStarter;
+    int _controllerPort;
+    int _brokerPort;
+    int _serverPort;
+    String _zkUrl;
+    String _controllerBaseApiUrl;
+    File _tempDir;
+    File _segmentDir;
+    File _tarDir;
+  }
+
+  protected void setupDirectories() throws Exception {
+    setupClusterDirectories(_cluster1, "cluster1");
+    setupClusterDirectories(_cluster2, "cluster2");
+  }
+
+  private void setupClusterDirectories(ClusterComponents cluster, String 
clusterPrefix) throws Exception {
+    cluster._tempDir = new File(FileUtils.getTempDirectory(), clusterPrefix + 
"_" + getClass().getSimpleName());
+    cluster._segmentDir = new File(cluster._tempDir, "segmentDir");
+    cluster._tarDir = new File(cluster._tempDir, "tarDir");
+    TestUtils.ensureDirectoriesExistAndEmpty(cluster._tempDir, 
cluster._segmentDir, cluster._tarDir);
+  }
+
+  protected void startZookeeper(ClusterComponents cluster) throws Exception {
+    cluster._zkInstance = ZkStarter.startLocalZkServer();
+    cluster._zkUrl = cluster._zkInstance.getZkUrl();
+  }
+
+  protected void startControllerInit(ClusterComponents cluster, ClusterConfig 
config) throws Exception {
+    cluster._controllerPort = findAvailablePort(config._basePort);
+    startController(cluster, config);
+  }
+
+  protected void startCluster(ClusterComponents cluster, ClusterComponents 
remoteCluster,
+      ClusterConfig config) throws Exception {
+    cluster._brokerPort = findAvailablePort(cluster._controllerPort + 1000);
+    startBroker(cluster, remoteCluster, config);
+    cluster._serverPort = findAvailablePort(cluster._brokerPort + 1000);
+    startServerWithMSE(cluster, config);
+  }
+
+  protected void startController(ClusterComponents cluster, ClusterConfig 
config) throws Exception {
+    Map<String, Object> controllerConfig = new HashMap<>();
+    controllerConfig.put(ControllerConf.ZK_STR, cluster._zkUrl);
+    controllerConfig.put(ControllerConf.HELIX_CLUSTER_NAME, config._name);
+    controllerConfig.put(ControllerConf.CONTROLLER_HOST, 
ControllerTest.LOCAL_HOST);
+    controllerConfig.put(ControllerConf.CONTROLLER_PORT, 
cluster._controllerPort);
+    controllerConfig.put(ControllerConf.DATA_DIR, 
cluster._tempDir.getAbsolutePath());
+    controllerConfig.put(ControllerConf.LOCAL_TEMP_DIR, 
cluster._tempDir.getAbsolutePath());
+    controllerConfig.put(ControllerConf.DISABLE_GROOVY, false);
+    controllerConfig.put(ControllerConf.CONSOLE_SWAGGER_ENABLE, false);
+    controllerConfig.put(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
+
+    cluster._controllerStarter = createControllerStarter();
+    cluster._controllerStarter.init(new PinotConfiguration(controllerConfig));
+    cluster._controllerStarter.start();
+    cluster._controllerBaseApiUrl = "http://localhost:"; + 
cluster._controllerPort;
+  }
+
+  protected void startBroker(ClusterComponents cluster, ClusterComponents 
remoteCluster,
+      ClusterConfig config) throws Exception {
+    PinotConfiguration brokerConfig = new PinotConfiguration();
+    brokerConfig.setProperty(Helix.CONFIG_OF_ZOOKEEPER_SERVER, cluster._zkUrl);
+    String remoteClusterName = CLUSTER_1_NAME.equalsIgnoreCase(config._name) ? 
CLUSTER_2_NAME : CLUSTER_1_NAME;
+    brokerConfig.setProperty(Helix.CONFIG_OF_REMOTE_CLUSTER_NAMES, 
remoteClusterName);
+    
brokerConfig.setProperty(String.format(Helix.CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS,
 remoteClusterName),
+        remoteCluster._zkUrl);
+    brokerConfig.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, config._name);
+    brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_HOSTNAME, 
ControllerTest.LOCAL_HOST);
+    brokerConfig.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, 
cluster._brokerPort);
+    brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
+    brokerConfig.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
+    brokerConfig.setProperty(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
+    cluster._brokerStarter = createBrokerStarter();
+    cluster._brokerStarter.init(brokerConfig);
+    cluster._brokerStarter.start();
+  }
+
+  protected void startServerWithMSE(ClusterComponents cluster, ClusterConfig 
config) throws Exception {
+    PinotConfiguration serverConfig = new PinotConfiguration();
+    serverConfig.setProperty(Helix.CONFIG_OF_ZOOKEEPER_SERVER, cluster._zkUrl);
+    serverConfig.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, config._name);
+    serverConfig.setProperty(Helix.KEY_OF_SERVER_NETTY_HOST, 
ControllerTest.LOCAL_HOST);
+    serverConfig.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, 
cluster._tempDir + "/dataDir");
+    serverConfig.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, 
cluster._tempDir + "/segmentTar");
+    serverConfig.setProperty(Server.CONFIG_OF_SEGMENT_FORMAT_VERSION, "v3");
+    serverConfig.setProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, 
false);
+    serverConfig.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, 
findAvailablePort(cluster._serverPort));
+    serverConfig.setProperty(Helix.KEY_OF_SERVER_NETTY_PORT, 
findAvailablePort(cluster._serverPort + 1));
+    serverConfig.setProperty(Server.CONFIG_OF_GRPC_PORT, 
findAvailablePort(cluster._serverPort + 2));
+    
serverConfig.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, 
true);
+    serverConfig.setProperty(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
+    serverConfig.setProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
+
+    cluster._serverStarter = createServerStarter();
+    cluster._serverStarter.init(serverConfig);
+    cluster._serverStarter.start();
+  }
+
+  protected int findAvailablePort(int basePort) {
+    try {
+      return NetUtils.findOpenPort(basePort);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to find available port starting from 
" + basePort, e);
+    }
+  }
+
+  protected List<File> createAvroData(int dataSize, int clusterId) throws 
Exception {
+    return createAvroDataMultipleSegments(dataSize, clusterId, 1);
+  }
+
+  protected List<File> createAvroDataMultipleSegments(int totalDataSize, int 
clusterId, int numSegments)
+      throws Exception {
+    Schema schema = createSchema(SCHEMA_FILE);
+    org.apache.avro.Schema avroSchema = createAvroSchema(schema);
+    File tempDir = (clusterId == 1) ? _cluster1._tempDir : _cluster2._tempDir;
+    List<File> avroFiles = new ArrayList<>();
+
+    for (int segment = 0; segment < numSegments; segment++) {
+      File avroFile = new File(tempDir, "cluster" + clusterId + 
"_data_segment" + segment + ".avro");
+      try (DataFileWriter<GenericData.Record> writer = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+        writer.create(avroSchema, avroFile);
+        int start = segment * (totalDataSize / numSegments);
+        int end = (segment == numSegments - 1) ? totalDataSize : (segment + 1) 
* (totalDataSize / numSegments);
+        for (int i = start; i < end; i++) {
+          GenericData.Record record = new GenericData.Record(avroSchema);
+          for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+            record.put(fieldSpec.getName(), 
generateFieldValue(fieldSpec.getName(), i, clusterId,
+                fieldSpec.getDataType()));
+          }
+          writer.append(record);
+        }
+      }
+      avroFiles.add(avroFile);
+    }
+    return avroFiles;
+  }
+
+  private org.apache.avro.Schema createAvroSchema(Schema schema) {
+    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    List<org.apache.avro.Schema.Field> fields = new ArrayList<>();
+
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      org.apache.avro.Schema.Type avroType = 
getAvroType(fieldSpec.getDataType());
+      fields.add(new org.apache.avro.Schema.Field(fieldSpec.getName(),
+          org.apache.avro.Schema.create(avroType), null, null));
+    }
+    avroSchema.setFields(fields);
+    return avroSchema;
+  }
+
+  private org.apache.avro.Schema.Type getAvroType(FieldSpec.DataType type) {
+    switch (type) {
+      case INT: return org.apache.avro.Schema.Type.INT;
+      case LONG: return org.apache.avro.Schema.Type.LONG;
+      case FLOAT: return org.apache.avro.Schema.Type.FLOAT;
+      case DOUBLE: return org.apache.avro.Schema.Type.DOUBLE;
+      case BOOLEAN: return org.apache.avro.Schema.Type.BOOLEAN;
+      default: return org.apache.avro.Schema.Type.STRING;
+    }
+  }
+
+  private Object generateFieldValue(String fieldName, int index, int 
clusterId, FieldSpec.DataType dataType) {
+    int baseValue = index + (clusterId * 10000);
+    switch (dataType) {
+      case INT: return index + 10000;
+      case LONG: return (long) baseValue;
+      case FLOAT: return (float) (baseValue + 0.1);
+      case DOUBLE: return (double) (baseValue + 0.1);
+      case BOOLEAN: return (baseValue % 2) == 0;
+      default: return "cluster_" + fieldName + "_" + index;
+    }
+  }
+
+  protected void loadDataIntoCluster(List<File> avroFiles, String tableName, 
ClusterComponents cluster)
+      throws Exception {
+    cleanDirectories(cluster._segmentDir, cluster._tarDir);
+    Schema schema = createSchema(SCHEMA_FILE);
+    schema.setSchemaName(tableName);
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(tableName)
+        .setTimeColumnName(TIME_COLUMN)
+        .build();
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0,
+        cluster._segmentDir, cluster._tarDir);
+    uploadSegmentsToCluster(tableName, cluster._tarDir, 
cluster._controllerBaseApiUrl);
+    Thread.sleep(2000);
+  }
+
+  private void cleanDirectories(File... dirs) {
+    for (File dir : dirs) {
+      try {
+        FileUtils.cleanDirectory(dir);
+      } catch (IOException e) {
+        // Ignore cleanup errors
+      }
+    }
+  }
+
+  protected void uploadSegmentsToCluster(String tableName, File tarDir, String 
controllerBaseApiUrl) throws Exception {
+    File[] segmentTarFiles = tarDir.listFiles();
+    assertNotNull(segmentTarFiles);
+    assertTrue(segmentTarFiles.length > 0);
+
+    URI uploadSegmentHttpURI = URI.create(controllerBaseApiUrl + "/segments");
+
+    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
+      for (File segmentTarFile : segmentTarFiles) {
+        int status = 
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
+                segmentTarFile.getName(), segmentTarFile, List.of(), 
tableName, TableType.OFFLINE)
+            .getStatusCode();
+        assertEquals(status, HttpStatus.SC_OK);
+      }
+    }
+
+    Thread.sleep(3000);
+  }
+
+
+  protected void createSchemaAndTableForCluster(String tableName, String 
controllerBaseApiUrl) throws IOException {
+    Schema schema = createSchema(SCHEMA_FILE);
+    schema.setSchemaName(tableName);
+    addSchemaToCluster(schema, controllerBaseApiUrl);
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(tableName)
+        .setTimeColumnName(TIME_COLUMN)
+        .build();
+    addTableConfigToCluster(tableConfig, controllerBaseApiUrl);
+  }
+
+  protected void createSchemaAndTableOnBothClusters(String tableName) throws 
Exception {
+    dropTableAndSchemaIfExists(tableName, _cluster1._controllerBaseApiUrl);
+    dropTableAndSchemaIfExists(tableName, _cluster2._controllerBaseApiUrl);
+    createSchemaAndTableForCluster(tableName, _cluster1._controllerBaseApiUrl);
+    createSchemaAndTableForCluster(tableName, _cluster2._controllerBaseApiUrl);
+  }
+
+  protected void dropTableAndSchemaIfExists(String tableName, String 
controllerBaseApiUrl) {
+    dropResource(controllerBaseApiUrl + "/tables/" + tableName);
+    dropResource(controllerBaseApiUrl + "/schemas/" + tableName);
+  }
+
+  private void dropResource(String url) {
+    try {
+      ControllerTest.sendDeleteRequest(url);
+    } catch (Exception e) {
+      // Ignore
+    }
+  }
+
+  protected void addSchemaToCluster(Schema schema, String 
controllerBaseApiUrl) throws IOException {
+    String url = controllerBaseApiUrl + "/schemas";
+    String schemaJson = schema.toPrettyJsonString();
+    String response = ControllerTest.sendPostRequest(url, schemaJson);
+    assertNotNull(response);
+  }
+
+  protected void addTableConfigToCluster(TableConfig tableConfig, String 
controllerBaseApiUrl) throws IOException {
+    String url = controllerBaseApiUrl + "/tables";
+    String tableConfigJson = JsonUtils.objectToPrettyString(tableConfig);
+    String response = ControllerTest.sendPostRequest(url, tableConfigJson);
+    assertNotNull(response);
+  }
+
+  protected String executeQuery(String query, ClusterComponents cluster) 
throws Exception {
+    Map<String, Object> payload = Map.of("sql", query);
+    String url = "http://localhost:"; + cluster._brokerPort + "/query/sql";
+    return ControllerTest.sendPostRequest(url, 
JsonUtils.objectToPrettyString(payload));
+  }
+
+  protected long parseCountResult(String result) {
+    try {
+      JsonNode rows = 
JsonMapper.builder().build().readTree(result).path("resultTable").path("rows");
+      if (rows.isArray() && rows.size() > 0) {
+        JsonNode firstRow = rows.get(0);
+        if (firstRow.isArray() && firstRow.size() > 0) {
+          return Long.parseLong(firstRow.get(0).asText());
+        }
+      }
+    } catch (Exception e) {
+      // Ignore
+    }
+    return 0;
+  }
+
+  protected Schema createSchema(String schemaFileName) throws IOException {
+    InputStream schemaInputStream = 
getClass().getClassLoader().getResourceAsStream(schemaFileName);
+    assertNotNull(schemaInputStream, "Schema file not found: " + 
schemaFileName);
+    return Schema.fromInputStream(schemaInputStream);
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception {
+    stopCluster(_cluster1);
+    stopCluster(_cluster2);
+  }
+
+  private void stopCluster(ClusterComponents cluster) {
+    if (cluster == null) {
+      return;
+    }
+    try {
+      if (cluster._serverStarter != null) {
+        cluster._serverStarter.stop();
+      }
+      if (cluster._brokerStarter != null) {
+        cluster._brokerStarter.stop();
+      }
+      if (cluster._controllerStarter != null) {
+        cluster._controllerStarter.stop();
+      }
+      if (cluster._zkInstance != null) {
+        ZkStarter.stopLocalZkServer(cluster._zkInstance);
+      }
+      FileUtils.deleteQuietly(cluster._tempDir);
+    } catch (Exception e) {
+      LOGGER.warn("Error stopping cluster", e);
+    }
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index b2fc58b11c2..3004cafdc73 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -251,6 +251,9 @@ public class CommonConstants {
     @Deprecated(since = "1.5.0", forRemoval = true)
     public static final String CONFIG_OF_ZOOKEEPR_SERVER = "pinot.zk.server";
 
+    public static final String CONFIG_OF_REMOTE_CLUSTER_NAMES = 
"pinot.remote.cluster.names";
+    public static final String CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS = 
"pinot.remote.zk.server.%s";
+
     public static final String CONFIG_OF_PINOT_CONTROLLER_STARTABLE_CLASS = 
"pinot.controller.startable.class";
     public static final String CONFIG_OF_PINOT_BROKER_STARTABLE_CLASS = 
"pinot.broker.startable.class";
     public static final String CONFIG_OF_PINOT_SERVER_STARTABLE_CLASS = 
"pinot.server.startable.class";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to