This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch full-auto-oss-abstraction
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 3dba068168ad0ae9184d9586a15f7ea4a29721f2
Author: jlli_LinkedIn <j...@linkedin.com>
AuthorDate: Fri Mar 22 16:00:43 2024 -0700

    Extract methods for Pinot table ideal state
---
 .../MultiStageReplicaGroupSelector.java            |  13 +-
 .../FullAutoInstancePartitionsUtils.java           |  52 +++++++
 .../common/assignment/InstancePartitionsUtils.java |  34 +++--
 .../assignment/InstancePartitionsUtilsHelper.java  |  58 ++++++++
 .../InstancePartitionsUtilsHelperFactory.java      |  46 ++++++
 .../pinot/common/utils/config/TierConfigUtils.java |   7 +-
 .../pinot/controller/BaseControllerStarter.java    |  10 ++
 .../apache/pinot/controller/ControllerConf.java    |  11 ++
 .../PinotInstanceAssignmentRestletResource.java    |  59 ++++----
 .../api/resources/PinotTenantRestletResource.java  |  12 +-
 .../helix/core/PinotHelixResourceManager.java      |  98 +++++-------
 .../helix/core/PinotTableIdealStateHelper.java     | 145 ------------------
 .../instance/InstanceAssignmentDriver.java         |   8 +-
 .../DefaultPinotTableIdealStateHelper.java         | 151 +++++++++++++++++++
 .../FullAutoPinotTableIdealStateHelper.java        | 165 +++++++++++++++++++++
 .../PinotTableIdealStateHelper.java                |  87 +++++++++++
 .../PinotTableIdealStateHelperFactory.java         |  50 +++++++
 .../realtime/MissingConsumingSegmentFinder.java    |   4 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  97 +++---------
 .../helix/core/rebalance/TableRebalancer.java      |  80 +++++-----
 .../helix/core/util/HelixSetupUtils.java           |  15 ++
 ...anceAssignmentRestletResourceStatelessTest.java |   4 +-
 ...NonReplicaGroupTieredSegmentAssignmentTest.java |   8 +-
 ...NonReplicaGroupTieredSegmentAssignmentTest.java |   8 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |   6 +-
 .../TableRebalancerClusterStatelessTest.java       |   4 +-
 .../helix/core/retention/RetentionManagerTest.java |  10 +-
 .../tools/admin/command/MoveReplicaGroup.java      |  20 +--
 28 files changed, 845 insertions(+), 417 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
index 15fb525a8c..3f794c4017 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -34,7 +34,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -145,12 +145,13 @@ public class MultiStageReplicaGroupSelector extends 
BaseInstanceSelector {
     Preconditions.checkNotNull(tableType);
     InstancePartitions instancePartitions;
     if (tableType.equals(TableType.OFFLINE)) {
-      instancePartitions = 
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
-          
InstancePartitionsUtils.getInstancePartitionsName(_tableNameWithType, 
tableType.name()));
+      instancePartitions = 
InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_propertyStore,
+          InstancePartitionsUtilsHelperFactory.create()
+              .getInstancePartitionsName(_tableNameWithType, 
tableType.name()));
     } else {
-      instancePartitions = 
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
-          InstancePartitionsUtils.getInstancePartitionsName(_tableNameWithType,
-              InstancePartitionsType.CONSUMING.name()));
+      instancePartitions = 
InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_propertyStore,
+          InstancePartitionsUtilsHelperFactory.create()
+              .getInstancePartitionsName(_tableNameWithType, 
InstancePartitionsType.CONSUMING.name()));
     }
     Preconditions.checkNotNull(instancePartitions);
     return instancePartitions;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/FullAutoInstancePartitionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/FullAutoInstancePartitionsUtils.java
new file mode 100644
index 0000000000..2821cc1c99
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/FullAutoInstancePartitionsUtils.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.assignment;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+
+
+public class FullAutoInstancePartitionsUtils extends InstancePartitionsUtils {
+
+  @Override
+  public InstancePartitions computeDefaultInstancePartitions(HelixManager 
helixManager, TableConfig tableConfig,
+      InstancePartitionsType instancePartitionsType) {
+    // Use the corresponding method to compute default instance partitions or 
no-op.
+    return super.computeDefaultInstancePartitions(helixManager, tableConfig, 
instancePartitionsType);
+  }
+
+  @Override
+  public InstancePartitions 
computeDefaultInstancePartitionsForTag(HelixManager helixManager, String 
tableNameWithType,
+      String instancePartitionsType, String serverTag) {
+    // Use the corresponding method to compute default instance partitions for 
tags or no-op.
+    return super.computeDefaultInstancePartitionsForTag(helixManager, 
tableNameWithType, instancePartitionsType,
+        serverTag);
+  }
+
+  @Override
+  public void persistInstancePartitions(HelixPropertyStore<ZNRecord> 
propertyStore, ConfigAccessor configAccessor,
+      String helixClusterName, InstancePartitions instancePartitions) {
+    // Use the corresponding method to persist instance partitions.
+    super.persistInstancePartitions(propertyStore, configAccessor, 
helixClusterName, instancePartitions);
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
index f8bbd08934..3e3ae7c667 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
@@ -43,9 +43,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 /**
  * Utility class for instance partitions.
  */
-public class InstancePartitionsUtils {
-  private InstancePartitionsUtils() {
-  }
+public class InstancePartitionsUtils implements InstancePartitionsUtilsHelper {
 
   public static final char TYPE_SUFFIX_SEPARATOR = '_';
   public static final String TIER_SUFFIX = "__TIER__";
@@ -54,14 +52,16 @@ public class InstancePartitionsUtils {
    * Returns the name of the instance partitions for the given table name 
(with or without type suffix) and instance
    * partitions type.
    */
-  public static String getInstancePartitionsName(String tableName, String 
instancePartitionsType) {
+  @Override
+  public String getInstancePartitionsName(String tableName, String 
instancePartitionsType) {
     return TableNameBuilder.extractRawTableName(tableName) + 
TYPE_SUFFIX_SEPARATOR + instancePartitionsType;
   }
 
   /**
    * Fetches the instance partitions from Helix property store if it exists, 
or computes it for backward-compatibility.
    */
-  public static InstancePartitions 
fetchOrComputeInstancePartitions(HelixManager helixManager, TableConfig 
tableConfig,
+  @Override
+  public InstancePartitions fetchOrComputeInstancePartitions(HelixManager 
helixManager, TableConfig tableConfig,
       InstancePartitionsType instancePartitionsType) {
     String tableNameWithType = tableConfig.getTableName();
     String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
@@ -88,15 +88,17 @@ public class InstancePartitionsUtils {
   /**
    * Fetches the instance partitions from Helix property store.
    */
+  @Override
   @Nullable
-  public static InstancePartitions 
fetchInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
+  public InstancePartitions 
fetchInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
       String instancePartitionsName) {
     String path = 
ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitionsName);
     ZNRecord znRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
     return znRecord != null ? InstancePartitions.fromZNRecord(znRecord) : null;
   }
 
-  public static String getInstancePartitionsNameForTier(String tableName, 
String tierName) {
+  @Override
+  public String getInstancePartitionsNameForTier(String tableName, String 
tierName) {
     return TableNameBuilder.extractRawTableName(tableName) + TIER_SUFFIX + 
tierName;
   }
 
@@ -106,7 +108,8 @@ public class InstancePartitionsUtils {
    * This method is useful when we use a table with instancePartitionsMap 
since in that case
    * the value of a table's instance partitions are copied over from an 
existing instancePartitions.
    */
-  public static InstancePartitions 
fetchInstancePartitionsWithRename(HelixPropertyStore<ZNRecord> propertyStore,
+  @Override
+  public InstancePartitions 
fetchInstancePartitionsWithRename(HelixPropertyStore<ZNRecord> propertyStore,
       String instancePartitionsName, String newName) {
     InstancePartitions instancePartitions = 
fetchInstancePartitions(propertyStore, instancePartitionsName);
     Preconditions.checkNotNull(instancePartitions,
@@ -121,7 +124,8 @@ public class InstancePartitionsUtils {
    * <p>Choose both enabled and disabled instances with the server tag as the 
qualified instances to avoid unexpected
    * data shuffling when instances get disabled.
    */
-  public static InstancePartitions 
computeDefaultInstancePartitions(HelixManager helixManager, TableConfig 
tableConfig,
+  @Override
+  public InstancePartitions computeDefaultInstancePartitions(HelixManager 
helixManager, TableConfig tableConfig,
       InstancePartitionsType instancePartitionsType) {
     TenantConfig tenantConfig = tableConfig.getTenantConfig();
     String serverTag;
@@ -148,7 +152,8 @@ public class InstancePartitionsUtils {
    * <p>Choose both enabled and disabled instances with the server tag as the 
qualified instances to avoid unexpected
    * data shuffling when instances get disabled.
    */
-  public static InstancePartitions 
computeDefaultInstancePartitionsForTag(HelixManager helixManager,
+  @Override
+  public InstancePartitions 
computeDefaultInstancePartitionsForTag(HelixManager helixManager,
       String tableNameWithType, String instancePartitionsType, String 
serverTag) {
     List<String> instances = HelixHelper.getInstancesWithTag(helixManager, 
serverTag);
     int numInstances = instances.size();
@@ -166,7 +171,8 @@ public class InstancePartitionsUtils {
   /**
    * Persists the instance partitions to Helix property store.
    */
-  public static void persistInstancePartitions(HelixPropertyStore<ZNRecord> 
propertyStore,
+  @Override
+  public void persistInstancePartitions(HelixPropertyStore<ZNRecord> 
propertyStore,
       ConfigAccessor configAccessor, String helixClusterName, 
InstancePartitions instancePartitions) {
     String path = ZKMetadataProvider
         
.constructPropertyStorePathForInstancePartitions(instancePartitions.getInstancePartitionsName());
@@ -185,7 +191,8 @@ public class InstancePartitionsUtils {
   /**
    * Removes the instance partitions from Helix property store.
    */
-  public static void removeInstancePartitions(HelixPropertyStore<ZNRecord> 
propertyStore,
+  @Override
+  public void removeInstancePartitions(HelixPropertyStore<ZNRecord> 
propertyStore,
       String instancePartitionsName) {
     String path = 
ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitionsName);
     if (!propertyStore.remove(path, AccessOption.PERSISTENT)) {
@@ -193,7 +200,8 @@ public class InstancePartitionsUtils {
     }
   }
 
-  public static void removeTierInstancePartitions(HelixPropertyStore<ZNRecord> 
propertyStore,
+  @Override
+  public void removeTierInstancePartitions(HelixPropertyStore<ZNRecord> 
propertyStore,
       String tableNameWithType) {
     List<InstancePartitions> instancePartitions = 
ZKMetadataProvider.getAllInstancePartitions(propertyStore);
     instancePartitions.stream().filter(instancePartition -> 
instancePartition.getInstancePartitionsName()
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtilsHelper.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtilsHelper.java
new file mode 100644
index 0000000000..66a4c5e157
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtilsHelper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.assignment;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+
+
+public interface InstancePartitionsUtilsHelper {
+
+  String getInstancePartitionsName(String tableName, String 
instancePartitionsType);
+
+  InstancePartitions fetchOrComputeInstancePartitions(HelixManager 
helixManager, TableConfig tableConfig,
+      InstancePartitionsType instancePartitionsType);
+
+  InstancePartitions fetchInstancePartitions(HelixPropertyStore<ZNRecord> 
propertyStore,
+      String instancePartitionsName);
+
+  InstancePartitions computeDefaultInstancePartitions(HelixManager 
helixManager, TableConfig tableConfig,
+      InstancePartitionsType instancePartitionsType);
+
+  InstancePartitions computeDefaultInstancePartitionsForTag(HelixManager 
helixManager,
+      String tableNameWithType, String instancePartitionsType, String 
serverTag);
+
+  String getInstancePartitionsNameForTier(String tableName, String tierName);
+
+  void persistInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
+      ConfigAccessor configAccessor, String helixClusterName, 
InstancePartitions instancePartitions);
+
+  InstancePartitions 
fetchInstancePartitionsWithRename(HelixPropertyStore<ZNRecord> propertyStore,
+      String instancePartitionsName, String newName);
+
+  void removeInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
+      String instancePartitionsName);
+
+  void removeTierInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
+      String tableNameWithType);
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtilsHelperFactory.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtilsHelperFactory.java
new file mode 100644
index 0000000000..be3587a374
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtilsHelperFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.assignment;
+
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class InstancePartitionsUtilsHelperFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePartitionsUtilsHelperFactory.class);
+
+  private static InstancePartitionsUtilsHelper _instance = null;
+
+  private static PinotConfiguration _pinotConfiguration;
+
+  private InstancePartitionsUtilsHelperFactory() {
+  }
+
+  public static void init(PinotConfiguration pinotConfiguration) {
+    _pinotConfiguration = pinotConfiguration;
+  }
+
+  public static InstancePartitionsUtilsHelper create() {
+    if (_instance == null) {
+      _instance = new InstancePartitionsUtils();
+    }
+    return _instance;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
index 31e32113e7..94298917be 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
@@ -28,7 +28,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.common.tier.FixedTierSegmentSelector;
 import org.apache.pinot.common.tier.PinotServerTierStorage;
 import org.apache.pinot.common.tier.Tier;
@@ -79,8 +79,9 @@ public final class TierConfigUtils {
       if (tier.getSegmentSelector().selectSegment(tableNameWithType, 
segmentName)) {
         // Compute default instance partitions
         PinotServerTierStorage storage = (PinotServerTierStorage) 
tier.getStorage();
-        return 
InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(helixManager, 
tableNameWithType,
-            tier.getName(), storage.getServerTag());
+        return InstancePartitionsUtilsHelperFactory.create()
+            .computeDefaultInstancePartitionsForTag(helixManager, 
tableNameWithType, tier.getName(),
+                storage.getServerTag());
       }
     }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 281c397401..8e2ca24a22 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -57,6 +57,7 @@ import org.apache.http.config.SocketConfig;
 import org.apache.http.conn.HttpClientConnectionManager;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.function.FunctionRegistry;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
@@ -88,6 +89,7 @@ import 
org.apache.pinot.controller.helix.RealtimeConsumerMonitor;
 import org.apache.pinot.controller.helix.SegmentStatusChecker;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.cleanup.StaleInstancesCleanupTask;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
@@ -200,6 +202,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     ServiceStartableUtils.applyClusterConfig(_config, _helixZkURL, 
_helixClusterName, ServiceRole.CONTROLLER);
 
     setupHelixSystemProperties();
+    // Initialize the ideal state helper for Pinot tables.
+    PinotTableIdealStateHelperFactory.init(_config);
     
HelixHelper.setMinNumCharsInISToTurnOnCompression(_config.getMinNumCharsInISToTurnOnCompression());
     _listenerConfigs = ListenerConfigUtil.buildControllerConfigs(_config);
     _controllerMode = _config.getControllerMode();
@@ -408,6 +412,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     initControllerFilePathProvider();
     initSegmentFetcherFactory();
     initPinotCrypterFactory();
+    initInstancePartitionUtilsHelperFactory();
 
     LOGGER.info("Initializing QueryRewriterFactory");
     QueryRewriterFactory.init(
@@ -732,6 +737,11 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     }
   }
 
+  private void initInstancePartitionUtilsHelperFactory() {
+    LOGGER.info("Initializing InstancePartitionUtilsHelperFactory");
+    InstancePartitionsUtilsHelperFactory.init(_config);
+  }
+
   /**
    * Registers, connects to Helix cluster as PARTICIPANT role, and adds 
listeners.
    */
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 4598b48eeb..3cfc4705fd 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -275,6 +275,7 @@ public class ControllerConf extends PinotConfiguration {
   public static final String ACCESS_CONTROL_USERNAME = 
"access.control.init.username";
   public static final String ACCESS_CONTROL_PASSWORD = 
"access.control.init.password";
   public static final String LINEAGE_MANAGER_CLASS = 
"controller.lineage.manager.class";
+  public static final String PINOT_TABLE_IDEALSTATE_HELPER_CLASS = 
"controller.pinot.table.idealstate.class";
   // Amount of the time the segment can take from the beginning of upload to 
the end of upload. Used when parallel push
   // protection is enabled. If the upload does not finish within the timeout, 
next upload can override the previous one.
   private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 
"controller.segment.upload.timeoutInMillis";
@@ -298,6 +299,8 @@ public class ControllerConf extends PinotConfiguration {
   private static final String DEFAULT_ACCESS_CONTROL_PASSWORD = "admin";
   private static final String DEFAULT_LINEAGE_MANAGER =
       "org.apache.pinot.controller.helix.core.lineage.DefaultLineageManager";
+  private static final String DEFAULT_PINOT_TABLE_IDEALSTATE_HELPER_CLASS =
+      
"org.apache.pinot.controller.helix.core.idealstatehelper.DefaultPinotTableIdealStateHelper";
   private static final long DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 
600_000L; // 10 minutes
   private static final int DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION 
= -1;
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 
64;
@@ -872,6 +875,14 @@ public class ControllerConf extends PinotConfiguration {
     setProperty(LINEAGE_MANAGER_CLASS, lineageModifierClass);
   }
 
+  public String getPinotTableIdealstateHelperClass() {
+    return getProperty(PINOT_TABLE_IDEALSTATE_HELPER_CLASS, 
DEFAULT_PINOT_TABLE_IDEALSTATE_HELPER_CLASS);
+  }
+
+  public void setPinotTableIdealstateHelperClass(String 
pinotTableIdealstateHelperClass) {
+    setProperty(PINOT_TABLE_IDEALSTATE_HELPER_CLASS, 
pinotTableIdealstateHelperClass);
+  }
+
   public long getSegmentUploadTimeoutInMillis() {
     return getProperty(SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS, 
DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index 6fd157d620..73306c1f35 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -50,7 +50,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.controller.api.access.AccessType;
 import org.apache.pinot.controller.api.access.Authenticate;
@@ -97,7 +97,7 @@ public class PinotInstanceAssignmentRestletResource {
     if (tableType != TableType.REALTIME) {
       if (InstancePartitionsType.OFFLINE.toString().equals(type) || type == 
null) {
         InstancePartitions offlineInstancePartitions =
-            
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
+            
InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_resourceManager.getPropertyStore(),
                 
InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
         if (offlineInstancePartitions != null) {
           instancePartitionsMap.put(InstancePartitionsType.OFFLINE.toString(), 
offlineInstancePartitions);
@@ -107,7 +107,7 @@ public class PinotInstanceAssignmentRestletResource {
     if (tableType != TableType.OFFLINE) {
       if (InstancePartitionsType.CONSUMING.toString().equals(type) || type == 
null) {
         InstancePartitions consumingInstancePartitions =
-            
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
+            
InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_resourceManager.getPropertyStore(),
                 
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
         if (consumingInstancePartitions != null) {
           
instancePartitionsMap.put(InstancePartitionsType.CONSUMING.toString(), 
consumingInstancePartitions);
@@ -115,7 +115,7 @@ public class PinotInstanceAssignmentRestletResource {
       }
       if (InstancePartitionsType.COMPLETED.toString().equals(type) || type == 
null) {
         InstancePartitions completedInstancePartitions =
-            
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
+            
InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_resourceManager.getPropertyStore(),
                 
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
         if (completedInstancePartitions != null) {
           
instancePartitionsMap.put(InstancePartitionsType.COMPLETED.toString(), 
completedInstancePartitions);
@@ -130,10 +130,10 @@ public class PinotInstanceAssignmentRestletResource {
       if (tableConfig != null && 
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
         for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
           if (type == null || type.equals(tierConfig.getName())) {
-            InstancePartitions instancePartitions =
-                
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
-                    
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
-                        tierConfig.getName()));
+            InstancePartitions instancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+                .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+                    InstancePartitionsUtilsHelperFactory.create()
+                        
.getInstancePartitionsNameForTier(tableConfig.getTableName(), 
tierConfig.getName()));
             if (instancePartitions != null) {
               instancePartitionsMap.put(tierConfig.getName(), 
instancePartitions);
             }
@@ -245,9 +245,10 @@ public class PinotInstanceAssignmentRestletResource {
       TableConfig tableConfig, List<InstanceConfig> instanceConfigs, 
InstancePartitionsType instancePartitionsType) {
     String tableNameWithType = tableConfig.getTableName();
     if (!TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, 
instancePartitionsType)) {
-      InstancePartitions existingInstancePartitions =
-          
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
-              
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, 
instancePartitionsType.toString()));
+      InstancePartitions existingInstancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+          
.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
+              InstancePartitionsUtilsHelperFactory.create()
+                  .getInstancePartitionsName(tableNameWithType, 
instancePartitionsType.toString()));
       instancePartitionsMap.put(instancePartitionsType.toString(),
           new 
InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, 
instanceConfigs,
               existingInstancePartitions));
@@ -255,14 +256,15 @@ public class PinotInstanceAssignmentRestletResource {
       if 
(InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, 
instancePartitionsType)) {
         // fetch the existing instance partitions, if the table, this is 
referenced in the new instance partitions
         // generation for minimum difference
-        InstancePartitions existingInstancePartitions = 
InstancePartitionsUtils.fetchInstancePartitions(
-            _resourceManager.getHelixZkManager().getHelixPropertyStore(),
-            
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, 
instancePartitionsType.toString()));
+        InstancePartitions existingInstancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+            
.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
+                
InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(tableNameWithType,
+                    instancePartitionsType.toString()));
         String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
         // fetch the pre-configured instance partitions, the renaming part is 
irrelevant as we are not really
         // preserving this preConfigured, but only using it as a reference to 
generate the new instance partitions
-        InstancePartitions preConfigured =
-            
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
+        InstancePartitions preConfigured = 
InstancePartitionsUtilsHelperFactory.create()
+            
.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
                 
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
                 
instancePartitionsType.getInstancePartitionsName(rawTableName));
         instancePartitionsMap.put(instancePartitionsType.toString(),
@@ -270,8 +272,8 @@ public class PinotInstanceAssignmentRestletResource {
                 existingInstancePartitions, preConfigured));
       } else {
         String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-        instancePartitionsMap.put(instancePartitionsType.toString(),
-            
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
+        instancePartitionsMap.put(instancePartitionsType.toString(), 
InstancePartitionsUtilsHelperFactory.create()
+            
.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
                 
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
                 
instancePartitionsType.getInstancePartitionsName(rawTableName)));
       }
@@ -285,10 +287,10 @@ public class PinotInstanceAssignmentRestletResource {
       for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
         if ((tierConfig.getName().equals(tierName) || tierName == null)
             && 
tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()) != null) 
{
-          InstancePartitions existingInstancePartitions = 
InstancePartitionsUtils.fetchInstancePartitions(
-              _resourceManager.getHelixZkManager().getHelixPropertyStore(),
-              
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
-                  tierConfig.getName()));
+          InstancePartitions existingInstancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+              
.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
+                  InstancePartitionsUtilsHelperFactory.create()
+                      
.getInstancePartitionsNameForTier(tableConfig.getTableName(), 
tierConfig.getName()));
 
           instancePartitionsMap.put(tierConfig.getName(),
               new 
InstanceAssignmentDriver(tableConfig).assignInstances(tierConfig.getName(), 
instanceConfigs,
@@ -301,7 +303,7 @@ public class PinotInstanceAssignmentRestletResource {
   private void persistInstancePartitionsHelper(InstancePartitions 
instancePartitions) {
     try {
       LOGGER.info("Persisting instance partitions: {}", instancePartitions);
-      
InstancePartitionsUtils.persistInstancePartitions(_resourceManager.getPropertyStore(),
+      
InstancePartitionsUtilsHelperFactory.create().persistInstancePartitions(_resourceManager.getPropertyStore(),
           _resourceManager.getHelixZkManager().getConfigAccessor(), 
_resourceManager.getHelixClusterName(),
           instancePartitions);
     } catch (Exception e) {
@@ -352,7 +354,8 @@ public class PinotInstanceAssignmentRestletResource {
     for (TableConfig tableConfig : tableConfigs) {
       if (tableConfig != null && 
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
         for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
-          if 
(InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
 tierConfig.getName())
+          if (InstancePartitionsUtilsHelperFactory.create()
+              .getInstancePartitionsNameForTier(tableConfig.getTableName(), 
tierConfig.getName())
               .equals(instancePartitionsName)) {
             persistInstancePartitionsHelper(instancePartitions);
             return Collections.singletonMap(tierConfig.getName(), 
instancePartitions);
@@ -399,9 +402,8 @@ public class PinotInstanceAssignmentRestletResource {
       if (tableConfig != null && 
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
         for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
           if (instancePartitionsType == null || 
instancePartitionsType.equals(tierConfig.getName())) {
-            removeInstancePartitionsHelper(
-                
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
-                    tierConfig.getName()));
+            
removeInstancePartitionsHelper(InstancePartitionsUtilsHelperFactory.create()
+                .getInstancePartitionsNameForTier(tableConfig.getTableName(), 
tierConfig.getName()));
           }
         }
       }
@@ -412,7 +414,8 @@ public class PinotInstanceAssignmentRestletResource {
   private void removeInstancePartitionsHelper(String instancePartitionsName) {
     try {
       LOGGER.info("Removing instance partitions: {}", instancePartitionsName);
-      
InstancePartitionsUtils.removeInstancePartitions(_resourceManager.getPropertyStore(),
 instancePartitionsName);
+      InstancePartitionsUtilsHelperFactory.create()
+          .removeInstancePartitions(_resourceManager.getPropertyStore(), 
instancePartitionsName);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, "Caught Exception while 
removing the instance partitions",
           Response.Status.INTERNAL_SERVER_ERROR, e);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
index 1fea68c75f..7ef327df18 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
@@ -52,7 +52,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -305,9 +305,8 @@ public class PinotTenantRestletResource {
       @QueryParam("instancePartitionType") String instancePartitionType) {
     String tenantNameWithType = 
InstancePartitionsType.valueOf(instancePartitionType)
         .getInstancePartitionsName(tenantName);
-    InstancePartitions instancePartitions =
-        
InstancePartitionsUtils.fetchInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
-            tenantNameWithType);
+    InstancePartitions instancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+        
.fetchInstancePartitions(_pinotHelixResourceManager.getPropertyStore(), 
tenantNameWithType);
 
     if (instancePartitions == null) {
       throw new ControllerApplicationException(LOGGER,
@@ -358,8 +357,9 @@ public class PinotTenantRestletResource {
   private void persistInstancePartitionsHelper(InstancePartitions 
instancePartitions) {
     try {
       LOGGER.info("Persisting instance partitions: {}", instancePartitions);
-      
InstancePartitionsUtils.persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
-          _pinotHelixResourceManager.getHelixZkManager().getConfigAccessor(),
+      InstancePartitionsUtilsHelperFactory.create()
+          
.persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
+              
_pinotHelixResourceManager.getHelixZkManager().getConfigAccessor(),
           _pinotHelixResourceManager.getHelixClusterName(), 
instancePartitions);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, "Caught Exception while 
persisting the instance partitions",
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 57c75d7618..98a8c467f5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -90,7 +90,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
@@ -142,6 +142,8 @@ import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelper;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import org.apache.pinot.controller.helix.core.lineage.LineageManager;
 import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
@@ -232,11 +234,14 @@ public class PinotHelixResourceManager {
   private SegmentDeletionManager _segmentDeletionManager;
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private TableCache _tableCache;
+
+  private final PinotTableIdealStateHelper _pinotTableIdealStateHelper;
   private final LineageManager _lineageManager;
 
   public PinotHelixResourceManager(String zkURL, String helixClusterName, 
@Nullable String dataDir,
       boolean isSingleTenantCluster, boolean enableBatchMessageMode, int 
deletedSegmentsRetentionInDays,
-      boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
+      boolean enableTieredSegmentAssignment, PinotTableIdealStateHelper 
pinotTableIdealStateHelper,
+      LineageManager lineageManager) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
     _dataDir = dataDir;
@@ -258,6 +263,7 @@ public class PinotHelixResourceManager {
     for (int i = 0; i < _tableUpdaterLocks.length; i++) {
       _tableUpdaterLocks[i] = new Object();
     }
+    _pinotTableIdealStateHelper = pinotTableIdealStateHelper;
     _lineageManager = lineageManager;
   }
 
@@ -265,7 +271,7 @@ public class PinotHelixResourceManager {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), 
controllerConf.getDataDir(),
         controllerConf.tenantIsolationEnabled(), 
controllerConf.getEnableBatchMessageMode(),
         controllerConf.getDeletedSegmentsRetentionInDays(), 
controllerConf.tieredSegmentAssignmentEnabled(),
-        LineageManagerFactory.create(controllerConf));
+        PinotTableIdealStateHelperFactory.create(), 
LineageManagerFactory.create(controllerConf));
   }
 
   /**
@@ -1583,9 +1589,7 @@ public class PinotHelixResourceManager {
     Preconditions.checkState(tableType == TableType.OFFLINE || tableType == 
TableType.REALTIME,
         "Invalid table type: %s", tableType);
 
-    IdealState idealState =
-        
PinotTableIdealStateHelper.buildEmptyFullAutoIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
-            _enableBatchMessageMode);
+    IdealState idealState = 
_pinotTableIdealStateHelper.buildEmptyIdealStateFor(tableConfig, 
_enableBatchMessageMode);
 //    if (tableType == TableType.REALTIME) {
 //      idealState =
 //           
PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
@@ -1763,7 +1767,7 @@ public class PinotHelixResourceManager {
     List<InstancePartitionsType> instancePartitionsTypesToAssign = new 
ArrayList<>();
     for (InstancePartitionsType instancePartitionsType : 
InstancePartitionsType.values()) {
       if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, 
instancePartitionsType)) {
-        if (override || 
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+        if (override || 
InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_propertyStore,
             instancePartitionsType.getInstancePartitionsName(rawTableName)) == 
null) {
           instancePartitionsTypesToAssign.add(instancePartitionsType);
         }
@@ -1786,22 +1790,24 @@ public class PinotHelixResourceManager {
         } else {
           String referenceInstancePartitionsName = 
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
           if (isPreConfigurationBasedAssignment) {
-            InstancePartitions preConfiguredInstancePartitions =
-                
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
-                    referenceInstancePartitionsName, 
instancePartitionsType.getInstancePartitionsName(rawTableName));
+            InstancePartitions preConfiguredInstancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+                .fetchInstancePartitionsWithRename(_propertyStore, 
referenceInstancePartitionsName,
+                    
instancePartitionsType.getInstancePartitionsName(rawTableName));
             instancePartitions = 
instanceAssignmentDriver.assignInstances(instancePartitionsType, 
instanceConfigs, null,
                 preConfiguredInstancePartitions);
             LOGGER.info("Persisting instance partitions: {} (based on {})", 
instancePartitions,
                 preConfiguredInstancePartitions);
           } else {
-            instancePartitions = 
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
-                referenceInstancePartitionsName, 
instancePartitionsType.getInstancePartitionsName(rawTableName));
+            instancePartitions = InstancePartitionsUtilsHelperFactory.create()
+                .fetchInstancePartitionsWithRename(_propertyStore, 
referenceInstancePartitionsName,
+                    
instancePartitionsType.getInstancePartitionsName(rawTableName));
             LOGGER.info("Persisting instance partitions: {} (referencing {})", 
instancePartitions,
                 referenceInstancePartitionsName);
           }
         }
-        InstancePartitionsUtils.persistInstancePartitions(_propertyStore, 
_helixZkManager.getConfigAccessor(),
-            _helixClusterName, instancePartitions);
+        InstancePartitionsUtilsHelperFactory.create()
+            .persistInstancePartitions(_propertyStore, 
_helixZkManager.getConfigAccessor(), _helixClusterName,
+                instancePartitions);
       }
     }
 
@@ -1810,17 +1816,18 @@ public class PinotHelixResourceManager {
         && tableConfig.getInstanceAssignmentConfigMap() != null) {
       for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
         if 
(tableConfig.getInstanceAssignmentConfigMap().containsKey(tierConfig.getName()))
 {
-          if (override || 
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
-              
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType, 
tierConfig.getName()))
-              == null) {
+          if (override || 
InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_propertyStore,
+              InstancePartitionsUtilsHelperFactory.create()
+                  .getInstancePartitionsNameForTier(tableNameWithType, 
tierConfig.getName())) == null) {
             LOGGER.info("Calculating instance partitions for tier: {}, table : 
{}", tierConfig.getName(),
                 tableNameWithType);
             InstancePartitions instancePartitions =
                 instanceAssignmentDriver.assignInstances(tierConfig.getName(), 
instanceConfigs, null,
                     
tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()));
             LOGGER.info("Persisting instance partitions: {}", 
instancePartitions);
-            InstancePartitionsUtils.persistInstancePartitions(_propertyStore, 
_helixZkManager.getConfigAccessor(),
-                _helixClusterName, instancePartitions);
+            InstancePartitionsUtilsHelperFactory.create()
+                .persistInstancePartitions(_propertyStore, 
_helixZkManager.getConfigAccessor(), _helixClusterName,
+                    instancePartitions);
           }
         }
       }
@@ -1971,18 +1978,18 @@ public class PinotHelixResourceManager {
 
     // Remove instance partitions
     if (tableType == TableType.OFFLINE) {
-      InstancePartitionsUtils.removeInstancePartitions(_propertyStore, 
tableNameWithType);
+      
InstancePartitionsUtilsHelperFactory.create().removeInstancePartitions(_propertyStore,
 tableNameWithType);
     } else {
       String rawTableName = TableNameBuilder.extractRawTableName(tableName);
-      InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
+      
InstancePartitionsUtilsHelperFactory.create().removeInstancePartitions(_propertyStore,
           
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
-      InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
+      
InstancePartitionsUtilsHelperFactory.create().removeInstancePartitions(_propertyStore,
           
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
     }
     LOGGER.info("Deleting table {}: Removed instance partitions", 
tableNameWithType);
 
     // Remove tier instance partitions
-    InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, 
tableNameWithType);
+    
InstancePartitionsUtilsHelperFactory.create().removeTierInstancePartitions(_propertyStore,
 tableNameWithType);
     LOGGER.info("Deleting table {}: Removed tier instance partitions", 
tableNameWithType);
 
     // Remove segment lineage
@@ -2256,36 +2263,8 @@ public class PinotHelixResourceManager {
       SegmentAssignment segmentAssignment =
           SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
tableConfig, _controllerMetrics);
       synchronized (getTableUpdaterLock(tableNameWithType)) {
-        Map<InstancePartitionsType, InstancePartitions> 
finalInstancePartitionsMap = instancePartitionsMap;
-        HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, 
idealState -> {
-          assert idealState != null;
-          Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
-          Map<String, List<String>> currentAssignmentList = 
idealState.getRecord().getListFields();
-          if (currentAssignment.containsKey(segmentName) && 
currentAssignmentList.containsKey(segmentName)) {
-            LOGGER.warn("Segment: {} already exists in the IdealState for 
table: {}, do not update", segmentName,
-                tableNameWithType);
-          } else {
-            List<String> assignedInstances =
-                segmentAssignment.assignSegment(segmentName, 
currentAssignment, finalInstancePartitionsMap);
-            LOGGER.info("Assigning segment: {} to instances: {} for table: 
{}", segmentName, assignedInstances,
-                tableNameWithType);
-            TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-            if (tableType == TableType.REALTIME) {
-              // TODO: Once REALTIME uses FULL-AUTO only the listFields should 
be updated
-              currentAssignmentList.put(segmentName, Collections.emptyList()
-                  /* 
SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */);
-//              currentAssignment.put(segmentName,
-//                  
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
-            } else {
-              // TODO: Assess whether to pass in an empty instance list or to 
set the preferred list
-              currentAssignmentList.put(segmentName, Collections.emptyList()
-                  /* 
SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */);
-            }
-            // currentAssignment.put(segmentName,
-            //     
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
-          }
-          return idealState;
-        });
+        PinotTableIdealStateHelperFactory.create()
+            .assignSegment(_helixZkManager, tableNameWithType, segmentName, 
segmentAssignment, instancePartitionsMap);
         LOGGER.info("Added segment: {} to IdealState for table: {}", 
segmentName, tableNameWithType);
       }
     } catch (Exception e) {
@@ -2306,22 +2285,23 @@ public class PinotHelixResourceManager {
       TableConfig tableConfig) {
     if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
       return Collections.singletonMap(InstancePartitionsType.OFFLINE,
-          
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixZkManager, 
tableConfig,
+          
InstancePartitionsUtilsHelperFactory.create().fetchOrComputeInstancePartitions(_helixZkManager,
 tableConfig,
               InstancePartitionsType.OFFLINE));
     }
     if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) {
       // In an upsert enabled LLC realtime table, all segments of the same 
partition are collocated on the same server
       // -- consuming or completed. So it is fine to use CONSUMING as the 
InstancePartitionsType.
       return Collections.singletonMap(InstancePartitionsType.CONSUMING,
-          
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixZkManager, 
tableConfig,
+          
InstancePartitionsUtilsHelperFactory.create().fetchOrComputeInstancePartitions(_helixZkManager,
 tableConfig,
               InstancePartitionsType.CONSUMING));
     }
     // for non-upsert realtime tables, if COMPLETED instance partitions is 
available or tag override for
     // completed segments is provided in the tenant config, COMPLETED instance 
partitions type is used
     // otherwise CONSUMING instance partitions type is used.
     InstancePartitionsType instancePartitionsType = 
InstancePartitionsType.COMPLETED;
-    InstancePartitions instancePartitions = 
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
-        InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, 
instancePartitionsType.toString()));
+    InstancePartitions instancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+        .fetchInstancePartitions(_propertyStore, 
InstancePartitionsUtilsHelperFactory.create()
+            .getInstancePartitionsName(tableNameWithType, 
instancePartitionsType.toString()));
     if (instancePartitions != null) {
       return Collections.singletonMap(instancePartitionsType, 
instancePartitions);
     }
@@ -2329,8 +2309,8 @@ public class PinotHelixResourceManager {
     if (tagOverrideConfig == null || tagOverrideConfig.getRealtimeCompleted() 
== null) {
       instancePartitionsType = InstancePartitionsType.CONSUMING;
     }
-    return Collections.singletonMap(instancePartitionsType,
-        
InstancePartitionsUtils.computeDefaultInstancePartitions(_helixZkManager, 
tableConfig, instancePartitionsType));
+    return Collections.singletonMap(instancePartitionsType, 
InstancePartitionsUtilsHelperFactory.create()
+        .computeDefaultInstancePartitions(_helixZkManager, tableConfig, 
instancePartitionsType));
   }
 
   public boolean isUpsertTable(String tableName) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
deleted file mode 100644
index 37c4b7555b..0000000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core;
-
-import java.util.List;
-import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.builder.CustomModeISBuilder;
-import org.apache.helix.model.builder.FullAutoModeISBuilder;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.spi.utils.retry.RetryPolicies;
-import org.apache.pinot.spi.utils.retry.RetryPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class PinotTableIdealStateHelper {
-  private PinotTableIdealStateHelper() {
-  }
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTableIdealStateHelper.class);
-  private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY =
-      RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L);
-
-  public static IdealState buildEmptyIdealStateFor(String tableNameWithType, 
int numReplicas,
-      boolean enableBatchMessageMode) {
-    LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", 
tableNameWithType, numReplicas);
-    CustomModeISBuilder customModeIdealStateBuilder = new 
CustomModeISBuilder(tableNameWithType);
-    customModeIdealStateBuilder
-        
.setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
-        
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1);
-    IdealState idealState = customModeIdealStateBuilder.build();
-    idealState.setInstanceGroupTag(tableNameWithType);
-    idealState.setBatchMessageMode(enableBatchMessageMode);
-    return idealState;
-  }
-
-  public static IdealState buildEmptyFullAutoIdealStateFor(String 
tableNameWithType, int numReplicas,
-      boolean enableBatchMessageMode) {
-    LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: 
{}", tableNameWithType, numReplicas);
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-    String stateModel;
-    if (tableType == null) {
-      throw new RuntimeException("Failed to get table type from table name: " 
+ tableNameWithType);
-    } else if (TableType.OFFLINE.equals(tableType)) {
-      stateModel =
-          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    } else {
-      stateModel =
-          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    }
-
-    // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, 
crushed auto-rebalance by default
-    // TODO: The state model used only works for OFFLINE tables today. Add 
support for REALTIME state model too
-    FullAutoModeISBuilder idealStateBuilder = new 
FullAutoModeISBuilder(tableNameWithType);
-    idealStateBuilder
-        .setStateModel(stateModel)
-        
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
-        // TODO: Revisit the rebalance strategy to use (maybe we add a custom 
one)
-        .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
-    // The below config guarantees if active number of replicas is no less 
than minimum active replica, there will
-    // not be partition movements happened.
-    // Set min active replicas to 0 and rebalance delay to 5 minutes so that 
if any master goes offline, Helix
-    // controller waits at most 5 minutes and then re-calculate the 
participant assignment.
-    // TODO: Assess which of these values need to be tweaked, removed, and 
what additional values that need to be added
-    idealStateBuilder.setMinActiveReplica(numReplicas - 1);
-    idealStateBuilder.setRebalanceDelay(300_000);
-    idealStateBuilder.enableDelayRebalance();
-    // Set instance group tag
-    IdealState idealState = idealStateBuilder.build();
-    idealState.setInstanceGroupTag(tableNameWithType);
-    idealState.setBatchMessageMode(enableBatchMessageMode);
-    return idealState;
-  }
-
-  /**
-   * Fetches the list of {@link PartitionGroupMetadata} for the new partition 
groups for the stream,
-   * with the help of the {@link PartitionGroupConsumptionStatus} of the 
current partitionGroups.
-   *
-   * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed:
-   *
-   * 1)
-   * The current {@link PartitionGroupConsumptionStatus} is used to determine 
the offsets that have been consumed for
-   * a partition group.
-   * An example of where the offsets would be used:
-   * e.g. If partition group 1 contains shardId 1, with status DONE and 
endOffset 150. There's 2 possibilities:
-   * 1) the stream indicates that shardId's last offset is 200.
-   * This tells Pinot that partition group 1 still has messages which haven't 
been consumed, and must be included in
-   * the response.
-   * 2) the stream indicates that shardId's last offset is 150,
-   * This tells Pinot that all messages of partition group 1 have been 
consumed, and it need not be included in the
-   * response.
-   * Thus, this call will skip a partition group when it has reached end of 
life and all messages from that partition
-   * group have been consumed.
-   *
-   * The current {@link PartitionGroupConsumptionStatus} is also used to know 
about existing groupings of partitions,
-   * and accordingly make the new partition groups.
-   * e.g. Assume that partition group 1 has status IN_PROGRESS and contains 
shards 0,1,2
-   * and partition group 2 has status DONE and contains shards 3,4.
-   * In the above example, the 
<code>partitionGroupConsumptionStatusList</code> indicates that
-   * the collection of shards in partition group 1, should remain unchanged in 
the response,
-   * whereas shards 3,4 can be added to new partition groups if needed.
-   *
-   * @param streamConfig the streamConfig from the tableConfig
-   * @param partitionGroupConsumptionStatusList List of {@link 
PartitionGroupConsumptionStatus} for the current
-   *                                            partition groups.
-   *                                          The size of this list is equal 
to the number of partition groups,
-   *                                          and is created using the latest 
segment zk metadata.
-   */
-  public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(StreamConfig streamConfig,
-      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
-    PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
-        new PartitionGroupMetadataFetcher(streamConfig, 
partitionGroupConsumptionStatusList);
-    try {
-      
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
-      return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
-    } catch (Exception e) {
-      Exception fetcherException = 
partitionGroupMetadataFetcher.getException();
-      LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of 
table: {}", streamConfig.getTopicName(),
-          streamConfig.getTableNameWithType(), fetcherException);
-      throw new RuntimeException(fetcherException);
-    }
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 6d869b86c1..8248a04283 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -25,7 +25,7 @@ import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
@@ -76,9 +76,9 @@ public class InstanceAssignmentDriver {
 
   public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig) {
-    return getInstancePartitions(
-        
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
 tierName),
-        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null);
+    return getInstancePartitions(InstancePartitionsUtilsHelperFactory.create()
+            .getInstancePartitionsNameForTier(_tableConfig.getTableName(), 
tierName), instanceAssignmentConfig,
+        instanceConfigs, existingInstancePartitions, null);
   }
 
   private InstancePartitions getInstancePartitions(String 
instancePartitionsName,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java
new file mode 100644
index 0000000000..e6cef02d99
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.idealstatehelper;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import 
org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
+import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
+import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultPinotTableIdealStateHelper implements 
PinotTableIdealStateHelper {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultPinotTableIdealStateHelper.class);
+  private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY =
+      RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L);
+
+  @Override
+  public IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean 
enableBatchMessageMode) {
+    String tableNameWithType = tableConfig.getTableName();
+    int numReplicas = tableConfig.getReplication();
+    LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", 
tableNameWithType, numReplicas);
+    CustomModeISBuilder customModeIdealStateBuilder = new 
CustomModeISBuilder(tableNameWithType);
+    customModeIdealStateBuilder
+        
.setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+        
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1);
+    IdealState idealState = customModeIdealStateBuilder.build();
+    idealState.setInstanceGroupTag(tableNameWithType);
+    idealState.setBatchMessageMode(enableBatchMessageMode);
+    return idealState;
+  }
+
+  @Override
+  public void assignSegment(HelixManager helixManager, String 
tableNameWithType, String segmentName,
+      SegmentAssignment segmentAssignment, Map<InstancePartitionsType, 
InstancePartitions> instancePartitionsMap) {
+    HelixHelper.updateIdealState(helixManager, tableNameWithType, idealState 
-> {
+      assert idealState != null;
+      Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
+      if (currentAssignment.containsKey(segmentName)) {
+        LOGGER.warn("Segment: {} already exists in the IdealState for table: 
{}, do not update", segmentName,
+            tableNameWithType);
+      } else {
+        List<String> assignedInstances =
+            segmentAssignment.assignSegment(segmentName, currentAssignment, 
instancePartitionsMap);
+        LOGGER.info("Assigning segment: {} to instances: {} for table: {}", 
segmentName, assignedInstances,
+            tableNameWithType);
+        currentAssignment.put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
+            CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE));
+      }
+      return idealState;
+    });
+  }
+
+  @Override
+  public void updateInstanceStatesForNewConsumingSegment(IdealState 
idealState, @Nullable String committingSegmentName,
+      @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
+    if (committingSegmentName != null) {
+      // Change committing segment state to ONLINE
+      Set<String> instances = 
instanceStatesMap.get(committingSegmentName).keySet();
+      instanceStatesMap.put(committingSegmentName, 
SegmentAssignmentUtils.getInstanceStateMap(instances,
+          CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE));
+      LOGGER.info("Updating segment: {} to ONLINE state", 
committingSegmentName);
+    }
+
+    // There used to be a race condition in pinot (caused by heavy GC on the 
controller during segment commit)
+    // that ended up creating multiple consuming segments for the same stream 
partition, named somewhat like
+    // tableName__1__25__20210920T190005Z and 
tableName__1__25__20210920T190007Z. It was fixed by checking the
+    // Zookeeper Stat object before updating the segment metadata.
+    // These conditions can happen again due to manual operations considered 
as fixes in Issues #5559 and #5263
+    // The following check prevents the table from going into such a state 
(but does not prevent the root cause
+    // of attempting such a zk update).
+    if (newSegmentName != null) {
+      LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName);
+      int partitionId = newLLCSegmentName.getPartitionGroupId();
+      int seqNum = newLLCSegmentName.getSequenceNumber();
+      for (String segmentNameStr : instanceStatesMap.keySet()) {
+        LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr);
+        if (llcSegmentName == null) {
+          // skip the segment name if the name is not in low-level consumer 
format
+          // such segment name can appear for uploaded segment
+          LOGGER.debug("Skip segment name {} not in low-level consumer 
format", segmentNameStr);
+          continue;
+        }
+        if (llcSegmentName.getPartitionGroupId() == partitionId && 
llcSegmentName.getSequenceNumber() == seqNum) {
+          String errorMsg =
+              String.format("Segment %s is a duplicate of existing segment 
%s", newSegmentName, segmentNameStr);
+          LOGGER.error(errorMsg);
+          throw new HelixHelper.PermanentUpdaterException(errorMsg);
+        }
+      }
+      // Assign instances to the new segment and add instances as state 
CONSUMING
+      List<String> instancesAssigned =
+          segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, 
instancePartitionsMap);
+      instanceStatesMap.put(newSegmentName, 
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
+          CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING));
+      LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", 
newSegmentName, instancesAssigned);
+    }
+  }
+
+  @Override
+  public List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(StreamConfig streamConfig,
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
+    PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
+        new PartitionGroupMetadataFetcher(streamConfig, 
partitionGroupConsumptionStatusList);
+    try {
+      
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
+      return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
+    } catch (Exception e) {
+      Exception fetcherException = 
partitionGroupMetadataFetcher.getException();
+      LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of 
table: {}", streamConfig.getTopicName(),
+          streamConfig.getTableNameWithType(), fetcherException);
+      throw new RuntimeException(fetcherException);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java
new file mode 100644
index 0000000000..7b1889fdd7
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.idealstatehelper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.FullAutoModeISBuilder;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import 
org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
+import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FullAutoPinotTableIdealStateHelper extends 
DefaultPinotTableIdealStateHelper {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FullAutoPinotTableIdealStateHelper.class);
+
+  @Override
+  public IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean 
enableBatchMessageMode) {
+    String tableNameWithType = tableConfig.getTableName();
+    int numReplicas = tableConfig.getReplication();
+
+    LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: 
{}", tableNameWithType, numReplicas);
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    String stateModel;
+    if (tableType == null) {
+      throw new RuntimeException("Failed to get table type from table name: " 
+ tableNameWithType);
+    } else if (TableType.OFFLINE.equals(tableType)) {
+      stateModel =
+          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    } else {
+      stateModel =
+          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    }
+
+    // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, 
crushed auto-rebalance by default
+    // TODO: The state model used only works for OFFLINE tables today. Add 
support for REALTIME state model too
+    FullAutoModeISBuilder idealStateBuilder = new 
FullAutoModeISBuilder(tableNameWithType);
+    idealStateBuilder
+        .setStateModel(stateModel)
+        
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
+        // TODO: Revisit the rebalance strategy to use (maybe we add a custom 
one)
+        .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
+    // The below config guarantees if active number of replicas is no less 
than minimum active replica, there will
+    // not be partition movements happened.
+    // Set min active replicas to 0 and rebalance delay to 5 minutes so that 
if any master goes offline, Helix
+    // controller waits at most 5 minutes and then re-calculate the 
participant assignment.
+    // TODO: Assess which of these values need to be tweaked, removed, and 
what additional values that need to be added
+    idealStateBuilder.setMinActiveReplica(numReplicas - 1);
+    idealStateBuilder.setRebalanceDelay(300_000);
+    idealStateBuilder.enableDelayRebalance();
+    // Set instance group tag
+    IdealState idealState = idealStateBuilder.build();
+    idealState.setInstanceGroupTag(tableNameWithType);
+    idealState.setBatchMessageMode(enableBatchMessageMode);
+    return idealState;
+  }
+
+  @Override
+  public void assignSegment(HelixManager helixManager, String 
tableNameWithType, String segmentName,
+      SegmentAssignment segmentAssignment, Map<InstancePartitionsType, 
InstancePartitions> instancePartitionsMap) {
+    HelixHelper.updateIdealState(helixManager, tableNameWithType, idealState 
-> {
+      assert idealState != null;
+      Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
+      Map<String, List<String>> currentAssignmentList = 
idealState.getRecord().getListFields();
+      if (currentAssignment.containsKey(segmentName) && 
currentAssignmentList.containsKey(segmentName)) {
+        LOGGER.warn("Segment: {} already exists in the IdealState for table: 
{}, do not update", segmentName,
+            tableNameWithType);
+      } else {
+        List<String> assignedInstances =
+            segmentAssignment.assignSegment(segmentName, currentAssignment, 
instancePartitionsMap);
+        LOGGER.info("Assigning segment: {} to instances: {} for table: {}", 
segmentName, assignedInstances,
+            tableNameWithType);
+        currentAssignmentList.put(segmentName, Collections.emptyList());
+      }
+      return idealState;
+    });
+  }
+
+  @Override
+  public void updateInstanceStatesForNewConsumingSegment(IdealState 
idealState, @Nullable String committingSegmentName,
+      @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
+    Map<String, List<String>> segmentList = 
idealState.getRecord().getListFields();
+    // TODO: Need to figure out the best way to handle committed segments' 
state change
+    if (committingSegmentName != null) {
+      // Change committing segment state to ONLINE
+//      Set<String> instances = 
instanceStatesMap.get(committingSegmentName).keySet();
+//      instanceStatesMap.put(committingSegmentName,
+//          SegmentAssignmentUtils.getInstanceStateMap(instances, 
SegmentStateModel.ONLINE));
+//      instanceStatesList.put(newSegmentName, Collections.emptyList()
+//          
/*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
+      LOGGER.info("Updating segment: {} to ONLINE state", 
committingSegmentName);
+    }
+
+    // There used to be a race condition in pinot (caused by heavy GC on the 
controller during segment commit)
+    // that ended up creating multiple consuming segments for the same stream 
partition, named somewhat like
+    // tableName__1__25__20210920T190005Z and 
tableName__1__25__20210920T190007Z. It was fixed by checking the
+    // Zookeeper Stat object before updating the segment metadata.
+    // These conditions can happen again due to manual operations considered 
as fixes in Issues #5559 and #5263
+    // The following check prevents the table from going into such a state 
(but does not prevent the root cause
+    // of attempting such a zk update).
+    if (newSegmentName != null) {
+      LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName);
+      int partitionId = newLLCSegmentName.getPartitionGroupId();
+      int seqNum = newLLCSegmentName.getSequenceNumber();
+      for (String segmentNameStr : instanceStatesMap.keySet()) {
+        LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr);
+        if (llcSegmentName == null) {
+          // skip the segment name if the name is not in low-level consumer 
format
+          // such segment name can appear for uploaded segment
+          LOGGER.debug("Skip segment name {} not in low-level consumer 
format", segmentNameStr);
+          continue;
+        }
+        if (llcSegmentName.getPartitionGroupId() == partitionId && 
llcSegmentName.getSequenceNumber() == seqNum) {
+          String errorMsg =
+              String.format("Segment %s is a duplicate of existing segment 
%s", newSegmentName, segmentNameStr);
+          LOGGER.error(errorMsg);
+          throw new HelixHelper.PermanentUpdaterException(errorMsg);
+        }
+      }
+      // Assign instances to the new segment and add instances as state 
CONSUMING
+      List<String> instancesAssigned =
+          segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, 
instancePartitionsMap);
+      // No need to check for tableType as offline tables can never go to 
CONSUMING state. All callers are for REALTIME
+//      instanceStatesMap.put(newSegmentName,
+//          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
+      // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the 
map. Uncomment below lines to update list.
+      //       Assess whether we should set am empty InstanceStateList for the 
segment or not. i.e. do we support
+      //       this preferred list concept, and does Helix-Auto even allow 
preferred list concept (from code reading it
+      //       looks like it does)
+      segmentList.put(newSegmentName, Collections.emptyList()
+          /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
+      LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", 
newSegmentName, instancesAssigned);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java
new file mode 100644
index 0000000000..a30872df83
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.idealstatehelper;
+
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+
+
+public interface PinotTableIdealStateHelper {
+
+  /**
+   * Builds an empty ideal state for the Pinot table.
+   * @param tableConfig table config.
+   * @param enableBatchMessageMode whether to enable batch message mode when 
building the ideal state.
+   */
+  IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean 
enableBatchMessageMode);
+
+  void assignSegment(HelixManager helixManager, String tableNameWithType, 
String segmentName,
+      SegmentAssignment segmentAssignment, Map<InstancePartitionsType, 
InstancePartitions> instancePartitionsMap);
+
+  void updateInstanceStatesForNewConsumingSegment(IdealState idealState, 
@Nullable String committingSegmentName,
+      @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap);
+
+  /**
+   * Fetches the list of {@link PartitionGroupMetadata} for the new partition 
groups for the stream,
+   * with the help of the {@link PartitionGroupConsumptionStatus} of the 
current partitionGroups.
+   *
+   * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed:
+   *
+   * 1)
+   * The current {@link PartitionGroupConsumptionStatus} is used to determine 
the offsets that have been consumed for
+   * a partition group.
+   * An example of where the offsets would be used:
+   * e.g. If partition group 1 contains shardId 1, with status DONE and 
endOffset 150. There's 2 possibilities:
+   * 1) the stream indicates that shardId's last offset is 200.
+   * This tells Pinot that partition group 1 still has messages which haven't 
been consumed, and must be included in
+   * the response.
+   * 2) the stream indicates that shardId's last offset is 150,
+   * This tells Pinot that all messages of partition group 1 have been 
consumed, and it need not be included in the
+   * response.
+   * Thus, this call will skip a partition group when it has reached end of 
life and all messages from that partition
+   * group have been consumed.
+   *
+   * The current {@link PartitionGroupConsumptionStatus} is also used to know 
about existing groupings of partitions,
+   * and accordingly make the new partition groups.
+   * e.g. Assume that partition group 1 has status IN_PROGRESS and contains 
shards 0,1,2
+   * and partition group 2 has status DONE and contains shards 3,4.
+   * In the above example, the 
<code>partitionGroupConsumptionStatusList</code> indicates that
+   * the collection of shards in partition group 1, should remain unchanged in 
the response,
+   * whereas shards 3,4 can be added to new partition groups if needed.
+   *
+   * @param streamConfig the streamConfig from the tableConfig
+   * @param partitionGroupConsumptionStatusList List of {@link 
PartitionGroupConsumptionStatus} for the current
+   *                                            partition groups.
+   *                                          The size of this list is equal 
to the number of partition groups,
+   *                                          and is created using the latest 
segment zk metadata.
+   */
+  List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig 
streamConfig,
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList);
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java
new file mode 100644
index 0000000000..9327ecba04
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.idealstatehelper;
+
+import org.apache.pinot.controller.ControllerConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PinotTableIdealStateHelperFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTableIdealStateHelperFactory.class);
+  private static PinotTableIdealStateHelper _instance = null;
+  private static ControllerConf _controllerConf;
+
+  private PinotTableIdealStateHelperFactory() {
+  }
+
+  public static void init(ControllerConf controllerConf) {
+    _controllerConf = controllerConf;
+  }
+
+  public static PinotTableIdealStateHelper create() {
+    if (_instance == null) {
+      String pinotTableIdealstateHelperClassPath = 
_controllerConf.getPinotTableIdealstateHelperClass();
+      try {
+        _instance = (PinotTableIdealStateHelper) 
Class.forName(pinotTableIdealstateHelperClassPath).newInstance();
+      } catch (Exception e) {
+        LOGGER.error("PinotTableIdealStateHelper not found: {}", 
pinotTableIdealstateHelperClassPath);
+        throw new RuntimeException(e);
+      }
+    }
+    return _instance;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index c9850856cd..94d158220c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -36,7 +36,7 @@ import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -79,7 +79,7 @@ public class MissingConsumingSegmentFinder {
     _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>();
     streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
     try {
-      PinotTableIdealStateHelper.getPartitionGroupMetadataList(streamConfig, 
Collections.emptyList())
+      
PinotTableIdealStateHelperFactory.create().getPartitionGroupMetadataList(streamConfig,
 Collections.emptyList())
           .forEach(metadata -> {
             
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
           });
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 40215c43a4..a30731274c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -50,7 +50,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.common.messages.ForceCommitMessage;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
@@ -68,9 +68,9 @@ import 
org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
 import org.apache.pinot.controller.api.resources.Constants;
 import org.apache.pinot.controller.api.resources.PauseStatus;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
@@ -336,7 +336,7 @@ public class PinotLLCRealtimeSegmentManager {
       String segmentName =
           setupNewPartitionGroup(tableConfig, streamConfig, 
partitionGroupMetadata, currentTimeMs, instancePartitions,
               numPartitionGroups, numReplicas);
-      updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
instancesStateList, null, segmentName,
+      updateInstanceStatesForNewConsumingSegment(idealState, null, segmentName,
           segmentAssignment, instancePartitionsMap);
     }
 
@@ -362,7 +362,7 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   InstancePartitions getConsumingInstancePartitions(TableConfig tableConfig) {
     try {
-      return 
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager, 
tableConfig,
+      return 
InstancePartitionsUtilsHelperFactory.create().fetchOrComputeInstancePartitions(_helixManager,
 tableConfig,
           InstancePartitionsType.CONSUMING);
     } catch (Exception e) {
       _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), 
ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES,
@@ -817,7 +817,7 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig 
streamConfig,
       List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList) {
-    return 
PinotTableIdealStateHelper.getPartitionGroupMetadataList(streamConfig,
+    return 
PinotTableIdealStateHelperFactory.create().getPartitionGroupMetadataList(streamConfig,
         currentPartitionGroupConsumptionStatusList);
   }
 
@@ -1000,8 +1000,7 @@ public class PinotLLCRealtimeSegmentManager {
         throw new HelixHelper.PermanentUpdaterException(
             "Exceeded max segment completion time for segment " + 
committingSegmentName);
       }
-      
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
-          idealState.getRecord().getListFields(), committingSegmentName,
+      updateInstanceStatesForNewConsumingSegment(idealState, 
committingSegmentName,
           isTablePaused(idealState) ? null : newSegmentName, 
segmentAssignment, instancePartitionsMap);
       return idealState;
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
@@ -1012,61 +1011,12 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   @VisibleForTesting
-  void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, 
String>> instanceStatesMap,
-      Map<String, List<String>> instanceStatesList,
-      @Nullable String committingSegmentName, @Nullable String newSegmentName, 
SegmentAssignment segmentAssignment,
+  void updateInstanceStatesForNewConsumingSegment(IdealState idealState, 
@Nullable String committingSegmentName,
+      @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
-    // TODO: Need to figure out the best way to handle committed segments' 
state change
-    if (committingSegmentName != null) {
-      // Change committing segment state to ONLINE
-//      Set<String> instances = 
instanceStatesMap.get(committingSegmentName).keySet();
-//      instanceStatesMap.put(committingSegmentName,
-//          SegmentAssignmentUtils.getInstanceStateMap(instances, 
SegmentStateModel.ONLINE));
-//      instanceStatesList.put(newSegmentName, Collections.emptyList()
-//          
/*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
-      LOGGER.info("Updating segment: {} to ONLINE state", 
committingSegmentName);
-    }
-
-    // There used to be a race condition in pinot (caused by heavy GC on the 
controller during segment commit)
-    // that ended up creating multiple consuming segments for the same stream 
partition, named somewhat like
-    // tableName__1__25__20210920T190005Z and 
tableName__1__25__20210920T190007Z. It was fixed by checking the
-    // Zookeeper Stat object before updating the segment metadata.
-    // These conditions can happen again due to manual operations considered 
as fixes in Issues #5559 and #5263
-    // The following check prevents the table from going into such a state 
(but does not prevent the root cause
-    // of attempting such a zk update).
-    if (newSegmentName != null) {
-      LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName);
-      int partitionId = newLLCSegmentName.getPartitionGroupId();
-      int seqNum = newLLCSegmentName.getSequenceNumber();
-      for (String segmentNameStr : instanceStatesMap.keySet()) {
-        LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr);
-        if (llcSegmentName == null) {
-          // skip the segment name if the name is not in low-level consumer 
format
-          // such segment name can appear for uploaded segment
-          LOGGER.debug("Skip segment name {} not in low-level consumer 
format", segmentNameStr);
-          continue;
-        }
-        if (llcSegmentName.getPartitionGroupId() == partitionId && 
llcSegmentName.getSequenceNumber() == seqNum) {
-          String errorMsg =
-              String.format("Segment %s is a duplicate of existing segment 
%s", newSegmentName, segmentNameStr);
-          LOGGER.error(errorMsg);
-          throw new HelixHelper.PermanentUpdaterException(errorMsg);
-        }
-      }
-      // Assign instances to the new segment and add instances as state 
CONSUMING
-      List<String> instancesAssigned =
-          segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, 
instancePartitionsMap);
-      // No need to check for tableType as offline tables can never go to 
CONSUMING state. All callers are for REALTIME
-//      instanceStatesMap.put(newSegmentName,
-//          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
-      // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the 
map. Uncomment below lines to update list.
-      //       Assess whether we should set am empty InstanceStateList for the 
segment or not. i.e. do we support
-      //       this preferred list concept, and does Helix-Auto even allow 
preferred list concept (from code reading it
-      //       looks like it does)
-      instanceStatesList.put(newSegmentName, Collections.emptyList()
-          /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
-      LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", 
newSegmentName, instancesAssigned);
-    }
+    PinotTableIdealStateHelperFactory.create()
+        .updateInstanceStatesForNewConsumingSegment(idealState, 
committingSegmentName, newSegmentName,
+            segmentAssignment, instancePartitionsMap);
   }
 
   /*
@@ -1222,14 +1172,14 @@ public class PinotLLCRealtimeSegmentManager {
                       
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
               createNewSegmentZKMetadata(tableConfig, streamConfig, 
newLLCSegmentName, currentTimeMs,
                   committingSegmentDescriptor, latestSegmentZKMetadata, 
instancePartitions, numPartitions, numReplicas);
-              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
instanceStatesList, latestSegmentName,
-                  newSegmentName, segmentAssignment, instancePartitionsMap);
+              updateInstanceStatesForNewConsumingSegment(idealState, 
latestSegmentName, newSegmentName,
+                  segmentAssignment, instancePartitionsMap);
             } else { // partition group reached end of life
               LOGGER.info("PartitionGroup: {} has reached end of life. 
Updating ideal state for segment: {}. "
                       + "Skipping creation of new ZK metadata and new segment 
in ideal state", partitionGroupId,
                   latestSegmentName);
-              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
instanceStatesList, latestSegmentName,
-                  null, segmentAssignment, instancePartitionsMap);
+              updateInstanceStatesForNewConsumingSegment(idealState, 
latestSegmentName, null, segmentAssignment,
+                  instancePartitionsMap);
             }
           }
           // else, the metadata should be IN_PROGRESS, which is the right 
state for a consuming segment.
@@ -1253,7 +1203,7 @@ public class PinotLLCRealtimeSegmentManager {
                     partitionGroupIdToSmallestStreamOffset, 
tableConfig.getTableName(), offsetFactory,
                     latestSegmentZKMetadata.getStartOffset()); // segments are 
OFFLINE; start from beginning
             createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
-                newPartitionGroupMetadataList, instancePartitions, 
instanceStatesMap, instanceStatesList,
+                newPartitionGroupMetadataList, instancePartitions, idealState,
                 segmentAssignment, instancePartitionsMap, startOffset);
           } else {
             if (newPartitionGroupSet.contains(partitionGroupId)) {
@@ -1272,7 +1222,7 @@ public class PinotLLCRealtimeSegmentManager {
                         partitionGroupIdToSmallestStreamOffset, 
tableConfig.getTableName(), offsetFactory,
                         latestSegmentZKMetadata.getEndOffset());
                 createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
-                    newPartitionGroupMetadataList, instancePartitions, 
instanceStatesMap, instanceStatesList,
+                    newPartitionGroupMetadataList, instancePartitions, 
idealState,
                     segmentAssignment, instancePartitionsMap, startOffset);
               } else {
                 LOGGER.error(
@@ -1320,8 +1270,8 @@ public class PinotLLCRealtimeSegmentManager {
                 partitionGroupId, realtimeTableName);
             _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
           }
-          updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
instanceStatesList, previousConsumingSegment,
-              latestSegmentName, segmentAssignment, instancePartitionsMap);
+          updateInstanceStatesForNewConsumingSegment(idealState, 
previousConsumingSegment, latestSegmentName,
+              segmentAssignment, instancePartitionsMap);
         } else {
           LOGGER.error("Got unexpected status: {} in segment ZK metadata for 
segment: {}",
               latestSegmentZKMetadata.getStatus(), latestSegmentName);
@@ -1336,8 +1286,8 @@ public class PinotLLCRealtimeSegmentManager {
         String newSegmentName =
             setupNewPartitionGroup(tableConfig, streamConfig, 
partitionGroupMetadata, currentTimeMs, instancePartitions,
                 numPartitions, numReplicas);
-        updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
instanceStatesList, null, newSegmentName,
-            segmentAssignment, instancePartitionsMap);
+        updateInstanceStatesForNewConsumingSegment(idealState, null, 
newSegmentName, segmentAssignment,
+            instancePartitionsMap);
       }
     }
 
@@ -1347,8 +1297,7 @@ public class PinotLLCRealtimeSegmentManager {
   private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig 
streamConfig,
       SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs,
       List<PartitionGroupMetadata> newPartitionGroupMetadataList, 
InstancePartitions instancePartitions,
-      Map<String, Map<String, String>> instanceStatesMap, Map<String, 
List<String>> instancesStateList,
-      SegmentAssignment segmentAssignment,
+      IdealState idealState, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
StreamPartitionMsgOffset startOffset) {
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
     int numPartitions = newPartitionGroupMetadataList.size();
@@ -1359,7 +1308,7 @@ public class PinotLLCRealtimeSegmentManager {
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, 
currentTimeMs, committingSegmentDescriptor,
         latestSegmentZKMetadata, instancePartitions, numPartitions, 
numReplicas);
     String newSegmentName = newLLCSegmentName.getSegmentName();
-    updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
instancesStateList, null, newSegmentName,
+    updateInstanceStatesForNewConsumingSegment(idealState, null, 
newSegmentName,
         segmentAssignment, instancePartitionsMap);
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index f97aa7b06a..dcba5949d3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -48,7 +48,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ControllerTimer;
 import org.apache.pinot.common.tier.PinotServerTierStorage;
@@ -589,11 +589,11 @@ public class TableRebalancer {
             "COMPLETED segments should not be relocated, skipping 
fetching/computing COMPLETED instance partitions "
                 + "for table: {}", tableNameWithType);
         if (!dryRun) {
-          String instancePartitionsName = 
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
-              InstancePartitionsType.COMPLETED.toString());
+          String instancePartitionsName = 
InstancePartitionsUtilsHelperFactory.create()
+              .getInstancePartitionsName(tableNameWithType, 
InstancePartitionsType.COMPLETED.toString());
           LOGGER.info("Removing instance partitions: {} from ZK if it exists", 
instancePartitionsName);
-          
InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(),
-              instancePartitionsName);
+          InstancePartitionsUtilsHelperFactory.create()
+              .removeInstancePartitions(_helixManager.getHelixPropertyStore(), 
instancePartitionsName);
         }
       }
     }
@@ -606,10 +606,10 @@ public class TableRebalancer {
   private Pair<InstancePartitions, Boolean> getInstancePartitions(TableConfig 
tableConfig,
       InstancePartitionsType instancePartitionsType, boolean 
reassignInstances, boolean bootstrap, boolean dryRun) {
     String tableNameWithType = tableConfig.getTableName();
-    String instancePartitionsName =
-        InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, 
instancePartitionsType.toString());
-    InstancePartitions existingInstancePartitions =
-        
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
 instancePartitionsName);
+    String instancePartitionsName = 
InstancePartitionsUtilsHelperFactory.create()
+        .getInstancePartitionsName(tableNameWithType, 
instancePartitionsType.toString());
+    InstancePartitions existingInstancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+        .fetchInstancePartitions(_helixManager.getHelixPropertyStore(), 
instancePartitionsName);
 
     if (reassignInstances) {
       if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, 
instancePartitionsType)) {
@@ -630,14 +630,15 @@ public class TableRebalancer {
           instancePartitionsUnchanged = 
instancePartitions.equals(existingInstancePartitions);
           if (!dryRun && !instancePartitionsUnchanged) {
             LOGGER.info("Persisting instance partitions: {} to ZK", 
instancePartitions);
-            
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
-                _helixManager.getConfigAccessor(), 
_helixManager.getClusterName(), instancePartitions);
+            InstancePartitionsUtilsHelperFactory.create()
+                
.persistInstancePartitions(_helixManager.getHelixPropertyStore(), 
_helixManager.getConfigAccessor(),
+                    _helixManager.getClusterName(), instancePartitions);
           }
         } else {
           String referenceInstancePartitionsName = 
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
           if (isPreConfigurationBasedAssignment) {
-            InstancePartitions preConfiguredInstancePartitions =
-                
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
+            InstancePartitions preConfiguredInstancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+                
.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
                     referenceInstancePartitionsName, instancePartitionsName);
             instancePartitions = 
instanceAssignmentDriver.assignInstances(instancePartitionsType,
                 
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
 true),
@@ -646,19 +647,21 @@ public class TableRebalancer {
             if (!dryRun && !instancePartitionsUnchanged) {
               LOGGER.info("Persisting instance partitions: {} (based on {})", 
instancePartitions,
                   preConfiguredInstancePartitions);
-              
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
-                  _helixManager.getConfigAccessor(), 
_helixManager.getClusterName(), instancePartitions);
+              InstancePartitionsUtilsHelperFactory.create()
+                  
.persistInstancePartitions(_helixManager.getHelixPropertyStore(), 
_helixManager.getConfigAccessor(),
+                      _helixManager.getClusterName(), instancePartitions);
             }
           } else {
-            instancePartitions =
-                
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
+            instancePartitions = InstancePartitionsUtilsHelperFactory.create()
+                
.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
                     referenceInstancePartitionsName, instancePartitionsName);
             instancePartitionsUnchanged = 
instancePartitions.equals(existingInstancePartitions);
             if (!dryRun && !instancePartitionsUnchanged) {
               LOGGER.info("Persisting instance partitions: {} (referencing 
{})", instancePartitions,
                   referenceInstancePartitionsName);
-              
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
-                  _helixManager.getConfigAccessor(), 
_helixManager.getClusterName(), instancePartitions);
+              InstancePartitionsUtilsHelperFactory.create()
+                  
.persistInstancePartitions(_helixManager.getHelixPropertyStore(), 
_helixManager.getConfigAccessor(),
+                      _helixManager.getClusterName(), instancePartitions);
             }
           }
         }
@@ -666,22 +669,20 @@ public class TableRebalancer {
       } else {
         LOGGER.info("{} instance assignment is not allowed, using default 
instance partitions for table: {}",
             instancePartitionsType, tableNameWithType);
-        InstancePartitions instancePartitions =
-            
InstancePartitionsUtils.computeDefaultInstancePartitions(_helixManager, 
tableConfig,
-                instancePartitionsType);
+        InstancePartitions instancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+            .computeDefaultInstancePartitions(_helixManager, tableConfig, 
instancePartitionsType);
         boolean noExistingInstancePartitions = existingInstancePartitions == 
null;
         if (!dryRun && !noExistingInstancePartitions) {
           LOGGER.info("Removing instance partitions: {} from ZK", 
instancePartitionsName);
-          
InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(),
-              instancePartitionsName);
+          InstancePartitionsUtilsHelperFactory.create()
+              .removeInstancePartitions(_helixManager.getHelixPropertyStore(), 
instancePartitionsName);
         }
         return Pair.of(instancePartitions, noExistingInstancePartitions);
       }
     } else {
       LOGGER.info("Fetching/computing {} instance partitions for table: {}", 
instancePartitionsType, tableNameWithType);
-      return Pair.of(
-          
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager, 
tableConfig, instancePartitionsType),
-          true);
+      return Pair.of(InstancePartitionsUtilsHelperFactory.create()
+          .fetchOrComputeInstancePartitions(_helixManager, tableConfig, 
instancePartitionsType), true);
     }
   }
 
@@ -730,9 +731,9 @@ public class TableRebalancer {
     String tableNameWithType = tableConfig.getTableName();
     String tierName = tier.getName();
     String instancePartitionsName =
-        
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType, 
tierName);
-    InstancePartitions existingInstancePartitions =
-        
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
 instancePartitionsName);
+        
InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsNameForTier(tableNameWithType,
 tierName);
+    InstancePartitions existingInstancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+        .fetchInstancePartitions(_helixManager.getHelixPropertyStore(), 
instancePartitionsName);
 
     if (reassignInstances) {
       Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = 
tableConfig.getInstanceAssignmentConfigMap();
@@ -743,14 +744,13 @@ public class TableRebalancer {
             "Instance assignment config for tier: {} does not exist for table: 
{}, using default instance partitions",
             tierName, tableNameWithType);
         PinotServerTierStorage storage = (PinotServerTierStorage) 
tier.getStorage();
-        InstancePartitions instancePartitions =
-            
InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager, 
tableNameWithType, tierName,
-                storage.getServerTag());
+        InstancePartitions instancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+            .computeDefaultInstancePartitionsForTag(_helixManager, 
tableNameWithType, tierName, storage.getServerTag());
         boolean noExistingInstancePartitions = existingInstancePartitions == 
null;
         if (!dryRun && !noExistingInstancePartitions) {
           LOGGER.info("Removing instance partitions: {} from ZK", 
instancePartitionsName);
-          
InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(),
-              instancePartitionsName);
+          InstancePartitionsUtilsHelperFactory.create()
+              .removeInstancePartitions(_helixManager.getHelixPropertyStore(), 
instancePartitionsName);
         }
         return Pair.of(instancePartitions, noExistingInstancePartitions);
       } else {
@@ -763,8 +763,9 @@ public class TableRebalancer {
         boolean instancePartitionsUnchanged = 
instancePartitions.equals(existingInstancePartitions);
         if (!dryRun && !instancePartitionsUnchanged) {
           LOGGER.info("Persisting instance partitions: {} to ZK", 
instancePartitions);
-          
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
-              _helixManager.getConfigAccessor(), 
_helixManager.getClusterName(), instancePartitions);
+          InstancePartitionsUtilsHelperFactory.create()
+              
.persistInstancePartitions(_helixManager.getHelixPropertyStore(), 
_helixManager.getConfigAccessor(),
+                  _helixManager.getClusterName(), instancePartitions);
         }
         return Pair.of(instancePartitions, instancePartitionsUnchanged);
       }
@@ -773,9 +774,8 @@ public class TableRebalancer {
         return Pair.of(existingInstancePartitions, true);
       } else {
         PinotServerTierStorage storage = (PinotServerTierStorage) 
tier.getStorage();
-        InstancePartitions instancePartitions =
-            
InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager, 
tableNameWithType, tierName,
-                storage.getServerTag());
+        InstancePartitions instancePartitions = 
InstancePartitionsUtilsHelperFactory.create()
+            .computeDefaultInstancePartitionsForTag(_helixManager, 
tableNameWithType, tierName, storage.getServerTag());
         return Pair.of(instancePartitions, true);
       }
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index a14010f17f..778c264d54 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -48,6 +48,7 @@ import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
 import 
org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
 import 
org.apache.pinot.controller.helix.core.PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator;
+import 
org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -167,6 +168,20 @@ public class HelixSetupUtils {
       helixDataAccessor.createStateModelDef(
           
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
     }
+
+    String segmentStateModelName =
+        
PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    StateModelDefinition defaultStateModelDefinition =
+        helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName);
+    if (defaultStateModelDefinition == null || isUpdateStateModel) {
+      if (defaultStateModelDefinition == null) {
+        LOGGER.info("Adding state model: {} with CONSUMING state", 
segmentStateModelName);
+      } else {
+        LOGGER.info("Updating state model: {} to contain CONSUMING state", 
segmentStateModelName);
+      }
+      helixDataAccessor.createStateModelDef(
+          
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+    }
   }
 
   private static void createBrokerResourceIfNeeded(String helixClusterName, 
HelixAdmin helixAdmin,
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
index dedc79384e..a95b1bb40d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.controller.ControllerConf;
@@ -238,7 +238,7 @@ public class 
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
         
sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 TIER_NAME)));
     assertEquals(instancePartitionsMap.size(), 1);
     
assertEquals(instancePartitionsMap.get(TIER_NAME).getInstancePartitionsName(),
-        
InstancePartitionsUtils.getInstancePartitionsNameForTier(RAW_TABLE_NAME, 
TIER_NAME));
+        
InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsNameForTier(RAW_TABLE_NAME,
 TIER_NAME));
 
     // Remove the instance partitions for both offline and real-time table
     
sendDeleteRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 null));
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java
index c9a42536d3..e74de87dfd 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -27,7 +27,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.common.tier.PinotServerTierStorage;
 import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.tier.TierFactory;
@@ -77,21 +77,21 @@ public class 
OfflineNonReplicaGroupTieredSegmentAssignmentTest {
   private static final List<String> INSTANCES_TIER_A =
       SegmentAssignmentTestUtils.getNameList(TIER_A_INSTANCE_NAME_PREFIX, 
NUM_INSTANCES_TIER_A);
   private static final String TIER_A_INSTANCE_PARTITIONS_NAME =
-      InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME, 
TIER_A_NAME);
+      
InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(RAW_TABLE_NAME,
 TIER_A_NAME);
 
   private static final String TIER_B_INSTANCE_NAME_PREFIX = "tierB_instance_";
   private static final int NUM_INSTANCES_TIER_B = 4;
   private static final List<String> INSTANCES_TIER_B =
       SegmentAssignmentTestUtils.getNameList(TIER_B_INSTANCE_NAME_PREFIX, 
NUM_INSTANCES_TIER_B);
   private static final String TIER_B_INSTANCE_PARTITIONS_NAME =
-      InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME, 
TIER_B_NAME);
+      
InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(RAW_TABLE_NAME,
 TIER_B_NAME);
 
   private static final String TIER_C_INSTANCE_NAME_PREFIX = "tierC_instance_";
   private static final int NUM_INSTANCES_TIER_C = 3;
   private static final List<String> INSTANCES_TIER_C =
       SegmentAssignmentTestUtils.getNameList(TIER_C_INSTANCE_NAME_PREFIX, 
NUM_INSTANCES_TIER_C);
   private static final String TIER_C_INSTANCE_PARTITIONS_NAME =
-      InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME, 
TIER_C_NAME);
+      
InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(RAW_TABLE_NAME,
 TIER_C_NAME);
 
   private Map<InstancePartitionsType, InstancePartitions> 
_instancePartitionsMap;
   private Map<String, InstancePartitions> _tierInstancePartitionsMap;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
index e7b5895a47..48c37afb8b 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.common.tier.PinotServerTierStorage;
 import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.tier.TierFactory;
@@ -80,21 +80,21 @@ public class 
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
   private static final List<String> INSTANCES_TIER_A =
       SegmentAssignmentTestUtils.getNameList(TIER_A_INSTANCE_NAME_PREFIX, 
NUM_INSTANCES_TIER_A);
   private static final String TIER_A_INSTANCE_PARTITIONS_NAME =
-      InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME, 
TIER_A_NAME);
+      
InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(RAW_TABLE_NAME,
 TIER_A_NAME);
 
   private static final String TIER_B_INSTANCE_NAME_PREFIX = "tierB_instance_";
   private static final int NUM_INSTANCES_TIER_B = 4;
   private static final List<String> INSTANCES_TIER_B =
       SegmentAssignmentTestUtils.getNameList(TIER_B_INSTANCE_NAME_PREFIX, 
NUM_INSTANCES_TIER_B);
   private static final String TIER_B_INSTANCE_PARTITIONS_NAME =
-      InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME, 
TIER_B_NAME);
+      
InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(RAW_TABLE_NAME,
 TIER_B_NAME);
 
   private static final String TIER_C_INSTANCE_NAME_PREFIX = "tierC_instance_";
   private static final int NUM_INSTANCES_TIER_C = 3;
   private static final List<String> INSTANCES_TIER_C =
       SegmentAssignmentTestUtils.getNameList(TIER_C_INSTANCE_NAME_PREFIX, 
NUM_INSTANCES_TIER_C);
   private static final String TIER_C_INSTANCE_PARTITIONS_NAME =
-      InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME, 
TIER_C_NAME);
+      
InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(RAW_TABLE_NAME,
 TIER_C_NAME);
 
   private List<String> _segments;
   private Map<InstancePartitionsType, InstancePartitions> 
_instancePartitionsMap;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 26641d515c..ee90649080 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1220,11 +1220,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
     void updateIdealStateOnSegmentCompletion(String realtimeTableName, String 
committingSegmentName,
         String newSegmentName, SegmentAssignment segmentAssignment,
         Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) 
{
-      
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
-          _idealState.getRecord().getListFields(), committingSegmentName, null,
+      updateInstanceStatesForNewConsumingSegment(_idealState, 
committingSegmentName, null,
           segmentAssignment, instancePartitionsMap);
-      
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
-          _idealState.getRecord().getListFields(), null, newSegmentName,
+      updateInstanceStatesForNewConsumingSegment(_idealState, null, 
newSegmentName,
           segmentAssignment, instancePartitionsMap);
     }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 5d679c0380..1702fb6063 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -25,7 +25,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.controller.helix.ControllerTest;
@@ -257,7 +257,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     rebalanceConfig.setReassignInstances(true);
     rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
-    assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+    
assertNull(InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_propertyStore,
         
InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME)));
 
     // All servers should be assigned to the table
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index dc988ad669..017b0b2a5a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -33,8 +33,8 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper;
 import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -270,9 +270,7 @@ public class RetentionManagerTest {
     final int replicaCount = tableConfig.getReplication();
 
     List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
-
-    IdealState idealState =
-        
PinotTableIdealStateHelper.buildEmptyIdealStateFor(REALTIME_TABLE_NAME, 
replicaCount, true);
+    IdealState idealState = 
PinotTableIdealStateHelperFactory.create().buildEmptyIdealStateFor(tableConfig, 
true);
 
     final int kafkaPartition = 5;
     final long millisInDays = TimeUnit.DAYS.toMillis(1);
@@ -334,9 +332,7 @@ public class RetentionManagerTest {
     final int replicaCount = tableConfig.getReplication();
 
     List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
-
-    IdealState idealState =
-        
PinotTableIdealStateHelper.buildEmptyIdealStateFor(REALTIME_TABLE_NAME, 
replicaCount, true);
+    IdealState idealState = 
PinotTableIdealStateHelperFactory.create().buildEmptyIdealStateFor(tableConfig, 
true);
 
     final int kafkaPartition = 5;
     final long millisInDays = TimeUnit.DAYS.toMillis(1);
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
index b14cb9e240..08de258d62 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
@@ -46,7 +46,6 @@ import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -211,23 +210,16 @@ public class MoveReplicaGroup extends 
AbstractBaseAdminCommand implements Comman
       @Nullable
       @Override
       public IdealState apply(@Nullable IdealState input) {
-        Map<String, Map<String, String>> existingMapField = 
input.getRecord().getMapFields();
         Map<String, List<String>> existingListField = 
input.getRecord().getListFields();
 
-        TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(_tableName);
         for (Map.Entry<String, Map<String, String>> segmentEntry : 
proposedIdealState.entrySet()) {
           // existingMapField.put(segmentEntry.getKey(), 
segmentEntry.getValue());
-          if (tableType == TableType.REALTIME) {
-            // TODO: Update listField only once REALTIME uses FULL-AUTO
-            existingMapField.put(segmentEntry.getKey(), 
segmentEntry.getValue());
-          } else {
-            String segmentName = segmentEntry.getKey();
-            Map<String, String> segmentMapping = segmentEntry.getValue();
-            List<String> listOfHosts = new 
ArrayList<>(segmentMapping.keySet());
-            Collections.sort(listOfHosts);
-            // TODO: Assess if we want to add the preferred list of hosts or 
not
-            existingListField.put(segmentName, Collections.emptyList() /* 
listOfHosts */);
-          }
+          String segmentName = segmentEntry.getKey();
+          Map<String, String> segmentMapping = segmentEntry.getValue();
+          List<String> listOfHosts = new ArrayList<>(segmentMapping.keySet());
+          Collections.sort(listOfHosts);
+          // TODO: Assess if we want to add the preferred list of hosts or not
+          existingListField.put(segmentName, Collections.emptyList() /* 
listOfHosts */);
         }
         return input;
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to