This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b80705f89 Fix bug in colocated tenant creation (#10098)
3b80705f89 is described below

commit 3b80705f8963f9e957ab28ed3a26bdc0eb14230d
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Jan 11 11:21:46 2023 -0800

    Fix bug in colocated tenant creation (#10098)
---
 .../helix/core/PinotHelixResourceManager.java      | 74 ++++++++++------------
 .../PinotHelixResourceManagerStatelessTest.java    | 41 ++++++++++++
 2 files changed, 76 insertions(+), 39 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index efb209795e..bca97f48c1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -990,8 +990,16 @@ public class PinotHelixResourceManager {
   }
 
   private void retagInstance(String instanceName, String oldTag, String 
newTag) {
-    _helixAdmin.removeInstanceTag(_helixClusterName, instanceName, oldTag);
-    _helixAdmin.addInstanceTag(_helixClusterName, instanceName, newTag);
+    PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(instanceName);
+    InstanceConfig instanceConfig = 
_helixDataAccessor.getProperty(instanceConfigKey);
+    if (instanceConfig == null) {
+      throw new NotFoundException("Failed to find instance config for 
instance: " + instanceName);
+    }
+    instanceConfig.removeTag(oldTag);
+    instanceConfig.addTag(newTag);
+    if (!_helixDataAccessor.setProperty(instanceConfigKey, instanceConfig)) {
+      throw new RuntimeException("Failed to set instance config for instance: 
" + instanceName);
+    }
   }
 
   public PinotResourceManagerResponse updateServerTenant(Tenant serverTenant) {
@@ -1162,51 +1170,39 @@ public class PinotHelixResourceManager {
   }
 
   public PinotResourceManagerResponse createServerTenant(Tenant serverTenant) {
-    int numberOfInstances = serverTenant.getNumberOfInstances();
-    List<String> unTaggedInstanceList = getOnlineUnTaggedServerInstanceList();
-    if (unTaggedInstanceList.size() < numberOfInstances) {
+    int numInstances = serverTenant.getNumberOfInstances();
+    int numOfflineInstances = serverTenant.getOfflineInstances();
+    int numRealtimeInstances = serverTenant.getRealtimeInstances();
+    if (numInstances < numOfflineInstances || numInstances < 
numRealtimeInstances) {
+      throw new BadRequestException(
+          String.format("Cannot request more offline instances: %d or realtime 
instances: %d than total instances: %d",
+              numOfflineInstances, numRealtimeInstances, numInstances));
+    }
+    // TODO: Consider throwing BadRequestException
+    List<String> untaggedInstances = getOnlineUnTaggedServerInstanceList();
+    if (untaggedInstances.size() < numInstances) {
       String message = "Failed to allocate server instances to Tag : " + 
serverTenant.getTenantName()
-          + ", Current number of untagged server instances : " + 
unTaggedInstanceList.size()
+          + ", Current number of untagged server instances : " + 
untaggedInstances.size()
           + ", Request asked number is : " + 
serverTenant.getNumberOfInstances();
       LOGGER.error(message);
       return PinotResourceManagerResponse.failure(message);
-    } else {
-      if (serverTenant.isCoLocated()) {
-        assignColocatedServerTenant(serverTenant, numberOfInstances, 
unTaggedInstanceList);
-      } else {
-        assignIndependentServerTenant(serverTenant, numberOfInstances, 
unTaggedInstanceList);
-      }
     }
-    return PinotResourceManagerResponse.SUCCESS;
-  }
-
-  private void assignIndependentServerTenant(Tenant serverTenant, int 
numberOfInstances,
-      List<String> unTaggedInstanceList) {
-    String offlineServerTag = 
TagNameUtils.getOfflineTagForTenant(serverTenant.getTenantName());
-    for (int i = 0; i < serverTenant.getOfflineInstances(); i++) {
-      retagInstance(unTaggedInstanceList.get(i), 
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
-    }
-    String realtimeServerTag = 
TagNameUtils.getRealtimeTagForTenant(serverTenant.getTenantName());
-    for (int i = 0; i < serverTenant.getRealtimeInstances(); i++) {
-      retagInstance(unTaggedInstanceList.get(i + 
serverTenant.getOfflineInstances()), Helix.UNTAGGED_SERVER_INSTANCE,
-          realtimeServerTag);
-    }
-  }
-
-  private void assignColocatedServerTenant(Tenant serverTenant, int 
numberOfInstances,
-      List<String> unTaggedInstanceList) {
-    int cnt = 0;
-    String offlineServerTag = 
TagNameUtils.getOfflineTagForTenant(serverTenant.getTenantName());
-    for (int i = 0; i < serverTenant.getOfflineInstances(); i++) {
-      retagInstance(unTaggedInstanceList.get(cnt++), 
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
+    int index = 0;
+    if (numOfflineInstances > 0) {
+      String offlineServerTag = 
TagNameUtils.getOfflineTagForTenant(serverTenant.getTenantName());
+      for (int i = 0; i < numOfflineInstances; i++) {
+        retagInstance(untaggedInstances.get(index), 
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
+        index = (index + 1) % numInstances;
+      }
     }
-    String realtimeServerTag = 
TagNameUtils.getRealtimeTagForTenant(serverTenant.getTenantName());
-    for (int i = 0; i < serverTenant.getRealtimeInstances(); i++) {
-      retagInstance(unTaggedInstanceList.get(cnt++), 
Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
-      if (cnt == numberOfInstances) {
-        cnt = 0;
+    if (numRealtimeInstances > 0) {
+      String realtimeServerTag = 
TagNameUtils.getRealtimeTagForTenant(serverTenant.getTenantName());
+      for (int i = 0; i < numRealtimeInstances; i++) {
+        retagInstance(untaggedInstances.get(index), 
Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
+        index = (index + 1) % numInstances;
       }
     }
+    return PinotResourceManagerResponse.SUCCESS;
   }
 
   public PinotResourceManagerResponse createBrokerTenant(Tenant brokerTenant) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 6569a6a4be..3e66b5c6ac 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -588,6 +588,47 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     resetServerTags();
   }
 
+  @Test
+  public void testCreateColocatedTenant() {
+    untagServers();
+    Tenant serverTenant = new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME, 
NUM_SERVER_INSTANCES, NUM_SERVER_INSTANCES,
+        NUM_SERVER_INSTANCES);
+    
assertTrue(_helixResourceManager.createServerTenant(serverTenant).isSuccessful());
+    assertEquals(
+        
_helixResourceManager.getInstancesWithTag(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME)).size(),
+        NUM_SERVER_INSTANCES);
+    assertEquals(
+        
_helixResourceManager.getInstancesWithTag(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME)).size(),
+        NUM_SERVER_INSTANCES);
+    
assertTrue(_helixResourceManager.getOnlineUnTaggedServerInstanceList().isEmpty());
+
+    untagServers();
+    serverTenant = new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME, 
NUM_SERVER_INSTANCES, NUM_SERVER_INSTANCES - 1,
+        NUM_SERVER_INSTANCES - 1);
+    
assertTrue(_helixResourceManager.createServerTenant(serverTenant).isSuccessful());
+    assertEquals(
+        
_helixResourceManager.getInstancesWithTag(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME)).size(),
+        NUM_SERVER_INSTANCES - 1);
+    assertEquals(
+        
_helixResourceManager.getInstancesWithTag(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME)).size(),
+        NUM_SERVER_INSTANCES - 1);
+    
assertTrue(_helixResourceManager.getOnlineUnTaggedServerInstanceList().isEmpty());
+
+    untagServers();
+    serverTenant = new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME, 
NUM_SERVER_INSTANCES - 1, NUM_SERVER_INSTANCES - 1,
+        NUM_SERVER_INSTANCES - 1);
+    
assertTrue(_helixResourceManager.createServerTenant(serverTenant).isSuccessful());
+    assertEquals(
+        
_helixResourceManager.getInstancesWithTag(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME)).size(),
+        NUM_SERVER_INSTANCES - 1);
+    assertEquals(
+        
_helixResourceManager.getInstancesWithTag(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME)).size(),
+        NUM_SERVER_INSTANCES - 1);
+    
assertEquals(_helixResourceManager.getOnlineUnTaggedServerInstanceList().size(),
 1);
+
+    resetServerTags();
+  }
+
   @Test
   public void testLeadControllerResource() {
     IdealState leadControllerResourceIdealState =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to