This is an automated email from the ASF dual-hosted git repository.

shauryachats pushed a commit to branch pr_18172
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 4c93076647e74e46b3a64aae6a0accef4464da90
Author: shauryachats <[email protected]>
AuthorDate: Sat Apr 11 00:14:43 2026 +0000

    Added initial code for multi-cluster broker routing debug API
---
 .../broker/api/resources/PinotBrokerDebug.java     |  98 ++++++++++++++++--
 .../broker/broker/BrokerAdminApiApplication.java   |   9 +-
 .../broker/MultiClusterRoutingContextProvider.java |  40 ++++++++
 .../broker/broker/helix/BaseBrokerStarter.java     |  18 ++--
 .../broker/api/resources/PinotBrokerDebugTest.java | 110 ++++++++++++++++++---
 .../multicluster/MultiClusterIntegrationTest.java  |  34 +++++++
 6 files changed, 281 insertions(+), 28 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
index 69248672bb5..71c30c9a52a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
@@ -46,14 +46,18 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.broker.MultiClusterRoutingContextProvider;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.manager.BrokerRoutingManager;
+import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.ManualAuthorization;
 import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
+import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.SegmentsToQuery;
 import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo;
@@ -64,8 +68,11 @@ import org.apache.pinot.spi.accounting.QueryResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadAccountant;
 import org.apache.pinot.spi.accounting.ThreadResourceTracker;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.pinot.spi.utils.CommonConstants.DATABASE;
 import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
@@ -83,6 +90,7 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
 @Path("/")
 // TODO: Add APIs to return the RoutingTable (with unavailable segments)
 public class PinotBrokerDebug {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotBrokerDebug.class);
 
   // Request ID is passed to the RoutingManager to rotate the selected 
replica-group.
   private final static AtomicLong REQUEST_ID_GENERATOR = new AtomicLong();
@@ -90,6 +98,12 @@ public class PinotBrokerDebug {
   @Inject
   private BrokerRoutingManager _routingManager;
 
+  @Inject
+  private MultiClusterRoutingContextProvider 
_multiClusterRoutingContextProvider;
+
+  @Inject
+  private TableCache _tableCache;
+
   @Inject
   private ServerRoutingStatsManager _serverRoutingStatsManager;
 
@@ -138,11 +152,19 @@ public class PinotBrokerDebug {
   })
   public Map<String, Map<ServerInstance, List<String>>> getRoutingTable(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName,
+      @ApiParam(value = "Use multi-cluster routing manager instead of local")
+      @QueryParam("useMultiClusterRouting") boolean useMultiClusterRouting,
       @Context HttpHeaders headers) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
+    RoutingManager routingManager = 
resolveRoutingManager(useMultiClusterRouting, tableName);
     Map<String, Map<ServerInstance, List<String>>> result = new TreeMap<>();
-    getRoutingTable(tableName, (tableNameWithType, routingTable) -> 
result.put(tableNameWithType,
-        
removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap())));
+    if (useMultiClusterRouting) {
+      getPhysicalRoutingTablesForLogical(routingManager, tableName, 
(tableNameWithType, routingTable) -> result.put(
+          tableNameWithType, 
removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap())));
+    } else {
+      getRoutingTable(routingManager, tableName, (tableNameWithType, 
routingTable) -> result.put(tableNameWithType,
+          
removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap())));
+    }
     if (!result.isEmpty()) {
       return result;
     } else {
@@ -162,11 +184,20 @@ public class PinotBrokerDebug {
   })
   public Map<String, Map<ServerInstance, SegmentsToQuery>> 
getRoutingTableWithOptionalSegments(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName,
+      @ApiParam(value = "Use multi-cluster routing manager instead of local")
+      @QueryParam("useMultiClusterRouting") boolean useMultiClusterRouting,
       @Context HttpHeaders headers) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
+    RoutingManager routingManager = 
resolveRoutingManager(useMultiClusterRouting, tableName);
     Map<String, Map<ServerInstance, SegmentsToQuery>> result = new TreeMap<>();
-    getRoutingTable(tableName, (tableNameWithType, routingTable) -> 
result.put(tableNameWithType,
-        routingTable.getServerInstanceToSegmentsMap()));
+    if (useMultiClusterRouting) {
+      getPhysicalRoutingTablesForLogical(routingManager, tableName,
+          (tableNameWithType, routingTable) -> result.put(tableNameWithType,
+              routingTable.getServerInstanceToSegmentsMap()));
+    } else {
+      getRoutingTable(routingManager, tableName, (tableNameWithType, 
routingTable) -> result.put(tableNameWithType,
+          routingTable.getServerInstanceToSegmentsMap()));
+    }
     if (!result.isEmpty()) {
       return result;
     } else {
@@ -174,14 +205,39 @@ public class PinotBrokerDebug {
     }
   }
 
-  private void getRoutingTable(String tableName, BiConsumer<String, 
RoutingTable> consumer) {
+  /**
+   * For a logical table with multi-cluster routing, iterates over every 
physical table in the logical table config
+   * and invokes the consumer with the per-physical-table routing result. This 
is needed because the underlying
+   * {@link RoutingManager} works with physical table names; passing a logical 
table name to it returns nothing.
+   */
+  private void getPhysicalRoutingTablesForLogical(RoutingManager 
routingManager, String logicalTableName,
+      BiConsumer<String, RoutingTable> consumer) {
+    String rawTableName = 
TableNameBuilder.extractRawTableName(logicalTableName);
+    LogicalTableConfig config = 
_tableCache.getLogicalTableConfig(rawTableName);
+    if (config == null) {
+      throw new WebApplicationException("Logical table config not found for: " 
+ rawTableName,
+          Response.Status.NOT_FOUND);
+    }
+    for (String physicalTableWithType : 
config.getPhysicalTableConfigMap().keySet()) {
+      RoutingTable routingTable = routingManager.getRoutingTable(
+          CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
physicalTableWithType), getRequestId());
+      if (routingTable != null) {
+        consumer.accept(physicalTableWithType, routingTable);
+      } else {
+        LOGGER.warn("No routing found in multi-cluster manager for physical 
table: {}", physicalTableWithType);
+      }
+    }
+  }
+
+  private void getRoutingTable(RoutingManager routingManager, String tableName,
+      BiConsumer<String, RoutingTable> consumer) {
     // Use a single requestId for both OFFLINE and REALTIME routing so that 
replica-group selection rotates properly
     // for raw table names (no suffix) and stays consistent for hybrid tables.
     long requestId = getRequestId();
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     if (tableType != TableType.REALTIME) {
       String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
-      RoutingTable routingTable = _routingManager.getRoutingTable(
+      RoutingTable routingTable = routingManager.getRoutingTable(
           CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
offlineTableName), requestId);
       if (routingTable != null) {
         consumer.accept(offlineTableName, routingTable);
@@ -189,7 +245,7 @@ public class PinotBrokerDebug {
     }
     if (tableType != TableType.OFFLINE) {
       String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
-      RoutingTable routingTable = _routingManager.getRoutingTable(
+      RoutingTable routingTable = routingManager.getRoutingTable(
           CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
realtimeTableName), requestId);
       if (routingTable != null) {
         consumer.accept(realtimeTableName, routingTable);
@@ -197,6 +253,34 @@ public class PinotBrokerDebug {
     }
   }
 
+  /**
+   * Returns the multi-cluster routing manager if requested and configured, 
otherwise falls back to the local
+   * routing manager. Throws a 400 error if multi-cluster routing is requested 
but not configured, or if the
+   * given table is not a logical table.
+   */
+  private RoutingManager resolveRoutingManager(boolean useMultiClusterRouting, 
String tableName) {
+    if (!useMultiClusterRouting) {
+      return _routingManager;
+    }
+    MultiClusterRoutingContext context = 
_multiClusterRoutingContextProvider.get();
+    if (context == null) {
+      throw new WebApplicationException("Multi-cluster routing is not 
configured on this broker",
+          Response.Status.BAD_REQUEST);
+    }
+    RoutingManager multiClusterRoutingManager = 
context.getMultiClusterRoutingManager();
+    if (multiClusterRoutingManager == null) {
+      throw new WebApplicationException("Multi-cluster routing is not 
configured on this broker",
+          Response.Status.BAD_REQUEST);
+    }
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    if (!_tableCache.isLogicalTable(rawTableName)) {
+      throw new WebApplicationException(
+          "Multi-cluster routing is only supported for logical tables, but '" 
+ rawTableName + "' is not a logical "
+              + "table", Response.Status.BAD_REQUEST);
+    }
+    return multiClusterRoutingManager;
+  }
+
   private static Map<ServerInstance, List<String>> removeOptionalSegments(
       Map<ServerInstance, SegmentsToQuery> serverInstanceToSegmentsMap) {
     Map<ServerInstance, List<String>> ret = new HashMap<>();
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
index 6381e76098c..4d5492ddbc1 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.hc.core5.http.io.SocketConfig;
@@ -36,6 +37,7 @@ import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
 import org.apache.pinot.broker.routing.manager.BrokerRoutingManager;
 import org.apache.pinot.common.audit.AuditLogFilter;
+import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.cursors.AbstractResponseStore;
 import org.apache.pinot.common.http.PoolingHttpClientConnectionManagerHelper;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -46,6 +48,7 @@ import org.apache.pinot.common.utils.log.LocalLogFileServer;
 import org.apache.pinot.common.utils.log.LogFileServer;
 import org.apache.pinot.core.api.ServiceAutoDiscoveryFeature;
 import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.core.transport.ListenerConfig;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.core.util.ListenerConfigUtil;
@@ -79,7 +82,8 @@ public class BrokerAdminApiApplication extends ResourceConfig 
{
       BrokerMetrics brokerMetrics, PinotConfiguration brokerConf, 
SqlQueryExecutor sqlQueryExecutor,
       ServerRoutingStatsManager serverRoutingStatsManager, 
AccessControlFactory accessFactory,
       HelixManager helixManager, QueryQuotaManager queryQuotaManager, 
ThreadAccountant threadAccountant,
-      AbstractResponseStore responseStore) {
+      AbstractResponseStore responseStore, @Nullable 
MultiClusterRoutingContext multiClusterRoutingContext,
+      TableCache tableCache) {
     _brokerResourcePackages = 
brokerConf.getProperty(CommonConstants.Broker.BROKER_RESOURCE_PACKAGES,
         CommonConstants.Broker.DEFAULT_BROKER_RESOURCE_PACKAGES);
     String[] pkgs = _brokerResourcePackages.split(",");
@@ -123,6 +127,9 @@ public class BrokerAdminApiApplication extends 
ResourceConfig {
         bind(threadAccountant).to(ThreadAccountant.class);
         bind(responseStore).to(AbstractResponseStore.class);
         bind(brokerConf).to(PinotConfiguration.class);
+        bind(new 
MultiClusterRoutingContextProvider(multiClusterRoutingContext))
+            .to(MultiClusterRoutingContextProvider.class);
+        bind(tableCache).to(TableCache.class);
       }
     });
     boolean enableBoundedJerseyThreadPoolExecutor =
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/MultiClusterRoutingContextProvider.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/MultiClusterRoutingContextProvider.java
new file mode 100644
index 00000000000..1a8f1d4d0bd
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/MultiClusterRoutingContextProvider.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
+
+
+/**
+ * Wraps a nullable {@link MultiClusterRoutingContext} for Jersey/HK2 
injection (always bound; value may be null).
+ */
+public final class MultiClusterRoutingContextProvider {
+  @Nullable
+  private final MultiClusterRoutingContext _context;
+
+  public MultiClusterRoutingContextProvider(@Nullable 
MultiClusterRoutingContext context) {
+    _context = context;
+  }
+
+  @Nullable
+  public MultiClusterRoutingContext get() {
+    return _context;
+  }
+}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index c53501a6a8b..ec317fd3720 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -174,6 +174,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
   protected PinotMetricsRegistry _metricsRegistry;
   protected BrokerMetrics _brokerMetrics;
   protected BrokerRoutingManager _routingManager;
+  protected MultiClusterRoutingContext _multiClusterRoutingContext;
   protected AccessControlFactory _accessControlFactory;
   protected BrokerRequestHandler _brokerRequestHandler;
   protected SqlQueryExecutor _sqlQueryExecutor;
@@ -467,8 +468,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       
_clusterConfigChangeHandler.registerClusterConfigChangeListener(threadAccountantListener);
     }
 
-    // TODO: Hook multiClusterRoutingContext into request handlers 
subsequently.
-    MultiClusterRoutingContext multiClusterRoutingContext = 
getMultiClusterRoutingContext();
+    _multiClusterRoutingContext = getMultiClusterRoutingContext();
 
     // Create Broker request handler.
     String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, 
getDefaultBrokerId());
@@ -480,7 +480,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       singleStageBrokerRequestHandler =
           createGrpcBrokerRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
               _accessControlFactory, _queryQuotaManager, _tableCache, 
_failureDetector, _threadAccountant,
-              multiClusterRoutingContext);
+              _multiClusterRoutingContext);
     } else {
       // Default request handler type, i.e. netty
       NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, 
Broker.BROKER_NETTY_PREFIX);
@@ -505,7 +505,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       singleStageBrokerRequestHandler =
           createSingleStageBrokerRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
               _accessControlFactory, _queryQuotaManager, _tableCache, 
nettyDefaults, tlsDefaults,
-              _serverRoutingStatsManager, _failureDetector, _threadAccountant, 
multiClusterRoutingContext);
+              _serverRoutingStatsManager, _failureDetector, _threadAccountant, 
_multiClusterRoutingContext);
     }
     MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
     if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, 
Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
@@ -519,16 +519,16 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       WorkerManager workerManager =
           createWorkerManager(brokerId, queryRunnerHostname, queryRunnerPort, 
_routingManager);
       WorkerManager multiClusterWorkerManager;
-      if (multiClusterRoutingContext != null) {
+      if (_multiClusterRoutingContext != null) {
         multiClusterWorkerManager = createWorkerManager(brokerId, 
queryRunnerHostname, queryRunnerPort,
-            multiClusterRoutingContext.getMultiClusterRoutingManager());
+            _multiClusterRoutingContext.getMultiClusterRoutingManager());
       } else {
         multiClusterWorkerManager = workerManager;
       }
       multiStageBrokerRequestHandler =
           createMultiStageBrokerRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
               _accessControlFactory, _queryQuotaManager, _tableCache, 
_multiStageQueryThrottler, _failureDetector,
-              _threadAccountant, multiClusterRoutingContext, workerManager, 
multiClusterWorkerManager);
+              _threadAccountant, _multiClusterRoutingContext, workerManager, 
multiClusterWorkerManager);
       MultiStageBrokerRequestHandler finalHandler = 
multiStageBrokerRequestHandler;
       _routingManager.setServerReenableCallback(
           serverInstance -> 
finalHandler.getQueryDispatcher().resetClientConnectionBackoff(serverInstance));
@@ -538,7 +538,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       timeSeriesRequestHandler =
           new TimeSeriesRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
               _accessControlFactory, _queryQuotaManager, _tableCache, 
_threadAccountant,
-              multiClusterRoutingContext);
+              _multiClusterRoutingContext);
     }
 
     LOGGER.info("Initializing PinotFSFactory");
@@ -983,7 +983,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     BrokerAdminApiApplication brokerAdminApiApplication =
         new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler, 
_brokerMetrics, _brokerConf,
             _sqlQueryExecutor, _serverRoutingStatsManager, 
_accessControlFactory, _spectatorHelixManager,
-            _queryQuotaManager, _threadAccountant, _responseStore);
+            _queryQuotaManager, _threadAccountant, _responseStore, 
_multiClusterRoutingContext, _tableCache);
     brokerAdminApiApplication.register(
         new AuditServiceBinder(_clusterConfigChangeHandler, getServiceRole(), 
_brokerMetrics));
     registerExtraComponents(brokerAdminApiApplication);
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/api/resources/PinotBrokerDebugTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/api/resources/PinotBrokerDebugTest.java
index f1f25d1173a..42cc043e3f5 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/api/resources/PinotBrokerDebugTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/api/resources/PinotBrokerDebugTest.java
@@ -21,10 +21,17 @@ package org.apache.pinot.broker.api.resources;
 import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.HttpHeaders;
+import org.apache.pinot.broker.broker.MultiClusterRoutingContextProvider;
 import org.apache.pinot.broker.routing.manager.BrokerRoutingManager;
+import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.mockito.ArgumentCaptor;
 import org.testng.annotations.Test;
 
@@ -36,10 +43,27 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
 
 
 public class PinotBrokerDebugTest {
 
+  private PinotBrokerDebug createBrokerDebug(BrokerRoutingManager 
routingManager)
+      throws Exception {
+    PinotBrokerDebug brokerDebug = new PinotBrokerDebug();
+    setField(brokerDebug, "_routingManager", routingManager);
+    setField(brokerDebug, "_multiClusterRoutingContextProvider", new 
MultiClusterRoutingContextProvider(null));
+    setField(brokerDebug, "_tableCache", mock(TableCache.class));
+    return brokerDebug;
+  }
+
+  private static void setField(Object target, String fieldName, Object value)
+      throws Exception {
+    Field field = target.getClass().getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(target, value);
+  }
+
   @Test
   public void testGetRoutingTableUsesSameRequestIdForOfflineAndRealtime()
       throws Exception {
@@ -47,12 +71,9 @@ public class PinotBrokerDebugTest {
     when(routingManager.getRoutingTable(any(BrokerRequest.class), anyLong()))
         .thenReturn(new RoutingTable(Collections.emptyMap(), 
Collections.emptyList(), 0));
 
-    PinotBrokerDebug brokerDebug = new PinotBrokerDebug();
-    Field routingManagerField = 
PinotBrokerDebug.class.getDeclaredField("_routingManager");
-    routingManagerField.setAccessible(true);
-    routingManagerField.set(brokerDebug, routingManager);
+    PinotBrokerDebug brokerDebug = createBrokerDebug(routingManager);
 
-    brokerDebug.getRoutingTable("testTable", (HttpHeaders) null);
+    brokerDebug.getRoutingTable("testTable", false, (HttpHeaders) null);
 
     ArgumentCaptor<Long> requestIdCaptor = ArgumentCaptor.forClass(Long.class);
     verify(routingManager, times(2)).getRoutingTable(any(BrokerRequest.class), 
requestIdCaptor.capture());
@@ -74,13 +95,10 @@ public class PinotBrokerDebugTest {
       return null;
     });
 
-    PinotBrokerDebug brokerDebug = new PinotBrokerDebug();
-    Field routingManagerField = 
PinotBrokerDebug.class.getDeclaredField("_routingManager");
-    routingManagerField.setAccessible(true);
-    routingManagerField.set(brokerDebug, routingManager);
+    PinotBrokerDebug brokerDebug = createBrokerDebug(routingManager);
 
-    brokerDebug.getRoutingTable("testTable", (HttpHeaders) null);
-    brokerDebug.getRoutingTable("testTable", (HttpHeaders) null);
+    brokerDebug.getRoutingTable("testTable", false, (HttpHeaders) null);
+    brokerDebug.getRoutingTable("testTable", false, (HttpHeaders) null);
 
     ArgumentCaptor<BrokerRequest> brokerRequestCaptor = 
ArgumentCaptor.forClass(BrokerRequest.class);
     ArgumentCaptor<Long> requestIdCaptor = ArgumentCaptor.forClass(Long.class);
@@ -107,4 +125,74 @@ public class PinotBrokerDebugTest {
     assertTrue(secondRealtimeRequestId != null);
     assertEquals((long) secondRealtimeRequestId, firstRealtimeRequestId + 1);
   }
+
+  @Test
+  public void testMultiClusterRoutingRejectsWhenNotConfigured()
+      throws Exception {
+    BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class);
+    PinotBrokerDebug brokerDebug = createBrokerDebug(routingManager);
+
+    WebApplicationException ex = expectThrows(WebApplicationException.class,
+        () -> brokerDebug.getRoutingTable("testTable", true, (HttpHeaders) 
null));
+    assertEquals(ex.getResponse().getStatus(), 400);
+    assertTrue(ex.getMessage().contains("Multi-cluster routing is not 
configured"));
+  }
+
+  @Test
+  public void testMultiClusterRoutingRejectsNonLogicalTable()
+      throws Exception {
+    BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class);
+    BrokerRoutingManager multiClusterManager = 
mock(BrokerRoutingManager.class);
+    MultiClusterRoutingContext context =
+        new MultiClusterRoutingContext(Collections.emptyMap(), routingManager, 
multiClusterManager,
+            Collections.emptySet());
+
+    TableCache tableCache = mock(TableCache.class);
+    when(tableCache.isLogicalTable("physicalTable")).thenReturn(false);
+
+    PinotBrokerDebug brokerDebug = new PinotBrokerDebug();
+    setField(brokerDebug, "_routingManager", routingManager);
+    setField(brokerDebug, "_multiClusterRoutingContextProvider", new 
MultiClusterRoutingContextProvider(context));
+    setField(brokerDebug, "_tableCache", tableCache);
+
+    WebApplicationException ex = expectThrows(WebApplicationException.class,
+        () -> brokerDebug.getRoutingTable("physicalTable", true, (HttpHeaders) 
null));
+    assertEquals(ex.getResponse().getStatus(), 400);
+    assertTrue(ex.getMessage().contains("not a logical table"));
+  }
+
+  @Test
+  public void testMultiClusterRoutingSucceedsForLogicalTable()
+      throws Exception {
+    BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class);
+    BrokerRoutingManager multiClusterManager = 
mock(BrokerRoutingManager.class);
+    when(multiClusterManager.getRoutingTable(any(BrokerRequest.class), 
anyLong()))
+        .thenReturn(new RoutingTable(Collections.emptyMap(), 
Collections.emptyList(), 0));
+
+    MultiClusterRoutingContext context =
+        new MultiClusterRoutingContext(Collections.emptyMap(), routingManager, 
multiClusterManager,
+            Collections.emptySet());
+
+    // Logical table with two physical tables (one OFFLINE each) in the config 
map
+    LogicalTableConfig logicalTableConfig = new LogicalTableConfig();
+    logicalTableConfig.setTableName("logicalTable");
+    logicalTableConfig.setPhysicalTableConfigMap(Map.of(
+        "physicalTable1_OFFLINE", new PhysicalTableConfig(false),
+        "physicalTable2_OFFLINE", new PhysicalTableConfig(true)));
+
+    TableCache tableCache = mock(TableCache.class);
+    when(tableCache.isLogicalTable("logicalTable")).thenReturn(true);
+    
when(tableCache.getLogicalTableConfig("logicalTable")).thenReturn(logicalTableConfig);
+
+    PinotBrokerDebug brokerDebug = new PinotBrokerDebug();
+    setField(brokerDebug, "_routingManager", routingManager);
+    setField(brokerDebug, "_multiClusterRoutingContextProvider", new 
MultiClusterRoutingContextProvider(context));
+    setField(brokerDebug, "_tableCache", tableCache);
+
+    brokerDebug.getRoutingTable("logicalTable", true, (HttpHeaders) null);
+
+    // One call per physical table in the config; the local routingManager is 
never touched
+    verify(multiClusterManager, 
times(2)).getRoutingTable(any(BrokerRequest.class), anyLong());
+    verify(routingManager, times(0)).getRoutingTable(any(BrokerRequest.class), 
anyLong());
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
index 452e3d1b4b6..f07dabaf408 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.json.JsonMapper;
 import java.io.IOException;
 import java.util.Map;
+import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,6 +102,39 @@ public class MultiClusterIntegrationTest extends 
BaseMultiClusterIntegrationTest
     LOGGER.info("Multi-cluster broker test passed: both clusters started and 
queryable");
   }
 
+  @Test(groups = "query")
+  public void testMultiClusterRoutingTableDebugEndpoint() throws Exception {
+    String logicalTable = getLogicalTableName();
+    String brokerBase = "http://localhost:"; + _cluster1._brokerPort;
+
+    // Local routing for a physical table should work without the 
multi-cluster flag
+    String physicalTable = getPhysicalTable1InCluster1();
+    String localRouting = ControllerTest.sendGetRequest(
+        brokerBase + "/debug/routingTable/" + physicalTable);
+    JsonMapper mapper = JsonMapper.builder().build();
+    JsonNode localJson = mapper.readTree(localRouting);
+    String physicalOfflineKey = physicalTable + "_OFFLINE";
+    assertTrue(localJson.has(physicalOfflineKey),
+        "Local routing should include physical offline table: " + 
localRouting);
+
+    // Multi-cluster routing for a logical table
+    String multiRouting = ControllerTest.sendGetRequest(
+        brokerBase + "/debug/routingTable/" + logicalTable + 
"?useMultiClusterRouting=true");
+    JsonNode multiJson = mapper.readTree(multiRouting);
+
+    // The logical table expands to its physical tables; verify both physical 
tables are represented.
+    String phys1OfflineKey = getPhysicalTable1InCluster1() + "_OFFLINE";
+    String phys2OfflineKey = getPhysicalTable1InCluster2() + "_OFFLINE";
+    assertTrue(multiJson.has(phys1OfflineKey),
+        "Multi-cluster routing should include physical table from cluster 1: " 
+ multiRouting);
+    assertTrue(multiJson.has(phys2OfflineKey),
+        "Multi-cluster routing should include physical table from cluster 2: " 
+ multiRouting);
+    assertTrue(multiJson.get(phys1OfflineKey).size() >= 1,
+        "Cluster 1 physical table should have at least one server");
+    assertTrue(multiJson.get(phys2OfflineKey).size() >= 1,
+        "Cluster 2 physical table should have at least one server");
+  }
+
   @BeforeGroups("query")
   public void setupTablesForQueryTests() throws Exception {
     dropLogicalTableIfExists(getLogicalTableName(), 
_cluster1._controllerBaseApiUrl);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to