This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0075ad7d53 Add Tenants Info to Rebalance API summary (#15284) 0075ad7d53 is described below commit 0075ad7d5346bb3c2b74aeca896f47007a9f87fc Author: Jhow <44998515+j-howhu...@users.noreply.github.com> AuthorDate: Fri Mar 28 14:57:24 2025 -0700 Add Tenants Info to Rebalance API summary (#15284) --- .../core/rebalance/RebalanceSummaryResult.java | 68 ++++++- .../helix/core/rebalance/TableRebalancer.java | 70 ++++++- .../RebalanceServerRebalanceSummaryResponse.tsx | 4 + .../TableRebalancerClusterStatelessTest.java | 202 +++++++++++++++++++++ .../tests/OfflineClusterIntegrationTest.java | 14 ++ 5 files changed, 354 insertions(+), 4 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java index 3169c9b367..753d3d5dd4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java @@ -39,6 +39,8 @@ public class RebalanceSummaryResult { private final ServerInfo _serverInfo; @JsonInclude(JsonInclude.Include.NON_NULL) private final SegmentInfo _segmentInfo; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final List<TagInfo> _tagsInfo; /** * Constructor for RebalanceSummaryResult @@ -47,9 +49,11 @@ public class RebalanceSummaryResult { */ @JsonCreator public RebalanceSummaryResult(@JsonProperty("serverInfo") @Nullable ServerInfo serverInfo, - @JsonProperty("segmentInfo") @Nullable SegmentInfo segmentInfo) { + @JsonProperty("segmentInfo") @Nullable SegmentInfo segmentInfo, + @JsonProperty("tagsInfo") @Nullable List<TagInfo> tagsInfo) { _serverInfo = serverInfo; _segmentInfo = segmentInfo; + _tagsInfo = tagsInfo; } @JsonProperty @@ -62,6 +66,11 @@ public class RebalanceSummaryResult { return _segmentInfo; } + @JsonProperty + public List<TagInfo> getTagsInfo() { + return _tagsInfo; + } + public static class ServerSegmentChangeInfo { private final ServerStatus _serverStatus; private final int _totalSegmentsAfterRebalance; @@ -161,6 +170,63 @@ public class RebalanceSummaryResult { } } + public static class TagInfo { + public static final String TAG_FOR_OUTDATED_SERVERS = "OUTDATED_SERVERS"; + private final String _tagName; + private int _numSegmentsUnchanged; + private int _numSegmentsToDownload; + private int _numServerParticipants; + + @JsonCreator + public TagInfo( + @JsonProperty("tagName") String tagName, + @JsonProperty("numSegmentsToDownload") int numSegmentsToDownload, + @JsonProperty("numSegmentsUnchanged") int numSegmentsUnchanged, + @JsonProperty("numServerParticipants") int numServerParticipants + ) { + _tagName = tagName; + _numSegmentsUnchanged = numSegmentsUnchanged; + _numSegmentsToDownload = numSegmentsToDownload; + _numServerParticipants = numServerParticipants; + } + + public TagInfo(String tagName) { + this(tagName, 0, 0, 0); + } + + @JsonProperty + public String getTagName() { + return _tagName; + } + + @JsonProperty + public int getNumSegmentsUnchanged() { + return _numSegmentsUnchanged; + } + + @JsonProperty + public int getNumSegmentsToDownload() { + return _numSegmentsToDownload; + } + + @JsonProperty + public int getNumServerParticipants() { + return _numServerParticipants; + } + + public void increaseNumSegmentsUnchanged(int numSegments) { + _numSegmentsUnchanged += numSegments; + } + + public void increaseNumSegmentsToDownload(int numSegments) { + _numSegmentsToDownload += numSegments; + } + + public void increaseNumServerParticipants(int numServers) { + _numServerParticipants += numServers; + } + } + public static class ServerInfo { private final int _numServersGettingNewSegments; @JsonInclude(JsonInclude.Include.NON_NULL) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index d91e2db103..3e7dc3eab3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -56,6 +56,7 @@ import org.apache.pinot.common.tier.PinotServerTierStorage; import org.apache.pinot.common.tier.Tier; import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.utils.config.TableConfigUtils; +import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.common.utils.config.TierConfigUtils; import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; @@ -66,6 +67,7 @@ import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.TagOverrideConfig; import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; @@ -334,7 +336,7 @@ public class TableRebalancer { // is expected or not based on the summary results RebalanceSummaryResult summaryResult = calculateDryRunSummary(currentAssignment, targetAssignment, tableNameWithType, rebalanceJobId, - tableSubTypeSizeDetails); + tableSubTypeSizeDetails, tableConfig); if (segmentAssignmentUnchanged) { LOGGER.info("Table: {} is already balanced", tableNameWithType); @@ -622,7 +624,7 @@ public class TableRebalancer { private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, String tableNameWithType, String rebalanceJobId, - TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails) { + TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, TableConfig tableConfig) { LOGGER.info("Calculating rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); int existingReplicationFactor = 0; @@ -662,6 +664,38 @@ public class TableRebalancer { Set<String> serversRemoved = new HashSet<>(); Set<String> serversUnchanged = new HashSet<>(); Set<String> serversGettingNewSegments = new HashSet<>(); + Map<String, RebalanceSummaryResult.TagInfo> tagsInfoMap = new HashMap<>(); + String serverTenantName = tableConfig.getTenantConfig().getServer(); + if (serverTenantName != null) { + String serverTenantTag = + TagNameUtils.getServerTagForTenant(serverTenantName, tableConfig.getTableType()); + tagsInfoMap.put(serverTenantTag, + new RebalanceSummaryResult.TagInfo(serverTenantTag)); + } + TagOverrideConfig tagOverrideConfig = tableConfig.getTenantConfig().getTagOverrideConfig(); + if (tagOverrideConfig != null) { + String completedTag = tagOverrideConfig.getRealtimeCompleted(); + String consumingTag = tagOverrideConfig.getRealtimeConsuming(); + if (completedTag != null) { + tagsInfoMap.put(completedTag, new RebalanceSummaryResult.TagInfo(completedTag)); + } + if (consumingTag != null) { + tagsInfoMap.put(consumingTag, new RebalanceSummaryResult.TagInfo(consumingTag)); + } + } + if (tableConfig.getInstanceAssignmentConfigMap() != null) { + // for simplicity, including all segment types present in instanceAssignmentConfigMap + tableConfig.getInstanceAssignmentConfigMap().values().forEach(instanceAssignmentConfig -> { + String tag = instanceAssignmentConfig.getTagPoolConfig().getTag(); + tagsInfoMap.put(tag, new RebalanceSummaryResult.TagInfo(tag)); + }); + } + if (tableConfig.getTierConfigsList() != null) { + tableConfig.getTierConfigsList().forEach(tierConfig -> { + String tierTag = tierConfig.getServerTag(); + tagsInfoMap.put(tierTag, new RebalanceSummaryResult.TagInfo(tierTag)); + }); + } Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> serverSegmentChangeInfoMap = new HashMap<>(); int segmentsNotMoved = 0; int maxSegmentsAddedToServer = 0; @@ -699,6 +733,30 @@ public class TableRebalancer { serverSegmentChangeInfoMap.put(server, new RebalanceSummaryResult.ServerSegmentChangeInfo(serverStatus, totalNewSegments, totalExistingSegments, segmentsAdded, segmentsDeleted, segmentsUnchanged, instanceToTagsMap.getOrDefault(server, null))); + List<String> serverTags = getServerTag(server); + Set<String> relevantTags = new HashSet<>(serverTags); + relevantTags.retainAll(tagsInfoMap.keySet()); + // The segments remain unchanged or need to download will be accounted to every tag associated with this + // server instance + if (relevantTags.isEmpty()) { + // this could happen when server's tags changed but reassignInstance=false in the rebalance config + LOGGER.warn("Server: {} was assigned to table: {} but does not have any relevant tags", server, + tableNameWithType); + + RebalanceSummaryResult.TagInfo tagsInfo = + tagsInfoMap.computeIfAbsent(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS, + RebalanceSummaryResult.TagInfo::new); + tagsInfo.increaseNumSegmentsUnchanged(segmentsUnchanged); + tagsInfo.increaseNumSegmentsToDownload(segmentsAdded); + tagsInfo.increaseNumServerParticipants(1); + } else { + for (String tag : relevantTags) { + RebalanceSummaryResult.TagInfo tagsInfo = tagsInfoMap.get(tag); + tagsInfo.increaseNumSegmentsUnchanged(segmentsUnchanged); + tagsInfo.increaseNumSegmentsToDownload(segmentsAdded); + tagsInfo.increaseNumServerParticipants(1); + } + } } for (Map.Entry<String, Set<String>> entry : existingServersToSegmentMap.entrySet()) { @@ -739,7 +797,13 @@ public class TableRebalancer { LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); - return new RebalanceSummaryResult(serverInfo, segmentInfo); + return new RebalanceSummaryResult(serverInfo, segmentInfo, new ArrayList<>(tagsInfoMap.values())); + } + + private List<String> getServerTag(String serverName) { + InstanceConfig instanceConfig = + _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(serverName)); + return instanceConfig.getTags(); } private void onReturnFailure(String errorMsg, Exception e) { diff --git a/pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerResponses/RebalanceServerRebalanceSummaryResponse.tsx b/pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerResponses/RebalanceServerRebalanceSummaryResponse.tsx index ba79b18167..7744058237 100644 --- a/pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerResponses/RebalanceServerRebalanceSummaryResponse.tsx +++ b/pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerResponses/RebalanceServerRebalanceSummaryResponse.tsx @@ -32,6 +32,10 @@ export const RebalanceServerRebalanceSummaryResponse = ({ response }) => { { name: 'II. Segment Information', key: 'segmentInfo' + }, + { + name: 'III. Server Tags Information', + key: 'tagsInfo' } ]; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index e86c8263f0..1a8c221899 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.assignment.InstancePartitionsUtils; import org.apache.pinot.common.restlet.resources.DiskUsageInfo; @@ -153,6 +154,13 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -202,6 +210,15 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 3); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 14); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS - 14); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers + numServersToAdd); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -371,6 +388,15 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 11); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 11); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS - 11); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers + numServersToAdd); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -451,6 +477,15 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers + numServersToAdd); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -476,6 +511,15 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers + numServersToAdd); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -522,6 +566,15 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 3); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 15); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS - 15); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -826,6 +879,15 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -858,6 +920,15 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -892,6 +963,27 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 9); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 6); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 3); + Map<String, RebalanceSummaryResult.TagInfo> tenantInfoMap = rebalanceSummaryResult.getTagsInfo() + .stream() + .collect(Collectors.toMap(RebalanceSummaryResult.TagInfo::getTagName, info -> info)); + assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME))); + assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant(TIER_A_NAME))); + assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant(TIER_B_NAME))); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)).getNumSegmentsToDownload(), 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)).getNumSegmentsUnchanged(), + 5 * NUM_REPLICAS); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)).getNumServerParticipants(), + numServers); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_A_NAME)).getNumSegmentsToDownload(), + NUM_REPLICAS); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_A_NAME)).getNumSegmentsUnchanged(), 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_A_NAME)).getNumServerParticipants(), 3); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_B_NAME)).getNumSegmentsToDownload(), + 4 * NUM_REPLICAS); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_B_NAME)).getNumSegmentsUnchanged(), 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_B_NAME)).getNumServerParticipants(), 3); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getTierInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -969,6 +1061,15 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -997,6 +1098,15 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -1025,6 +1135,28 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 6); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 2); + Map<String, RebalanceSummaryResult.TagInfo> tenantInfoMap = rebalanceSummaryResult.getTagsInfo() + .stream() + .collect(Collectors.toMap(RebalanceSummaryResult.TagInfo::getTagName, info -> info)); + assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME))); + assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME))); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) + .getNumSegmentsToDownload(), 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) + .getNumSegmentsUnchanged(), + 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) + .getNumServerParticipants(), + 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) + .getNumSegmentsToDownload(), + numSegments * NUM_REPLICAS); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) + .getNumSegmentsUnchanged(), 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) + .getNumServerParticipants(), 6); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getTierInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -1064,6 +1196,28 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 13); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 2); + tenantInfoMap = rebalanceSummaryResult.getTagsInfo() + .stream() + .collect(Collectors.toMap(RebalanceSummaryResult.TagInfo::getTagName, info -> info)); + assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME))); + assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME))); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) + .getNumSegmentsToDownload(), 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) + .getNumSegmentsUnchanged(), + 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) + .getNumServerParticipants(), + 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) + .getNumSegmentsToDownload(), + 13); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) + .getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS - 13); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) + .getNumServerParticipants(), 6); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getTierInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -1115,6 +1269,54 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { } assertEquals(numSegmentsOnServer0, numSegments / 2); + _helixResourceManager.deleteOfflineServerTenantFor("replicaAssignment" + TIER_A_NAME); + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setReassignInstances(false); + + // if rebalance with reassignInstances=false, servers assigned would not have relevant tags + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 3); + tenantInfoMap = rebalanceSummaryResult.getTagsInfo() + .stream() + .collect(Collectors.toMap(RebalanceSummaryResult.TagInfo::getTagName, info -> info)); + assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME))); + assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME))); + assertTrue(tenantInfoMap.containsKey(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS)); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) + .getNumSegmentsToDownload(), 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) + .getNumSegmentsUnchanged(), + 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) + .getNumServerParticipants(), + 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) + .getNumSegmentsToDownload(), + 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) + .getNumSegmentsUnchanged(), 0); + assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) + .getNumServerParticipants(), 0); + assertEquals( + tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsToDownload(), + 0); + assertEquals( + tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS); + assertEquals( + tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumServerParticipants(), + 6); + _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index f4eb559f68..eb575799e3 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -65,6 +65,7 @@ import org.apache.pinot.common.response.server.TableIndexMetadataResponse; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; @@ -4294,6 +4295,19 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet "Existing number of servers don't match"); assertEquals(summaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), newNumServers, "New number of servers don't match"); + // In this cluster integration test, servers are tagged with DefaultTenant only + assertEquals(summaryResult.getTagsInfo().size(), 1); + assertEquals(summaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(getServerTenant())); + assertEquals(summaryResult.getTagsInfo().get(0).getNumServerParticipants(), newNumServers); + assertEquals(summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), + summaryResult.getTagsInfo().get(0).getNumSegmentsToDownload()); + // For this single tenant, the number of unchanged segments and the number of received segments should add up to + // the total present segment + assertEquals(summaryResult.getSegmentInfo().getNumSegmentsAcrossAllReplicas().getExpectedValueAfterRebalance(), + summaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged() + summaryResult.getTagsInfo() + .get(0) + .getNumSegmentsToDownload()); if (_tableSize > 0) { assertTrue(summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes() > 0L, "Avg segment size expected to be > 0 but found to be 0"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org