yashmayya commented on code in PR #15930:
URL: https://github.com/apache/pinot/pull/15930#discussion_r2123362083


##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -764,6 +768,195 @@ public void testRebalanceStrictReplicaGroup()
     }
   }
 
+  @Test
+  public void 
testRebalanceWithImplicitRealtimeTablePartitionSelectorAndMinimizeDataMovement()
+      throws Exception {
+    int numServers = 6;
+    int numPartitions = 18;
+    int numReplicas = 2;
+
+    for (int i = 0; i < numServers; i++) {
+      String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+    }
+
+    InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 0, 1, 
true, null);
+    InstanceAssignmentConfig instanceAssignmentConfig =
+        new InstanceAssignmentConfig(
+            new 
InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(null), false, 0, 
null), null,
+            replicaGroupPartitionConfig,
+            
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
 true);
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+            .setNumReplicas(numReplicas)
+            .setRoutingConfig(
+                new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+            .setStreamConfigs(
+                
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions).getStreamConfigsMap())
+            
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), 
instanceAssignmentConfig))
+            .build();
+
+    // Create the table
+    addDummySchema(RAW_TABLE_NAME);
+    _helixResourceManager.addTable(tableConfig);
+
+    // Add the segments
+    int numSegmentsPerPartition = 4;
+    for (int i = 0; i < numPartitions; i++) {
+      for (int j = 0; j < numSegmentsPerPartition; j++) {
+        _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME,
+            
SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME,
+                SEGMENT_NAME_PREFIX + (i * numSegmentsPerPartition + j), 
PARTITION_COLUMN, i), null);
+      }
+    }
+
+    Map<String, Map<String, String>> oldSegmentAssignment =
+        
_helixResourceManager.getTableIdealState(REALTIME_TABLE_NAME).getRecord().getMapFields();
+    for (Map.Entry<String, Map<String, String>> entry : 
oldSegmentAssignment.entrySet()) {
+      assertEquals(entry.getValue().size(), numReplicas);
+    }
+
+    // Verify that segments are distributed equally across servers
+    Map<String, Integer> numSegmentsPerServer = 
getNumSegmentsPerServer(oldSegmentAssignment);
+    for (int i = 0; i < numServers; i++) {
+      String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+      assertTrue(numSegmentsPerServer.containsKey(instanceId));
+      // Total number of segments is numReplicas * numPartitions * 
(numSegmentsPerPartition + 1) because of
+      // CONSUMING segments
+      assertEquals(numSegmentsPerServer.get(instanceId),
+          (numReplicas * numPartitions * (numSegmentsPerPartition + 1)) / 
numServers);
+    }
+
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, null, null);
+    // Rebalance should return NO_OP status since there has been no change
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+
+    // All servers should be assigned to the table
+    Map<InstancePartitionsType, InstancePartitions> instanceAssignment = 
rebalanceResult.getInstanceAssignment();
+    assertEquals(instanceAssignment.size(), 1);
+    InstancePartitions instancePartitions = 
instanceAssignment.get(InstancePartitionsType.CONSUMING);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Verify that replica partitions are distributed equally across servers
+    Map<String, Integer> numReplicaPartitionsPerServer = 
getNumReplicaPartitionsPerServer(instancePartitions);
+    for (int i = 0; i < numServers; i++) {
+      String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+      assertTrue(numReplicaPartitionsPerServer.containsKey(instanceId));
+      // Total number of partitions is numReplicas * numPartitions
+      assertEquals(numReplicaPartitionsPerServer.get(instanceId), (numReplicas 
* numPartitions) / numServers);
+    }
+
+    // Segment assignment should not change
+    assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+    // Add two new servers
+    int numServersToAdd = 2;
+    Set<String> newServers = new HashSet<>();
+    for (int i = 0; i < numServersToAdd; i++) {
+      String instanceId = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
+      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+      newServers.add(instanceId);
+    }
+
+    // Rebalance with reassignInstances and minimizeDataMovement enabled
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceConfig.setIncludeConsuming(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    instanceAssignment = rebalanceResult.getInstanceAssignment();
+    assertEquals(instanceAssignment.size(), 1);
+    instancePartitions = 
instanceAssignment.get(InstancePartitionsType.CONSUMING);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Get number of segments moved
+    int numSegmentsMoved = getNumSegmentsMoved(oldSegmentAssignment, 
rebalanceResult.getSegmentAssignment());
+    // This number is 130 when using the default partition selector in this 
same scenario since more segment partitions
+    // will be moved when the instance partitions don't match the segment 
partitions (we're setting numPartitions to
+    // the default value of 0 in the table's instance assignment config).
+    assertEquals(numSegmentsMoved, 30);

Review Comment:
   This is the crux of the new test and validates that this new instance 
assignment strategy does indeed minimize data movement in scenarios like 
https://github.com/apache/pinot/issues/14151.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to