This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 555e5a0443 Combine the read access for replication config (#9849) 555e5a0443 is described below commit 555e5a04439c614bea8915c9e69327d7390260de Author: Seunghyun Lee <seungh...@startree.ai> AuthorDate: Wed Nov 23 22:38:10 2022 -0800 Combine the read access for replication config (#9849) * Combine the read access for replication config Currently, we have a separate configuration for replication. Offline and HLC reads from `replication` and LLC reads from `replicasPerPartition`. This PR combines the read access for the replication config. * Addressed comments --- .../assignment/InstanceAssignmentConfigUtils.java | 2 +- .../pinot/common/utils/config/TableConfigTest.java | 68 +++++++++++++++++++--- .../api/resources/PinotTableRestletResource.java | 5 +- .../controller/helix/SegmentStatusChecker.java | 7 +-- .../helix/core/PinotHelixResourceManager.java | 13 +---- .../helix/core/PinotTableIdealStateBuilder.java | 16 ++--- .../assignment/segment/BaseSegmentAssignment.java | 7 +-- .../segment/OfflineSegmentAssignment.java | 6 -- .../segment/RealtimeSegmentAssignment.java | 6 -- .../BalancedNumSegmentAssignmentStrategy.java | 7 +-- .../ReplicaGroupSegmentAssignmentStrategy.java | 6 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 2 +- .../api/PinotTableRestletResourceTest.java | 5 +- .../api/TableConfigsRestletResourceTest.java | 4 +- .../controller/helix/PinotResourceManagerTest.java | 7 ++- ...altimeNonReplicaGroupSegmentAssignmentTest.java | 7 ++- ...NonReplicaGroupTieredSegmentAssignmentTest.java | 5 +- .../RealtimeReplicaGroupSegmentAssignmentTest.java | 5 +- .../SegmentAssignmentStrategyFactoryTest.java | 12 ++-- .../helix/core/retention/RetentionManagerTest.java | 8 ++- .../segment/local/utils/TableConfigUtils.java | 14 ++--- .../SegmentsValidationAndRetentionConfig.java | 12 ++++ .../apache/pinot/spi/config/table/TableConfig.java | 28 +++++++++ .../apache/pinot/tools/PinotNumReplicaChanger.java | 2 +- .../command/RealtimeProvisioningHelperCommand.java | 2 +- 25 files changed, 158 insertions(+), 98 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java index 8689b06e22..6a0ae1188e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java @@ -106,7 +106,7 @@ public class InstanceAssignmentConfigUtils { InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig; SegmentsValidationAndRetentionConfig segmentConfig = tableConfig.getValidationConfig(); - int numReplicaGroups = segmentConfig.getReplicationNumber(); + int numReplicaGroups = tableConfig.getReplication(); ReplicaGroupStrategyConfig replicaGroupStrategyConfig = segmentConfig.getReplicaGroupStrategyConfig(); Preconditions.checkState(replicaGroupStrategyConfig != null, "Failed to find the replica-group strategy config"); String partitionColumn = replicaGroupStrategyConfig.getPartitionColumn(); diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java index ffeef06aca..100cd62c59 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java @@ -22,31 +22,38 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; public class TableConfigTest { + private static final String TEST_OFFLINE_TABLE_NAME = "testllc_OFFLINE"; + private static final String TEST_REALTIME_HLC_TABLE_NAME = "testhlc_REALTIME"; + private static final String TEST_REALTIME_LLC_TABLE_NAME = "testllc_REALTIME"; + @DataProvider public Object[][] configs() throws IOException { try (Stream<Path> configs = Files.list(Paths.get("src/test/resources/testConfigs"))) { return configs.map(path -> { - try { - return Files.readAllBytes(path); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .map(config -> new Object[]{config}) - .toArray(Object[][]::new); + try { + return Files.readAllBytes(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).map(config -> new Object[]{config}).toArray(Object[][]::new); } } @@ -56,4 +63,49 @@ public class TableConfigTest { TableConfig tableConfig = JsonUtils.DEFAULT_READER.forType(TableConfig.class).readValue(config); assertTrue(StringUtils.isNotBlank(tableConfig.getTableName())); } + + @Test + public void testGetReplication() { + TableConfig offlineTableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_OFFLINE_TABLE_NAME).setNumReplicas(2).build(); + assertEquals(2, offlineTableConfig.getReplication()); + + offlineTableConfig.getValidationConfig().setReplication("4"); + assertEquals(4, offlineTableConfig.getReplication()); + + offlineTableConfig.getValidationConfig().setReplicasPerPartition("3"); + assertEquals(4, offlineTableConfig.getReplication()); + + TableConfig realtimeHLCTableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(TEST_REALTIME_HLC_TABLE_NAME) + .setStreamConfigs(getStreamConfigMap("highlevel")).setNumReplicas(2).build(); + assertEquals(2, realtimeHLCTableConfig.getReplication()); + + realtimeHLCTableConfig.getValidationConfig().setReplication("4"); + assertEquals(4, realtimeHLCTableConfig.getReplication()); + + realtimeHLCTableConfig.getValidationConfig().setReplicasPerPartition("3"); + assertEquals(4, realtimeHLCTableConfig.getReplication()); + + TableConfig realtimeLLCTableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(TEST_REALTIME_LLC_TABLE_NAME) + .setStreamConfigs(getStreamConfigMap("lowlevel")).setLLC(true).setNumReplicas(2).build(); + + assertEquals(2, realtimeLLCTableConfig.getReplication()); + + realtimeLLCTableConfig.getValidationConfig().setReplication("4"); + assertEquals(2, realtimeLLCTableConfig.getReplication()); + + realtimeLLCTableConfig.getValidationConfig().setReplicasPerPartition("3"); + assertEquals(3, realtimeLLCTableConfig.getReplication()); + } + + private Map<String, String> getStreamConfigMap(String consumerType) { + Map<String, String> configMap = new HashMap<>(); + configMap.put("streamType", "kafka"); + configMap.put("stream.kafka.consumer.type", consumerType); + configMap.put("stream.kafka.topic.name", "test"); + configMap.put("stream.kafka.decoder.class.name", "test"); + return configMap; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 532a5e66cb..54f9b57250 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -98,7 +98,6 @@ import org.apache.pinot.controller.util.TableIngestionStatusHelper; import org.apache.pinot.controller.util.TableMetadataReader; import org.apache.pinot.core.auth.ManualAuthorization; import org.apache.pinot.segment.local.utils.TableConfigUtils; -import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableStats; import org.apache.pinot.spi.config.table.TableStatus; @@ -811,9 +810,7 @@ public class PinotTableRestletResource { String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); - SegmentsValidationAndRetentionConfig segmentsConfig = - tableConfig != null ? tableConfig.getValidationConfig() : null; - int numReplica = segmentsConfig == null ? 1 : Integer.parseInt(segmentsConfig.getReplication()); + int numReplica = tableConfig == null ? 1 : tableConfig.getReplication(); String segmentsMetadata; try { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index bcdcf876f0..941bb8f0b1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -143,12 +143,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.REPLICATION_FROM_CONFIG, 0); return; } - int replication; - if (tableConfig.getTableType() == TableType.REALTIME) { - replication = tableConfig.getValidationConfig().getReplicasPerPartitionNumber(); - } else { - replication = tableConfig.getValidationConfig().getReplicationNumber(); - } + int replication = tableConfig.getReplication(); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.REPLICATION_FROM_CONFIG, replication); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index d2b2947293..efb209795e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -136,7 +136,6 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils; import org.apache.pinot.controller.helix.starter.HelixConfig; -import org.apache.pinot.segment.local.utils.ReplicationUtils; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.ConfigUtils; import org.apache.pinot.spi.config.instance.Instance; @@ -1474,7 +1473,6 @@ public class PinotHelixResourceManager { } validateTableTenantConfig(tableConfig); - SegmentsValidationAndRetentionConfig segmentsConfig = tableConfig.getValidationConfig(); TableType tableType = tableConfig.getTableType(); switch (tableType) { @@ -1482,7 +1480,7 @@ public class PinotHelixResourceManager { // now lets build an ideal state LOGGER.info("building empty ideal state for table : " + tableNameWithType); final IdealState offlineIdealState = PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, - Integer.parseInt(segmentsConfig.getReplication()), _enableBatchMessageMode); + tableConfig.getReplication(), _enableBatchMessageMode); LOGGER.info("adding table via the admin"); try { @@ -1795,7 +1793,7 @@ public class PinotHelixResourceManager { // Update IdealState replication IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); - String replicationConfigured = segmentsConfig.getReplication(); + String replicationConfigured = Integer.toString(tableConfig.getReplication()); if (!idealState.getReplicas().equals(replicationConfigured)) { HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is -> { assert is != null; @@ -3742,12 +3740,7 @@ public class PinotHelixResourceManager { Set<String> serverInstances = getAllInstancesForServerTenant(tenantConfig.getServer()); return serverInstances.size(); } - - if (ReplicationUtils.useReplicasPerPartition(tableConfig)) { - return Integer.parseInt(tableConfig.getValidationConfig().getReplicasPerPartition()); - } - - return tableConfig.getValidationConfig().getReplicationNumber(); + return tableConfig.getReplication(); } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index beb4f796de..ac24151d67 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -92,31 +92,25 @@ public class PinotTableIdealStateBuilder { List<String> realtimeInstances = HelixHelper.getInstancesWithTag(helixManager, TagNameUtils.extractConsumingServerTag(realtimeTableConfig.getTenantConfig())); IdealState idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, 1, enableBatchMessageMode); - if (realtimeInstances.size() % Integer.parseInt(realtimeTableConfig.getValidationConfig().getReplication()) != 0) { + if (realtimeInstances.size() % realtimeTableConfig.getReplication() != 0) { throw new RuntimeException( "Number of instance in current tenant should be an integer multiples of the number of replications"); } setupInstanceConfigForHighLevelConsumer(realtimeTableName, realtimeInstances.size(), - Integer.parseInt(realtimeTableConfig.getValidationConfig().getReplication()), - IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig), zkHelixPropertyStore, realtimeInstances); + realtimeTableConfig.getReplication(), IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig), + zkHelixPropertyStore, realtimeInstances); return idealState; } public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, String realtimeTableName, TableConfig realtimeTableConfig, IdealState idealState, boolean enableBatchMessageMode) { - // Validate replicasPerPartition here. - final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition(); - if (replicasPerPartitionStr == null || replicasPerPartitionStr.isEmpty()) { - throw new RuntimeException("Null or empty value for replicasPerPartition, expected a number"); - } final int nReplicas; try { - nReplicas = Integer.valueOf(replicasPerPartitionStr); + nReplicas = realtimeTableConfig.getReplication(); } catch (NumberFormatException e) { - throw new InvalidTableConfigException( - "Invalid value for replicasPerPartition, expected a number: " + replicasPerPartitionStr, e); + throw new InvalidTableConfigException("Invalid value for replicasPerPartition, expected a number.", e); } if (idealState == null) { idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java index c91efb904a..fc66bce53e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java @@ -75,7 +75,7 @@ public abstract class BaseSegmentAssignment implements SegmentAssignment { _helixManager = helixManager; _tableNameWithType = tableConfig.getTableName(); _tableConfig = tableConfig; - _replication = getReplication(tableConfig); + _replication = tableConfig.getReplication(); ReplicaGroupStrategyConfig replicaGroupStrategyConfig = tableConfig.getValidationConfig().getReplicaGroupStrategyConfig(); _partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null; @@ -89,11 +89,6 @@ public abstract class BaseSegmentAssignment implements SegmentAssignment { } } - /** - * Returns the replication of the table. - */ - protected abstract int getReplication(TableConfig tableConfig); - /** * Rebalances tiers and returns a pair of tier assignments and non-tier assignment. */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java index ec04b728fb..36f784515a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java @@ -30,7 +30,6 @@ import org.apache.pinot.common.tier.Tier; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.AllServersSegmentAssignmentStrategy; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory; -import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.RebalanceConfigConstants; @@ -40,11 +39,6 @@ import org.apache.pinot.spi.utils.RebalanceConfigConstants; */ public class OfflineSegmentAssignment extends BaseSegmentAssignment { - @Override - protected int getReplication(TableConfig tableConfig) { - return tableConfig.getValidationConfig().getReplicationNumber(); - } - @Override public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java index 4f44aa1e44..c3e4f4c239 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -31,7 +31,6 @@ import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.tier.Tier; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory; -import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.RebalanceConfigConstants; @@ -74,11 +73,6 @@ import org.apache.pinot.spi.utils.RebalanceConfigConstants; */ public class RealtimeSegmentAssignment extends BaseSegmentAssignment { - @Override - protected int getReplication(TableConfig tableConfig) { - return tableConfig.getValidationConfig().getReplicasPerPartitionNumber(); - } - @Override public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java index 1d26abc296..e9c540da78 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java @@ -26,7 +26,6 @@ import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,11 +50,7 @@ public class BalancedNumSegmentAssignmentStrategy implements SegmentAssignmentSt _tableNameWithType = tableConfig.getTableName(); SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig(); Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null"); - // Number of replicas per partition of low-level consumers check is for the real time tables only - // TODO: Cleanup required once we fetch the replication number from table config depending on table type - _replication = tableConfig.getTableType() == TableType.REALTIME - ? validationAndRetentionConfig.getReplicasPerPartitionNumber() - : validationAndRetentionConfig.getReplicationNumber(); + _replication = tableConfig.getReplication(); LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: " + "{} with replication: {}", _tableNameWithType, _replication); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java index 94069dc8c2..d5a4d0e027 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java @@ -53,11 +53,7 @@ class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy _tableName = tableConfig.getTableName(); SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig(); Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null"); - // Number of replicas per partition of low-level consumers check is for the real time tables only - // TODO: Cleanup required once we fetch the replication number from table config depending on table type - _replication = tableConfig.getTableType() == TableType.REALTIME - ? validationAndRetentionConfig.getReplicasPerPartitionNumber() - : validationAndRetentionConfig.getReplicationNumber(); + _replication = tableConfig.getReplication(); ReplicaGroupStrategyConfig replicaGroupStrategyConfig = validationAndRetentionConfig.getReplicaGroupStrategyConfig(); _partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 762ffdc421..b7ab051ab2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1339,7 +1339,7 @@ public class PinotLLCRealtimeSegmentManager { private int getNumReplicas(TableConfig tableConfig, InstancePartitions instancePartitions) { if (instancePartitions.getNumReplicaGroups() == 1) { // Non-replica-group based - return tableConfig.getValidationConfig().getReplicasPerPartitionNumber(); + return tableConfig.getReplication(); } else { // Replica-group based return instancePartitions.getNumReplicaGroups(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java index 2b85ed477d..8339e3025f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java @@ -229,7 +229,7 @@ public class PinotTableRestletResourceTest extends ControllerTest { sendPostRequest(_createTableUrl, tableJSONConfigString); // table creation should succeed TableConfig tableConfig = getTableConfig(tableName, "OFFLINE"); - assertEquals(tableConfig.getValidationConfig().getReplicationNumber(), + assertEquals(tableConfig.getReplication(), Math.max(tableReplication, DEFAULT_MIN_NUM_REPLICAS)); DEFAULT_INSTANCE.addDummySchema(tableName); @@ -237,8 +237,7 @@ public class PinotTableRestletResourceTest extends ControllerTest { _realtimeBuilder.setTableName(tableName).setNumReplicas(tableReplication).build().toJsonString(); sendPostRequest(_createTableUrl, tableJSONConfigString); tableConfig = getTableConfig(tableName, "REALTIME"); - assertEquals(tableConfig.getValidationConfig().getReplicationNumber(), - Math.max(tableReplication, DEFAULT_MIN_NUM_REPLICAS)); + assertEquals(tableConfig.getReplication(), Math.max(tableReplication, DEFAULT_MIN_NUM_REPLICAS)); DEFAULT_INSTANCE.getHelixResourceManager().deleteOfflineTable(tableName); DEFAULT_INSTANCE.getHelixResourceManager().deleteRealtimeTable(tableName); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java index fad767d123..a24e774981 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java @@ -306,9 +306,9 @@ public class TableConfigsRestletResourceTest extends ControllerTest { response = sendGetRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableConfigsGet(tableName)); tableConfigsResponse = JsonUtils.stringToObject(response, TableConfigs.class); Assert.assertEquals(tableConfigsResponse.getTableName(), tableName); - Assert.assertEquals(tableConfigsResponse.getOffline().getValidationConfig().getReplicationNumber(), + Assert.assertEquals(tableConfigsResponse.getOffline().getReplication(), DEFAULT_MIN_NUM_REPLICAS); - Assert.assertEquals(tableConfigsResponse.getRealtime().getValidationConfig().getReplicasPerPartitionNumber(), + Assert.assertEquals(tableConfigsResponse.getRealtime().getReplication(), DEFAULT_MIN_NUM_REPLICAS); sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableConfigsDelete(tableName)); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java index 6e4230c39a..445e43a871 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java @@ -79,15 +79,16 @@ public class PinotResourceManagerTest { Schema dummySchema = TEST_INSTANCE.createDummySchema(invalidRealtimeTable); TEST_INSTANCE.addSchema(dummySchema); - Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap(); // Missing replicasPerPartition TableConfig invalidRealtimeTableConfig = - new TableConfigBuilder(TableType.REALTIME).setStreamConfigs(streamConfigs).setTableName(invalidRealtimeTable) + new TableConfigBuilder(TableType.REALTIME).setTableName(invalidRealtimeTable) .setSchemaName(dummySchema.getSchemaName()).build(); + try { TEST_INSTANCE.getHelixResourceManager().addTable(invalidRealtimeTableConfig); Assert.fail( - "Table creation should have thrown exception due to missing replicasPerPartition in validation config"); + "Table creation should have thrown exception due to missing stream config and replicasPerPartition in " + + "validation config"); } catch (Exception e) { // expected } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java index 4e20967949..2bf6fd11e0 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java @@ -30,6 +30,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; @@ -79,9 +80,10 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { System.currentTimeMillis()).getSegmentName()); } + Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap(); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS) - .setLLC(true).build(); + .setLLC(true).setStreamConfigs(streamConfigs).build(); _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), tableConfig); _instancePartitionsMap = new TreeMap<>(); @@ -113,9 +115,10 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { @Test public void testReplicationForSegmentAssignment() { + Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap(); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS) - .setLLC(true).build(); + .setLLC(true).setStreamConfigs(streamConfigs).build(); // Update the replication by changing the NUM_REPLICAS_PER_PARTITION tableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_PER_PARTITION); SegmentAssignment segmentAssignment = diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java index c3f1ae30fe..73cb94c4fa 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java @@ -33,6 +33,7 @@ import org.apache.pinot.common.tier.Tier; import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.tier.TierSegmentSelector; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TierConfig; @@ -117,9 +118,11 @@ public class RealtimeNonReplicaGroupTieredSegmentAssignmentTest { TierFactory.PINOT_SERVER_STORAGE_TYPE, TAG_B_NAME, null, null), new TierConfig(TIER_C_NAME, TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", null, TierFactory.PINOT_SERVER_STORAGE_TYPE, TAG_C_NAME, null, null)); + + Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap(); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS) - .setTierConfigList(tierConfigList).setLLC(true).build(); + .setTierConfigList(tierConfigList).setLLC(true).setStreamConfigs(streamConfigs).build(); _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, tableConfig); _instancePartitionsMap = new TreeMap<>(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java index 11dbe233ae..713b4c442a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java @@ -31,6 +31,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -83,9 +84,11 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { System.currentTimeMillis()).getSegmentName()); } + Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap(); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS) - .setLLC(true).setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY) + .setLLC(true).setStreamConfigs(streamConfigs) + .setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY) .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)).build(); _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), tableConfig); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java index b0f12240f5..619d61ef82 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils; +import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -98,14 +99,15 @@ public class SegmentAssignmentStrategyFactoryTest { @Test public void testBalancedNumSegmentAssignmentStrategyForRealtimeTables() { - TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setLLC(true).build(); + Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap(); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setLLC(true) + .setStreamConfigs(streamConfigs).build(); InstancePartitions instancePartitions = new InstancePartitions(INSTANCE_PARTITIONS_NAME); instancePartitions.setInstances(0, 0, INSTANCES); - SegmentAssignmentStrategy segmentAssignmentStrategy = SegmentAssignmentStrategyFactory - .getSegmentAssignmentStrategy(null, tableConfig, InstancePartitionsType.COMPLETED.toString(), - instancePartitions); + SegmentAssignmentStrategy segmentAssignmentStrategy = + SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(null, tableConfig, + InstancePartitionsType.COMPLETED.toString(), instancePartitions); Assert.assertNotNull(segmentAssignmentStrategy); Assert.assertTrue(segmentAssignmentStrategy instanceof BalancedNumSegmentAssignmentStrategy); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java index b5d1162c53..85aba90f36 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.retention; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.helix.HelixAdmin; import org.apache.helix.model.IdealState; @@ -34,6 +35,7 @@ import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder; import org.apache.pinot.controller.helix.core.SegmentDeletionManager; +import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.metrics.PinotMetricUtils; @@ -152,8 +154,10 @@ public class RetentionManagerTest { } private TableConfig createRealtimeTableConfig1(int replicaCount) { + Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap(); return new TableConfigBuilder(TableType.REALTIME).setTableName(TEST_TABLE_NAME).setLLC(true) - .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setNumReplicas(replicaCount).build(); + .setStreamConfigs(streamConfigs).setRetentionTimeUnit("DAYS").setRetentionTimeValue("5") + .setNumReplicas(replicaCount).build(); } private void setupPinotHelixResourceManager(TableConfig tableConfig, final List<String> removedSegments, @@ -233,7 +237,7 @@ public class RetentionManagerTest { private PinotHelixResourceManager setupSegmentMetadata(TableConfig tableConfig, final long now, final int nSegments, List<String> segmentsToBeDeleted) { - final int replicaCount = Integer.valueOf(tableConfig.getValidationConfig().getReplicasPerPartition()); + final int replicaCount = tableConfig.getReplication(); List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 2f15d9b2b4..7aaf01c787 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -972,6 +972,9 @@ public final class TableConfigUtils { } /** + * TODO: After deprecating "replicasPerPartition", we can change this function's behavior to always overwrite + * config to "replication" only. + * * Ensure that the table config has the minimum number of replicas set as per cluster configs. * If is doesn't, set the required amount of replication in the table config */ @@ -992,7 +995,7 @@ public final class TableConfigUtils { if (verifyReplication) { int requestReplication; try { - requestReplication = segmentsConfig.getReplicationNumber(); + requestReplication = tableConfig.getReplication(); if (requestReplication < defaultTableMinReplicas) { LOGGER.info("Creating table with minimum replication factor of: {} instead of requested replication: {}", defaultTableMinReplicas, requestReplication); @@ -1004,12 +1007,9 @@ public final class TableConfigUtils { } if (verifyReplicasPerPartition) { - String replicasPerPartitionStr = segmentsConfig.getReplicasPerPartition(); - if (replicasPerPartitionStr == null) { - throw new IllegalStateException("Field replicasPerPartition needs to be specified"); - } + int replicasPerPartition; try { - int replicasPerPartition = Integer.parseInt(replicasPerPartitionStr); + replicasPerPartition = tableConfig.getReplication(); if (replicasPerPartition < defaultTableMinReplicas) { LOGGER.info( "Creating table with minimum replicasPerPartition of: {} instead of requested replicasPerPartition: {}", @@ -1017,7 +1017,7 @@ public final class TableConfigUtils { segmentsConfig.setReplicasPerPartition(String.valueOf(defaultTableMinReplicas)); } } catch (NumberFormatException e) { - throw new IllegalStateException("Invalid value for replicasPerPartition: '" + replicasPerPartitionStr + "'", e); + throw new IllegalStateException("Invalid replicasPerPartition number", e); } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java index 9595c879c4..849c1e8902 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java @@ -124,6 +124,9 @@ public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig { _segmentPushType = segmentPushType; } + /** + * Try to Use {@link TableConfig#getReplication()} + */ public String getReplication() { return _replication; } @@ -142,6 +145,9 @@ public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig { _schemaName = schemaName; } + /** + * Try to Use {@link TableConfig#getReplication()} + */ public String getReplicasPerPartition() { return _replicasPerPartition; } @@ -166,11 +172,17 @@ public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig { _completionConfig = completionConfig; } + /** + * Try to Use {@link TableConfig#getReplication()} + */ @JsonIgnore public int getReplicationNumber() { return Integer.parseInt(_replication); } + /** + * Try to Use {@link TableConfig#getReplication()} + */ @JsonIgnore public int getReplicasPerPartitionNumber() { return Integer.parseInt(_replicasPerPartition); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java index e9e0032cda..6c478b7995 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java @@ -31,6 +31,8 @@ import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -354,4 +356,30 @@ public class TableConfig extends BaseJsonConfig { public void setSegmentAssignmentConfigMap(Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap) { _segmentAssignmentConfigMap = segmentAssignmentConfigMap; } + + @JsonIgnore + public int getReplication() { + int replication = 0; + if (_tableType == TableType.REALTIME) { + StreamConfig streamConfig = new StreamConfig(_tableName, IngestionConfigUtils.getStreamConfigMap(this)); + if (streamConfig.hasHighLevelConsumerType()) { + // In case of HLC, we read from "replication" + replication = Integer.parseInt(_validationConfig.getReplication()); + } else { + // To keep the backward compatibility, we read from "replicasPerPartition" in case of LLC + String replicasPerPartitionStr = _validationConfig.getReplicasPerPartition(); + try { + replication = Integer.parseInt(replicasPerPartitionStr); + } catch (NumberFormatException e) { + // If numReplicasPerPartition is not being used or specified, read the value from replication + String replicationStr = _validationConfig.getReplication(); + replication = Integer.parseInt(replicationStr); + } + } + } else { + // In case of OFFLINE tables, we read from "replication" + replication = Integer.parseInt(_validationConfig.getReplication()); + } + return replication; + } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotNumReplicaChanger.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotNumReplicaChanger.java index 30a6284b73..528453115e 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotNumReplicaChanger.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotNumReplicaChanger.java @@ -53,7 +53,7 @@ public class PinotNumReplicaChanger extends PinotZKChanger { // Get the number of replicas in the tableconfig. final String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName); final TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, offlineTableName); - final int newNumReplicas = Integer.parseInt(offlineTableConfig.getValidationConfig().getReplication()); + final int newNumReplicas = offlineTableConfig.getReplication(); // Now get the idealstate, and get the number of replicas in it. IdealState currentIdealState = _helixAdmin.getResourceIdealState(_clusterName, offlineTableName); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java index a3ca0caca0..d86dcc2a91 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java @@ -221,7 +221,7 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand StringBuilder note = new StringBuilder(); note.append("\nNote:\n"); - int numReplicas = tableConfig.getValidationConfig().getReplicasPerPartitionNumber(); + int numReplicas = tableConfig.getReplication(); int tableRetentionHours = (int) TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit()) .toHours(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue())); if (_retentionHours > 0) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org