This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 54b74803e4 Fix the way of fetching the segment zk metadata for task 
generators (#11832)
54b74803e4 is described below

commit 54b74803e467c42176a406ac4b65ab6c74643693
Author: Seunghyun Lee <seungh...@startree.ai>
AuthorDate: Thu Oct 19 09:13:16 2023 -0700

    Fix the way of fetching the segment zk metadata for task generators (#11832)
    
    * Fix the way of fetching the segment zk metadata for task generators
    
    Currently, we fetch the list of segment zk metadata directly from
    the property store. This can cause the issue when the segment
    deletion process fails in the middle (deleted the segment from
    idealstate while the zk metadata is not cleaned up).
    
    1. In order to avoid scheduling dangling segment, we should compute
    the intersection of segment zk metadata and idealstate.
    2. Enhance the outputDir path formulation to become less error prone
    
    * Addressing the comments
---
 .../helix/core/PinotHelixResourceManager.java      |  79 +++---
 .../helix/core/minion/ClusterInfoAccessor.java     |   5 +
 .../core/minion/generator/BaseTaskGenerator.java   |  26 ++
 .../BaseMultipleSegmentsConversionExecutor.java    |   3 +-
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java |  17 +-
 .../mergerollup/MergeRollupTaskGenerator.java      |   2 +-
 .../minion/tasks/purge/PurgeTaskGenerator.java     |   4 +-
 .../RealtimeToOfflineSegmentsTaskGenerator.java    |   2 +-
 .../SegmentGenerationAndPushTaskGenerator.java     |   2 +-
 .../UpsertCompactionTaskGenerator.java             |   2 +-
 .../mergerollup/MergeRollupTaskGeneratorTest.java  | 289 ++++++++++++---------
 ...RealtimeToOfflineSegmentsTaskGeneratorTest.java |  40 +++
 .../UpsertCompactionTaskGeneratorTest.java         |  19 ++
 13 files changed, 328 insertions(+), 162 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a85401d706..95db8d35f9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -235,8 +235,8 @@ public class PinotHelixResourceManager {
   private final LineageManager _lineageManager;
 
   public PinotHelixResourceManager(String zkURL, String helixClusterName, 
@Nullable String dataDir,
-      boolean isSingleTenantCluster, boolean enableBatchMessageMode,
-      int deletedSegmentsRetentionInDays, boolean 
enableTieredSegmentAssignment, LineageManager lineageManager) {
+      boolean isSingleTenantCluster, boolean enableBatchMessageMode, int 
deletedSegmentsRetentionInDays,
+      boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
     _dataDir = dataDir;
@@ -315,8 +315,8 @@ public class PinotHelixResourceManager {
     // Initialize TableCache
     HelixConfigScope helixConfigScope =
         new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_helixClusterName).build();
-    Map<String, String> configs = _helixAdmin.getConfig(helixConfigScope,
-        Arrays.asList(Helix.ENABLE_CASE_INSENSITIVE_KEY));
+    Map<String, String> configs =
+        _helixAdmin.getConfig(helixConfigScope, 
Arrays.asList(Helix.ENABLE_CASE_INSENSITIVE_KEY));
     boolean caseInsensitive = 
Boolean.parseBoolean(configs.getOrDefault(Helix.ENABLE_CASE_INSENSITIVE_KEY,
         Boolean.toString(Helix.DEFAULT_ENABLE_CASE_INSENSITIVE)));
     _tableCache = new TableCache(_propertyStore, caseInsensitive);
@@ -470,8 +470,8 @@ public class PinotHelixResourceManager {
   }
 
   public List<String> getAllBrokerInstances() {
-    return HelixHelper.getAllInstances(_helixAdmin, _helixClusterName).stream()
-        .filter(InstanceTypeUtils::isBroker).collect(Collectors.toList());
+    return HelixHelper.getAllInstances(_helixAdmin, 
_helixClusterName).stream().filter(InstanceTypeUtils::isBroker)
+        .collect(Collectors.toList());
   }
 
   public List<InstanceConfig> getAllBrokerInstanceConfigs() {
@@ -808,21 +808,33 @@ public class PinotHelixResourceManager {
     IdealState idealState = getTableIdealState(tableNameWithType);
     Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", tableNameWithType);
     List<String> segments = new ArrayList<>(idealState.getPartitionSet());
-    if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
-      return shouldExcludeReplacedSegments ? 
excludeReplacedSegments(tableNameWithType, segments) : segments;
-    } else {
-      List<String> selectedSegments = new ArrayList<>();
-      List<SegmentZKMetadata> segmentZKMetadataList = 
getSegmentsZKMetadata(tableNameWithType);
-      for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
-        String segmentName = segmentZKMetadata.getSegmentName();
-        if (segments.contains(segmentName) && 
isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp,
-            excludeOverlapping)) {
+    List<SegmentZKMetadata> segmentZKMetadataList = 
getSegmentsZKMetadata(tableNameWithType);
+    List<String> selectedSegments = new ArrayList<>();
+    ArrayList<String> filteredSegments = new ArrayList<>();
+    for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+      String segmentName = segmentZKMetadata.getSegmentName();
+      // Compute the interesction of segmentZK metadata and idealstate for 
valid segmnets
+      if (!segments.contains(segmentName)) {
+        filteredSegments.add(segmentName);
+        continue;
+      }
+      // No need to filter by time if the time range is not specified
+      if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
+        selectedSegments.add(segmentName);
+      } else {
+        // Filter by time if the time range is specified
+        if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, 
endTimestamp, excludeOverlapping)) {
           selectedSegments.add(segmentName);
         }
       }
-      return shouldExcludeReplacedSegments ? 
excludeReplacedSegments(tableNameWithType, selectedSegments)
-          : selectedSegments;
     }
+    LOGGER.info(
+        "Successfully computed the segments for table : {}. # of filtered 
segments: {}, the filtered segment list: "
+            + "{}. Only showing up to 100 filtered segments.", 
tableNameWithType, filteredSegments.size(),
+        (filteredSegments.size() > 0) ? filteredSegments.subList(0, 
Math.min(filteredSegments.size(), 100))
+            : filteredSegments);
+    return shouldExcludeReplacedSegments ? 
excludeReplacedSegments(tableNameWithType, selectedSegments)
+        : selectedSegments;
   }
 
   /**
@@ -3182,8 +3194,8 @@ public class PinotHelixResourceManager {
   @VisibleForTesting
   void updateTargetTier(String rebalanceJobId, String tableNameWithType, 
TableConfig tableConfig) {
     List<TierConfig> tierCfgs = tableConfig.getTierConfigsList();
-    List<Tier> sortedTiers = tierCfgs == null ? Collections.emptyList()
-        : TierConfigUtils.getSortedTiers(tierCfgs, _helixZkManager);
+    List<Tier> sortedTiers =
+        tierCfgs == null ? Collections.emptyList() : 
TierConfigUtils.getSortedTiers(tierCfgs, _helixZkManager);
     LOGGER.info("For rebalanceId: {}, updating target tiers for segments of 
table: {} with tierConfigs: {}",
         rebalanceJobId, tableNameWithType, sortedTiers);
     for (String segmentName : getSegmentsFor(tableNameWithType, true)) {
@@ -3463,9 +3475,9 @@ public class PinotHelixResourceManager {
 
               // Add segments for proactive clean-up.
               segmentsToCleanUp.addAll(segmentsToForEntryToRevert);
-            } else if (lineageEntry.getState() == LineageEntryState.COMPLETED
-                && 
"REFRESH".equalsIgnoreCase(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig))
-                && CollectionUtils.isEqualCollection(segmentsFrom, 
lineageEntry.getSegmentsTo())) {
+            } else if (lineageEntry.getState() == LineageEntryState.COMPLETED 
&& "REFRESH".equalsIgnoreCase(
+                
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) && 
CollectionUtils.isEqualCollection(
+                segmentsFrom, lineageEntry.getSegmentsTo())) {
               // This part of code assumes that we only allow at most 2 data 
snapshots at a time by proactively
               // deleting the older snapshots (for REFRESH tables).
               //
@@ -3514,8 +3526,8 @@ public class PinotHelixResourceManager {
         segmentLineage.addLineageEntry(segmentLineageEntryId,
             new LineageEntry(segmentsFrom, segmentsTo, 
LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
 
-        _lineageManager
-            .updateLineageForStartReplaceSegments(tableConfig, 
segmentLineageEntryId, customMap, segmentLineage);
+        _lineageManager.updateLineageForStartReplaceSegments(tableConfig, 
segmentLineageEntryId, customMap,
+            segmentLineage);
         // Write back to the lineage entry to the property store
         if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, 
segmentLineage, expectedVersion)) {
           // Trigger the proactive segment clean up if needed. Once the 
lineage is updated in the property store, it
@@ -3768,8 +3780,7 @@ public class PinotHelixResourceManager {
    * @param customMap
    */
   private boolean writeLineageEntryWithTightLoop(TableConfig tableConfig, 
String lineageEntryId,
-      LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch,
-      ZkHelixPropertyStore<ZNRecord> propertyStore,
+      LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
       LineageUpdateType lineageUpdateType, Map<String, String> customMap) {
     for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) {
       // Fetch the segment lineage
@@ -3792,16 +3803,16 @@ public class PinotHelixResourceManager {
       segmentLineageToUpdate.updateLineageEntry(lineageEntryId, 
lineageEntryToUpdate);
       switch (lineageUpdateType) {
         case START:
-          _lineageManager
-              .updateLineageForStartReplaceSegments(tableConfig, 
lineageEntryId, customMap, segmentLineageToUpdate);
+          _lineageManager.updateLineageForStartReplaceSegments(tableConfig, 
lineageEntryId, customMap,
+              segmentLineageToUpdate);
           break;
         case END:
-          _lineageManager
-              .updateLineageForEndReplaceSegments(tableConfig, lineageEntryId, 
customMap, segmentLineageToUpdate);
+          _lineageManager.updateLineageForEndReplaceSegments(tableConfig, 
lineageEntryId, customMap,
+              segmentLineageToUpdate);
           break;
         case REVERT:
-          _lineageManager
-              .updateLineageForRevertReplaceSegments(tableConfig, 
lineageEntryId, customMap, segmentLineageToUpdate);
+          _lineageManager.updateLineageForRevertReplaceSegments(tableConfig, 
lineageEntryId, customMap,
+              segmentLineageToUpdate);
           break;
         default:
       }
@@ -4034,8 +4045,8 @@ public class PinotHelixResourceManager {
       }
     }
     for (TableConfig tableConfig : getAllTableConfigs()) {
-      String tag = 
TagNameUtils.getServerTagForTenant(tableConfig.getTenantConfig().getServer(),
-          tableConfig.getTableType());
+      String tag =
+          
TagNameUtils.getServerTagForTenant(tableConfig.getTenantConfig().getServer(), 
tableConfig.getTableType());
       tagMinInstanceMap.put(tag, Math.max(tagMinInstanceMap.getOrDefault(tag, 
0), tableConfig.getReplication()));
       String brokerTag = 
TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker());
       tagMinInstanceMap.put(brokerTag, 1);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 954c2ced38..3ec8ea5147 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 import javax.annotation.Nullable;
 import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -101,6 +102,10 @@ public class ClusterInfoAccessor {
     return 
ZKMetadataProvider.getSegmentsZKMetadata(_pinotHelixResourceManager.getPropertyStore(),
 tableNameWithType);
   }
 
+  public IdealState getIdealState(String tableNameWithType) {
+    return _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+  }
+
   /**
    * Get shared executor
    */
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
index 51cbe542bf..b35bb54423 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
@@ -18,9 +18,13 @@
  */
 package org.apache.pinot.controller.helix.core.minion.generator;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.task.JobConfig;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.common.MinionConstants;
@@ -89,6 +93,28 @@ public abstract class BaseTaskGenerator implements 
PinotTaskGenerator {
     return MinionConstants.DEFAULT_MAX_ATTEMPTS_PER_TASK;
   }
 
+  /**
+   * Returns the list of segment zk metadata for available segments in the 
table. The list does NOT filter out inactive
+   * segments based on the lineage. In order to compute the valid segments, we 
look at both idealstate and segment
+   * zk metadata in the property store and compute the intersection. In this 
way, we can avoid picking the dangling
+   * segments.
+   *
+   * @param tableNameWithType
+   * @return the list of segment zk metadata for available segments in the 
table.
+   */
+  public List<SegmentZKMetadata> getSegmentsZKMetadataForTable(String 
tableNameWithType) {
+    IdealState idealState = 
_clusterInfoAccessor.getIdealState(tableNameWithType);
+    Set<String> segmentsForTable = idealState.getPartitionSet();
+    List<SegmentZKMetadata> segmentZKMetadataList = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+    List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>();
+    for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+      if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())) {
+        selectedSegmentZKMetadataList.add(segmentZKMetadata);
+      }
+    }
+    return selectedSegmentZKMetadataList;
+  }
+
   @Override
   public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, 
Map<String, String> taskConfigs)
       throws Exception {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 99b1341f6d..6b439add13 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -393,7 +393,8 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
       throws Exception {
     URI outputSegmentDirURI = 
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
     try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, 
outputSegmentDirURI)) {
-      URI outputSegmentTarURI = URI.create(outputSegmentDirURI + 
localSegmentTarFile.getName());
+      URI outputSegmentTarURI =
+          
URI.create(MinionTaskUtils.normalizeDirectoryURI(outputSegmentDirURI) + 
localSegmentTarFile.getName());
       if 
(!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) 
&& outputFileFS.exists(
           outputSegmentTarURI)) {
         throw new RuntimeException(String.format("Output file: %s already 
exists. "
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index 0d8c1e375c..a46f903ed9 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -29,6 +29,7 @@ import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory;
 public class MinionTaskUtils {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MinionTaskUtils.class);
 
+  private static final String DEFAULT_DIR_PATH_TERMINATOR = "/";
+
   private MinionTaskUtils() {
   }
 
@@ -84,7 +87,8 @@ public class MinionTaskUtils {
         singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
             BatchConfigProperties.SegmentPushType.TAR.toString());
       } else {
-        URI outputDirURI = URI.create(clusterInfoAccessor.getDataDir() + "/" + 
tableName);
+        URI outputDirURI = URI.create(
+            normalizeDirectoryURI(clusterInfoAccessor.getDataDir()) + 
TableNameBuilder.extractRawTableName(tableName));
         String outputDirURIScheme = outputDirURI.getScheme();
 
         if (!isLocalOutputDir(outputDirURIScheme)) {
@@ -115,4 +119,15 @@ public class MinionTaskUtils {
   public static PinotFS getLocalPinotFs() {
     return new LocalPinotFS();
   }
+
+  public static String normalizeDirectoryURI(URI dirURI) {
+    return normalizeDirectoryURI(dirURI.toString());
+  }
+
+  public static String normalizeDirectoryURI(String dirInStr) {
+    if (!dirInStr.endsWith(DEFAULT_DIR_PATH_TERMINATOR)) {
+      return dirInStr + DEFAULT_DIR_PATH_TERMINATOR;
+    }
+    return dirInStr;
+  }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 7d87abc96d..0b09bf6d35 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -157,7 +157,7 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
       LOGGER.info("Start generating task configs for table: {} for task: {}", 
tableNameWithType, taskType);
 
       // Get all segment metadata
-      List<SegmentZKMetadata> allSegments = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+      List<SegmentZKMetadata> allSegments = 
getSegmentsZKMetadataForTable(tableNameWithType);
       // Filter segments based on status
       List<SegmentZKMetadata> preSelectedSegmentsBasedOnStatus
           = filterSegmentsBasedOnStatus(tableConfig.getTableType(), 
allSegments);
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
index c5cbe4de6e..02b9a47283 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
@@ -90,7 +90,7 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
       }
       List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
       if (tableConfig.getTableType() == TableType.REALTIME) {
-        List<SegmentZKMetadata> segmentsZKMetadataAll = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableName);
+        List<SegmentZKMetadata> segmentsZKMetadataAll = 
getSegmentsZKMetadataForTable(tableName);
         for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadataAll) {
           CommonConstants.Segment.Realtime.Status status = 
segmentZKMetadata.getStatus();
           if (status.isCompleted()) {
@@ -98,7 +98,7 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
           }
         }
       } else {
-        segmentsZKMetadata = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableName);
+        segmentsZKMetadata = getSegmentsZKMetadataForTable(tableName);
       }
 
       List<SegmentZKMetadata> purgedSegmentsZKMetadata = new ArrayList<>();
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index 51be9b1ce3..750cb15416 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -246,7 +246,7 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends 
BaseTaskGenerator {
    */
   private void getCompletedSegmentsInfo(String realtimeTableName, 
List<SegmentZKMetadata> completedSegmentsZKMetadata,
       Map<Integer, String> partitionToLatestLLCSegmentName, Set<Integer> 
allPartitions) {
-    List<SegmentZKMetadata> segmentsZKMetadata = 
_clusterInfoAccessor.getSegmentsZKMetadata(realtimeTableName);
+    List<SegmentZKMetadata> segmentsZKMetadata = 
getSegmentsZKMetadataForTable(realtimeTableName);
 
     Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
     for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
index 268f84456a..c5e662e285 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
@@ -149,7 +149,7 @@ public class SegmentGenerationAndPushTaskGenerator extends 
BaseTaskGenerator {
           List<SegmentZKMetadata> segmentsZKMetadata = Collections.emptyList();
           // For append mode, we don't create segments for input file URIs 
already created.
           if 
(BatchConfigProperties.SegmentIngestionType.APPEND.name().equalsIgnoreCase(batchSegmentIngestionType))
 {
-            segmentsZKMetadata = 
_clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName);
+            segmentsZKMetadata = 
getSegmentsZKMetadataForTable(offlineTableName);
           }
           Set<String> existingSegmentInputFiles = 
getExistingSegmentInputFiles(segmentsZKMetadata);
           Set<String> inputFilesFromRunningTasks = 
getInputFilesFromRunningTasks(offlineTableName);
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index aa84ffefa5..590102319e 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -232,7 +232,7 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
     List<SegmentZKMetadata> completedSegments = new ArrayList<>();
     String bufferPeriod = 
taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, 
DEFAULT_BUFFER_PERIOD);
     long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
-    List<SegmentZKMetadata> allSegments = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+    List<SegmentZKMetadata> allSegments = 
getSegmentsZKMetadataForTable(tableNameWithType);
     for (SegmentZKMetadata segment : allSegments) {
       CommonConstants.Segment.Realtime.Status status = segment.getStatus();
       // initial segments selection based on status and age
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index c367bda735..6c1372f56e 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.lineage.LineageEntry;
 import org.apache.pinot.common.lineage.LineageEntryState;
@@ -84,40 +85,31 @@ public class MergeRollupTaskGeneratorTest {
 
   @Test
   public void testValidateIfMergeRollupCanBeEnabledOrNot() {
-    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
-      .setTableName(RAW_TABLE_NAME)
-      .setTimeColumnName(TIME_COLUMN_NAME)
-      .build();
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .build();
     assertTrue(MergeRollupTaskGenerator.validate(tableConfig, 
MinionConstants.MergeRollupTask.TASK_TYPE));
 
     IngestionConfig ingestionConfig = new IngestionConfig();
-    ingestionConfig.setBatchIngestionConfig(
-            new BatchIngestionConfig(Collections.emptyList(), "REFRESH", 
"daily"));
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE)
-        .setTableName(RAW_TABLE_NAME)
-        .setTimeColumnName(TIME_COLUMN_NAME)
-        .setIngestionConfig(ingestionConfig)
-        .build();
+    ingestionConfig.setBatchIngestionConfig(new 
BatchIngestionConfig(Collections.emptyList(), "REFRESH", "daily"));
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .setIngestionConfig(ingestionConfig).build();
     assertFalse(MergeRollupTaskGenerator.validate(tableConfig, 
MinionConstants.MergeRollupTask.TASK_TYPE));
 
-    tableConfig = new TableConfigBuilder(TableType.REALTIME)
-        .setTableName(RAW_TABLE_NAME)
-        .setTimeColumnName(TIME_COLUMN_NAME)
-        .build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .build();
     assertTrue(MergeRollupTaskGenerator.validate(tableConfig, 
MinionConstants.MergeRollupTask.TASK_TYPE));
 
-    tableConfig = new TableConfigBuilder(TableType.REALTIME)
-        .setTableName(RAW_TABLE_NAME)
-        .setTimeColumnName(TIME_COLUMN_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
-        .build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
     assertFalse(MergeRollupTaskGenerator.validate(tableConfig, 
MinionConstants.MergeRollupTask.TASK_TYPE));
 
-    tableConfig = new TableConfigBuilder(TableType.REALTIME)
-        .setTableName(RAW_TABLE_NAME)
-        .setTimeColumnName(TIME_COLUMN_NAME)
-        .setDedupConfig(new DedupConfig(true, HashFunction.MD5))
-        .build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .setDedupConfig(new DedupConfig(true, HashFunction.MD5)).build();
     assertFalse(MergeRollupTaskGenerator.validate(tableConfig, 
MinionConstants.MergeRollupTask.TASK_TYPE));
   }
 
@@ -135,13 +127,17 @@ public class MergeRollupTaskGeneratorTest {
     
realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
     SegmentZKMetadata realtimeTableSegmentMetadata2 =
         getSegmentZKMetadata("testTable__1__0__0", 5000, 50_000, 
TimeUnit.MILLISECONDS, null);
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(realtimeTableSegmentMetadata1, 
realtimeTableSegmentMetadata2));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(realtimeTableSegmentMetadata1, 
realtimeTableSegmentMetadata2));
+    when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(
+        getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList("testTable__0", 
"server0", "ONLINE")));
 
     SegmentZKMetadata offlineTableSegmentMetadata =
         getSegmentZKMetadata("testTable__0", 5000, 50_000, 
TimeUnit.MILLISECONDS, null);
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(offlineTableSegmentMetadata));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(offlineTableSegmentMetadata));
+    when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(
+        getIdealState(OFFLINE_TABLE_NAME, 
Lists.newArrayList("testTable__0__0__0", "testTable__1__0__0")));
 
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
@@ -173,9 +169,8 @@ public class MergeRollupTaskGeneratorTest {
         maxNumRecordsPerSegments);
   }
 
-  private void checkPinotTaskConfig(Map<String, String> pinotTaskConfig, 
String mergeLevel,
-      String mergeType, String partitionBucketTimePeriod, String 
roundBucketTimePeriod,
-      String maxNumRecordsPerSegments) {
+  private void checkPinotTaskConfig(Map<String, String> pinotTaskConfig, 
String mergeLevel, String mergeType,
+      String partitionBucketTimePeriod, String roundBucketTimePeriod, String 
maxNumRecordsPerSegments) {
     assertEquals(pinotTaskConfig.get(MinionConstants.TABLE_NAME_KEY), 
OFFLINE_TABLE_NAME);
     
assertTrue("true".equalsIgnoreCase(pinotTaskConfig.get(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY)));
     
assertEquals(pinotTaskConfig.get(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY),
 mergeLevel);
@@ -228,15 +223,16 @@ public class MergeRollupTaskGeneratorTest {
     taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, 
tableTaskConfigs);
     TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, 
taskConfigsMap);
     ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(Collections.emptyList()));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(Collections.emptyList()));
+    
when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(new 
IdealState(OFFLINE_TABLE_NAME));
     mockMergeRollupTaskMetadataGetterAndSetter(mockClusterInfoProvide);
 
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
-    assertNull(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME));
+    
assertNull(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+        OFFLINE_TABLE_NAME));
     assertEquals(pinotTaskConfigs.size(), 0);
   }
 
@@ -261,12 +257,14 @@ public class MergeRollupTaskGeneratorTest {
         getSegmentZKMetadata(segmentName1, currentTime - 500_000L, 
currentTime, TimeUnit.MILLISECONDS, null);
     metadata1.setTotalDocs(0);
     
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(metadata1));
+    when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(
+        getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1)));
 
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
-    assertNull(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME));
+    
assertNull(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+        OFFLINE_TABLE_NAME));
     assertEquals(pinotTaskConfigs.size(), 0);
   }
 
@@ -290,6 +288,8 @@ public class MergeRollupTaskGeneratorTest {
     SegmentZKMetadata metadata1 =
         getSegmentZKMetadata(segmentName1, currentTime - 500_000L, 
currentTime, TimeUnit.MILLISECONDS, null);
     
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(metadata1));
+    when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(
+        getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1)));
 
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
@@ -321,8 +321,10 @@ public class MergeRollupTaskGeneratorTest {
     SegmentZKMetadata metadata2 =
         getSegmentZKMetadata(segmentName2, 86_400_000L, 100_000_000L, 
TimeUnit.MILLISECONDS, "download2");
     metadata2.setTotalDocs(4000000L);
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2));
+    when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(
+        getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, 
segmentName2)));
 
     // Single task
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
@@ -338,8 +340,11 @@ public class MergeRollupTaskGeneratorTest {
     SegmentZKMetadata metadata3 =
         getSegmentZKMetadata(segmentName3, 86_400_000L, 110_000_000L, 
TimeUnit.MILLISECONDS, null);
     metadata3.setTotalDocs(5000000L);
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2, metadata3));
+    when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(
+        getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, 
segmentName2, segmentName3)));
+
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 2);
     checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat", "1d",
@@ -380,8 +385,11 @@ public class MergeRollupTaskGeneratorTest {
         getSegmentZKMetadata(segmentName5, 345_600_000L, 346_000_000L, 
TimeUnit.MILLISECONDS, "download5");
 
     // No spilled over data
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, 
metadata4, metadata5));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, 
metadata5));
+    
when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME,
+        Lists.newArrayList(segmentName1, segmentName2, segmentName3, 
segmentName4, segmentName5)));
+
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
@@ -395,8 +403,11 @@ public class MergeRollupTaskGeneratorTest {
     String segmentName6 = "testTable__6";
     SegmentZKMetadata metadata6 =
         getSegmentZKMetadata(segmentName6, 172_800_000L, 260_000_000L, 
TimeUnit.MILLISECONDS, null);
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, 
metadata4, metadata5, metadata6));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, 
metadata5, metadata6));
+    
when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME,
+        Lists.newArrayList(segmentName1, segmentName2, segmentName3, 
segmentName4, segmentName5, segmentName6)));
+
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 2);
     checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat", "1d",
@@ -405,8 +416,10 @@ public class MergeRollupTaskGeneratorTest {
         null, "1000000");
 
     // Has time bucket without overlapping segments
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata4, 
metadata5));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2, metadata4, metadata5));
+    
when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME,
+        Lists.newArrayList(segmentName1, segmentName2, segmentName3, 
segmentName4, segmentName5)));
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 3);
     checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat", "1d",
@@ -419,8 +432,11 @@ public class MergeRollupTaskGeneratorTest {
     
metadata1.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
     
metadata2.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
     
metadata4.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, 
metadata4, metadata5, metadata6));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, 
metadata5, metadata6));
+    
when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME,
+        Lists.newArrayList(segmentName1, segmentName2, segmentName3, 
segmentName4, segmentName5, segmentName6)));
+
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 3);
     checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName3, 
DAILY, "concat", "1d", null, "1000000");
@@ -436,9 +452,8 @@ public class MergeRollupTaskGeneratorTest {
     TreeMap<String, Long> waterMarkMap = new TreeMap<>();
     // Watermark for daily is at 30 days since epoch
     waterMarkMap.put(DAILY, 2_592_000_000L);
-    when(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME))
-        .thenReturn(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, 
waterMarkMap).toZNRecord());
+    
when(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+        OFFLINE_TABLE_NAME)).thenReturn(new 
MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, waterMarkMap).toZNRecord());
 
     String segmentName7 = "testTable__7";
     String segmentName8 = "testTable__8";
@@ -448,8 +463,11 @@ public class MergeRollupTaskGeneratorTest {
         getSegmentZKMetadata(segmentName8, 2_592_000_000L, 2_600_000_000L, 
TimeUnit.MILLISECONDS, "download8");
     
metadata7.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
     
metadata8.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata7, metadata8));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata7, metadata8));
+    when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(
+        getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName7, 
segmentName8)));
+
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 1);
     checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName7, 
MONTHLY, "concat", "30d", null, "1000000");
@@ -494,8 +512,10 @@ public class MergeRollupTaskGeneratorTest {
     metadata4.setPartitionMetadata(new 
SegmentPartitionMetadata(Collections.singletonMap("memberId",
         new ColumnPartitionMetadata("murmur", 10, Collections.singleton(1), 
null))));
     ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, 
metadata4));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2, metadata3, metadata4));
+    when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(
+        getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, 
segmentName2, segmentName3, segmentName4)));
 
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
@@ -574,17 +594,19 @@ public class MergeRollupTaskGeneratorTest {
     SegmentZKMetadata metadata2 =
         getSegmentZKMetadata(segmentName2, 345_600_000L, 400_000_000L, 
TimeUnit.MILLISECONDS, null);
     ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2));
     mockMergeRollupTaskMetadataGetterAndSetter(mockClusterInfoProvide);
+    when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(
+        getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, 
segmentName2)));
 
     // Cold start, set watermark to smallest segment metadata start time round 
off to the nearest bucket boundary
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
-    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME)).getWatermarkMap()
-        .get(DAILY).longValue(), 86_400_000L);
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(
+        
mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+            OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 
86_400_000L);
     assertEquals(pinotTaskConfigs.size(), 1);
     checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1, 
DAILY, "concat", "1d", null, "1000000");
 
@@ -595,9 +617,9 @@ public class MergeRollupTaskGeneratorTest {
         MinionConstants.MergeRollupTask.TASK_TYPE, -1);
     
metadata1.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
-    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME)).getWatermarkMap()
-        .get(DAILY).longValue(), 345_600_000L);
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(
+        
mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+            OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 
345_600_000L);
     assertEquals(pinotTaskConfigs.size(), 1);
     checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName2, 
DAILY, "concat", "1d", null, "1000000");
 
@@ -607,9 +629,9 @@ public class MergeRollupTaskGeneratorTest {
         MinionConstants.MergeRollupTask.TASK_TYPE, -1);
     
metadata2.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
-    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME)).getWatermarkMap()
-        .get(DAILY).longValue(), 345_600_000L);
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(
+        
mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+            OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 
345_600_000L);
     assertEquals(pinotTaskConfigs.size(), 0);
   }
 
@@ -640,9 +662,8 @@ public class MergeRollupTaskGeneratorTest {
     ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
     Map<String, Long> waterMarkMap = new TreeMap<>();
     waterMarkMap.put(DAILY, 86_400_000L);
-    when(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME))
-        .thenReturn(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, 
waterMarkMap).toZNRecord());
+    
when(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+        OFFLINE_TABLE_NAME)).thenReturn(new 
MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, waterMarkMap).toZNRecord());
 
     Map<String, TaskState> taskStatesMap = new HashMap<>();
     String taskName = "Task_MergeRollupTask_" + System.currentTimeMillis();
@@ -651,12 +672,15 @@ public class MergeRollupTaskGeneratorTest {
     taskConfigs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, DAILY);
     taskConfigs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName1);
     
when(mockClusterInfoProvide.getTaskStates(MinionConstants.MergeRollupTask.TASK_TYPE)).thenReturn(taskStatesMap);
-    when(mockClusterInfoProvide.getTaskConfigs(taskName))
-        .thenReturn(Lists.newArrayList(new 
PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, taskConfigs)));
+    when(mockClusterInfoProvide.getTaskConfigs(taskName)).thenReturn(
+        Lists.newArrayList(new 
PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, taskConfigs)));
 
     // If same task and table, IN_PROGRESS, then don't generate again
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2));
+    when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(
+        getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, 
segmentName2)));
+
     taskStatesMap.put(taskName, TaskState.IN_PROGRESS);
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
@@ -664,8 +688,11 @@ public class MergeRollupTaskGeneratorTest {
     assertTrue(pinotTaskConfigs.isEmpty());
 
     // If same task and table, IN_PROGRESS, but older than 1 day, generate
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2));
+    when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(
+        getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, 
segmentName2)));
+
     String oldTaskName = "Task_MergeRollupTask_" + (System.currentTimeMillis() 
- TimeUnit.DAYS.toMillis(3));
     taskStatesMap.remove(taskName);
     taskStatesMap.put(oldTaskName, TaskState.IN_PROGRESS);
@@ -674,10 +701,12 @@ public class MergeRollupTaskGeneratorTest {
     checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1, 
DAILY, "concat", "1d", null, "1000000");
 
     // If same task and table, but COMPLETED, generate
-    mergedMetadata1
-        
.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2, mergedMetadata1));
+    mergedMetadata1.setCustomMap(
+        
ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2, mergedMetadata1));
+    when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(
+        getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, 
segmentName2, mergedSegmentName1)));
     SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME);
     
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
         new LineageEntry(Collections.singletonList(segmentName1), 
Collections.singletonList(mergedSegmentName1),
@@ -730,17 +759,19 @@ public class MergeRollupTaskGeneratorTest {
         getSegmentZKMetadata(segmentName5, 2_592_000_000L, 2_592_020_000L, 
TimeUnit.MILLISECONDS,
             null); // starts 30 days since epoch
     ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, 
metadata4, metadata5));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, 
metadata5));
+    
when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME,
+        Lists.newArrayList(segmentName1, segmentName2, segmentName3, 
segmentName4, segmentName5)));
     mockMergeRollupTaskMetadataGetterAndSetter(mockClusterInfoProvide);
 
     // Cold start only schedule daily merge tasks
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
-    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME)).getWatermarkMap()
-        .get(DAILY).longValue(), 86_400_000L);
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(
+        
mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+            OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 
86_400_000L);
     assertEquals(pinotTaskConfigs.size(), 1);
     Map<String, String> taskConfigsDaily1 = 
pinotTaskConfigs.get(0).getConfigs();
     checkPinotTaskConfig(taskConfigsDaily1, segmentName1 + "," + segmentName2 
+ "," + segmentName3, DAILY, "concat",
@@ -751,17 +782,19 @@ public class MergeRollupTaskGeneratorTest {
     String segmentNameMergedDaily1 = "merged_testTable__1__2__3";
     SegmentZKMetadata metadataMergedDaily1 =
         getSegmentZKMetadata(segmentNameMergedDaily1, 86_400_000L, 
110_000_000L, TimeUnit.MILLISECONDS, null);
-    metadataMergedDaily1
-        
.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
-        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, 
metadata4, metadata5, metadataMergedDaily1));
+    metadataMergedDaily1.setCustomMap(
+        
ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, 
metadata5, metadataMergedDaily1));
+    
when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME,
+        Lists.newArrayList(segmentName1, segmentName2, segmentName3, 
segmentName4, segmentName5,
+            segmentNameMergedDaily1)));
 
     SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME);
     
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
         new LineageEntry(Arrays.asList(segmentName1, segmentName2, 
segmentName3),
             Collections.singletonList(segmentNameMergedDaily1), 
LineageEntryState.COMPLETED, 11111L));
     
when(mockClusterInfoProvide.getSegmentLineage(OFFLINE_TABLE_NAME)).thenReturn(segmentLineage);
-
     Map<String, TaskState> taskStatesMap = new HashMap<>();
     String taskName1 = "Task_MergeRollupTask_1";
     taskStatesMap.put(taskName1, TaskState.COMPLETED);
@@ -771,9 +804,9 @@ public class MergeRollupTaskGeneratorTest {
 
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
 
-    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME)).getWatermarkMap()
-        .get(DAILY).longValue(), 2_505_600_000L);
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(
+        
mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+            OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 
2_505_600_000L);
     assertEquals(pinotTaskConfigs.size(), 1);
     Map<String, String> taskConfigsDaily2 = 
pinotTaskConfigs.get(0).getConfigs();
     checkPinotTaskConfig(taskConfigsDaily2, segmentName4, DAILY, "concat", 
"1d", null, "1000000");
@@ -782,16 +815,19 @@ public class MergeRollupTaskGeneratorTest {
     String segmentNameMergedDaily2 = "merged_testTable__4_1";
     SegmentZKMetadata metadataMergedDaily2 =
         getSegmentZKMetadata(segmentNameMergedDaily2, 2_505_600_000L, 
2_591_999_999L, TimeUnit.MILLISECONDS, null);
-    metadataMergedDaily2
-        
.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
+    metadataMergedDaily2.setCustomMap(
+        
ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
     String segmentNameMergedDaily3 = "merged_testTable__4_2";
     SegmentZKMetadata metadataMergedDaily3 =
         getSegmentZKMetadata(segmentNameMergedDaily3, 2_592_000_000L, 
2_592_010_000L, TimeUnit.MILLISECONDS, null);
-    metadataMergedDaily3
-        
.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
-    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists
-        .newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, 
metadataMergedDaily1, metadataMergedDaily2,
-            metadataMergedDaily3));
+    metadataMergedDaily3.setCustomMap(
+        
ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, 
metadata5, metadataMergedDaily1,
+            metadataMergedDaily2, metadataMergedDaily3));
+    
when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME,
+        Lists.newArrayList(segmentName1, segmentName2, segmentName3, 
segmentName4, segmentName5,
+            segmentNameMergedDaily1, segmentNameMergedDaily2, 
segmentNameMergedDaily3)));
 
     
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
         new LineageEntry(Collections.singletonList(segmentName4),
@@ -804,12 +840,12 @@ public class MergeRollupTaskGeneratorTest {
 
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
 
-    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME)).getWatermarkMap()
-        .get(DAILY).longValue(), 2_592_000_000L);
-    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME)).getWatermarkMap()
-        .get(MONTHLY).longValue(), 0L);
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(
+        
mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+            OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 
2_592_000_000L);
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(
+        
mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+            OFFLINE_TABLE_NAME)).getWatermarkMap().get(MONTHLY).longValue(), 
0L);
     assertEquals(pinotTaskConfigs.size(), 2);
     Map<String, String> taskConfigsDaily3 = 
pinotTaskConfigs.get(0).getConfigs();
     Map<String, String> taskConfigsMonthly1 = 
pinotTaskConfigs.get(1).getConfigs();
@@ -823,16 +859,20 @@ public class MergeRollupTaskGeneratorTest {
     String segmentNameMergedDaily4 = "merged_testTable__4_2__5";
     SegmentZKMetadata metadataMergedDaily4 =
         getSegmentZKMetadata(segmentNameMergedDaily4, 2_592_000_000L, 
2_592_020_000L, TimeUnit.MILLISECONDS, null);
-    metadataMergedDaily4
-        
.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
+    metadataMergedDaily4.setCustomMap(
+        
ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
     String segmentNameMergedMonthly1 = "merged_testTable__1__2__3__4_1";
     SegmentZKMetadata metadataMergedMonthly1 =
         getSegmentZKMetadata(segmentNameMergedMonthly1, 86_400_000L, 
2_591_999_999L, TimeUnit.MILLISECONDS, null);
-    metadataMergedMonthly1
-        
.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 MONTHLY));
-    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists
-        .newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, 
metadataMergedDaily1, metadataMergedDaily2,
-            metadataMergedDaily3, metadataMergedDaily4, 
metadataMergedMonthly1));
+    metadataMergedMonthly1.setCustomMap(
+        
ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 MONTHLY));
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, 
metadata5, metadataMergedDaily1,
+            metadataMergedDaily2, metadataMergedDaily3, metadataMergedDaily4, 
metadataMergedMonthly1));
+    
when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME,
+        Lists.newArrayList(segmentName1, segmentName2, segmentName3, 
segmentName4, segmentName5,
+            segmentNameMergedDaily1, segmentNameMergedDaily2, 
segmentNameMergedDaily3, segmentNameMergedDaily4,
+            segmentNameMergedMonthly1)));
 
     
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
         new LineageEntry(Arrays.asList(segmentNameMergedDaily3, segmentName5),
@@ -852,12 +892,12 @@ public class MergeRollupTaskGeneratorTest {
 
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
 
-    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME)).getWatermarkMap()
-        .get(DAILY).longValue(), 2_592_000_000L); // 30 days since epoch
-    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
-        
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, 
OFFLINE_TABLE_NAME)).getWatermarkMap()
-        .get(MONTHLY).longValue(), 0L);
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(
+        
mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+            OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 
2_592_000_000L); // 30 days since epoch
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(
+        
mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE,
+            OFFLINE_TABLE_NAME)).getWatermarkMap().get(MONTHLY).longValue(), 
0L);
     assertEquals(pinotTaskConfigs.size(), 0);
   }
 
@@ -871,4 +911,13 @@ public class MergeRollupTaskGeneratorTest {
     segmentZKMetadata.setTotalDocs(1000);
     return segmentZKMetadata;
   }
+
+  private IdealState getIdealState(String tableName, List<String> 
segmentNames) {
+    IdealState idealState = new IdealState(tableName);
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    for (String segmentName : segmentNames) {
+      idealState.setPartitionState(segmentName, "Server_0", "ONLINE");
+    }
+    return idealState;
+  }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index 0fe1fb8c89..84169f3b93 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
@@ -85,6 +86,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
         getSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 5000, 
50_000, TimeUnit.MILLISECONDS, null);
     when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(segmentZKMetadata));
+    when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME))
+        .thenReturn(getIdealState(REALTIME_TABLE_NAME, 
Lists.newArrayList(segmentZKMetadata.getSegmentName())));
 
     RealtimeToOfflineSegmentsTaskGenerator generator = new 
RealtimeToOfflineSegmentsTaskGenerator();
     generator.init(mockClusterInfoProvide);
@@ -139,6 +142,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
             null);
     when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(segmentZKMetadata));
+    when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME))
+        .thenReturn(getIdealState(REALTIME_TABLE_NAME, 
Lists.newArrayList(segmentZKMetadata.getSegmentName())));
 
     RealtimeToOfflineSegmentsTaskGenerator generator = new 
RealtimeToOfflineSegmentsTaskGenerator();
     generator.init(mockClusterInfoProvide);
@@ -175,6 +180,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
     
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
 HashMap<>());
     
when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList());
+    when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME))
+        .thenReturn(getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList()));
 
     RealtimeToOfflineSegmentsTaskGenerator generator = new 
RealtimeToOfflineSegmentsTaskGenerator();
     generator.init(mockClusterInfoProvide);
@@ -186,6 +193,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
         getSegmentZKMetadata("testTable__0__0__12345", Status.IN_PROGRESS, -1, 
-1, TimeUnit.MILLISECONDS, null);
     when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(segmentZKMetadata1));
+    when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME))
+        .thenReturn(getIdealState(REALTIME_TABLE_NAME, 
Lists.newArrayList(segmentZKMetadata1.getSegmentName())));
 
     generator = new RealtimeToOfflineSegmentsTaskGenerator();
     generator.init(mockClusterInfoProvide);
@@ -199,6 +208,9 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
         getSegmentZKMetadata("testTable__1__1__13456", Status.IN_PROGRESS, -1, 
-1, TimeUnit.MILLISECONDS, null);
     when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(segmentZKMetadata1, segmentZKMetadata2, 
segmentZKMetadata3));
+    
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
+        Lists.newArrayList(segmentZKMetadata1.getSegmentName(), 
segmentZKMetadata2.getSegmentName(),
+            segmentZKMetadata3.getSegmentName())));
 
     generator = new RealtimeToOfflineSegmentsTaskGenerator();
     generator.init(mockClusterInfoProvide);
@@ -223,6 +235,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
             TimeUnit.MILLISECONDS, "download2"); // 21 May 2020 8am to 22 May 
2020 8am UTC
     when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(segmentZKMetadata1, 
segmentZKMetadata2));
+    
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
+        Lists.newArrayList(segmentZKMetadata1.getSegmentName(), 
segmentZKMetadata2.getSegmentName())));
 
     // StartTime calculated using segment metadata
     Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
@@ -248,6 +262,9 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
         "download2"); // 21 May 2020 8am to 22 May 2020 8am UTC
     when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(segmentZKMetadata1, 
segmentZKMetadata2));
+    
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
+        Lists.newArrayList(segmentZKMetadata1.getSegmentName(), 
segmentZKMetadata2.getSegmentName())));
+
     generator = new RealtimeToOfflineSegmentsTaskGenerator();
     generator.init(mockClusterInfoProvide);
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
@@ -279,6 +296,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
             TimeUnit.MILLISECONDS, "download2"); // 05-21-2020T08:00:00 UTC to 
05-22-2020T08:00:00 UTC
     when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(segmentZKMetadata1, 
segmentZKMetadata2));
+    
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
+        Lists.newArrayList(segmentZKMetadata1.getSegmentName(), 
segmentZKMetadata2.getSegmentName())));
 
     // Default configs
     Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
@@ -372,6 +391,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
         getSegmentZKMetadata("testTable__0__1__12345", Status.IN_PROGRESS, -1, 
-1, TimeUnit.MILLISECONDS, null);
     when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(segmentZKMetadata1, 
segmentZKMetadata2));
+    
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
+        Lists.newArrayList(segmentZKMetadata1.getSegmentName(), 
segmentZKMetadata2.getSegmentName())));
 
     RealtimeToOfflineSegmentsTaskGenerator generator = new 
RealtimeToOfflineSegmentsTaskGenerator();
     generator.init(mockClusterInfoProvide);
@@ -386,6 +407,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
         getSegmentZKMetadata("testTable__0__1__12345", Status.IN_PROGRESS, -1, 
-1, TimeUnit.MILLISECONDS, null);
     when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(segmentZKMetadata1, 
segmentZKMetadata2));
+    
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
+        Lists.newArrayList(segmentZKMetadata1.getSegmentName(), 
segmentZKMetadata2.getSegmentName())));
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertTrue(pinotTaskConfigs.isEmpty());
 
@@ -396,6 +419,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
         getSegmentZKMetadata("testTable__0__1__12345", Status.IN_PROGRESS, -1, 
-1, TimeUnit.MILLISECONDS, null);
     when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(segmentZKMetadata1, 
segmentZKMetadata2));
+    
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
+        Lists.newArrayList(segmentZKMetadata1.getSegmentName(), 
segmentZKMetadata2.getSegmentName())));
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertEquals(pinotTaskConfigs.size(), 1);
   }
@@ -419,6 +444,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
             TimeUnit.MILLISECONDS, "download2"); // 05-23-2020T08:00:00 UTC to 
05-24-2020T08:00:00 UTC
     
when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
         Collections.singletonList(segmentZKMetadata));
+    
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
+        Lists.newArrayList(segmentZKMetadata.getSegmentName())));
 
     RealtimeToOfflineSegmentsTaskGenerator generator = new 
RealtimeToOfflineSegmentsTaskGenerator();
     generator.init(mockClusterInfoProvide);
@@ -450,6 +477,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
             TimeUnit.MILLISECONDS, null);
     when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(segmentZKMetadata));
+    
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
+        Lists.newArrayList(segmentZKMetadata.getSegmentName())));
 
     RealtimeToOfflineSegmentsTaskGenerator generator = new 
RealtimeToOfflineSegmentsTaskGenerator();
     generator.init(mockClusterInfoProvide);
@@ -472,6 +501,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
             TimeUnit.MILLISECONDS, null);
     when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(segmentZKMetadata));
+    
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
+        Lists.newArrayList(segmentZKMetadata.getSegmentName())));
 
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertTrue(pinotTaskConfigs.isEmpty());
@@ -487,4 +518,13 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     realtimeSegmentZKMetadata.setDownloadUrl(downloadURL);
     return realtimeSegmentZKMetadata;
   }
+
+  private IdealState getIdealState(String tableName, List<String> 
segmentNames) {
+    IdealState idealState = new IdealState(tableName);
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    for (String segmentName: segmentNames) {
+      idealState.setPartitionState(segmentName, "Server_0", "ONLINE");
+    }
+    return idealState;
+  }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index 9c93979d9f..03908a958e 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.common.MinionConstants;
@@ -130,6 +131,9 @@ public class UpsertCompactionTaskGeneratorTest {
   public void testGenerateTasksWithNoSegments() {
     
when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
         Lists.newArrayList(Collections.emptyList()));
+    
when(_mockClusterInfoAccessor.getIdealState(REALTIME_TABLE_NAME)).thenReturn(
+        getIdealState(REALTIME_TABLE_NAME, 
Lists.newArrayList(Collections.emptyList())));
+
     _taskGenerator.init(_mockClusterInfoAccessor);
 
     List<PinotTaskConfig> pinotTaskConfigs = 
_taskGenerator.generateTasks(Lists.newArrayList(_tableConfig));
@@ -143,6 +147,9 @@ public class UpsertCompactionTaskGeneratorTest {
     
consumingSegment.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
     
when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
         Lists.newArrayList(consumingSegment));
+    
when(_mockClusterInfoAccessor.getIdealState(REALTIME_TABLE_NAME)).thenReturn(
+        getIdealState(REALTIME_TABLE_NAME, 
Lists.newArrayList("testTable__0")));
+
     _taskGenerator.init(_mockClusterInfoAccessor);
 
     List<PinotTaskConfig> pinotTaskConfigs = 
_taskGenerator.generateTasks(Lists.newArrayList(_tableConfig));
@@ -154,6 +161,9 @@ public class UpsertCompactionTaskGeneratorTest {
   public void testGenerateTasksWithNewlyCompletedSegment() {
     
when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
         Lists.newArrayList(_completedSegment));
+    
when(_mockClusterInfoAccessor.getIdealState(REALTIME_TABLE_NAME)).thenReturn(
+        getIdealState(REALTIME_TABLE_NAME, 
Lists.newArrayList(_completedSegment.getSegmentName())));
+
     _taskGenerator.init(_mockClusterInfoAccessor);
 
     List<PinotTaskConfig> pinotTaskConfigs = 
_taskGenerator.generateTasks(Lists.newArrayList(_tableConfig));
@@ -270,4 +280,13 @@ public class UpsertCompactionTaskGeneratorTest {
     }
     return compactionConfigs;
   }
+
+  private IdealState getIdealState(String tableName, List<String> 
segmentNames) {
+    IdealState idealState = new IdealState(tableName);
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    for (String segmentName: segmentNames) {
+      idealState.setPartitionState(segmentName, "Server_0", "ONLINE");
+    }
+    return idealState;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to