Jackie-Jiang commented on code in PR #13951:
URL: https://github.com/apache/pinot/pull/13951#discussion_r1958720448


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -1284,4 +1286,18 @@ private void handleLegacySchemaConfig(TableConfig 
tableConfig, HttpHeaders httpH
       
validationConfig.setSchemaName(DatabaseUtils.translateTableName(validationConfig.getSchemaName(),
 httpHeaders));
     }
   }
+
+  /*
+  For the given table config, calculates a target assignment, if possible. 
Otherwise, throws an exception
+   */
+  private void validateInstanceAssignment(TableConfig tableConfig) {
+    TableRebalancer tableRebalancer = new 
TableRebalancer(_pinotHelixResourceManager.getHelixZkManager());
+    try {
+      tableRebalancer.getInstancePartitionsMap(tableConfig, true, true, true);
+    } catch (Exception e) {
+      LOGGER.error("Exception calculating instance partitions for table: {}", 
tableConfig.getTableName());

Review Comment:
   Since we are throwing the exception out, no need to log the error here.



##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java:
##########
@@ -137,7 +137,7 @@ public static InstancePartitions 
computeDefaultInstancePartitions(HelixManager h
         throw new IllegalStateException();
     }
     return computeDefaultInstancePartitionsForTag(helixManager, 
tableConfig.getTableName(),
-        instancePartitionsType.toString(), serverTag);
+        instancePartitionsType.toString(), serverTag, 
tableConfig.getReplication());

Review Comment:
   Let's directly pass in `tableConfig`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -1284,4 +1286,18 @@ private void handleLegacySchemaConfig(TableConfig 
tableConfig, HttpHeaders httpH
       
validationConfig.setSchemaName(DatabaseUtils.translateTableName(validationConfig.getSchemaName(),
 httpHeaders));
     }
   }
+
+  /*
+  For the given table config, calculates a target assignment, if possible. 
Otherwise, throws an exception
+   */
+  private void validateInstanceAssignment(TableConfig tableConfig) {
+    TableRebalancer tableRebalancer = new 
TableRebalancer(_pinotHelixResourceManager.getHelixZkManager());
+    try {
+      tableRebalancer.getInstancePartitionsMap(tableConfig, true, true, true);
+    } catch (Exception e) {
+      LOGGER.error("Exception calculating instance partitions for table: {}", 
tableConfig.getTableName());
+      throw new RuntimeException("Exception calculating instance partitions 
for table:" + tableConfig.getTableName());
+    }
+  }
+

Review Comment:
   (nit) Remove empty line



##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java:
##########
@@ -161,6 +161,24 @@ public static InstancePartitions 
computeDefaultInstancePartitionsForTag(HelixMan
     return instancePartitions;
   }
 
+  public static InstancePartitions 
computeDefaultInstancePartitionsForTag(HelixManager helixManager,

Review Comment:
   We can replace the old method. I checked all the usage, and it is safe to 
use the new method in `TierConfigUtils`.
   
   @klsince `TierConfigUtils.getTieredInstancePartitionsForSegment()` shouldn't 
always use default instance partitions, it should use the tier instance 
partitions if exists. Can you please take a look?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -1284,4 +1286,18 @@ private void handleLegacySchemaConfig(TableConfig 
tableConfig, HttpHeaders httpH
       
validationConfig.setSchemaName(DatabaseUtils.translateTableName(validationConfig.getSchemaName(),
 httpHeaders));
     }
   }
+
+  /*
+  For the given table config, calculates a target assignment, if possible. 
Otherwise, throws an exception
+   */
+  private void validateInstanceAssignment(TableConfig tableConfig) {
+    TableRebalancer tableRebalancer = new 
TableRebalancer(_pinotHelixResourceManager.getHelixZkManager());
+    try {
+      tableRebalancer.getInstancePartitionsMap(tableConfig, true, true, true);
+    } catch (Exception e) {
+      LOGGER.error("Exception calculating instance partitions for table: {}", 
tableConfig.getTableName());
+      throw new RuntimeException("Exception calculating instance partitions 
for table:" + tableConfig.getTableName());

Review Comment:
   Add a space after `:`



##########
pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java:
##########
@@ -746,6 +752,218 @@ public void testUnrecognizedProperties()
         "unrecognizedProperties\":{\"/illegalKey1\":1," + 
"\"/illegalKey2/illegalKey3\":2}}"));
   }
 
+  /**
+   * Validates the behavior of the system when creating or updating tables 
with invalid replication factors.
+   * This method tests both REALTIME and OFFLINE table configurations.
+   *
+   * The method performs the following steps:
+   * 1. Attempts to create a REALTIME table with an invalid replication factor 
of 5, which exceeds the number of
+   *  available instances. The creation is expected to fail, and the test 
verifies that the exception message
+   *  contains the expected error.
+   * 2. Attempts to create an OFFLINE table with the same invalid replication 
factor. The creation is expected to
+   *  fail, and the test verifies that the exception message contains the 
expected error.
+   * 3. Creates REALTIME and OFFLINE tables with a valid replication factor of 
1 to establish a baseline for further
+   * testing. These creations are expected to succeed.
+   * 4. Attempts to update the replication factor of the previously created 
REALTIME and OFFLINE tables to the
+   * invalid value of 5. These updates are expected to fail, and the test 
verifies that the appropriate error
+   * messages are returned.
+   * @throws Exception
+   */
+  @Test
+  public void validateInvalidTableReplication()
+      throws Exception {
+
+    String rawTableName = "testTable";
+
+    validateRealtimeTableCreationWithInvalidReplication(rawTableName);
+    validateOfflineTableCreationWithInvalidReplication(rawTableName);
+
+    createRealtimeTableWithValidReplication(rawTableName);
+    createOfflineTableWithValidReplication(rawTableName);
+
+    validateRealtimeTableUpdateReplicationToInvalidValue(rawTableName);
+    validateOfflineTableUpdateReplicationToInvalidValue(rawTableName);
+  }
+
+  /**
+   * Validates the behavior of the system when creating or updating tables 
with invalid replica group configurations.
+   * This method tests the REALTIME table configuration.
+   *
+   * The method performs the following steps:
+   * 1. Attempts to create a REALTIME table with an invalid replica group 
configuration. The creation is expected to
+   * fail, and the test verifies that the exception message contains the 
expected error.
+   * 2. Creates a new REALTIME table with a valid replica group configuration 
to establish a baseline for further
+   * testing. This creation is expected to succeed.
+   * 3. Attempts to update the replica group configuration of the previously 
created REALTIME table to an invalid
+   * value. The update is expected to fail, and the test verifies that the 
appropriate error message is returned.
+   *
+   * @throws Exception if any error occurs during the validation process
+   */
+  @Test
+  public void validateInvalidReplicaGroupConfig()
+      throws Exception {
+
+    // Create a REALTIME table with an invalid replication factor. Creation 
should fail.
+    String tableName = "testTable";
+    DEFAULT_INSTANCE.addDummySchema(tableName);
+
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new 
HashMap<>();
+    instanceAssignmentConfigMap.put(InstancePartitionsType.CONSUMING.name(),
+        getInstanceAssignmentConfig("DefaultTenant_REALTIME", 4, 2));
+
+    TableConfig realtimeTableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setServerTenant("DefaultTenant")
+            
.setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+            
.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap).setNumReplicas(10).build();
+
+    try {
+      sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString());
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains(
+          "Invalid table config for table testTable_REALTIME: 
java.lang.IllegalStateException: Not enough qualified "
+              + "instances, ask for: (numInstancesPerReplicaGroup: 2) * 
(numReplicaGroups: 4) = 8, having only 4\" "
+              + "while sending request"));
+    }
+
+    //Create a new valid table and update it with invalid replica group config
+    instanceAssignmentConfigMap.clear();
+    instanceAssignmentConfigMap.put(InstancePartitionsType.CONSUMING.name(),
+        getInstanceAssignmentConfig("DefaultTenant_REALTIME", 4, 1));
+
+    realtimeTableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setServerTenant("DefaultTenant")
+            
.setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+            
.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap).setNumReplicas(10).build();
+
+    try {
+      sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString());
+    } catch (Exception e) {
+      fail("Preconditions failure: Could not create a REALTIME table with a 
valid replica group config as a "
+          + "precondition to testing config updates");
+    }
+
+    //now, update it with an invalid RG config, the update should fail
+    instanceAssignmentConfigMap.clear();
+    instanceAssignmentConfigMap.put(InstancePartitionsType.CONSUMING.name(),
+        getInstanceAssignmentConfig("DefaultTenant_REALTIME", 1, 8));
+
+    try {
+      sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString());
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains(
+          "Invalid table config for table testTable_REALTIME: 
java.lang.IllegalStateException: Not enough qualified "
+              + "instances, ask for: (numInstancesPerReplicaGroup: 8) * 
(numReplicaGroups: 1) = 8, having only 4\" "
+              + "while sending request"));
+    }
+  }
+
+  private void validateOfflineTableUpdateReplicationToInvalidValue(String 
rawTableName) {
+    TableConfig offlineTableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setServerTenant("DefaultTenant")
+            
.setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
+            .setNumReplicas(5).build();
+
+    try {
+      sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains(
+          "Invalid table config for table testTable_OFFLINE: 
java.lang.IllegalStateException: Number of instances: 4"
+              + " with tag: DefaultTenant_OFFLINE < table replication factor: 
5"));
+    }
+  }
+
+  private void validateRealtimeTableUpdateReplicationToInvalidValue(String 
rawTableName) {
+    TableConfig realtimeTableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setServerTenant("DefaultTenant")
+            
.setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+            .setNumReplicas(5).build();
+
+    try {
+      sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString());
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains(
+          "Invalid table config for table testTable_REALTIME: 
java.lang.IllegalStateException: Number of instances: 4"
+              + " with tag: DefaultTenant_REALTIME < table replication factor: 
5"));
+    }
+  }
+
+  private void createOfflineTableWithValidReplication(String rawTableName) {
+    TableConfig offlineTableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setServerTenant("DefaultTenant")
+            
.setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
+            .setNumReplicas(1).build();
+
+    try {
+      sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+    } catch (Exception e) {
+      fail("Preconditions failure: Could not create a REALTIME table with 
valid replication factor of 1 as a "
+          + "precondition to testing config updates");
+    }
+  }
+
+  private void createRealtimeTableWithValidReplication(String rawTableName) {
+    TableConfig realtimeTableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setServerTenant("DefaultTenant")
+            
.setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+            .setNumReplicas(1).build();
+
+    try {
+      sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString());
+    } catch (Exception e) {
+      fail("Preconditions failure: Could not create a REALTIME table with 
valid replication factor of 1 as a "
+          + "precondition to testing config updates");
+    }
+  }
+
+  private void validateOfflineTableCreationWithInvalidReplication(String 
tableName) {
+    TableConfig offlineTableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setServerTenant("DefaultTenant")
+            
.setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
+            .setNumReplicas(5).build();
+
+    try {
+      sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+    } catch (Exception e) {
+      assertTrue(e.getMessage()
+          .contains("Number of instances: 4 with tag: DefaultTenant_OFFLINE < 
table replication factor: 5"));
+    }
+  }
+
+  private void validateRealtimeTableCreationWithInvalidReplication(String 
rawTableName)
+      throws IOException {
+    DEFAULT_INSTANCE.addDummySchema(rawTableName);
+    TableConfig realtimeTableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setServerTenant("DefaultTenant")
+            
.setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+            .setNumReplicas(5).build();
+
+    try {
+      sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString());
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Number of instances: 4 with tag: 
DefaultTenant_REALTIME < table replication: 5"));
+    }
+  }
+
+  private static InstanceAssignmentConfig getInstanceAssignmentConfig(String 
tag, int numReplicaGroups,
+      int numInstancesPerReplicaGroup) {
+    InstanceTagPoolConfig instanceTagPoolConfig = new 
InstanceTagPoolConfig(tag, false, 0, null);
+    List<String> constraints = new ArrayList<>();
+    constraints.add("constraints1");
+    InstanceConstraintConfig instanceConstraintConfig = new 
InstanceConstraintConfig(constraints);
+    InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 1, numReplicaGroups, 
numInstancesPerReplicaGroup, 1, 1, true,
+            null);
+    return new InstanceAssignmentConfig(instanceTagPoolConfig, 
instanceConstraintConfig,
+        instanceReplicaGroupPartitionConfig,
+        
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.name(),
 false);
+  }
+

Review Comment:
   (nit) Remove empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to