This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0075ad7d53 Add Tenants Info to Rebalance API summary (#15284)
0075ad7d53 is described below

commit 0075ad7d5346bb3c2b74aeca896f47007a9f87fc
Author: Jhow <44998515+j-howhu...@users.noreply.github.com>
AuthorDate: Fri Mar 28 14:57:24 2025 -0700

    Add Tenants Info to Rebalance API summary (#15284)
---
 .../core/rebalance/RebalanceSummaryResult.java     |  68 ++++++-
 .../helix/core/rebalance/TableRebalancer.java      |  70 ++++++-
 .../RebalanceServerRebalanceSummaryResponse.tsx    |   4 +
 .../TableRebalancerClusterStatelessTest.java       | 202 +++++++++++++++++++++
 .../tests/OfflineClusterIntegrationTest.java       |  14 ++
 5 files changed, 354 insertions(+), 4 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
index 3169c9b367..753d3d5dd4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
@@ -39,6 +39,8 @@ public class RebalanceSummaryResult {
   private final ServerInfo _serverInfo;
   @JsonInclude(JsonInclude.Include.NON_NULL)
   private final SegmentInfo _segmentInfo;
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final List<TagInfo> _tagsInfo;
 
   /**
    * Constructor for RebalanceSummaryResult
@@ -47,9 +49,11 @@ public class RebalanceSummaryResult {
    */
   @JsonCreator
   public RebalanceSummaryResult(@JsonProperty("serverInfo") @Nullable 
ServerInfo serverInfo,
-      @JsonProperty("segmentInfo") @Nullable SegmentInfo segmentInfo) {
+      @JsonProperty("segmentInfo") @Nullable SegmentInfo segmentInfo,
+      @JsonProperty("tagsInfo") @Nullable List<TagInfo> tagsInfo) {
     _serverInfo = serverInfo;
     _segmentInfo = segmentInfo;
+    _tagsInfo = tagsInfo;
   }
 
   @JsonProperty
@@ -62,6 +66,11 @@ public class RebalanceSummaryResult {
     return _segmentInfo;
   }
 
+  @JsonProperty
+  public List<TagInfo> getTagsInfo() {
+    return _tagsInfo;
+  }
+
   public static class ServerSegmentChangeInfo {
     private final ServerStatus _serverStatus;
     private final int _totalSegmentsAfterRebalance;
@@ -161,6 +170,63 @@ public class RebalanceSummaryResult {
     }
   }
 
+  public static class TagInfo {
+    public static final String TAG_FOR_OUTDATED_SERVERS = "OUTDATED_SERVERS";
+    private final String _tagName;
+    private int _numSegmentsUnchanged;
+    private int _numSegmentsToDownload;
+    private int _numServerParticipants;
+
+    @JsonCreator
+    public TagInfo(
+        @JsonProperty("tagName") String tagName,
+        @JsonProperty("numSegmentsToDownload") int numSegmentsToDownload,
+        @JsonProperty("numSegmentsUnchanged") int numSegmentsUnchanged,
+        @JsonProperty("numServerParticipants") int numServerParticipants
+    ) {
+      _tagName = tagName;
+      _numSegmentsUnchanged = numSegmentsUnchanged;
+      _numSegmentsToDownload = numSegmentsToDownload;
+      _numServerParticipants = numServerParticipants;
+    }
+
+    public TagInfo(String tagName) {
+      this(tagName, 0, 0, 0);
+    }
+
+    @JsonProperty
+    public String getTagName() {
+      return _tagName;
+    }
+
+    @JsonProperty
+    public int getNumSegmentsUnchanged() {
+      return _numSegmentsUnchanged;
+    }
+
+    @JsonProperty
+    public int getNumSegmentsToDownload() {
+      return _numSegmentsToDownload;
+    }
+
+    @JsonProperty
+    public int getNumServerParticipants() {
+      return _numServerParticipants;
+    }
+
+    public void increaseNumSegmentsUnchanged(int numSegments) {
+      _numSegmentsUnchanged += numSegments;
+    }
+
+    public void increaseNumSegmentsToDownload(int numSegments) {
+      _numSegmentsToDownload += numSegments;
+    }
+
+    public void increaseNumServerParticipants(int numServers) {
+      _numServerParticipants += numServers;
+    }
+  }
+
   public static class ServerInfo {
     private final int _numServersGettingNewSegments;
     @JsonInclude(JsonInclude.Include.NON_NULL)
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index d91e2db103..3e7dc3eab3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -56,6 +56,7 @@ import org.apache.pinot.common.tier.PinotServerTierStorage;
 import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.config.TierConfigUtils;
 import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
@@ -66,6 +67,7 @@ import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.config.table.RoutingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TagOverrideConfig;
 import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -334,7 +336,7 @@ public class TableRebalancer {
     // is expected or not based on the summary results
     RebalanceSummaryResult summaryResult =
         calculateDryRunSummary(currentAssignment, targetAssignment, 
tableNameWithType, rebalanceJobId,
-            tableSubTypeSizeDetails);
+            tableSubTypeSizeDetails, tableConfig);
 
     if (segmentAssignmentUnchanged) {
       LOGGER.info("Table: {} is already balanced", tableNameWithType);
@@ -622,7 +624,7 @@ public class TableRebalancer {
 
   private RebalanceSummaryResult calculateDryRunSummary(Map<String, 
Map<String, String>> currentAssignment,
       Map<String, Map<String, String>> targetAssignment, String 
tableNameWithType, String rebalanceJobId,
-      TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails) {
+      TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, 
TableConfig tableConfig) {
     LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
         tableNameWithType, rebalanceJobId);
     int existingReplicationFactor = 0;
@@ -662,6 +664,38 @@ public class TableRebalancer {
     Set<String> serversRemoved = new HashSet<>();
     Set<String> serversUnchanged = new HashSet<>();
     Set<String> serversGettingNewSegments = new HashSet<>();
+    Map<String, RebalanceSummaryResult.TagInfo> tagsInfoMap = new HashMap<>();
+    String serverTenantName = tableConfig.getTenantConfig().getServer();
+    if (serverTenantName != null) {
+      String serverTenantTag =
+          TagNameUtils.getServerTagForTenant(serverTenantName, 
tableConfig.getTableType());
+      tagsInfoMap.put(serverTenantTag,
+          new RebalanceSummaryResult.TagInfo(serverTenantTag));
+    }
+    TagOverrideConfig tagOverrideConfig = 
tableConfig.getTenantConfig().getTagOverrideConfig();
+    if (tagOverrideConfig != null) {
+      String completedTag = tagOverrideConfig.getRealtimeCompleted();
+      String consumingTag = tagOverrideConfig.getRealtimeConsuming();
+      if (completedTag != null) {
+        tagsInfoMap.put(completedTag, new 
RebalanceSummaryResult.TagInfo(completedTag));
+      }
+      if (consumingTag != null) {
+        tagsInfoMap.put(consumingTag, new 
RebalanceSummaryResult.TagInfo(consumingTag));
+      }
+    }
+    if (tableConfig.getInstanceAssignmentConfigMap() != null) {
+      // for simplicity, including all segment types present in 
instanceAssignmentConfigMap
+      
tableConfig.getInstanceAssignmentConfigMap().values().forEach(instanceAssignmentConfig
 -> {
+        String tag = instanceAssignmentConfig.getTagPoolConfig().getTag();
+        tagsInfoMap.put(tag, new RebalanceSummaryResult.TagInfo(tag));
+      });
+    }
+    if (tableConfig.getTierConfigsList() != null) {
+      tableConfig.getTierConfigsList().forEach(tierConfig -> {
+        String tierTag = tierConfig.getServerTag();
+        tagsInfoMap.put(tierTag, new RebalanceSummaryResult.TagInfo(tierTag));
+      });
+    }
     Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
serverSegmentChangeInfoMap = new HashMap<>();
     int segmentsNotMoved = 0;
     int maxSegmentsAddedToServer = 0;
@@ -699,6 +733,30 @@ public class TableRebalancer {
       serverSegmentChangeInfoMap.put(server, new 
RebalanceSummaryResult.ServerSegmentChangeInfo(serverStatus,
           totalNewSegments, totalExistingSegments, segmentsAdded, 
segmentsDeleted, segmentsUnchanged,
           instanceToTagsMap.getOrDefault(server, null)));
+      List<String> serverTags = getServerTag(server);
+      Set<String> relevantTags = new HashSet<>(serverTags);
+      relevantTags.retainAll(tagsInfoMap.keySet());
+      // The segments remain unchanged or need to download will be accounted 
to every tag associated with this
+      // server instance
+      if (relevantTags.isEmpty()) {
+        // this could happen when server's tags changed but 
reassignInstance=false in the rebalance config
+        LOGGER.warn("Server: {} was assigned to table: {} but does not have 
any relevant tags", server,
+            tableNameWithType);
+
+        RebalanceSummaryResult.TagInfo tagsInfo =
+            
tagsInfoMap.computeIfAbsent(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS,
+                RebalanceSummaryResult.TagInfo::new);
+        tagsInfo.increaseNumSegmentsUnchanged(segmentsUnchanged);
+        tagsInfo.increaseNumSegmentsToDownload(segmentsAdded);
+        tagsInfo.increaseNumServerParticipants(1);
+      } else {
+        for (String tag : relevantTags) {
+          RebalanceSummaryResult.TagInfo tagsInfo = tagsInfoMap.get(tag);
+          tagsInfo.increaseNumSegmentsUnchanged(segmentsUnchanged);
+          tagsInfo.increaseNumSegmentsToDownload(segmentsAdded);
+          tagsInfo.increaseNumServerParticipants(1);
+        }
+      }
     }
 
     for (Map.Entry<String, Set<String>> entry : 
existingServersToSegmentMap.entrySet()) {
@@ -739,7 +797,13 @@ public class TableRebalancer {
 
     LOGGER.info("Calculated rebalance summary for table: {} with 
rebalanceJobId: {}", tableNameWithType,
         rebalanceJobId);
-    return new RebalanceSummaryResult(serverInfo, segmentInfo);
+    return new RebalanceSummaryResult(serverInfo, segmentInfo, new 
ArrayList<>(tagsInfoMap.values()));
+  }
+
+  private List<String> getServerTag(String serverName) {
+    InstanceConfig instanceConfig =
+        
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(serverName));
+    return instanceConfig.getTags();
   }
 
   private void onReturnFailure(String errorMsg, Exception e) {
diff --git 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerResponses/RebalanceServerRebalanceSummaryResponse.tsx
 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerResponses/RebalanceServerRebalanceSummaryResponse.tsx
index ba79b18167..7744058237 100644
--- 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerResponses/RebalanceServerRebalanceSummaryResponse.tsx
+++ 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerResponses/RebalanceServerRebalanceSummaryResponse.tsx
@@ -32,6 +32,10 @@ export const RebalanceServerRebalanceSummaryResponse = ({ 
response }) => {
         {
             name: 'II. Segment Information',
             key: 'segmentInfo'
+        },
+        {
+            name: 'III. Server Tags Information',
+            key: 'tagsInfo'
         }
     ];
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index e86c8263f0..1a8c221899 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.restlet.resources.DiskUsageInfo;
@@ -153,6 +154,13 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+        TagNameUtils.getOfflineTagForTenant(null));
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
 numSegments * NUM_REPLICAS);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
 numServers);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -202,6 +210,15 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 3);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+        TagNameUtils.getOfflineTagForTenant(null));
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 14);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+        numSegments * NUM_REPLICAS - 14);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+        numServers + numServersToAdd);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -371,6 +388,15 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 11);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+        TagNameUtils.getOfflineTagForTenant(null));
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 11);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+        numSegments * NUM_REPLICAS - 11);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+        numServers + numServersToAdd);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -451,6 +477,15 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+        TagNameUtils.getOfflineTagForTenant(null));
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+        numSegments * NUM_REPLICAS);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+        numServers + numServersToAdd);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -476,6 +511,15 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+        TagNameUtils.getOfflineTagForTenant(null));
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+        numSegments * NUM_REPLICAS);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+        numServers + numServersToAdd);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -522,6 +566,15 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 3);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+        TagNameUtils.getOfflineTagForTenant(null));
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 15);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+        numSegments * NUM_REPLICAS - 15);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+        numServers);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -826,6 +879,15 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+        TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME));
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+        numSegments * NUM_REPLICAS);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+        numServers);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -858,6 +920,15 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+        TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME));
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+        numSegments * NUM_REPLICAS);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+        numServers);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -892,6 +963,27 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 9);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 6);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 3);
+    Map<String, RebalanceSummaryResult.TagInfo> tenantInfoMap = 
rebalanceSummaryResult.getTagsInfo()
+        .stream()
+        .collect(Collectors.toMap(RebalanceSummaryResult.TagInfo::getTagName, 
info -> info));
+    
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)));
+    
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant(TIER_A_NAME)));
+    
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant(TIER_B_NAME)));
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)).getNumSegmentsToDownload(),
 0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)).getNumSegmentsUnchanged(),
+        5 * NUM_REPLICAS);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)).getNumServerParticipants(),
+        numServers);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_A_NAME)).getNumSegmentsToDownload(),
+        NUM_REPLICAS);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_A_NAME)).getNumSegmentsUnchanged(),
 0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_A_NAME)).getNumServerParticipants(),
 3);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_B_NAME)).getNumSegmentsToDownload(),
+        4 * NUM_REPLICAS);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_B_NAME)).getNumSegmentsUnchanged(),
 0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_B_NAME)).getNumServerParticipants(),
 3);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getTierInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -969,6 +1061,15 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+        TagNameUtils.getOfflineTagForTenant("replicaAssignment" + 
NO_TIER_NAME));
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+        numSegments * NUM_REPLICAS);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+        numServers);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -997,6 +1098,15 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+        TagNameUtils.getOfflineTagForTenant("replicaAssignment" + 
NO_TIER_NAME));
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+        numSegments * NUM_REPLICAS);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+        numServers);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -1025,6 +1135,28 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 6);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 2);
+    Map<String, RebalanceSummaryResult.TagInfo> tenantInfoMap = 
rebalanceSummaryResult.getTagsInfo()
+        .stream()
+        .collect(Collectors.toMap(RebalanceSummaryResult.TagInfo::getTagName, 
info -> info));
+    
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME)));
+    
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME)));
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
+        .getNumSegmentsToDownload(), 0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
+            .getNumSegmentsUnchanged(),
+        0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
+            .getNumServerParticipants(),
+        0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
+            .getNumSegmentsToDownload(),
+        numSegments * NUM_REPLICAS);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
+        .getNumSegmentsUnchanged(), 0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
+        .getNumServerParticipants(), 6);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getTierInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -1064,6 +1196,28 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 13);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 2);
+    tenantInfoMap = rebalanceSummaryResult.getTagsInfo()
+        .stream()
+        .collect(Collectors.toMap(RebalanceSummaryResult.TagInfo::getTagName, 
info -> info));
+    
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME)));
+    
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME)));
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
+        .getNumSegmentsToDownload(), 0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
+            .getNumSegmentsUnchanged(),
+        0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
+            .getNumServerParticipants(),
+        0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
+            .getNumSegmentsToDownload(),
+        13);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
+        .getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS - 13);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
+        .getNumServerParticipants(), 6);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getTierInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -1115,6 +1269,54 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     }
     assertEquals(numSegmentsOnServer0, numSegments / 2);
 
+    _helixResourceManager.deleteOfflineServerTenantFor("replicaAssignment" + 
TIER_A_NAME);
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setReassignInstances(false);
+
+    // if rebalance with reassignInstances=false, servers assigned would not 
have relevant tags
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+    assertNotNull(rebalanceSummaryResult.getTagsInfo());
+    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 3);
+    tenantInfoMap = rebalanceSummaryResult.getTagsInfo()
+        .stream()
+        .collect(Collectors.toMap(RebalanceSummaryResult.TagInfo::getTagName, 
info -> info));
+    
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME)));
+    
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME)));
+    
assertTrue(tenantInfoMap.containsKey(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS));
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
+        .getNumSegmentsToDownload(), 0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
+            .getNumSegmentsUnchanged(),
+        0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
+            .getNumServerParticipants(),
+        0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
+            .getNumSegmentsToDownload(),
+        0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
+        .getNumSegmentsUnchanged(), 0);
+    
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
+        .getNumServerParticipants(), 0);
+    assertEquals(
+        
tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsToDownload(),
+        0);
+    assertEquals(
+        
tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsUnchanged(),
+        numSegments * NUM_REPLICAS);
+    assertEquals(
+        
tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumServerParticipants(),
+        6);
+
     _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index f4eb559f68..eb575799e3 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -65,6 +65,7 @@ import 
org.apache.pinot.common.response.server.TableIndexMetadataResponse;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.http.HttpClient;
 import 
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -4294,6 +4295,19 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         "Existing number of servers don't match");
     
assertEquals(summaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 newNumServers,
         "New number of servers don't match");
+    // In this cluster integration test, servers are tagged with DefaultTenant 
only
+    assertEquals(summaryResult.getTagsInfo().size(), 1);
+    assertEquals(summaryResult.getTagsInfo().get(0).getTagName(),
+        TagNameUtils.getOfflineTagForTenant(getServerTenant()));
+    
assertEquals(summaryResult.getTagsInfo().get(0).getNumServerParticipants(), 
newNumServers);
+    assertEquals(summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
+        summaryResult.getTagsInfo().get(0).getNumSegmentsToDownload());
+    // For this single tenant, the number of unchanged segments and the number 
of received segments should add up to
+    // the total present segment
+    
assertEquals(summaryResult.getSegmentInfo().getNumSegmentsAcrossAllReplicas().getExpectedValueAfterRebalance(),
+        summaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged() + 
summaryResult.getTagsInfo()
+            .get(0)
+            .getNumSegmentsToDownload());
     if (_tableSize > 0) {
       
assertTrue(summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes()
 > 0L,
           "Avg segment size expected to be > 0 but found to be 0");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to