This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 13647c6c38a Report table's server instances based on ideal state
instead of tags (#16964)
13647c6c38a is described below
commit 13647c6c38a19e16834a08a00607d8f61636a8f8
Author: Jhow <[email protected]>
AuthorDate: Tue Oct 7 09:54:19 2025 -0700
Report table's server instances based on ideal state instead of tags
(#16964)
---
.../helix/core/PinotHelixResourceManager.java | 23 +++++++++++-----------
...PartialUpsertTableRebalanceIntegrationTest.java | 16 ++++-----------
2 files changed, 15 insertions(+), 24 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8f73155a3fa..a5cb1c5377e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -3559,21 +3559,20 @@ public class PinotHelixResourceManager {
return tableConfigs;
}
+ /** Get all server instances that host at least a segment for a given table
based on the ideal state.
+ *
+ * @param tableName Table name with or without type suffix
+ * @param tableType Table type
+ * @return List of server instances
+ */
public List<String> getServerInstancesForTable(String tableName, TableType
tableType) {
- TableConfig tableConfig = getTableConfig(tableName, tableType);
- Preconditions.checkNotNull(tableConfig);
- TenantConfig tenantConfig = tableConfig.getTenantConfig();
Set<String> serverInstances = new HashSet<>();
- List<InstanceConfig> instanceConfigs =
HelixHelper.getInstanceConfigs(_helixZkManager);
- if (tableType == TableType.OFFLINE) {
- serverInstances.addAll(
- HelixHelper.getInstancesWithTag(instanceConfigs,
TagNameUtils.extractOfflineServerTag(tenantConfig)));
- } else if (TableType.REALTIME.equals(tableType)) {
- serverInstances.addAll(
- HelixHelper.getInstancesWithTag(instanceConfigs,
TagNameUtils.extractConsumingServerTag(tenantConfig)));
- serverInstances.addAll(
- HelixHelper.getInstancesWithTag(instanceConfigs,
TagNameUtils.extractCompletedServerTag(tenantConfig)));
+ IdealState idealState =
getTableIdealState(TableNameBuilder.forType(tableType).tableNameWithType(tableName));
+ ZNRecord record = idealState.getRecord();
+ if (record == null) {
+ return new ArrayList<>();
}
+ record.getMapFields().forEach((key, value) ->
serverInstances.addAll(value.keySet()));
return new ArrayList<>(serverInstances);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index 87cb5b2abc0..80ab7359ad9 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -131,16 +131,12 @@ public class PartialUpsertTableRebalanceIntegrationTest
extends BaseClusterInteg
BaseServerStarter serverStarter2 = startOneServer(NUM_SERVERS + 1);
rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
- // Check the number of servers after rebalancing
- finalServers = _resourceManager.getServerInstancesForTable(getTableName(),
TableType.REALTIME).size();
-
- // Check that a server has been added
- assertEquals(finalServers, NUM_SERVERS + 2, "Rebalancing didn't correctly
add the new server");
-
waitForRebalanceToComplete(rebalanceResult, 600_000L);
waitForAllDocsLoaded(600_000L);
// number of instances assigned can't be more than number of partitions
for rf = 1
+ finalServers = _resourceManager.getServerInstancesForTable(getTableName(),
TableType.REALTIME).size();
+ assertEquals(finalServers, getNumKafkaPartitions(), "Rebalancing didn't
correctly add the new server");
verifySegmentAssignment(rebalanceResult.getSegmentAssignment(), 5,
getNumKafkaPartitions());
_resourceManager.updateInstanceTags(serverStarter1.getInstanceId(), "",
false);
@@ -199,16 +195,12 @@ public class PartialUpsertTableRebalanceIntegrationTest
extends BaseClusterInteg
BaseServerStarter serverStarter2 = startOneServer(NUM_SERVERS + 1);
rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
- // Check the number of servers after rebalancing
- finalServers = _resourceManager.getServerInstancesForTable(getTableName(),
TableType.REALTIME).size();
-
- // Check that a server has been added
- assertEquals(finalServers, NUM_SERVERS + 2, "Rebalancing didn't correctly
add the new server");
-
waitForRebalanceToComplete(rebalanceResult, 600_000L);
waitForAllDocsLoaded(600_000L);
// number of instances assigned can't be more than number of partitions
for rf = 1
+ finalServers = _resourceManager.getServerInstancesForTable(getTableName(),
TableType.REALTIME).size();
+ assertEquals(finalServers, getNumKafkaPartitions(), "Rebalancing didn't
correctly add the new server");
verifySegmentAssignment(rebalanceResult.getSegmentAssignment(), 5,
getNumKafkaPartitions());
_resourceManager.updateInstanceTags(serverStarter1.getInstanceId(), "",
false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]