Jackie-Jiang commented on code in PR #13951: URL: https://github.com/apache/pinot/pull/13951#discussion_r1958720448
########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java: ########## @@ -1284,4 +1286,18 @@ private void handleLegacySchemaConfig(TableConfig tableConfig, HttpHeaders httpH validationConfig.setSchemaName(DatabaseUtils.translateTableName(validationConfig.getSchemaName(), httpHeaders)); } } + + /* + For the given table config, calculates a target assignment, if possible. Otherwise, throws an exception + */ + private void validateInstanceAssignment(TableConfig tableConfig) { + TableRebalancer tableRebalancer = new TableRebalancer(_pinotHelixResourceManager.getHelixZkManager()); + try { + tableRebalancer.getInstancePartitionsMap(tableConfig, true, true, true); + } catch (Exception e) { + LOGGER.error("Exception calculating instance partitions for table: {}", tableConfig.getTableName()); Review Comment: Since we are throwing the exception out, no need to log the error here. ########## pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java: ########## @@ -137,7 +137,7 @@ public static InstancePartitions computeDefaultInstancePartitions(HelixManager h throw new IllegalStateException(); } return computeDefaultInstancePartitionsForTag(helixManager, tableConfig.getTableName(), - instancePartitionsType.toString(), serverTag); + instancePartitionsType.toString(), serverTag, tableConfig.getReplication()); Review Comment: Let's directly pass in `tableConfig` ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java: ########## @@ -1284,4 +1286,18 @@ private void handleLegacySchemaConfig(TableConfig tableConfig, HttpHeaders httpH validationConfig.setSchemaName(DatabaseUtils.translateTableName(validationConfig.getSchemaName(), httpHeaders)); } } + + /* + For the given table config, calculates a target assignment, if possible. Otherwise, throws an exception + */ + private void validateInstanceAssignment(TableConfig tableConfig) { + TableRebalancer tableRebalancer = new TableRebalancer(_pinotHelixResourceManager.getHelixZkManager()); + try { + tableRebalancer.getInstancePartitionsMap(tableConfig, true, true, true); + } catch (Exception e) { + LOGGER.error("Exception calculating instance partitions for table: {}", tableConfig.getTableName()); + throw new RuntimeException("Exception calculating instance partitions for table:" + tableConfig.getTableName()); + } + } + Review Comment: (nit) Remove empty line ########## pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java: ########## @@ -161,6 +161,24 @@ public static InstancePartitions computeDefaultInstancePartitionsForTag(HelixMan return instancePartitions; } + public static InstancePartitions computeDefaultInstancePartitionsForTag(HelixManager helixManager, Review Comment: We can replace the old method. I checked all the usage, and it is safe to use the new method in `TierConfigUtils`. @klsince `TierConfigUtils.getTieredInstancePartitionsForSegment()` shouldn't always use default instance partitions, it should use the tier instance partitions if exists. Can you please take a look? ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java: ########## @@ -1284,4 +1286,18 @@ private void handleLegacySchemaConfig(TableConfig tableConfig, HttpHeaders httpH validationConfig.setSchemaName(DatabaseUtils.translateTableName(validationConfig.getSchemaName(), httpHeaders)); } } + + /* + For the given table config, calculates a target assignment, if possible. Otherwise, throws an exception + */ + private void validateInstanceAssignment(TableConfig tableConfig) { + TableRebalancer tableRebalancer = new TableRebalancer(_pinotHelixResourceManager.getHelixZkManager()); + try { + tableRebalancer.getInstancePartitionsMap(tableConfig, true, true, true); + } catch (Exception e) { + LOGGER.error("Exception calculating instance partitions for table: {}", tableConfig.getTableName()); + throw new RuntimeException("Exception calculating instance partitions for table:" + tableConfig.getTableName()); Review Comment: Add a space after `:` ########## pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java: ########## @@ -746,6 +752,218 @@ public void testUnrecognizedProperties() "unrecognizedProperties\":{\"/illegalKey1\":1," + "\"/illegalKey2/illegalKey3\":2}}")); } + /** + * Validates the behavior of the system when creating or updating tables with invalid replication factors. + * This method tests both REALTIME and OFFLINE table configurations. + * + * The method performs the following steps: + * 1. Attempts to create a REALTIME table with an invalid replication factor of 5, which exceeds the number of + * available instances. The creation is expected to fail, and the test verifies that the exception message + * contains the expected error. + * 2. Attempts to create an OFFLINE table with the same invalid replication factor. The creation is expected to + * fail, and the test verifies that the exception message contains the expected error. + * 3. Creates REALTIME and OFFLINE tables with a valid replication factor of 1 to establish a baseline for further + * testing. These creations are expected to succeed. + * 4. Attempts to update the replication factor of the previously created REALTIME and OFFLINE tables to the + * invalid value of 5. These updates are expected to fail, and the test verifies that the appropriate error + * messages are returned. + * @throws Exception + */ + @Test + public void validateInvalidTableReplication() + throws Exception { + + String rawTableName = "testTable"; + + validateRealtimeTableCreationWithInvalidReplication(rawTableName); + validateOfflineTableCreationWithInvalidReplication(rawTableName); + + createRealtimeTableWithValidReplication(rawTableName); + createOfflineTableWithValidReplication(rawTableName); + + validateRealtimeTableUpdateReplicationToInvalidValue(rawTableName); + validateOfflineTableUpdateReplicationToInvalidValue(rawTableName); + } + + /** + * Validates the behavior of the system when creating or updating tables with invalid replica group configurations. + * This method tests the REALTIME table configuration. + * + * The method performs the following steps: + * 1. Attempts to create a REALTIME table with an invalid replica group configuration. The creation is expected to + * fail, and the test verifies that the exception message contains the expected error. + * 2. Creates a new REALTIME table with a valid replica group configuration to establish a baseline for further + * testing. This creation is expected to succeed. + * 3. Attempts to update the replica group configuration of the previously created REALTIME table to an invalid + * value. The update is expected to fail, and the test verifies that the appropriate error message is returned. + * + * @throws Exception if any error occurs during the validation process + */ + @Test + public void validateInvalidReplicaGroupConfig() + throws Exception { + + // Create a REALTIME table with an invalid replication factor. Creation should fail. + String tableName = "testTable"; + DEFAULT_INSTANCE.addDummySchema(tableName); + + Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new HashMap<>(); + instanceAssignmentConfigMap.put(InstancePartitionsType.CONSUMING.name(), + getInstanceAssignmentConfig("DefaultTenant_REALTIME", 4, 2)); + + TableConfig realtimeTableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setServerTenant("DefaultTenant") + .setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5") + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) + .setInstanceAssignmentConfigMap(instanceAssignmentConfigMap).setNumReplicas(10).build(); + + try { + sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString()); + } catch (Exception e) { + assertTrue(e.getMessage().contains( + "Invalid table config for table testTable_REALTIME: java.lang.IllegalStateException: Not enough qualified " + + "instances, ask for: (numInstancesPerReplicaGroup: 2) * (numReplicaGroups: 4) = 8, having only 4\" " + + "while sending request")); + } + + //Create a new valid table and update it with invalid replica group config + instanceAssignmentConfigMap.clear(); + instanceAssignmentConfigMap.put(InstancePartitionsType.CONSUMING.name(), + getInstanceAssignmentConfig("DefaultTenant_REALTIME", 4, 1)); + + realtimeTableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setServerTenant("DefaultTenant") + .setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5") + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) + .setInstanceAssignmentConfigMap(instanceAssignmentConfigMap).setNumReplicas(10).build(); + + try { + sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString()); + } catch (Exception e) { + fail("Preconditions failure: Could not create a REALTIME table with a valid replica group config as a " + + "precondition to testing config updates"); + } + + //now, update it with an invalid RG config, the update should fail + instanceAssignmentConfigMap.clear(); + instanceAssignmentConfigMap.put(InstancePartitionsType.CONSUMING.name(), + getInstanceAssignmentConfig("DefaultTenant_REALTIME", 1, 8)); + + try { + sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString()); + } catch (Exception e) { + assertTrue(e.getMessage().contains( + "Invalid table config for table testTable_REALTIME: java.lang.IllegalStateException: Not enough qualified " + + "instances, ask for: (numInstancesPerReplicaGroup: 8) * (numReplicaGroups: 1) = 8, having only 4\" " + + "while sending request")); + } + } + + private void validateOfflineTableUpdateReplicationToInvalidValue(String rawTableName) { + TableConfig offlineTableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setServerTenant("DefaultTenant") + .setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5") + .setNumReplicas(5).build(); + + try { + sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); + } catch (Exception e) { + assertTrue(e.getMessage().contains( + "Invalid table config for table testTable_OFFLINE: java.lang.IllegalStateException: Number of instances: 4" + + " with tag: DefaultTenant_OFFLINE < table replication factor: 5")); + } + } + + private void validateRealtimeTableUpdateReplicationToInvalidValue(String rawTableName) { + TableConfig realtimeTableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setServerTenant("DefaultTenant") + .setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5") + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) + .setNumReplicas(5).build(); + + try { + sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString()); + } catch (Exception e) { + assertTrue(e.getMessage().contains( + "Invalid table config for table testTable_REALTIME: java.lang.IllegalStateException: Number of instances: 4" + + " with tag: DefaultTenant_REALTIME < table replication factor: 5")); + } + } + + private void createOfflineTableWithValidReplication(String rawTableName) { + TableConfig offlineTableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setServerTenant("DefaultTenant") + .setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5") + .setNumReplicas(1).build(); + + try { + sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); + } catch (Exception e) { + fail("Preconditions failure: Could not create a REALTIME table with valid replication factor of 1 as a " + + "precondition to testing config updates"); + } + } + + private void createRealtimeTableWithValidReplication(String rawTableName) { + TableConfig realtimeTableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setServerTenant("DefaultTenant") + .setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5") + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) + .setNumReplicas(1).build(); + + try { + sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString()); + } catch (Exception e) { + fail("Preconditions failure: Could not create a REALTIME table with valid replication factor of 1 as a " + + "precondition to testing config updates"); + } + } + + private void validateOfflineTableCreationWithInvalidReplication(String tableName) { + TableConfig offlineTableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setServerTenant("DefaultTenant") + .setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5") + .setNumReplicas(5).build(); + + try { + sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); + } catch (Exception e) { + assertTrue(e.getMessage() + .contains("Number of instances: 4 with tag: DefaultTenant_OFFLINE < table replication factor: 5")); + } + } + + private void validateRealtimeTableCreationWithInvalidReplication(String rawTableName) + throws IOException { + DEFAULT_INSTANCE.addDummySchema(rawTableName); + TableConfig realtimeTableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setServerTenant("DefaultTenant") + .setTimeColumnName("timeColumn").setTimeType("DAYS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5") + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) + .setNumReplicas(5).build(); + + try { + sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString()); + } catch (Exception e) { + assertTrue(e.getMessage().contains("Number of instances: 4 with tag: DefaultTenant_REALTIME < table replication: 5")); + } + } + + private static InstanceAssignmentConfig getInstanceAssignmentConfig(String tag, int numReplicaGroups, + int numInstancesPerReplicaGroup) { + InstanceTagPoolConfig instanceTagPoolConfig = new InstanceTagPoolConfig(tag, false, 0, null); + List<String> constraints = new ArrayList<>(); + constraints.add("constraints1"); + InstanceConstraintConfig instanceConstraintConfig = new InstanceConstraintConfig(constraints); + InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 1, numReplicaGroups, numInstancesPerReplicaGroup, 1, 1, true, + null); + return new InstanceAssignmentConfig(instanceTagPoolConfig, instanceConstraintConfig, + instanceReplicaGroupPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.name(), false); + } + Review Comment: (nit) Remove empty line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org