Ferix9288 commented on issue #11727:
URL: https://github.com/apache/pinot/issues/11727#issuecomment-1753657572

   @jackjlli @Jackie-Jiang  
   
   Was able to replicate this via a unit test below:
   
   ```
   /**
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
   package org.apache.pinot.controller.helix.core.assignment.instance;
   
   import com.fasterxml.jackson.core.JsonProcessingException;
   import com.fasterxml.jackson.databind.ObjectMapper;
   import org.apache.commons.text.StringSubstitutor;
   import org.apache.helix.model.InstanceConfig;
   import org.apache.helix.zookeeper.datamodel.ZNRecord;
   import org.apache.pinot.common.assignment.InstancePartitions;
   import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
   import org.testng.annotations.Test;
   
   import java.util.ArrayList;
   import java.util.HashMap;
   import java.util.List;
   import java.util.Map;
   
   public class InstanceReplicaGroupPartitionSelectorTest {
   
       private static final String instanceConfigTemplate =
           "{\n" +
           "  \"id\": 
\"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
 +
           "  \"simpleFields\": {\n" +
           "    \"HELIX_ENABLED\": \"true\",\n" +
           "    \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n" +
           "    \"HELIX_HOST\": 
\"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n"
 +
           "    \"HELIX_PORT\": \"8098\",\n" +
           "    \"adminPort\": \"8097\",\n" +
           "    \"grpcPort\": \"8090\",\n" +
           "    \"queryMailboxPort\": \"46347\",\n" +
           "    \"queryServerPort\": \"45031\",\n" +
           "    \"shutdownInProgress\": \"false\"\n" +
           "  },\n" +
           "  \"mapFields\": {\n" +
           "    \"SYSTEM_RESOURCE_INFO\": {\n" +
           "      \"numCores\": \"16\",\n" +
           "      \"totalMemoryMB\": \"126976\",\n" +
           "      \"maxHeapSizeMB\": \"65536\"\n" +
           "    },\n" +
           "    \"pool\": {\n" +
           "      \"DefaultTenant_OFFLINE\": \"${pool}\",\n" +
           "      \"${poolName}\": \"${pool}\",\n" +
           "      \"AllReplicationGroups\": \"1\"\n" +
           "    }\n" +
           "  },\n" +
           "  \"listFields\": {\n" +
           "    \"TAG_LIST\": [\n" +
           "      \"DefaultTenant_OFFLINE\",\n" +
           "      \"DefaultTenant_REALTIME\",\n" +
           "      \"${poolName}\",\n" +
           "      \"AllReplicationGroups\"\n" +
           "    ]\n" +
           "  }\n" +
           "}";
   
       @Test
       public void testSelectInstances() throws JsonProcessingException {
           ObjectMapper objectMapper = new ObjectMapper();
           String existingPartitionsJson =
               "    {\n" +
               "      \"instancePartitionsName\": 
\"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" +
               "      \"partitionToInstancesMap\": {\n" +
               "        \"0_0\": [\n" +
               "          
\"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
 +
               "          
\"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
 +
               "        ]\n" +
               "      }\n" +
               "    }\n";
           InstancePartitions existing = 
objectMapper.readValue(existingPartitionsJson, InstancePartitions.class);
           InstanceReplicaGroupPartitionConfig config = new 
InstanceReplicaGroupPartitionConfig(
               true, 0, 2, 2, 1, 2, true, null
           );
   
           InstanceReplicaGroupPartitionSelector selector = new 
InstanceReplicaGroupPartitionSelector(
               config,
               "tableNameBlah",
               existing
           );
   
           String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"};
           String[] poolNumbers = {"0", "0", "1", "1"};
           String[] poolNames = {"FirstHalfReplicationGroups", 
"FirstHalfReplicationGroups", "SecondHalfReplicationGroups", 
"SecondHalfReplicationGroups"};
           Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new 
HashMap<>();
   
           for (int i = 0; i < serverNames.length; i++) {
               Map<String, String> valuesMap = new HashMap<>();
               valuesMap.put("serverName", serverNames[i]);
               valuesMap.put("pool", poolNumbers[i]);
               valuesMap.put("poolName", poolNames[i]);
   
               StringSubstitutor substitutor = new StringSubstitutor(valuesMap);
               String resolvedString = 
substitutor.replace(instanceConfigTemplate);
   
               ZNRecord znRecord = objectMapper.readValue(resolvedString, 
ZNRecord.class);
               int poolNumber = Integer.parseInt(poolNumbers[i]);
               poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new 
ArrayList<>()).add(new InstanceConfig(znRecord));
           }
           InstancePartitions assignedPartitions = new 
InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE");
           selector.selectInstances(poolToInstanceConfigsMap, 
assignedPartitions);
   
           String expectedInstancePartitions =
               "    {\n" +
               "      \"instancePartitionsName\": 
\"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" +
               "      \"partitionToInstancesMap\": {\n" +
               "        \"0_0\": [\n" +
               "          
\"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
 +
               "          
\"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
 +
               "        ],\n" +
               "        \"0_1\": [\n" +
               "          
\"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
 +
               "          
\"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
 +
               "        ]\n" +
               "      }\n" +
               "  }\n";
           InstancePartitions expectedPartitions = objectMapper.readValue(
               expectedInstancePartitions,
               InstancePartitions.class
           );
           assert assignedPartitions.equals(expectedPartitions);
   
       }
   
   }
   ```
   
   From my findings, can confirm that the current code just implodes when the 
number of pools increase. Specifically: 
   
   ```
           for (int replicaGroupId = 0; replicaGroupId < 
numCommonReplicaGroups; replicaGroupId++) {
             Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
             if (pool == null) {
               // Skip the replica group if it's no longer needed.
               continue;
             }
             Set<String> candidateInstances =
                 poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
             List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
             instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
   
             for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
               List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
               
replicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new 
HashSet<>())
                   .addAll(existingInstances);
             }
           }
   ```
   
   The above code will only populate 1 of the two pools in 
poolToCandidateInstancesMap which in turn causes the NPE below: 
   
   ```
           // If the new number of replica groups is greater than the existing 
number of replica groups.
           for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < 
numReplicaGroups; replicaGroupId++) {
             int pool = replicaGroupIdToPoolMap.get(replicaGroupId);
             LinkedHashSet<String> candidateInstances = new 
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
   ```
   
   I made a temp fix here (purely to show diff, not actually gonna merge)
   https://github.com/apache/pinot/pull/11765
   
   It's a temp fix because it doesn't exactly `minimizeDataMovement` because 
everything gets screwed with the below:
   
   ```
         for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
           // Pick one pool for each replica-group based on the table name hash
           int pool = pools.get((tableNameHash + replicaId) % numPools);
           poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(replicaId);
           replicaGroupIdToPoolMap.put(replicaId, pool);
         }
   ``` 
   
   i.e. if replicaGroupId is not mapped to the same pool, the partition 
selection gets jostled up (replicaGroupId 0 -> mapped to pool 1 is now instead 
mapped to pool 2)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to