This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 13647c6c38a Report table's server instances based on ideal state 
instead of tags (#16964)
13647c6c38a is described below

commit 13647c6c38a19e16834a08a00607d8f61636a8f8
Author: Jhow <[email protected]>
AuthorDate: Tue Oct 7 09:54:19 2025 -0700

    Report table's server instances based on ideal state instead of tags 
(#16964)
---
 .../helix/core/PinotHelixResourceManager.java      | 23 +++++++++++-----------
 ...PartialUpsertTableRebalanceIntegrationTest.java | 16 ++++-----------
 2 files changed, 15 insertions(+), 24 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8f73155a3fa..a5cb1c5377e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -3559,21 +3559,20 @@ public class PinotHelixResourceManager {
     return tableConfigs;
   }
 
+  /** Get all server instances that host at least a segment for a given table 
based on the ideal state.
+   *
+   * @param tableName Table name with or without type suffix
+   * @param tableType Table type
+   * @return List of server instances
+   */
   public List<String> getServerInstancesForTable(String tableName, TableType 
tableType) {
-    TableConfig tableConfig = getTableConfig(tableName, tableType);
-    Preconditions.checkNotNull(tableConfig);
-    TenantConfig tenantConfig = tableConfig.getTenantConfig();
     Set<String> serverInstances = new HashSet<>();
-    List<InstanceConfig> instanceConfigs = 
HelixHelper.getInstanceConfigs(_helixZkManager);
-    if (tableType == TableType.OFFLINE) {
-      serverInstances.addAll(
-          HelixHelper.getInstancesWithTag(instanceConfigs, 
TagNameUtils.extractOfflineServerTag(tenantConfig)));
-    } else if (TableType.REALTIME.equals(tableType)) {
-      serverInstances.addAll(
-          HelixHelper.getInstancesWithTag(instanceConfigs, 
TagNameUtils.extractConsumingServerTag(tenantConfig)));
-      serverInstances.addAll(
-          HelixHelper.getInstancesWithTag(instanceConfigs, 
TagNameUtils.extractCompletedServerTag(tenantConfig)));
+    IdealState idealState = 
getTableIdealState(TableNameBuilder.forType(tableType).tableNameWithType(tableName));
+    ZNRecord record = idealState.getRecord();
+    if (record == null) {
+      return new ArrayList<>();
     }
+    record.getMapFields().forEach((key, value) -> 
serverInstances.addAll(value.keySet()));
     return new ArrayList<>(serverInstances);
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index 87cb5b2abc0..80ab7359ad9 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -131,16 +131,12 @@ public class PartialUpsertTableRebalanceIntegrationTest 
extends BaseClusterInteg
     BaseServerStarter serverStarter2 = startOneServer(NUM_SERVERS + 1);
     rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
 
-    // Check the number of servers after rebalancing
-    finalServers = _resourceManager.getServerInstancesForTable(getTableName(), 
TableType.REALTIME).size();
-
-    // Check that a server has been added
-    assertEquals(finalServers, NUM_SERVERS + 2, "Rebalancing didn't correctly 
add the new server");
-
     waitForRebalanceToComplete(rebalanceResult, 600_000L);
     waitForAllDocsLoaded(600_000L);
 
     // number of instances assigned can't be more than number of partitions 
for rf = 1
+    finalServers = _resourceManager.getServerInstancesForTable(getTableName(), 
TableType.REALTIME).size();
+    assertEquals(finalServers, getNumKafkaPartitions(), "Rebalancing didn't 
correctly add the new server");
     verifySegmentAssignment(rebalanceResult.getSegmentAssignment(), 5, 
getNumKafkaPartitions());
 
     _resourceManager.updateInstanceTags(serverStarter1.getInstanceId(), "", 
false);
@@ -199,16 +195,12 @@ public class PartialUpsertTableRebalanceIntegrationTest 
extends BaseClusterInteg
     BaseServerStarter serverStarter2 = startOneServer(NUM_SERVERS + 1);
     rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
 
-    // Check the number of servers after rebalancing
-    finalServers = _resourceManager.getServerInstancesForTable(getTableName(), 
TableType.REALTIME).size();
-
-    // Check that a server has been added
-    assertEquals(finalServers, NUM_SERVERS + 2, "Rebalancing didn't correctly 
add the new server");
-
     waitForRebalanceToComplete(rebalanceResult, 600_000L);
     waitForAllDocsLoaded(600_000L);
 
     // number of instances assigned can't be more than number of partitions 
for rf = 1
+    finalServers = _resourceManager.getServerInstancesForTable(getTableName(), 
TableType.REALTIME).size();
+    assertEquals(finalServers, getNumKafkaPartitions(), "Rebalancing didn't 
correctly add the new server");
     verifySegmentAssignment(rebalanceResult.getSegmentAssignment(), 5, 
getNumKafkaPartitions());
 
     _resourceManager.updateInstanceTags(serverStarter1.getInstanceId(), "", 
false);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to