This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 51bf75efa6 10608: Changes for adding partitionColumn in replicaGroupPartitionConfig (#10656) 51bf75efa6 is described below commit 51bf75efa65cbe8bd8b497eb20e34869205d74e8 Author: Abhishek Sharma <abhishek.sha...@spothero.com> AuthorDate: Fri May 26 20:19:57 2023 -0400 10608: Changes for adding partitionColumn in replicaGroupPartitionConfig (#10656) --- .../assignment/InstanceAssignmentConfigUtils.java | 4 +- .../common/utils/config/TableConfigUtils.java | 25 ++++++++++ .../common/utils/config/TableConfigSerDeTest.java | 2 +- .../common/utils/config/TableConfigUtilsTest.java | 25 ++++++++++ .../assignment/segment/BaseSegmentAssignment.java | 6 +-- .../ReplicaGroupSegmentAssignmentStrategy.java | 6 +-- ...anceAssignmentRestletResourceStatelessTest.java | 6 +-- .../instance/InstanceAssignmentTest.java | 58 ++++++++++++---------- .../TableRebalancerClusterStatelessTest.java | 4 +- .../segment/local/utils/TableConfigUtils.java | 20 ++++++++ .../segment/local/utils/TableConfigUtilsTest.java | 42 ++++++++++++++++ .../InstanceReplicaGroupPartitionConfig.java | 14 +++++- 12 files changed, 169 insertions(+), 43 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java index b571918c0c..b37429c527 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java @@ -114,12 +114,12 @@ public class InstanceAssignmentConfigUtils { Preconditions.checkState(numPartitions > 0, "Number of partitions for column: %s is not properly configured", partitionColumn); replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, numPartitions, - replicaGroupStrategyConfig.getNumInstancesPerPartition(), minimizeDataMovement); + replicaGroupStrategyConfig.getNumInstancesPerPartition(), minimizeDataMovement, partitionColumn); } else { // If partition column is not configured, use replicaGroupStrategyConfig.getNumInstancesPerPartition() as // number of instances per replica-group for backward-compatibility replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, - replicaGroupStrategyConfig.getNumInstancesPerPartition(), 0, 0, minimizeDataMovement); + replicaGroupStrategyConfig.getNumInstancesPerPartition(), 0, 0, minimizeDataMovement, null); } return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java index 0546dcb66a..0a59696b10 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.DimensionTableConfig; @@ -38,6 +39,7 @@ import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.QuotaConfig; +import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -444,4 +446,27 @@ public class TableConfigUtils { return hasPreConfiguredInstancePartitions(tableConfig) && tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType); } + + /** + * Get the partition column from tableConfig instance assignment config map. + * @param tableConfig table config + * @return partition column + */ + public static String getPartitionColumn(TableConfig tableConfig) { + // check InstanceAssignmentConfigMap is null or empty, + if (!MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) { + for (InstanceAssignmentConfig instanceAssignmentConfig : tableConfig.getInstanceAssignmentConfigMap().values()) { + //check InstanceAssignmentConfig has the InstanceReplicaGroupPartitionConfig with non-empty partitionColumn + if (StringUtils.isNotEmpty(instanceAssignmentConfig.getReplicaGroupPartitionConfig().getPartitionColumn())) { + return instanceAssignmentConfig.getReplicaGroupPartitionConfig().getPartitionColumn(); + } + } + } + + // for backward-compatibility, If partitionColumn value isn't there in InstanceReplicaGroupPartitionConfig + // check ReplicaGroupStrategyConfig for partitionColumn + ReplicaGroupStrategyConfig replicaGroupStrategyConfig = + tableConfig.getValidationConfig().getReplicaGroupStrategyConfig(); + return replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null; + } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java index 6436058b59..eb1d31bab0 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java @@ -211,7 +211,7 @@ public class TableConfigSerDeTest { InstanceAssignmentConfig instanceAssignmentConfig = new InstanceAssignmentConfig(new InstanceTagPoolConfig("tenant_OFFLINE", true, 3, null), new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")), - new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false)); + new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null)); TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build(); diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java index 3e00e9fdeb..9d1f86dc23 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -48,6 +49,7 @@ import org.testng.annotations.Test; public class TableConfigUtilsTest { private static final String TABLE_NAME = "testTable"; + private static final String PARTITION_COLUMN = "partitionColumn"; /** * Test the {@link TableConfigUtils#convertFromLegacyTableConfig(TableConfig)} utility. @@ -140,6 +142,29 @@ public class TableConfigUtilsTest { Assert.assertEquals(tierTblCfg, tableConfig); } + @Test + public void testGetPartitionColumnWithoutAnyConfig() { + // without instanceAssignmentConfigMap + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).build(); + Assert.assertNull(TableConfigUtils.getPartitionColumn(tableConfig)); + } + + @Test + public void testGetPartitionColumnWithReplicaGroupConfig() { + ReplicaGroupStrategyConfig replicaGroupStrategyConfig = + new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1); + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).build(); + + // setting up ReplicaGroupStrategyConfig for backward compatibility test. + SegmentsValidationAndRetentionConfig validationConfig = new SegmentsValidationAndRetentionConfig(); + validationConfig.setReplicaGroupStrategyConfig(replicaGroupStrategyConfig); + tableConfig.setValidationConfig(validationConfig); + + Assert.assertEquals(PARTITION_COLUMN, TableConfigUtils.getPartitionColumn(tableConfig)); + } + /** * Helper method to create a test StreamConfigs map. * @return Map containing Stream Configs diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java index fc66bce53e..b89fd497be 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java @@ -28,9 +28,9 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.HelixManager; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.tier.Tier; +import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory; -import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; @@ -76,9 +76,7 @@ public abstract class BaseSegmentAssignment implements SegmentAssignment { _tableNameWithType = tableConfig.getTableName(); _tableConfig = tableConfig; _replication = tableConfig.getReplication(); - ReplicaGroupStrategyConfig replicaGroupStrategyConfig = - tableConfig.getValidationConfig().getReplicaGroupStrategyConfig(); - _partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null; + _partitionColumn = TableConfigUtils.getPartitionColumn(_tableConfig); if (_partitionColumn == null) { _logger.info("Initialized with replication: {} without partition column for table: {} ", _replication, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java index d5a4d0e027..3ff0807220 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java @@ -27,8 +27,8 @@ import java.util.Random; import java.util.TreeMap; import org.apache.helix.HelixManager; import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; -import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -54,9 +54,7 @@ class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig(); Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null"); _replication = tableConfig.getReplication(); - ReplicaGroupStrategyConfig replicaGroupStrategyConfig = - validationAndRetentionConfig.getReplicaGroupStrategyConfig(); - _partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null; + _partitionColumn = TableConfigUtils.getPartitionColumn(_tableConfig); if (_partitionColumn == null) { LOGGER.info("Initialized ReplicaGroupSegmentAssignmentStrategy " + "with replication: {} without partition column for table: {} ", _replication, _tableName); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java index ef95da5135..570c59438c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java @@ -118,7 +118,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control // Add OFFLINE instance assignment config to the offline table config InstanceAssignmentConfig offlineInstanceAssignmentConfig = new InstanceAssignmentConfig( new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null, - new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false)); + new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null)); offlineTableConfig.setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig)); _helixResourceManager.setExistingTableConfig(offlineTableConfig); @@ -136,7 +136,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control // Add CONSUMING instance assignment config to the real-time table config InstanceAssignmentConfig consumingInstanceAssignmentConfig = new InstanceAssignmentConfig( new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME), false, 0, null), null, - new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false)); + new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null)); realtimeTableConfig.setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), consumingInstanceAssignmentConfig)); _helixResourceManager.setExistingTableConfig(realtimeTableConfig); @@ -164,7 +164,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control null))); InstanceAssignmentConfig tierInstanceAssignmentConfig = new InstanceAssignmentConfig( new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null, - new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false)); + new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null)); Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new HashMap<>(); instanceAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig); instanceAssignmentConfigMap.put(TIER_NAME, tierInstanceAssignmentConfig); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java index 53bd2da317..4335d80b14 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java @@ -350,7 +350,7 @@ public class InstanceAssignmentTest { // Assign to 2 replica-groups so that each replica-group is assigned to one pool int numReplicaGroups = numPools; InstanceReplicaGroupPartitionConfig replicaPartitionConfig = - new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false); + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))).build(); @@ -448,7 +448,7 @@ public class InstanceAssignmentTest { // Assign instances from 2 pools to 3 replica-groups numReplicaGroups = numPools; - replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false); + replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); @@ -477,7 +477,7 @@ public class InstanceAssignmentTest { // Reset the number of replica groups to 2 and pools to 2. numReplicaGroups = 2; numPools = 2; - replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true); + replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null); tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); @@ -554,7 +554,7 @@ public class InstanceAssignmentTest { // Assign instances from 2 pools to 3 replica-groups numReplicaGroups = 3; - replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true); + replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); @@ -633,7 +633,7 @@ public class InstanceAssignmentTest { // Reduce number of replica groups from 3 to 2. numReplicaGroups = 2; - replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true); + replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); @@ -760,7 +760,7 @@ public class InstanceAssignmentTest { InstanceTagPoolConfig tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, false, 0, null); InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = - new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false); + new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); @@ -849,7 +849,8 @@ public class InstanceAssignmentTest { } tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null); - replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false); + replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false, null + ); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); @@ -862,7 +863,8 @@ public class InstanceAssignmentTest { } // Enable replica-group - replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false); + replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false, null + ); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); @@ -874,7 +876,7 @@ public class InstanceAssignmentTest { assertEquals(e.getMessage(), "Number of replica-groups must be positive"); } - replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false); + replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); @@ -887,7 +889,7 @@ public class InstanceAssignmentTest { "Not enough qualified instances from pool: 0, cannot select 6 replica-groups from 5 instances"); } - replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false); + replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); @@ -899,7 +901,7 @@ public class InstanceAssignmentTest { assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)"); } - replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false); + replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); @@ -912,7 +914,7 @@ public class InstanceAssignmentTest { "Number of instances per partition: 3 must be smaller or equal to number of instances per replica-group: 2"); } - replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false); + replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); @@ -948,7 +950,8 @@ public class InstanceAssignmentTest { } tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); InstanceReplicaGroupPartitionConfig replicaPartitionConfig = - new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false); + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, + 0, false, null); try { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) @@ -977,7 +980,8 @@ public class InstanceAssignmentTest { } tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); replicaPartitionConfig = - new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false); + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, + 0, 0, false, null); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, @@ -1009,7 +1013,8 @@ public class InstanceAssignmentTest { } tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); replicaPartitionConfig = - new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false); + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, + numInstancesPerReplicaGroup, 0, 0, false, null); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, @@ -1049,7 +1054,8 @@ public class InstanceAssignmentTest { tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); replicaPartitionConfig = - new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false); + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, + numInstancesPerReplicaGroup, 0, 0, false, null); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, @@ -1087,7 +1093,7 @@ public class InstanceAssignmentTest { // Assign to 3 replica-groups so that each replica-group is assigned 7 instances InstanceReplicaGroupPartitionConfig replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, - numInstancesPerPartition, false); + numInstancesPerPartition, false, null); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, @@ -1159,7 +1165,7 @@ public class InstanceAssignmentTest { // Assign to 3 replica-groups so that each replica-group is assigned 7 instances replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, - numInstancesPerPartition, true); + numInstancesPerPartition, true, null); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, @@ -1237,7 +1243,7 @@ public class InstanceAssignmentTest { // Assign to 3 replica-groups so that each replica-group is assigned 7 instances replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, - numInstancesPerPartition, false); + numInstancesPerPartition, false, null); String partitionColumnName = "partition"; SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig( Collections.singletonMap(partitionColumnName, new ColumnPartitionConfig("Modulo", numPartitionsSegment, null))); @@ -1310,7 +1316,7 @@ public class InstanceAssignmentTest { // Assign to 3 replica-groups so that each replica-group is assigned 3 instances replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, - numInstancesPerPartition, false); + numInstancesPerPartition, false, null); // Do not rotate for testing InstanceConstraintConfig instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")); @@ -1367,7 +1373,7 @@ public class InstanceAssignmentTest { // Assign to 3 replica-groups so that each replica-group is assigned 3 instances replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, - numInstancesPerPartition, true); + numInstancesPerPartition, true, null); // Do not rotate for testing instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")); tableConfig = @@ -1433,7 +1439,7 @@ public class InstanceAssignmentTest { // Assign to 3 replica-groups so that each replica-group is assigned 5 instances replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, - numInstancesPerPartition, false); + numInstancesPerPartition, false, null); // Do not rotate instance sequence in pool (for testing) instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")); // Do not rotate pool sequence (for testing) @@ -1499,7 +1505,7 @@ public class InstanceAssignmentTest { // Assign to 3 replica-groups so that each replica-group is assigned 5 instances replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, - numInstancesPerPartition, true); + numInstancesPerPartition, true, null); // Do not rotate instance sequence in pool (for testing) instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")); // Do not rotate pool sequence (for testing) @@ -1571,7 +1577,7 @@ public class InstanceAssignmentTest { // Assign to 3 replica-groups so that each replica-group is assigned 1 instances replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, - numInstancesPerPartition, false); + numInstancesPerPartition, false, null); // Do not rotate for testing instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")); tableConfig = @@ -1621,7 +1627,7 @@ public class InstanceAssignmentTest { // Assign to 6 replica-groups so that each replica-group is assigned 2 instances replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, - numInstancesPerPartition, false); + numInstancesPerPartition, false, null); // Do not rotate instance sequence in pool (for testing) instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")); // Do not rotate pool sequence (for testing) @@ -1685,7 +1691,7 @@ public class InstanceAssignmentTest { // Assign to 6 replica-groups so that each replica-group is assigned 2 instances replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, - numInstancesPerPartition, true); + numInstancesPerPartition, true, null); // Do not rotate instance sequence in pool (for testing) instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")); // Do not rotate pool sequence (for testing) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index 4b9e869dcf..91f27160c0 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -195,7 +195,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { InstanceTagPoolConfig tagPoolConfig = new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), false, 0, null); InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = - new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false); + new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); _helixResourceManager.updateTableConfig(tableConfig); @@ -477,7 +477,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME), false, 0, null); InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = - new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false); + new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(TIER_A_NAME, new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); _helixResourceManager.updateTableConfig(tableConfig); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index c679759b87..9ad82bc23c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -60,6 +60,7 @@ import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; @@ -141,6 +142,7 @@ public final class TableConfigUtils { validateIndexingConfig(tableConfig.getIndexingConfig(), schema); validateFieldConfigList(tableConfig.getFieldConfigList(), tableConfig.getIndexingConfig(), schema); validateInstancePartitionsTypeMapConfig(tableConfig); + validatePartitionedReplicaGroupInstance(tableConfig); if (!skipTypes.contains(ValidationType.UPSERT)) { validateUpsertAndDedupConfig(tableConfig, schema); validatePartialUpsertStrategies(tableConfig, schema); @@ -605,6 +607,24 @@ public final class TableConfigUtils { } } + /** + * Detects whether both replicaGroupStrategyConfig and replicaGroupPartitionConfig are set for a given + * table. Validation fails because the table would ignore replicaGroupStrategyConfig + * when the replicaGroupPartitionConfig is already set. + */ + @VisibleForTesting + static void validatePartitionedReplicaGroupInstance(TableConfig tableConfig) { + if (tableConfig.getValidationConfig().getReplicaGroupStrategyConfig() == null + || MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) { + return; + } + for (Map.Entry<String, InstanceAssignmentConfig> entry: tableConfig.getInstanceAssignmentConfigMap().entrySet()) { + boolean isNullReplicaGroupPartitionConfig = entry.getValue().getReplicaGroupPartitionConfig() == null; + Preconditions.checkState(isNullReplicaGroupPartitionConfig, + "Both replicaGroupStrategyConfig and replicaGroupPartitionConfig is provided"); + } + } + /** * Validates metrics aggregation when upsert config is enabled * - Metrics aggregation cannot be enabled when Upsert Config is enabled. diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 963e23438e..138ae2db1a 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -32,6 +32,7 @@ import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; @@ -42,6 +43,7 @@ import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; @@ -1749,6 +1751,46 @@ public class TableConfigUtilsTest { } } + @Test + public void testValidatePartitionedReplicaGroupInstance() { + String partitionColumn = "testPartitionCol"; + ReplicaGroupStrategyConfig replicaGroupStrategyConfig = + new ReplicaGroupStrategyConfig(partitionColumn, 2); + + TableConfig tableConfigWithoutReplicaGroupStrategyConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .build(); + // Call validate with a table-config without replicaGroupStrategyConfig or replicaGroupPartitionConfig. + TableConfigUtils.validatePartitionedReplicaGroupInstance(tableConfigWithoutReplicaGroupStrategyConfig); + + TableConfig tableConfigWithReplicaGroupStrategyConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + tableConfigWithReplicaGroupStrategyConfig.getValidationConfig() + .setReplicaGroupStrategyConfig(replicaGroupStrategyConfig); + + // Call validate with a table-config with replicaGroupStrategyConfig and without replicaGroupPartitionConfig. + TableConfigUtils.validatePartitionedReplicaGroupInstance(tableConfigWithReplicaGroupStrategyConfig); + + InstanceAssignmentConfig instanceAssignmentConfig = Mockito.mock(InstanceAssignmentConfig.class); + InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 2, 0, false, partitionColumn); + Mockito.doReturn(instanceReplicaGroupPartitionConfig) + .when(instanceAssignmentConfig).getReplicaGroupPartitionConfig(); + + TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE) + .setTableName(TABLE_NAME).setInstanceAssignmentConfigMap( + ImmutableMap.of(TableType.OFFLINE.toString(), instanceAssignmentConfig)).build(); + invalidTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig); + + try { + // Call validate with a table-config with replicaGroupStrategyConfig and replicaGroupPartitionConfig. + TableConfigUtils.validatePartitionedReplicaGroupInstance(invalidTableConfig); + Assert.fail("Validation should have failed since both replicaGroupStrategyConfig " + + "and replicaGroupPartitionConfig are set"); + } catch (IllegalStateException ignored) { + } + } + private Map<String, String> getStreamConfigs() { Map<String, String> streamConfigs = new HashMap<>(); streamConfigs.put("streamType", "kafka"); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java index 95102e77c4..adc72e8f1c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java @@ -21,6 +21,7 @@ package org.apache.pinot.spi.config.table.assignment; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import javax.annotation.Nullable; import org.apache.pinot.spi.config.BaseJsonConfig; @@ -51,6 +52,10 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig { + "instances if not " + "specified") private final int _numInstancesPerPartition; + @JsonPropertyDescription( + "Name of the column used for partition, if not provided table level replica group will be used") + private final String _partitionColumn; + private final boolean _minimizeDataMovement; @JsonCreator @@ -59,7 +64,8 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig { @JsonProperty("numInstancesPerReplicaGroup") int numInstancesPerReplicaGroup, @JsonProperty("numPartitions") int numPartitions, @JsonProperty("numInstancesPerPartition") int numInstancesPerPartition, - @JsonProperty("minimizeDataMovement") boolean minimizeDataMovement) { + @JsonProperty("minimizeDataMovement") boolean minimizeDataMovement, + @Nullable @JsonProperty("partitionColumn") String partitionColumn) { _replicaGroupBased = replicaGroupBased; _numInstances = numInstances; _numReplicaGroups = numReplicaGroups; @@ -67,6 +73,7 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig { _numPartitions = numPartitions; _numInstancesPerPartition = numInstancesPerPartition; _minimizeDataMovement = minimizeDataMovement; + _partitionColumn = partitionColumn; } public boolean isReplicaGroupBased() { @@ -96,4 +103,9 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig { public boolean isMinimizeDataMovement() { return _minimizeDataMovement; } + + @Nullable + public String getPartitionColumn() { + return _partitionColumn; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org