This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5af90157801 [federation] Introduced MultiClusterHelixBrokerStarter to
start broker in federated mode (#17421)
5af90157801 is described below
commit 5af901578014b2e2f928dce70d46f0352bbe2717
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Sun Dec 28 21:05:18 2025 -0800
[federation] Introduced MultiClusterHelixBrokerStarter to start broker in
federated mode (#17421)
* [federation] Introduced MultiClusterHelixBrokerStarter to initialize
cross-cluster federated broker
* Fixed starter logic
* Added TODO
* Added multi cluster broker startup failure metrics
---------
Co-authored-by: shauryachats <[email protected]>
---
.../broker/broker/helix/BaseBrokerStarter.java | 115 +++--
.../helix/MultiClusterHelixBrokerStarter.java | 369 ++++++++++++++++
.../apache/pinot/common/metrics/BrokerMeter.java | 3 +
.../core/routing/MultiClusterRoutingContext.java | 55 +++
.../multicluster/MultiClusterIntegrationTest.java | 490 +++++++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 3 +
6 files changed, 990 insertions(+), 45 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index d4c2b2c8754..6bb740339f7 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -81,6 +81,7 @@ import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.common.version.PinotVersion;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
import org.apache.pinot.core.transport.ListenerConfig;
import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.core.util.ListenerConfigUtil;
@@ -150,6 +151,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
protected HelixAdmin _helixAdmin;
protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
protected HelixDataAccessor _helixDataAccessor;
+ protected TableCache _tableCache;
protected PinotMetricsRegistry _metricsRegistry;
protected BrokerMetrics _brokerMetrics;
protected BrokerRoutingManager _routingManager;
@@ -310,12 +312,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
Utils.logVersions();
LOGGER.info("Connecting spectator Helix manager");
- _spectatorHelixManager =
- HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId,
InstanceType.SPECTATOR, _zkServers);
- _spectatorHelixManager.connect();
- _helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
- _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
- _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
+ initSpectatorHelixManager();
LOGGER.info("Setting up broker request handler");
// Set up metric registry and broker metrics
@@ -336,8 +333,8 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
// Set up request handling classes
_serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf,
_brokerMetrics);
_serverRoutingStatsManager.init();
- _routingManager = new BrokerRoutingManager(_brokerMetrics,
_serverRoutingStatsManager, _brokerConf);
- _routingManager.init(_spectatorHelixManager);
+ initRoutingManager();
+
final PinotConfiguration factoryConf =
_brokerConf.subset(Broker.ACCESS_CONTROL_CONFIG_PREFIX);
// Adding cluster name to the config so that it can be used by the
AccessControlFactory
factoryConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME,
_brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME));
@@ -353,7 +350,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
FunctionRegistry.init();
boolean caseInsensitive =
_brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY,
Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
- TableCache tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
+ _tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
LOGGER.info("Initializing Broker Event Listener Factory");
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
@@ -382,6 +379,9 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
org.apache.pinot.spi.config.instance.InstanceType.BROKER);
_threadAccountant.startWatcherTask();
+ // TODO: Hook multiClusterRoutingContext into request handlers
subsequently.
+ MultiClusterRoutingContext multiClusterRoutingContext =
getMultiClusterRoutingContext();
+
// Create Broker request handler.
String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID,
getDefaultBrokerId());
BrokerRequestIdGenerator requestIdGenerator = new
BrokerRequestIdGenerator();
@@ -391,7 +391,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
if
(brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE))
{
singleStageBrokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, brokerId,
requestIdGenerator, _routingManager,
- _accessControlFactory, _queryQuotaManager, tableCache,
_failureDetector, _threadAccountant);
+ _accessControlFactory, _queryQuotaManager, _tableCache,
_failureDetector, _threadAccountant);
} else {
// Default request handler type, i.e. netty
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf,
Broker.BROKER_NETTY_PREFIX);
@@ -402,7 +402,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
}
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId,
requestIdGenerator, _routingManager,
- _accessControlFactory, _queryQuotaManager, tableCache,
nettyDefaults, tlsDefaults,
+ _accessControlFactory, _queryQuotaManager, _tableCache,
nettyDefaults, tlsDefaults,
_serverRoutingStatsManager, _failureDetector, _threadAccountant);
}
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
@@ -416,7 +416,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
queryDispatcher = createQueryDispatcher(_brokerConf);
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId,
requestIdGenerator, _routingManager,
- _accessControlFactory, _queryQuotaManager, tableCache,
_multiStageQueryThrottler, _failureDetector,
+ _accessControlFactory, _queryQuotaManager, _tableCache,
_multiStageQueryThrottler, _failureDetector,
_threadAccountant);
}
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
@@ -424,7 +424,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
Preconditions.checkNotNull(queryDispatcher, "Multistage Engine should be
enabled to use time-series engine");
timeSeriesRequestHandler =
new TimeSeriesRequestHandler(_brokerConf, brokerId,
requestIdGenerator, _routingManager,
- _accessControlFactory, _queryQuotaManager, tableCache,
queryDispatcher, _threadAccountant);
+ _accessControlFactory, _queryQuotaManager, _tableCache,
queryDispatcher, _threadAccountant);
}
LOGGER.info("Initializing PinotFSFactory");
@@ -471,6 +471,45 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
}
LOGGER.info("Initializing cluster change mediator");
+ initClusterChangeMediator();
+
+ LOGGER.info("Connecting participant Helix manager");
+ _participantHelixManager =
+ HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId,
InstanceType.PARTICIPANT, _zkServers);
+ // Register state model factory
+ _participantHelixManager.getStateMachineEngine()
+
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(),
+ new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore,
_helixDataAccessor, _routingManager,
+ _queryQuotaManager));
+ // Register user-define message handler factory
+ _participantHelixManager.getMessagingService()
+
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+ new BrokerUserDefinedMessageHandlerFactory(_routingManager,
_queryQuotaManager));
+ _participantHelixManager.connect();
+ updateInstanceConfigAndBrokerResourceIfNeeded();
+ _brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME,
+ () -> _participantHelixManager.isConnected() ? 1L : 0L);
+ _participantHelixManager.addPreConnectCallback(
+ () ->
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS,
1L));
+
+ // Initializing Groovy execution security
+ GroovyFunctionEvaluator.configureGroovySecurity(
+
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_QUERY_STATIC_ANALYZER_CONFIG,
+
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_ALL_STATIC_ANALYZER_CONFIG)));
+
+ // Register the service status handler
+ registerServiceStatusHandler();
+
+ _isStarting = false;
+ _brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
+ System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
+
+
_defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
+
+ LOGGER.info("Finish starting Pinot broker");
+ }
+
+ protected void initClusterChangeMediator() throws Exception {
for (ClusterChangeHandler clusterConfigChangeHandler :
_clusterConfigChangeHandlers) {
clusterConfigChangeHandler.init(_spectatorHelixManager);
}
@@ -515,41 +554,27 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
if (!_liveInstanceChangeHandlers.isEmpty()) {
_spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
}
+ }
- LOGGER.info("Connecting participant Helix manager");
- _participantHelixManager =
- HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId,
InstanceType.PARTICIPANT, _zkServers);
- // Register state model factory
- _participantHelixManager.getStateMachineEngine()
-
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(),
- new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore,
_helixDataAccessor, _routingManager,
- _queryQuotaManager));
- // Register user-define message handler factory
- _participantHelixManager.getMessagingService()
-
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
- new BrokerUserDefinedMessageHandlerFactory(_routingManager,
_queryQuotaManager));
- _participantHelixManager.connect();
- updateInstanceConfigAndBrokerResourceIfNeeded();
- _brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME,
- () -> _participantHelixManager.isConnected() ? 1L : 0L);
- _participantHelixManager.addPreConnectCallback(
- () ->
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS,
1L));
-
- // Initializing Groovy execution security
- GroovyFunctionEvaluator.configureGroovySecurity(
-
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_QUERY_STATIC_ANALYZER_CONFIG,
-
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_ALL_STATIC_ANALYZER_CONFIG)));
-
- // Register the service status handler
- registerServiceStatusHandler();
-
- _isStarting = false;
- _brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
- System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
+ protected void initRoutingManager() throws Exception {
+ _routingManager = new BrokerRoutingManager(_brokerMetrics,
_serverRoutingStatsManager, _brokerConf);
+ _routingManager.init(_spectatorHelixManager);
+ }
-
_defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
+ protected void initSpectatorHelixManager() throws Exception {
+ _spectatorHelixManager =
+ HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId,
InstanceType.SPECTATOR, _zkServers);
+ _spectatorHelixManager.connect();
+ _helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
+ _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
+ _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
+ }
- LOGGER.info("Finish starting Pinot broker");
+ /**
+ * Can be overridden to inject a custom MultiClusterRoutingContext from
MultiClusterBrokerStarter.
+ */
+ protected MultiClusterRoutingContext getMultiClusterRoutingContext() {
+ return null;
}
/**
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/MultiClusterHelixBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/MultiClusterHelixBrokerStarter.java
new file mode 100644
index 00000000000..8c40f3a0499
--- /dev/null
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/MultiClusterHelixBrokerStarter.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.broker.routing.manager.MultiClusterRoutingManager;
+import
org.apache.pinot.broker.routing.manager.RemoteClusterBrokerRoutingManager;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.config.provider.ZkTableCache;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Multi-cluster broker starter that extends the base Helix broker
functionality
+ * to support federation across multiple Pinot clusters.
+ *
+ * This class handles:
+ * - Connection to remote clusters via separate ZooKeeper instances
+ * - Federated routing across primary and remote clusters
+ * - Cross-cluster query federation
+ * - Cluster change monitoring for remote clusters
+ */
+@SuppressWarnings("unused")
+public class MultiClusterHelixBrokerStarter extends BaseBrokerStarter {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MultiClusterHelixBrokerStarter.class);
+
+ // Remote cluster configuration
+ protected List<String> _remoteClusterNames;
+ protected Map<String, String> _remoteZkServers;
+ protected String _remoteInstanceId;
+
+ // Remote cluster Helix managers and routing
+ protected Map<String, HelixManager> _remoteSpectatorHelixManager;
+ protected Map<String, RemoteClusterBrokerRoutingManager>
_remoteRoutingManagers;
+ protected MultiClusterRoutingManager _multiClusterRoutingManager;
+ protected MultiClusterRoutingContext _multiClusterRoutingContext;
+ protected Map<String, ClusterChangeMediator> _remoteClusterChangeMediator;
+
+ public MultiClusterHelixBrokerStarter() {
+ }
+
+ @Override
+ public void init(PinotConfiguration brokerConf)
+ throws Exception {
+ super.init(brokerConf);
+ _remoteInstanceId = _instanceId + "_remote";
+ initRemoteClusterNamesAndZk(brokerConf);
+ }
+
+ @Override
+ public void start()
+ throws Exception {
+ LOGGER.info("[multi-cluster] Starting multi-cluster broker");
+ super.start();
+ // build routing tables for remote clusters
+ initRemoteClusterRouting();
+ LOGGER.info("[multi-cluster] Multi-cluster broker started successfully");
+ }
+
+ @Override
+ protected void initSpectatorHelixManager() throws Exception {
+ super.initSpectatorHelixManager();
+ try {
+ initRemoteClusterSpectatorHelixManagers();
+ } catch (Exception e) {
+ LOGGER.error("[multi-cluster] Failed to initialize remote cluster
spectator Helix managers", e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
1);
+ }
+ }
+
+ @Override
+ protected void initRoutingManager() throws Exception {
+ super.initRoutingManager();
+ try {
+ initRemoteClusterFederatedRoutingManager();
+ } catch (Exception e) {
+ LOGGER.error("[multi-cluster] Failed to initialize remote cluster
federated routing manager", e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
1);
+ }
+ }
+
+ @Override
+ protected void initClusterChangeMediator() throws Exception {
+ super.initClusterChangeMediator();
+ try {
+ initRemoteClusterChangeMediator();
+ } catch (Exception e) {
+ LOGGER.error("[multi-cluster] Failed to initialize remote cluster change
mediator", e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
1);
+ }
+ }
+
+ private void initRemoteClusterSpectatorHelixManagers() throws Exception {
+ if (_remoteZkServers == null || _remoteZkServers.isEmpty()) {
+ LOGGER.info("[multi-cluster] No remote ZK servers configured - skipping
spectator Helix manager init");
+ return;
+ }
+
+ LOGGER.info("[multi-cluster] Initializing spectator Helix managers for {}
remote clusters",
+ _remoteZkServers.size());
+ _remoteSpectatorHelixManager = new HashMap<>();
+
+ for (Map.Entry<String, String> entry : _remoteZkServers.entrySet()) {
+ String clusterName = entry.getKey();
+ String zkServers = entry.getValue();
+ try {
+ HelixManager helixManager = HelixManagerFactory.getZKHelixManager(
+ clusterName, _instanceId, InstanceType.SPECTATOR, zkServers);
+ helixManager.connect();
+ _remoteSpectatorHelixManager.put(clusterName, helixManager);
+ LOGGER.info("[multi-cluster] Connected to remote cluster '{}' at ZK:
{}", clusterName, zkServers);
+ } catch (Exception e) {
+ LOGGER.error("[multi-cluster] Failed to connect to cluster '{}' at ZK:
{}", clusterName, zkServers, e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
1);
+ }
+ }
+
+ if (_remoteSpectatorHelixManager.isEmpty()) {
+ LOGGER.warn("[multi-cluster] Failed to connect to any remote clusters - "
+ + "multi-cluster will not be functional");
+ } else {
+ LOGGER.info("[multi-cluster] Connected to {}/{} remote clusters: {}",
_remoteSpectatorHelixManager.size(),
+ _remoteZkServers.size(), _remoteSpectatorHelixManager.keySet());
+ }
+ }
+
+ protected void stopRemoteClusterComponents() {
+ if (_remoteClusterChangeMediator != null) {
+
_remoteClusterChangeMediator.values().forEach(ClusterChangeMediator::stop);
+ }
+ if (_remoteRoutingManagers != null) {
+ for (RemoteClusterBrokerRoutingManager routingManager :
_remoteRoutingManagers.values()) {
+ routingManager.shutdown();
+ }
+ }
+ if (_remoteSpectatorHelixManager != null) {
+ _remoteSpectatorHelixManager.values().forEach(HelixManager::disconnect);
+ }
+ }
+
+ private void initRemoteClusterNamesAndZk(PinotConfiguration brokerConf) {
+ LOGGER.info("[multi-cluster] Initializing remote cluster configuration");
+ String remoteClusterNames =
brokerConf.getProperty(Helix.CONFIG_OF_REMOTE_CLUSTER_NAMES);
+
+ if (remoteClusterNames == null || remoteClusterNames.trim().isEmpty()) {
+ LOGGER.info("[multi-cluster] No remote cluster configured -
multi-cluster mode disabled");
+ return;
+ }
+
+ _remoteClusterNames = Arrays.asList(remoteClusterNames.replaceAll("\\s+",
"").split(","));
+ if (_remoteClusterNames.isEmpty()) {
+ LOGGER.warn("[multi-cluster] Remote cluster names list is empty after
parsing");
+ return;
+ }
+ LOGGER.info("[multi-cluster] Configured remote cluster names: {}",
_remoteClusterNames);
+
+ _remoteZkServers = new HashMap<>();
+ for (String name : _remoteClusterNames) {
+ String zkConfig =
String.format(Helix.CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS, name);
+ String zkServers = brokerConf.getProperty(zkConfig);
+
+ if (zkServers == null || zkServers.trim().isEmpty()) {
+ LOGGER.error("[multi-cluster] Missing ZooKeeper configuration for
cluster '{}', expected: {}", name, zkConfig);
+ continue;
+ }
+ _remoteZkServers.put(name, zkServers.replaceAll("\\s+", ""));
+ }
+
+ if (_remoteZkServers.isEmpty()) {
+ LOGGER.error("[multi-cluster] No valid ZooKeeper configurations found -
multi-cluster will not be functional");
+ _remoteClusterNames = null;
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
1);
+ } else {
+ LOGGER.info("[multi-cluster] Initialized {} remote cluster(s): {}",
_remoteZkServers.size(),
+ _remoteZkServers.keySet());
+ }
+ }
+
+ private void initRemoteClusterFederatedRoutingManager() {
+ if (_remoteSpectatorHelixManager == null ||
_remoteSpectatorHelixManager.isEmpty()) {
+ LOGGER.info("[multi-cluster] No remote spectator Helix managers -
skipping federated routing manager init");
+ return;
+ }
+
+ LOGGER.info("[multi-cluster] Initializing federated routing manager for {}
clusters",
+ _remoteSpectatorHelixManager.size());
+ _remoteRoutingManagers = new HashMap<>();
+
+ for (Map.Entry<String, HelixManager> entry :
_remoteSpectatorHelixManager.entrySet()) {
+ String clusterName = entry.getKey();
+ try {
+ RemoteClusterBrokerRoutingManager routingManager =
+ new RemoteClusterBrokerRoutingManager(clusterName, _brokerMetrics,
_serverRoutingStatsManager, _brokerConf);
+ routingManager.init(entry.getValue());
+ _remoteRoutingManagers.put(clusterName, routingManager);
+ } catch (Exception e) {
+ LOGGER.error("[multi-cluster] Failed to initialize routing manager for
cluster '{}'", clusterName, e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
1);
+ }
+ }
+
+ if (_remoteRoutingManagers.isEmpty()) {
+ LOGGER.error("[multi-cluster] Failed to initialize any routing managers
- federated routing unavailable");
+ } else {
+ _multiClusterRoutingManager = new
MultiClusterRoutingManager(_routingManager,
+ new ArrayList<>(_remoteRoutingManagers.values()));
+ LOGGER.info("[multi-cluster] Created federated routing manager with
{}/{} remote clusters",
+ _remoteRoutingManagers.size(), _remoteSpectatorHelixManager.size());
+ }
+ }
+
+ private void initRemoteClusterFederationProvider(TableCache
primaryTableCache, boolean caseInsensitive) {
+ if (_multiClusterRoutingManager == null) {
+ LOGGER.info("[multi-cluster] Federation is not enabled -
FederationProvider will be null");
+ _multiClusterRoutingContext = null;
+ return;
+ }
+
+ Map<String, TableCache> tableCacheMap = new HashMap<>();
+ tableCacheMap.put(_clusterName, primaryTableCache);
+
+ if (_remoteSpectatorHelixManager == null ||
_remoteSpectatorHelixManager.isEmpty()) {
+ LOGGER.info("[multi-cluster] No remote spectator Helix managers - "
+ + "creating provider with primary cluster only");
+ _multiClusterRoutingContext = null;
+ return;
+ }
+
+ LOGGER.info("[multi-cluster] Initializing federation provider with {}
remote clusters",
+ _remoteSpectatorHelixManager.size());
+
+ for (Map.Entry<String, HelixManager> entry :
_remoteSpectatorHelixManager.entrySet()) {
+ String clusterName = entry.getKey();
+ try {
+ TableCache remoteCache = new
ZkTableCache(entry.getValue().getHelixPropertyStore(), caseInsensitive);
+ tableCacheMap.put(clusterName, remoteCache);
+ } catch (Exception e) {
+ LOGGER.error("[multi-cluster] Failed to create table cache for cluster
'{}'", clusterName, e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
1);
+ }
+ }
+
+ _multiClusterRoutingContext = new
MultiClusterRoutingContext(tableCacheMap, _routingManager,
+ _multiClusterRoutingManager);
+ LOGGER.info("[multi-cluster] Created federation provider with {}/{}
clusters (1 primary + {} remote)",
+ tableCacheMap.size(), _remoteSpectatorHelixManager.size() + 1,
tableCacheMap.size() - 1);
+ }
+
+ private void initRemoteClusterRouting() {
+ if (_remoteRoutingManagers == null || _remoteRoutingManagers.isEmpty()) {
+ LOGGER.info("[multi-cluster] No remote routing managers - skipping
routing table initialization");
+ return;
+ }
+
+ LOGGER.info("[multi-cluster] Initializing routing tables for {} remote
clusters",
+ _remoteRoutingManagers.size());
+ int initialized = 0;
+
+ for (Map.Entry<String, RemoteClusterBrokerRoutingManager> entry :
_remoteRoutingManagers.entrySet()) {
+ try {
+ entry.getValue().determineRoutingChangeForTables();
+ initialized++;
+ } catch (Exception e) {
+ LOGGER.error("[multi-cluster] Failed to initialize routing tables for
cluster '{}'", entry.getKey(), e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
1);
+ }
+ }
+
+ LOGGER.info("[multi-cluster] Initialized routing tables for {}/{} remote
clusters",
+ initialized, _remoteRoutingManagers.size());
+ }
+
+ private void initRemoteClusterChangeMediator() throws Exception {
+ if (_remoteSpectatorHelixManager == null ||
_remoteSpectatorHelixManager.isEmpty()) {
+ LOGGER.info("[multi-cluster] No remote spectator Helix managers -
skipping cluster change mediator init");
+ return;
+ }
+
+ if (_remoteRoutingManagers == null || _remoteRoutingManagers.isEmpty()) {
+ LOGGER.error("[multi-cluster] Remote routing managers not initialized - "
+ + "cannot create cluster change mediators");
+ return;
+ }
+
+ LOGGER.info("[multi-cluster] Initializing cluster change mediators for {}
remote clusters",
+ _remoteSpectatorHelixManager.size());
+ _remoteClusterChangeMediator = new HashMap<>();
+
+ for (String clusterName : _remoteSpectatorHelixManager.keySet()) {
+ RemoteClusterBrokerRoutingManager routingManager =
_remoteRoutingManagers.get(clusterName);
+ if (routingManager == null) {
+ LOGGER.error("[multi-cluster] Routing manager not found for cluster
'{}' - skipping mediator setup",
+ clusterName);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
1);
+ continue;
+ }
+
+ try {
+ Map<ChangeType, List<ClusterChangeHandler>> handlers = new HashMap<>();
+ handlers.put(ChangeType.CLUSTER_CONFIG, new ArrayList<>());
+ handlers.put(ChangeType.IDEAL_STATE,
Collections.singletonList(routingManager));
+ handlers.put(ChangeType.EXTERNAL_VIEW,
Collections.singletonList(routingManager));
+ handlers.put(ChangeType.INSTANCE_CONFIG,
Collections.singletonList(routingManager));
+ handlers.put(ChangeType.RESOURCE_CONFIG,
Collections.singletonList(routingManager));
+
+ ClusterChangeMediator mediator = new ClusterChangeMediator(handlers,
_brokerMetrics);
+ mediator.start();
+ _remoteClusterChangeMediator.put(clusterName, mediator);
+
+ HelixManager helixManager =
_remoteSpectatorHelixManager.get(clusterName);
+ helixManager.addIdealStateChangeListener(mediator);
+ helixManager.addExternalViewChangeListener(mediator);
+ helixManager.addInstanceConfigChangeListener(mediator);
+ helixManager.addClusterfigChangeListener(mediator);
+ } catch (Exception e) {
+ LOGGER.error("[multi-cluster] Failed to initialize cluster change
mediator for cluster '{}'", clusterName, e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE,
1);
+ }
+ }
+
+ LOGGER.info("[multi-cluster] Initialized {}/{} cluster change mediators",
_remoteClusterChangeMediator.size(),
+ _remoteSpectatorHelixManager.size());
+ }
+
+ @Override
+ protected MultiClusterRoutingContext getMultiClusterRoutingContext() {
+ initRemoteClusterFederationProvider(_tableCache,
+ _brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY,
Helix.DEFAULT_ENABLE_CASE_INSENSITIVE));
+ return _multiClusterRoutingContext;
+ }
+
+ @Override
+ public void stop() {
+ LOGGER.info("[multi-cluster] Shutting down multi-cluster broker");
+ super.stop();
+ stopRemoteClusterComponents();
+ LOGGER.info("[multi-cluster] Multi-cluster broker shut down successfully");
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 01676557a20..c0e730a1ca6 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -178,6 +178,9 @@ public class BrokerMeter implements AbstractMetrics.Meter {
public static final BrokerMeter HELIX_ZOOKEEPER_RECONNECTS =
create("HELIX_ZOOKEEPER_RECONNECTS", "reconnects", true);
+ public static final BrokerMeter MULTI_CLUSTER_BROKER_STARTUP_FAILURE =
create(
+ "MULTI_CLUSTER_BROKER_STARTUP_FAILURE", "failureCount", true);
+
public static final BrokerMeter REQUEST_DROPPED_DUE_TO_ACCESS_ERROR = create(
"REQUEST_DROPPED_DUE_TO_ACCESS_ERROR", "requestsDropped", false);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
new file mode 100644
index 00000000000..1f139da56ac
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.routing;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.config.provider.TableCache;
+
+
+/**
+ * A generic class which provides the dependencies for federation routing.
+ * This class is responsible for managing routing managers and providing the
appropriate
+ * routing manager based on query options (e.g., whether federation is
enabled).
+ */
+public class MultiClusterRoutingContext {
+ // Maps clusterName to TableCache. Includes the local and all remote
clusters.
+ private final Map<String, TableCache> _tableCacheMap;
+
+ // Local
+ private final RoutingManager _localRoutingManager;
+
+ // Federated routing manager (for federated queries, may be null if
federation is not configured)
+ @Nullable
+ private final RoutingManager _multiClusterRoutingManager;
+
+ /**
+ * Constructor for FederationProvider with routing managers.
+ *
+ * @param tableCacheMap Map of cluster name to TableCache
+ * @param localRoutingManager Local routing manager for non-federated queries
+ * @param multiClusterRoutingManager Multi cluster routing manager for
cross-cluster queries (can be null)
+ */
+ public MultiClusterRoutingContext(Map<String, TableCache> tableCacheMap,
RoutingManager localRoutingManager,
+ @Nullable RoutingManager multiClusterRoutingManager) {
+ _tableCacheMap = tableCacheMap;
+ _localRoutingManager = localRoutingManager;
+ _multiClusterRoutingManager = multiClusterRoutingManager;
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
new file mode 100644
index 00000000000..e9ac332e2e9
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.multicluster;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.http.HttpStatus;
+import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
+import org.apache.pinot.broker.broker.helix.MultiClusterHelixBrokerStarter;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.controller.BaseControllerStarter;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.ClusterTest;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Broker;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.NetUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+public class MultiClusterIntegrationTest extends ClusterTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MultiClusterIntegrationTest.class);
+
+ protected static final String SCHEMA_FILE =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
+ protected static final String TIME_COLUMN = "DaysSinceEpoch";
+ // TODO: N clusters instead of 2 in future iterations.
+ protected static final String CLUSTER_1_NAME = "DualIsolatedCluster1";
+ protected static final String CLUSTER_2_NAME = "DualIsolatedCluster2";
+ protected static final ClusterConfig CLUSTER_1_CONFIG = new
ClusterConfig(CLUSTER_1_NAME, 30000);
+ protected static final ClusterConfig CLUSTER_2_CONFIG = new
ClusterConfig(CLUSTER_2_NAME, 40000);
+
+ protected ClusterComponents _cluster1;
+ protected ClusterComponents _cluster2;
+ protected List<File> _cluster1AvroFiles;
+ protected List<File> _cluster2AvroFiles;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ LOGGER.info("Setting up MultiClusterIntegrationTest");
+
+ // Initialize cluster components
+ _cluster1 = new ClusterComponents();
+ _cluster2 = new ClusterComponents();
+
+ // Setup directories
+ setupDirectories();
+
+ // Start ZooKeeper instances for both clusters
+ startZookeeper(_cluster1);
+ startZookeeper(_cluster2);
+
+ // Start controllers for both clusters
+ startControllerInit(_cluster1, CLUSTER_1_CONFIG);
+ startControllerInit(_cluster2, CLUSTER_2_CONFIG);
+
+ // Start brokers and servers for both clusters
+ // Note: Each cluster's broker is configured to know about the other
cluster as remote
+ startCluster(_cluster1, _cluster2, CLUSTER_1_CONFIG);
+ startCluster(_cluster2, _cluster1, CLUSTER_2_CONFIG);
+
+ LOGGER.info("MultiClusterIntegrationTest setup complete");
+ }
+
+ // TODO: Add more tests for cross-cluster queries in subsequent iterations.
+ @Test
+ public void testMultiClusterBrokerStartsAndIsQueryable() throws Exception {
+ LOGGER.info("Testing that multi-cluster broker starts successfully and is
queryable");
+
+ // Verify both clusters' brokers are running
(MultiClusterHelixBrokerStarter)
+ assertNotNull(_cluster1._brokerStarter, "Cluster 1 broker should be
started");
+ assertNotNull(_cluster2._brokerStarter, "Cluster 2 broker should be
started");
+ assertTrue(_cluster1._brokerStarter instanceof
MultiClusterHelixBrokerStarter,
+ "Cluster 1 broker should be MultiClusterHelixBrokerStarter");
+ assertTrue(_cluster2._brokerStarter instanceof
MultiClusterHelixBrokerStarter,
+ "Cluster 2 broker should be MultiClusterHelixBrokerStarter");
+
+ // Setup a test table on both clusters
+ String testTableName = "multicluster_test_table";
+ createSchemaAndTableOnBothClusters(testTableName);
+
+ // Create and load test data into both clusters
+ _cluster1AvroFiles = createAvroData(100, 1);
+ _cluster2AvroFiles = createAvroData(100, 2);
+
+ loadDataIntoCluster(_cluster1AvroFiles, testTableName, _cluster1);
+ loadDataIntoCluster(_cluster2AvroFiles, testTableName, _cluster2);
+
+ // Verify cluster 1 is queryable
+ String query = "SELECT COUNT(*) FROM " + testTableName;
+ String result1 = executeQuery(query, _cluster1);
+ assertNotNull(result1, "Query result from cluster 1 should not be null");
+ long count1 = parseCountResult(result1);
+ assertEquals(count1, 100, "Cluster 1 should have 100 records");
+
+ // Verify cluster 2 is queryable
+ String result2 = executeQuery(query, _cluster2);
+ assertNotNull(result2, "Query result from cluster 2 should not be null");
+ long count2 = parseCountResult(result2);
+ assertEquals(count2, 100, "Cluster 2 should have 100 records");
+
+ LOGGER.info("Multi-cluster broker test passed: both clusters started and
queryable");
+ }
+
+ @Override
+ protected BaseBrokerStarter createBrokerStarter() {
+ return new MultiClusterHelixBrokerStarter();
+ }
+
+ protected static class ClusterConfig {
+ final String _name;
+ final int _basePort;
+
+ ClusterConfig(String name, int basePort) {
+ _name = name;
+ _basePort = basePort;
+ }
+ }
+
+ protected static class ClusterComponents {
+ ZkStarter.ZookeeperInstance _zkInstance;
+ BaseControllerStarter _controllerStarter;
+ BaseBrokerStarter _brokerStarter;
+ BaseServerStarter _serverStarter;
+ int _controllerPort;
+ int _brokerPort;
+ int _serverPort;
+ String _zkUrl;
+ String _controllerBaseApiUrl;
+ File _tempDir;
+ File _segmentDir;
+ File _tarDir;
+ }
+
+ protected void setupDirectories() throws Exception {
+ setupClusterDirectories(_cluster1, "cluster1");
+ setupClusterDirectories(_cluster2, "cluster2");
+ }
+
+ private void setupClusterDirectories(ClusterComponents cluster, String
clusterPrefix) throws Exception {
+ cluster._tempDir = new File(FileUtils.getTempDirectory(), clusterPrefix +
"_" + getClass().getSimpleName());
+ cluster._segmentDir = new File(cluster._tempDir, "segmentDir");
+ cluster._tarDir = new File(cluster._tempDir, "tarDir");
+ TestUtils.ensureDirectoriesExistAndEmpty(cluster._tempDir,
cluster._segmentDir, cluster._tarDir);
+ }
+
+ protected void startZookeeper(ClusterComponents cluster) throws Exception {
+ cluster._zkInstance = ZkStarter.startLocalZkServer();
+ cluster._zkUrl = cluster._zkInstance.getZkUrl();
+ }
+
+ protected void startControllerInit(ClusterComponents cluster, ClusterConfig
config) throws Exception {
+ cluster._controllerPort = findAvailablePort(config._basePort);
+ startController(cluster, config);
+ }
+
+ protected void startCluster(ClusterComponents cluster, ClusterComponents
remoteCluster,
+ ClusterConfig config) throws Exception {
+ cluster._brokerPort = findAvailablePort(cluster._controllerPort + 1000);
+ startBroker(cluster, remoteCluster, config);
+ cluster._serverPort = findAvailablePort(cluster._brokerPort + 1000);
+ startServerWithMSE(cluster, config);
+ }
+
+ protected void startController(ClusterComponents cluster, ClusterConfig
config) throws Exception {
+ Map<String, Object> controllerConfig = new HashMap<>();
+ controllerConfig.put(ControllerConf.ZK_STR, cluster._zkUrl);
+ controllerConfig.put(ControllerConf.HELIX_CLUSTER_NAME, config._name);
+ controllerConfig.put(ControllerConf.CONTROLLER_HOST,
ControllerTest.LOCAL_HOST);
+ controllerConfig.put(ControllerConf.CONTROLLER_PORT,
cluster._controllerPort);
+ controllerConfig.put(ControllerConf.DATA_DIR,
cluster._tempDir.getAbsolutePath());
+ controllerConfig.put(ControllerConf.LOCAL_TEMP_DIR,
cluster._tempDir.getAbsolutePath());
+ controllerConfig.put(ControllerConf.DISABLE_GROOVY, false);
+ controllerConfig.put(ControllerConf.CONSOLE_SWAGGER_ENABLE, false);
+ controllerConfig.put(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
+
+ cluster._controllerStarter = createControllerStarter();
+ cluster._controllerStarter.init(new PinotConfiguration(controllerConfig));
+ cluster._controllerStarter.start();
+ cluster._controllerBaseApiUrl = "http://localhost:" +
cluster._controllerPort;
+ }
+
+ protected void startBroker(ClusterComponents cluster, ClusterComponents
remoteCluster,
+ ClusterConfig config) throws Exception {
+ PinotConfiguration brokerConfig = new PinotConfiguration();
+ brokerConfig.setProperty(Helix.CONFIG_OF_ZOOKEEPER_SERVER, cluster._zkUrl);
+ String remoteClusterName = CLUSTER_1_NAME.equalsIgnoreCase(config._name) ?
CLUSTER_2_NAME : CLUSTER_1_NAME;
+ brokerConfig.setProperty(Helix.CONFIG_OF_REMOTE_CLUSTER_NAMES,
remoteClusterName);
+
brokerConfig.setProperty(String.format(Helix.CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS,
remoteClusterName),
+ remoteCluster._zkUrl);
+ brokerConfig.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, config._name);
+ brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_HOSTNAME,
ControllerTest.LOCAL_HOST);
+ brokerConfig.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT,
cluster._brokerPort);
+ brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
+ brokerConfig.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
+ brokerConfig.setProperty(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
+ cluster._brokerStarter = createBrokerStarter();
+ cluster._brokerStarter.init(brokerConfig);
+ cluster._brokerStarter.start();
+ }
+
+ protected void startServerWithMSE(ClusterComponents cluster, ClusterConfig
config) throws Exception {
+ PinotConfiguration serverConfig = new PinotConfiguration();
+ serverConfig.setProperty(Helix.CONFIG_OF_ZOOKEEPER_SERVER, cluster._zkUrl);
+ serverConfig.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, config._name);
+ serverConfig.setProperty(Helix.KEY_OF_SERVER_NETTY_HOST,
ControllerTest.LOCAL_HOST);
+ serverConfig.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR,
cluster._tempDir + "/dataDir");
+ serverConfig.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR,
cluster._tempDir + "/segmentTar");
+ serverConfig.setProperty(Server.CONFIG_OF_SEGMENT_FORMAT_VERSION, "v3");
+ serverConfig.setProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK,
false);
+ serverConfig.setProperty(Server.CONFIG_OF_ADMIN_API_PORT,
findAvailablePort(cluster._serverPort));
+ serverConfig.setProperty(Helix.KEY_OF_SERVER_NETTY_PORT,
findAvailablePort(cluster._serverPort + 1));
+ serverConfig.setProperty(Server.CONFIG_OF_GRPC_PORT,
findAvailablePort(cluster._serverPort + 2));
+
serverConfig.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
true);
+ serverConfig.setProperty(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
+ serverConfig.setProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
+
+ cluster._serverStarter = createServerStarter();
+ cluster._serverStarter.init(serverConfig);
+ cluster._serverStarter.start();
+ }
+
+ protected int findAvailablePort(int basePort) {
+ try {
+ return NetUtils.findOpenPort(basePort);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to find available port starting from
" + basePort, e);
+ }
+ }
+
+ protected List<File> createAvroData(int dataSize, int clusterId) throws
Exception {
+ return createAvroDataMultipleSegments(dataSize, clusterId, 1);
+ }
+
+ protected List<File> createAvroDataMultipleSegments(int totalDataSize, int
clusterId, int numSegments)
+ throws Exception {
+ Schema schema = createSchema(SCHEMA_FILE);
+ org.apache.avro.Schema avroSchema = createAvroSchema(schema);
+ File tempDir = (clusterId == 1) ? _cluster1._tempDir : _cluster2._tempDir;
+ List<File> avroFiles = new ArrayList<>();
+
+ for (int segment = 0; segment < numSegments; segment++) {
+ File avroFile = new File(tempDir, "cluster" + clusterId +
"_data_segment" + segment + ".avro");
+ try (DataFileWriter<GenericData.Record> writer = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+ writer.create(avroSchema, avroFile);
+ int start = segment * (totalDataSize / numSegments);
+ int end = (segment == numSegments - 1) ? totalDataSize : (segment + 1)
* (totalDataSize / numSegments);
+ for (int i = start; i < end; i++) {
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ record.put(fieldSpec.getName(),
generateFieldValue(fieldSpec.getName(), i, clusterId,
+ fieldSpec.getDataType()));
+ }
+ writer.append(record);
+ }
+ }
+ avroFiles.add(avroFile);
+ }
+ return avroFiles;
+ }
+
+ private org.apache.avro.Schema createAvroSchema(Schema schema) {
+ org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+ List<org.apache.avro.Schema.Field> fields = new ArrayList<>();
+
+ for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ org.apache.avro.Schema.Type avroType =
getAvroType(fieldSpec.getDataType());
+ fields.add(new org.apache.avro.Schema.Field(fieldSpec.getName(),
+ org.apache.avro.Schema.create(avroType), null, null));
+ }
+ avroSchema.setFields(fields);
+ return avroSchema;
+ }
+
+ private org.apache.avro.Schema.Type getAvroType(FieldSpec.DataType type) {
+ switch (type) {
+ case INT: return org.apache.avro.Schema.Type.INT;
+ case LONG: return org.apache.avro.Schema.Type.LONG;
+ case FLOAT: return org.apache.avro.Schema.Type.FLOAT;
+ case DOUBLE: return org.apache.avro.Schema.Type.DOUBLE;
+ case BOOLEAN: return org.apache.avro.Schema.Type.BOOLEAN;
+ default: return org.apache.avro.Schema.Type.STRING;
+ }
+ }
+
+ private Object generateFieldValue(String fieldName, int index, int
clusterId, FieldSpec.DataType dataType) {
+ int baseValue = index + (clusterId * 10000);
+ switch (dataType) {
+ case INT: return index + 10000;
+ case LONG: return (long) baseValue;
+ case FLOAT: return (float) (baseValue + 0.1);
+ case DOUBLE: return (double) (baseValue + 0.1);
+ case BOOLEAN: return (baseValue % 2) == 0;
+ default: return "cluster_" + fieldName + "_" + index;
+ }
+ }
+
+ protected void loadDataIntoCluster(List<File> avroFiles, String tableName,
ClusterComponents cluster)
+ throws Exception {
+ cleanDirectories(cluster._segmentDir, cluster._tarDir);
+ Schema schema = createSchema(SCHEMA_FILE);
+ schema.setSchemaName(tableName);
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(tableName)
+ .setTimeColumnName(TIME_COLUMN)
+ .build();
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0,
+ cluster._segmentDir, cluster._tarDir);
+ uploadSegmentsToCluster(tableName, cluster._tarDir,
cluster._controllerBaseApiUrl);
+ Thread.sleep(2000);
+ }
+
+ private void cleanDirectories(File... dirs) {
+ for (File dir : dirs) {
+ try {
+ FileUtils.cleanDirectory(dir);
+ } catch (IOException e) {
+ // Ignore cleanup errors
+ }
+ }
+ }
+
+ protected void uploadSegmentsToCluster(String tableName, File tarDir, String
controllerBaseApiUrl) throws Exception {
+ File[] segmentTarFiles = tarDir.listFiles();
+ assertNotNull(segmentTarFiles);
+ assertTrue(segmentTarFiles.length > 0);
+
+ URI uploadSegmentHttpURI = URI.create(controllerBaseApiUrl + "/segments");
+
+ try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
+ for (File segmentTarFile : segmentTarFiles) {
+ int status =
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
+ segmentTarFile.getName(), segmentTarFile, List.of(),
tableName, TableType.OFFLINE)
+ .getStatusCode();
+ assertEquals(status, HttpStatus.SC_OK);
+ }
+ }
+
+ Thread.sleep(3000);
+ }
+
+
+ protected void createSchemaAndTableForCluster(String tableName, String
controllerBaseApiUrl) throws IOException {
+ Schema schema = createSchema(SCHEMA_FILE);
+ schema.setSchemaName(tableName);
+ addSchemaToCluster(schema, controllerBaseApiUrl);
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(tableName)
+ .setTimeColumnName(TIME_COLUMN)
+ .build();
+ addTableConfigToCluster(tableConfig, controllerBaseApiUrl);
+ }
+
+ protected void createSchemaAndTableOnBothClusters(String tableName) throws
Exception {
+ dropTableAndSchemaIfExists(tableName, _cluster1._controllerBaseApiUrl);
+ dropTableAndSchemaIfExists(tableName, _cluster2._controllerBaseApiUrl);
+ createSchemaAndTableForCluster(tableName, _cluster1._controllerBaseApiUrl);
+ createSchemaAndTableForCluster(tableName, _cluster2._controllerBaseApiUrl);
+ }
+
+ protected void dropTableAndSchemaIfExists(String tableName, String
controllerBaseApiUrl) {
+ dropResource(controllerBaseApiUrl + "/tables/" + tableName);
+ dropResource(controllerBaseApiUrl + "/schemas/" + tableName);
+ }
+
+ private void dropResource(String url) {
+ try {
+ ControllerTest.sendDeleteRequest(url);
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+
+ protected void addSchemaToCluster(Schema schema, String
controllerBaseApiUrl) throws IOException {
+ String url = controllerBaseApiUrl + "/schemas";
+ String schemaJson = schema.toPrettyJsonString();
+ String response = ControllerTest.sendPostRequest(url, schemaJson);
+ assertNotNull(response);
+ }
+
+ protected void addTableConfigToCluster(TableConfig tableConfig, String
controllerBaseApiUrl) throws IOException {
+ String url = controllerBaseApiUrl + "/tables";
+ String tableConfigJson = JsonUtils.objectToPrettyString(tableConfig);
+ String response = ControllerTest.sendPostRequest(url, tableConfigJson);
+ assertNotNull(response);
+ }
+
+ protected String executeQuery(String query, ClusterComponents cluster)
throws Exception {
+ Map<String, Object> payload = Map.of("sql", query);
+ String url = "http://localhost:" + cluster._brokerPort + "/query/sql";
+ return ControllerTest.sendPostRequest(url,
JsonUtils.objectToPrettyString(payload));
+ }
+
+ protected long parseCountResult(String result) {
+ try {
+ JsonNode rows =
JsonMapper.builder().build().readTree(result).path("resultTable").path("rows");
+ if (rows.isArray() && rows.size() > 0) {
+ JsonNode firstRow = rows.get(0);
+ if (firstRow.isArray() && firstRow.size() > 0) {
+ return Long.parseLong(firstRow.get(0).asText());
+ }
+ }
+ } catch (Exception e) {
+ // Ignore
+ }
+ return 0;
+ }
+
+ protected Schema createSchema(String schemaFileName) throws IOException {
+ InputStream schemaInputStream =
getClass().getClassLoader().getResourceAsStream(schemaFileName);
+ assertNotNull(schemaInputStream, "Schema file not found: " +
schemaFileName);
+ return Schema.fromInputStream(schemaInputStream);
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ stopCluster(_cluster1);
+ stopCluster(_cluster2);
+ }
+
+ private void stopCluster(ClusterComponents cluster) {
+ if (cluster == null) {
+ return;
+ }
+ try {
+ if (cluster._serverStarter != null) {
+ cluster._serverStarter.stop();
+ }
+ if (cluster._brokerStarter != null) {
+ cluster._brokerStarter.stop();
+ }
+ if (cluster._controllerStarter != null) {
+ cluster._controllerStarter.stop();
+ }
+ if (cluster._zkInstance != null) {
+ ZkStarter.stopLocalZkServer(cluster._zkInstance);
+ }
+ FileUtils.deleteQuietly(cluster._tempDir);
+ } catch (Exception e) {
+ LOGGER.warn("Error stopping cluster", e);
+ }
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index b2fc58b11c2..3004cafdc73 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -251,6 +251,9 @@ public class CommonConstants {
@Deprecated(since = "1.5.0", forRemoval = true)
public static final String CONFIG_OF_ZOOKEEPR_SERVER = "pinot.zk.server";
+ public static final String CONFIG_OF_REMOTE_CLUSTER_NAMES =
"pinot.remote.cluster.names";
+ public static final String CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS =
"pinot.remote.zk.server.%s";
+
public static final String CONFIG_OF_PINOT_CONTROLLER_STARTABLE_CLASS =
"pinot.controller.startable.class";
public static final String CONFIG_OF_PINOT_BROKER_STARTABLE_CLASS =
"pinot.broker.startable.class";
public static final String CONFIG_OF_PINOT_SERVER_STARTABLE_CLASS =
"pinot.server.startable.class";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]