This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 555e5a0443 Combine the read access for replication config (#9849)
555e5a0443 is described below

commit 555e5a04439c614bea8915c9e69327d7390260de
Author: Seunghyun Lee <seungh...@startree.ai>
AuthorDate: Wed Nov 23 22:38:10 2022 -0800

    Combine the read access for replication config (#9849)
    
    * Combine the read access for replication config
    
    Currently, we have a separate configuration for replication.
    Offline and HLC reads from `replication` and LLC reads from
    `replicasPerPartition`. This PR combines the read access for
    the replication config.
    
    * Addressed comments
---
 .../assignment/InstanceAssignmentConfigUtils.java  |  2 +-
 .../pinot/common/utils/config/TableConfigTest.java | 68 +++++++++++++++++++---
 .../api/resources/PinotTableRestletResource.java   |  5 +-
 .../controller/helix/SegmentStatusChecker.java     |  7 +--
 .../helix/core/PinotHelixResourceManager.java      | 13 +----
 .../helix/core/PinotTableIdealStateBuilder.java    | 16 ++---
 .../assignment/segment/BaseSegmentAssignment.java  |  7 +--
 .../segment/OfflineSegmentAssignment.java          |  6 --
 .../segment/RealtimeSegmentAssignment.java         |  6 --
 .../BalancedNumSegmentAssignmentStrategy.java      |  7 +--
 .../ReplicaGroupSegmentAssignmentStrategy.java     |  6 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  2 +-
 .../api/PinotTableRestletResourceTest.java         |  5 +-
 .../api/TableConfigsRestletResourceTest.java       |  4 +-
 .../controller/helix/PinotResourceManagerTest.java |  7 ++-
 ...altimeNonReplicaGroupSegmentAssignmentTest.java |  7 ++-
 ...NonReplicaGroupTieredSegmentAssignmentTest.java |  5 +-
 .../RealtimeReplicaGroupSegmentAssignmentTest.java |  5 +-
 .../SegmentAssignmentStrategyFactoryTest.java      | 12 ++--
 .../helix/core/retention/RetentionManagerTest.java |  8 ++-
 .../segment/local/utils/TableConfigUtils.java      | 14 ++---
 .../SegmentsValidationAndRetentionConfig.java      | 12 ++++
 .../apache/pinot/spi/config/table/TableConfig.java | 28 +++++++++
 .../apache/pinot/tools/PinotNumReplicaChanger.java |  2 +-
 .../command/RealtimeProvisioningHelperCommand.java |  2 +-
 25 files changed, 158 insertions(+), 98 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index 8689b06e22..6a0ae1188e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -106,7 +106,7 @@ public class InstanceAssignmentConfigUtils {
 
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig;
     SegmentsValidationAndRetentionConfig segmentConfig = 
tableConfig.getValidationConfig();
-    int numReplicaGroups = segmentConfig.getReplicationNumber();
+    int numReplicaGroups = tableConfig.getReplication();
     ReplicaGroupStrategyConfig replicaGroupStrategyConfig = 
segmentConfig.getReplicaGroupStrategyConfig();
     Preconditions.checkState(replicaGroupStrategyConfig != null, "Failed to 
find the replica-group strategy config");
     String partitionColumn = replicaGroupStrategyConfig.getPartitionColumn();
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
index ffeef06aca..100cd62c59 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
@@ -22,31 +22,38 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 
 
 public class TableConfigTest {
 
+  private static final String TEST_OFFLINE_TABLE_NAME = "testllc_OFFLINE";
+  private static final String TEST_REALTIME_HLC_TABLE_NAME = 
"testhlc_REALTIME";
+  private static final String TEST_REALTIME_LLC_TABLE_NAME = 
"testllc_REALTIME";
+
   @DataProvider
   public Object[][] configs()
       throws IOException {
     try (Stream<Path> configs = 
Files.list(Paths.get("src/test/resources/testConfigs"))) {
       return configs.map(path -> {
-            try {
-              return Files.readAllBytes(path);
-            } catch (IOException e) {
-              throw new RuntimeException(e);
-            }
-          })
-          .map(config -> new Object[]{config})
-          .toArray(Object[][]::new);
+        try {
+          return Files.readAllBytes(path);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }).map(config -> new Object[]{config}).toArray(Object[][]::new);
     }
   }
 
@@ -56,4 +63,49 @@ public class TableConfigTest {
     TableConfig tableConfig = 
JsonUtils.DEFAULT_READER.forType(TableConfig.class).readValue(config);
     assertTrue(StringUtils.isNotBlank(tableConfig.getTableName()));
   }
+
+  @Test
+  public void testGetReplication() {
+    TableConfig offlineTableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_OFFLINE_TABLE_NAME).setNumReplicas(2).build();
+    assertEquals(2, offlineTableConfig.getReplication());
+
+    offlineTableConfig.getValidationConfig().setReplication("4");
+    assertEquals(4, offlineTableConfig.getReplication());
+
+    offlineTableConfig.getValidationConfig().setReplicasPerPartition("3");
+    assertEquals(4, offlineTableConfig.getReplication());
+
+    TableConfig realtimeHLCTableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_REALTIME_HLC_TABLE_NAME)
+            
.setStreamConfigs(getStreamConfigMap("highlevel")).setNumReplicas(2).build();
+    assertEquals(2, realtimeHLCTableConfig.getReplication());
+
+    realtimeHLCTableConfig.getValidationConfig().setReplication("4");
+    assertEquals(4, realtimeHLCTableConfig.getReplication());
+
+    realtimeHLCTableConfig.getValidationConfig().setReplicasPerPartition("3");
+    assertEquals(4, realtimeHLCTableConfig.getReplication());
+
+    TableConfig realtimeLLCTableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_REALTIME_LLC_TABLE_NAME)
+            
.setStreamConfigs(getStreamConfigMap("lowlevel")).setLLC(true).setNumReplicas(2).build();
+
+    assertEquals(2, realtimeLLCTableConfig.getReplication());
+
+    realtimeLLCTableConfig.getValidationConfig().setReplication("4");
+    assertEquals(2, realtimeLLCTableConfig.getReplication());
+
+    realtimeLLCTableConfig.getValidationConfig().setReplicasPerPartition("3");
+    assertEquals(3, realtimeLLCTableConfig.getReplication());
+  }
+
+  private Map<String, String> getStreamConfigMap(String consumerType) {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("streamType", "kafka");
+    configMap.put("stream.kafka.consumer.type", consumerType);
+    configMap.put("stream.kafka.topic.name", "test");
+    configMap.put("stream.kafka.decoder.class.name", "test");
+    return configMap;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 532a5e66cb..54f9b57250 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -98,7 +98,6 @@ import 
org.apache.pinot.controller.util.TableIngestionStatusHelper;
 import org.apache.pinot.controller.util.TableMetadataReader;
 import org.apache.pinot.core.auth.ManualAuthorization;
 import org.apache.pinot.segment.local.utils.TableConfigUtils;
-import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableStats;
 import org.apache.pinot.spi.config.table.TableStatus;
@@ -811,9 +810,7 @@ public class PinotTableRestletResource {
     String tableNameWithType =
         
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableType, LOGGER).get(0);
     TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-    SegmentsValidationAndRetentionConfig segmentsConfig =
-        tableConfig != null ? tableConfig.getValidationConfig() : null;
-    int numReplica = segmentsConfig == null ? 1 : 
Integer.parseInt(segmentsConfig.getReplication());
+    int numReplica = tableConfig == null ? 1 : tableConfig.getReplication();
 
     String segmentsMetadata;
     try {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index bcdcf876f0..941bb8f0b1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -143,12 +143,7 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
       _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.REPLICATION_FROM_CONFIG, 0);
       return;
     }
-    int replication;
-    if (tableConfig.getTableType() == TableType.REALTIME) {
-      replication = 
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
-    } else {
-      replication = tableConfig.getValidationConfig().getReplicationNumber();
-    }
+    int replication = tableConfig.getReplication();
     _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.REPLICATION_FROM_CONFIG, replication);
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d2b2947293..efb209795e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -136,7 +136,6 @@ import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
-import org.apache.pinot.segment.local.utils.ReplicationUtils;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.ConfigUtils;
 import org.apache.pinot.spi.config.instance.Instance;
@@ -1474,7 +1473,6 @@ public class PinotHelixResourceManager {
     }
 
     validateTableTenantConfig(tableConfig);
-    SegmentsValidationAndRetentionConfig segmentsConfig = 
tableConfig.getValidationConfig();
     TableType tableType = tableConfig.getTableType();
 
     switch (tableType) {
@@ -1482,7 +1480,7 @@ public class PinotHelixResourceManager {
         // now lets build an ideal state
         LOGGER.info("building empty ideal state for table : " + 
tableNameWithType);
         final IdealState offlineIdealState = 
PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType,
-            Integer.parseInt(segmentsConfig.getReplication()), 
_enableBatchMessageMode);
+            tableConfig.getReplication(), _enableBatchMessageMode);
         LOGGER.info("adding table via the admin");
 
         try {
@@ -1795,7 +1793,7 @@ public class PinotHelixResourceManager {
 
         // Update IdealState replication
         IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
-        String replicationConfigured = segmentsConfig.getReplication();
+        String replicationConfigured = 
Integer.toString(tableConfig.getReplication());
         if (!idealState.getReplicas().equals(replicationConfigured)) {
           HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is 
-> {
             assert is != null;
@@ -3742,12 +3740,7 @@ public class PinotHelixResourceManager {
       Set<String> serverInstances = 
getAllInstancesForServerTenant(tenantConfig.getServer());
       return serverInstances.size();
     }
-
-    if (ReplicationUtils.useReplicasPerPartition(tableConfig)) {
-      return 
Integer.parseInt(tableConfig.getValidationConfig().getReplicasPerPartition());
-    }
-
-    return tableConfig.getValidationConfig().getReplicationNumber();
+    return tableConfig.getReplication();
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index beb4f796de..ac24151d67 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -92,31 +92,25 @@ public class PinotTableIdealStateBuilder {
     List<String> realtimeInstances = 
HelixHelper.getInstancesWithTag(helixManager,
         
TagNameUtils.extractConsumingServerTag(realtimeTableConfig.getTenantConfig()));
     IdealState idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, 
1, enableBatchMessageMode);
-    if (realtimeInstances.size() % 
Integer.parseInt(realtimeTableConfig.getValidationConfig().getReplication()) != 
0) {
+    if (realtimeInstances.size() % realtimeTableConfig.getReplication() != 0) {
       throw new RuntimeException(
           "Number of instance in current tenant should be an integer multiples 
of the number of replications");
     }
     setupInstanceConfigForHighLevelConsumer(realtimeTableName, 
realtimeInstances.size(),
-        
Integer.parseInt(realtimeTableConfig.getValidationConfig().getReplication()),
-        IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig), 
zkHelixPropertyStore, realtimeInstances);
+        realtimeTableConfig.getReplication(), 
IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig),
+        zkHelixPropertyStore, realtimeInstances);
     return idealState;
   }
 
   public static void 
buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager 
pinotLLCRealtimeSegmentManager,
       String realtimeTableName, TableConfig realtimeTableConfig, IdealState 
idealState,
       boolean enableBatchMessageMode) {
-
     // Validate replicasPerPartition here.
-    final String replicasPerPartitionStr = 
realtimeTableConfig.getValidationConfig().getReplicasPerPartition();
-    if (replicasPerPartitionStr == null || replicasPerPartitionStr.isEmpty()) {
-      throw new RuntimeException("Null or empty value for 
replicasPerPartition, expected a number");
-    }
     final int nReplicas;
     try {
-      nReplicas = Integer.valueOf(replicasPerPartitionStr);
+      nReplicas = realtimeTableConfig.getReplication();
     } catch (NumberFormatException e) {
-      throw new InvalidTableConfigException(
-          "Invalid value for replicasPerPartition, expected a number: " + 
replicasPerPartitionStr, e);
+      throw new InvalidTableConfigException("Invalid value for 
replicasPerPartition, expected a number.", e);
     }
     if (idealState == null) {
       idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, 
nReplicas, enableBatchMessageMode);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
index c91efb904a..fc66bce53e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
@@ -75,7 +75,7 @@ public abstract class BaseSegmentAssignment implements 
SegmentAssignment {
     _helixManager = helixManager;
     _tableNameWithType = tableConfig.getTableName();
     _tableConfig = tableConfig;
-    _replication = getReplication(tableConfig);
+    _replication = tableConfig.getReplication();
     ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
         tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
     _partitionColumn = replicaGroupStrategyConfig != null ? 
replicaGroupStrategyConfig.getPartitionColumn() : null;
@@ -89,11 +89,6 @@ public abstract class BaseSegmentAssignment implements 
SegmentAssignment {
     }
   }
 
-  /**
-   * Returns the replication of the table.
-   */
-  protected abstract int getReplication(TableConfig tableConfig);
-
   /**
    * Rebalances tiers and returns a pair of tier assignments and non-tier 
assignment.
    */
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index ec04b728fb..36f784515a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -30,7 +30,6 @@ import org.apache.pinot.common.tier.Tier;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.strategy.AllServersSegmentAssignmentStrategy;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 
@@ -40,11 +39,6 @@ import org.apache.pinot.spi.utils.RebalanceConfigConstants;
  */
 public class OfflineSegmentAssignment extends BaseSegmentAssignment {
 
-  @Override
-  protected int getReplication(TableConfig tableConfig) {
-    return tableConfig.getValidationConfig().getReplicationNumber();
-  }
-
   @Override
   public List<String> assignSegment(String segmentName, Map<String, 
Map<String, String>> currentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 4f44aa1e44..c3e4f4c239 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -31,7 +31,6 @@ import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.tier.Tier;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.RebalanceConfigConstants;
@@ -74,11 +73,6 @@ import org.apache.pinot.spi.utils.RebalanceConfigConstants;
  */
 public class RealtimeSegmentAssignment extends BaseSegmentAssignment {
 
-  @Override
-  protected int getReplication(TableConfig tableConfig) {
-    return tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
-  }
-
   @Override
   public List<String> assignSegment(String segmentName, Map<String, 
Map<String, String>> currentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
index 1d26abc296..e9c540da78 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
@@ -26,7 +26,6 @@ import org.apache.pinot.common.assignment.InstancePartitions;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,11 +50,7 @@ public class BalancedNumSegmentAssignmentStrategy implements 
SegmentAssignmentSt
     _tableNameWithType = tableConfig.getTableName();
     SegmentsValidationAndRetentionConfig validationAndRetentionConfig = 
tableConfig.getValidationConfig();
     Preconditions.checkState(validationAndRetentionConfig != null, "Validation 
Config is null");
-    // Number of replicas per partition of low-level consumers check is for 
the real time tables only
-    // TODO: Cleanup required once we fetch the replication number from table 
config depending on table type
-    _replication = tableConfig.getTableType() == TableType.REALTIME
-        ? validationAndRetentionConfig.getReplicasPerPartitionNumber()
-        : validationAndRetentionConfig.getReplicationNumber();
+    _replication = tableConfig.getReplication();
     LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: " 
+ "{} with replication: {}",
         _tableNameWithType, _replication);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java
index 94069dc8c2..d5a4d0e027 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java
@@ -53,11 +53,7 @@ class ReplicaGroupSegmentAssignmentStrategy implements 
SegmentAssignmentStrategy
     _tableName = tableConfig.getTableName();
     SegmentsValidationAndRetentionConfig validationAndRetentionConfig = 
tableConfig.getValidationConfig();
     Preconditions.checkState(validationAndRetentionConfig != null, "Validation 
Config is null");
-    // Number of replicas per partition of low-level consumers check is for 
the real time tables only
-    // TODO: Cleanup required once we fetch the replication number from table 
config depending on table type
-    _replication = tableConfig.getTableType() == TableType.REALTIME
-        ? validationAndRetentionConfig.getReplicasPerPartitionNumber()
-        : validationAndRetentionConfig.getReplicationNumber();
+    _replication = tableConfig.getReplication();
     ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
         validationAndRetentionConfig.getReplicaGroupStrategyConfig();
     _partitionColumn = replicaGroupStrategyConfig != null ? 
replicaGroupStrategyConfig.getPartitionColumn() : null;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 762ffdc421..b7ab051ab2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1339,7 +1339,7 @@ public class PinotLLCRealtimeSegmentManager {
   private int getNumReplicas(TableConfig tableConfig, InstancePartitions 
instancePartitions) {
     if (instancePartitions.getNumReplicaGroups() == 1) {
       // Non-replica-group based
-      return tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
+      return tableConfig.getReplication();
     } else {
       // Replica-group based
       return instancePartitions.getNumReplicaGroups();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index 2b85ed477d..8339e3025f 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -229,7 +229,7 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
     sendPostRequest(_createTableUrl, tableJSONConfigString);
     // table creation should succeed
     TableConfig tableConfig = getTableConfig(tableName, "OFFLINE");
-    assertEquals(tableConfig.getValidationConfig().getReplicationNumber(),
+    assertEquals(tableConfig.getReplication(),
         Math.max(tableReplication, DEFAULT_MIN_NUM_REPLICAS));
 
     DEFAULT_INSTANCE.addDummySchema(tableName);
@@ -237,8 +237,7 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
         
_realtimeBuilder.setTableName(tableName).setNumReplicas(tableReplication).build().toJsonString();
     sendPostRequest(_createTableUrl, tableJSONConfigString);
     tableConfig = getTableConfig(tableName, "REALTIME");
-    assertEquals(tableConfig.getValidationConfig().getReplicationNumber(),
-        Math.max(tableReplication, DEFAULT_MIN_NUM_REPLICAS));
+    assertEquals(tableConfig.getReplication(), Math.max(tableReplication, 
DEFAULT_MIN_NUM_REPLICAS));
 
     DEFAULT_INSTANCE.getHelixResourceManager().deleteOfflineTable(tableName);
     DEFAULT_INSTANCE.getHelixResourceManager().deleteRealtimeTable(tableName);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java
index fad767d123..a24e774981 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java
@@ -306,9 +306,9 @@ public class TableConfigsRestletResourceTest extends 
ControllerTest {
     response = 
sendGetRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableConfigsGet(tableName));
     tableConfigsResponse = JsonUtils.stringToObject(response, 
TableConfigs.class);
     Assert.assertEquals(tableConfigsResponse.getTableName(), tableName);
-    
Assert.assertEquals(tableConfigsResponse.getOffline().getValidationConfig().getReplicationNumber(),
+    Assert.assertEquals(tableConfigsResponse.getOffline().getReplication(),
         DEFAULT_MIN_NUM_REPLICAS);
-    
Assert.assertEquals(tableConfigsResponse.getRealtime().getValidationConfig().getReplicasPerPartitionNumber(),
+    Assert.assertEquals(tableConfigsResponse.getRealtime().getReplication(),
         DEFAULT_MIN_NUM_REPLICAS);
     
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableConfigsDelete(tableName));
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index 6e4230c39a..445e43a871 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -79,15 +79,16 @@ public class PinotResourceManagerTest {
     Schema dummySchema = TEST_INSTANCE.createDummySchema(invalidRealtimeTable);
     TEST_INSTANCE.addSchema(dummySchema);
 
-    Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
     // Missing replicasPerPartition
     TableConfig invalidRealtimeTableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setStreamConfigs(streamConfigs).setTableName(invalidRealtimeTable)
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(invalidRealtimeTable)
             .setSchemaName(dummySchema.getSchemaName()).build();
+
     try {
       
TEST_INSTANCE.getHelixResourceManager().addTable(invalidRealtimeTableConfig);
       Assert.fail(
-          "Table creation should have thrown exception due to missing 
replicasPerPartition in validation config");
+          "Table creation should have thrown exception due to missing stream 
config and replicasPerPartition in "
+              + "validation config");
     } catch (Exception e) {
       // expected
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
index 4e20967949..2bf6fd11e0 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
@@ -30,6 +30,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -79,9 +80,10 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
           System.currentTimeMillis()).getSegmentName());
     }
 
+    Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
-            .setLLC(true).build();
+            .setLLC(true).setStreamConfigs(streamConfigs).build();
     _segmentAssignment = 
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), 
tableConfig);
 
     _instancePartitionsMap = new TreeMap<>();
@@ -113,9 +115,10 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
 
   @Test
   public void testReplicationForSegmentAssignment() {
+    Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
-            .setLLC(true).build();
+            .setLLC(true).setStreamConfigs(streamConfigs).build();
     // Update the replication by changing the NUM_REPLICAS_PER_PARTITION
     
tableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_PER_PARTITION);
     SegmentAssignment segmentAssignment =
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
index c3f1ae30fe..73cb94c4fa 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.tier.TierSegmentSelector;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TierConfig;
@@ -117,9 +118,11 @@ public class 
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
             TierFactory.PINOT_SERVER_STORAGE_TYPE, TAG_B_NAME, null, null),
         new TierConfig(TIER_C_NAME, TierFactory.TIME_SEGMENT_SELECTOR_TYPE, 
"30d", null,
             TierFactory.PINOT_SERVER_STORAGE_TYPE, TAG_C_NAME, null, null));
+
+    Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
-            .setTierConfigList(tierConfigList).setLLC(true).build();
+            
.setTierConfigList(tierConfigList).setLLC(true).setStreamConfigs(streamConfigs).build();
     _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, 
tableConfig);
 
     _instancePartitionsMap = new TreeMap<>();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index 11dbe233ae..713b4c442a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -31,6 +31,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -83,9 +84,11 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
           System.currentTimeMillis()).getSegmentName());
     }
 
+    Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
-            
.setLLC(true).setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
+            .setLLC(true).setStreamConfigs(streamConfigs)
+            
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
             .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)).build();
     _segmentAssignment = 
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), 
tableConfig);
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java
index b0f12240f5..619d61ef82 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -98,14 +99,15 @@ public class SegmentAssignmentStrategyFactoryTest {
 
   @Test
   public void testBalancedNumSegmentAssignmentStrategyForRealtimeTables() {
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setLLC(true).build();
+    Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setLLC(true)
+        .setStreamConfigs(streamConfigs).build();
     InstancePartitions instancePartitions = new 
InstancePartitions(INSTANCE_PARTITIONS_NAME);
     instancePartitions.setInstances(0, 0, INSTANCES);
 
-    SegmentAssignmentStrategy segmentAssignmentStrategy = 
SegmentAssignmentStrategyFactory
-        .getSegmentAssignmentStrategy(null, tableConfig, 
InstancePartitionsType.COMPLETED.toString(),
-            instancePartitions);
+    SegmentAssignmentStrategy segmentAssignmentStrategy =
+        SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(null, 
tableConfig,
+            InstancePartitionsType.COMPLETED.toString(), instancePartitions);
     Assert.assertNotNull(segmentAssignmentStrategy);
 
     Assert.assertTrue(segmentAssignmentStrategy instanceof 
BalancedNumSegmentAssignmentStrategy);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index b5d1162c53..85aba90f36 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.retention;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.IdealState;
@@ -34,6 +35,7 @@ import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
@@ -152,8 +154,10 @@ public class RetentionManagerTest {
   }
 
   private TableConfig createRealtimeTableConfig1(int replicaCount) {
+    Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
     return new 
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_TABLE_NAME).setLLC(true)
-        
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setNumReplicas(replicaCount).build();
+        
.setStreamConfigs(streamConfigs).setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
+        .setNumReplicas(replicaCount).build();
   }
 
   private void setupPinotHelixResourceManager(TableConfig tableConfig, final 
List<String> removedSegments,
@@ -233,7 +237,7 @@ public class RetentionManagerTest {
 
   private PinotHelixResourceManager setupSegmentMetadata(TableConfig 
tableConfig, final long now, final int nSegments,
       List<String> segmentsToBeDeleted) {
-    final int replicaCount = 
Integer.valueOf(tableConfig.getValidationConfig().getReplicasPerPartition());
+    final int replicaCount = tableConfig.getReplication();
 
     List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 2f15d9b2b4..7aaf01c787 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -972,6 +972,9 @@ public final class TableConfigUtils {
   }
 
   /**
+   * TODO: After deprecating "replicasPerPartition", we can change this 
function's behavior to always overwrite
+   * config to "replication" only.
+   *
    * Ensure that the table config has the minimum number of replicas set as 
per cluster configs.
    * If is doesn't, set the required amount of replication in the table config
    */
@@ -992,7 +995,7 @@ public final class TableConfigUtils {
     if (verifyReplication) {
       int requestReplication;
       try {
-        requestReplication = segmentsConfig.getReplicationNumber();
+        requestReplication = tableConfig.getReplication();
         if (requestReplication < defaultTableMinReplicas) {
           LOGGER.info("Creating table with minimum replication factor of: {} 
instead of requested replication: {}",
               defaultTableMinReplicas, requestReplication);
@@ -1004,12 +1007,9 @@ public final class TableConfigUtils {
     }
 
     if (verifyReplicasPerPartition) {
-      String replicasPerPartitionStr = 
segmentsConfig.getReplicasPerPartition();
-      if (replicasPerPartitionStr == null) {
-        throw new IllegalStateException("Field replicasPerPartition needs to 
be specified");
-      }
+      int replicasPerPartition;
       try {
-        int replicasPerPartition = Integer.parseInt(replicasPerPartitionStr);
+        replicasPerPartition = tableConfig.getReplication();
         if (replicasPerPartition < defaultTableMinReplicas) {
           LOGGER.info(
               "Creating table with minimum replicasPerPartition of: {} instead 
of requested replicasPerPartition: {}",
@@ -1017,7 +1017,7 @@ public final class TableConfigUtils {
           
segmentsConfig.setReplicasPerPartition(String.valueOf(defaultTableMinReplicas));
         }
       } catch (NumberFormatException e) {
-        throw new IllegalStateException("Invalid value for 
replicasPerPartition: '" + replicasPerPartitionStr + "'", e);
+        throw new IllegalStateException("Invalid replicasPerPartition number", 
e);
       }
     }
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index 9595c879c4..849c1e8902 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -124,6 +124,9 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
     _segmentPushType = segmentPushType;
   }
 
+  /**
+   * Try to Use {@link TableConfig#getReplication()}
+   */
   public String getReplication() {
     return _replication;
   }
@@ -142,6 +145,9 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
     _schemaName = schemaName;
   }
 
+  /**
+   * Try to Use {@link TableConfig#getReplication()}
+   */
   public String getReplicasPerPartition() {
     return _replicasPerPartition;
   }
@@ -166,11 +172,17 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
     _completionConfig = completionConfig;
   }
 
+  /**
+   * Try to Use {@link TableConfig#getReplication()}
+   */
   @JsonIgnore
   public int getReplicationNumber() {
     return Integer.parseInt(_replication);
   }
 
+  /**
+   * Try to Use {@link TableConfig#getReplication()}
+   */
   @JsonIgnore
   public int getReplicasPerPartitionNumber() {
     return Integer.parseInt(_replicasPerPartition);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index e9e0032cda..6c478b7995 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -31,6 +31,8 @@ import 
org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
@@ -354,4 +356,30 @@ public class TableConfig extends BaseJsonConfig {
   public void setSegmentAssignmentConfigMap(Map<String, 
SegmentAssignmentConfig> segmentAssignmentConfigMap) {
     _segmentAssignmentConfigMap = segmentAssignmentConfigMap;
   }
+
+  @JsonIgnore
+  public int getReplication() {
+    int replication = 0;
+    if (_tableType == TableType.REALTIME) {
+      StreamConfig streamConfig = new StreamConfig(_tableName, 
IngestionConfigUtils.getStreamConfigMap(this));
+      if (streamConfig.hasHighLevelConsumerType()) {
+        // In case of HLC, we read from "replication"
+        replication = Integer.parseInt(_validationConfig.getReplication());
+      } else {
+        // To keep the backward compatibility, we read from 
"replicasPerPartition" in case of LLC
+        String replicasPerPartitionStr = 
_validationConfig.getReplicasPerPartition();
+        try {
+          replication = Integer.parseInt(replicasPerPartitionStr);
+        } catch (NumberFormatException e) {
+          // If numReplicasPerPartition is not being used or specified, read 
the value from replication
+          String replicationStr = _validationConfig.getReplication();
+          replication = Integer.parseInt(replicationStr);
+        }
+      }
+    } else {
+      // In case of OFFLINE tables, we read from "replication"
+      replication = Integer.parseInt(_validationConfig.getReplication());
+    }
+    return replication;
+  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotNumReplicaChanger.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotNumReplicaChanger.java
index 30a6284b73..528453115e 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotNumReplicaChanger.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotNumReplicaChanger.java
@@ -53,7 +53,7 @@ public class PinotNumReplicaChanger extends PinotZKChanger {
     // Get the number of replicas in the tableconfig.
     final String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
     final TableConfig offlineTableConfig = 
ZKMetadataProvider.getOfflineTableConfig(_propertyStore, offlineTableName);
-    final int newNumReplicas = 
Integer.parseInt(offlineTableConfig.getValidationConfig().getReplication());
+    final int newNumReplicas = offlineTableConfig.getReplication();
 
     // Now get the idealstate, and get the number of replicas in it.
     IdealState currentIdealState = 
_helixAdmin.getResourceIdealState(_clusterName, offlineTableName);
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
index a3ca0caca0..d86dcc2a91 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
@@ -221,7 +221,7 @@ public class RealtimeProvisioningHelperCommand extends 
AbstractBaseAdminCommand
 
     StringBuilder note = new StringBuilder();
     note.append("\nNote:\n");
-    int numReplicas = 
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
+    int numReplicas = tableConfig.getReplication();
     int tableRetentionHours = (int) 
TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit())
         
.toHours(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
     if (_retentionHours > 0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to