This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 54b74803e4 Fix the way of fetching the segment zk metadata for task generators (#11832) 54b74803e4 is described below commit 54b74803e467c42176a406ac4b65ab6c74643693 Author: Seunghyun Lee <seungh...@startree.ai> AuthorDate: Thu Oct 19 09:13:16 2023 -0700 Fix the way of fetching the segment zk metadata for task generators (#11832) * Fix the way of fetching the segment zk metadata for task generators Currently, we fetch the list of segment zk metadata directly from the property store. This can cause the issue when the segment deletion process fails in the middle (deleted the segment from idealstate while the zk metadata is not cleaned up). 1. In order to avoid scheduling dangling segment, we should compute the intersection of segment zk metadata and idealstate. 2. Enhance the outputDir path formulation to become less error prone * Addressing the comments --- .../helix/core/PinotHelixResourceManager.java | 79 +++--- .../helix/core/minion/ClusterInfoAccessor.java | 5 + .../core/minion/generator/BaseTaskGenerator.java | 26 ++ .../BaseMultipleSegmentsConversionExecutor.java | 3 +- .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 17 +- .../mergerollup/MergeRollupTaskGenerator.java | 2 +- .../minion/tasks/purge/PurgeTaskGenerator.java | 4 +- .../RealtimeToOfflineSegmentsTaskGenerator.java | 2 +- .../SegmentGenerationAndPushTaskGenerator.java | 2 +- .../UpsertCompactionTaskGenerator.java | 2 +- .../mergerollup/MergeRollupTaskGeneratorTest.java | 289 ++++++++++++--------- ...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 40 +++ .../UpsertCompactionTaskGeneratorTest.java | 19 ++ 13 files changed, 328 insertions(+), 162 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index a85401d706..95db8d35f9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -235,8 +235,8 @@ public class PinotHelixResourceManager { private final LineageManager _lineageManager; public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir, - boolean isSingleTenantCluster, boolean enableBatchMessageMode, - int deletedSegmentsRetentionInDays, boolean enableTieredSegmentAssignment, LineageManager lineageManager) { + boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, + boolean enableTieredSegmentAssignment, LineageManager lineageManager) { _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL); _helixClusterName = helixClusterName; _dataDir = dataDir; @@ -315,8 +315,8 @@ public class PinotHelixResourceManager { // Initialize TableCache HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_helixClusterName).build(); - Map<String, String> configs = _helixAdmin.getConfig(helixConfigScope, - Arrays.asList(Helix.ENABLE_CASE_INSENSITIVE_KEY)); + Map<String, String> configs = + _helixAdmin.getConfig(helixConfigScope, Arrays.asList(Helix.ENABLE_CASE_INSENSITIVE_KEY)); boolean caseInsensitive = Boolean.parseBoolean(configs.getOrDefault(Helix.ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(Helix.DEFAULT_ENABLE_CASE_INSENSITIVE))); _tableCache = new TableCache(_propertyStore, caseInsensitive); @@ -470,8 +470,8 @@ public class PinotHelixResourceManager { } public List<String> getAllBrokerInstances() { - return HelixHelper.getAllInstances(_helixAdmin, _helixClusterName).stream() - .filter(InstanceTypeUtils::isBroker).collect(Collectors.toList()); + return HelixHelper.getAllInstances(_helixAdmin, _helixClusterName).stream().filter(InstanceTypeUtils::isBroker) + .collect(Collectors.toList()); } public List<InstanceConfig> getAllBrokerInstanceConfigs() { @@ -808,21 +808,33 @@ public class PinotHelixResourceManager { IdealState idealState = getTableIdealState(tableNameWithType); Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType); List<String> segments = new ArrayList<>(idealState.getPartitionSet()); - if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) { - return shouldExcludeReplacedSegments ? excludeReplacedSegments(tableNameWithType, segments) : segments; - } else { - List<String> selectedSegments = new ArrayList<>(); - List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType); - for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { - String segmentName = segmentZKMetadata.getSegmentName(); - if (segments.contains(segmentName) && isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, - excludeOverlapping)) { + List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType); + List<String> selectedSegments = new ArrayList<>(); + ArrayList<String> filteredSegments = new ArrayList<>(); + for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + String segmentName = segmentZKMetadata.getSegmentName(); + // Compute the interesction of segmentZK metadata and idealstate for valid segmnets + if (!segments.contains(segmentName)) { + filteredSegments.add(segmentName); + continue; + } + // No need to filter by time if the time range is not specified + if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) { + selectedSegments.add(segmentName); + } else { + // Filter by time if the time range is specified + if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) { selectedSegments.add(segmentName); } } - return shouldExcludeReplacedSegments ? excludeReplacedSegments(tableNameWithType, selectedSegments) - : selectedSegments; } + LOGGER.info( + "Successfully computed the segments for table : {}. # of filtered segments: {}, the filtered segment list: " + + "{}. Only showing up to 100 filtered segments.", tableNameWithType, filteredSegments.size(), + (filteredSegments.size() > 0) ? filteredSegments.subList(0, Math.min(filteredSegments.size(), 100)) + : filteredSegments); + return shouldExcludeReplacedSegments ? excludeReplacedSegments(tableNameWithType, selectedSegments) + : selectedSegments; } /** @@ -3182,8 +3194,8 @@ public class PinotHelixResourceManager { @VisibleForTesting void updateTargetTier(String rebalanceJobId, String tableNameWithType, TableConfig tableConfig) { List<TierConfig> tierCfgs = tableConfig.getTierConfigsList(); - List<Tier> sortedTiers = tierCfgs == null ? Collections.emptyList() - : TierConfigUtils.getSortedTiers(tierCfgs, _helixZkManager); + List<Tier> sortedTiers = + tierCfgs == null ? Collections.emptyList() : TierConfigUtils.getSortedTiers(tierCfgs, _helixZkManager); LOGGER.info("For rebalanceId: {}, updating target tiers for segments of table: {} with tierConfigs: {}", rebalanceJobId, tableNameWithType, sortedTiers); for (String segmentName : getSegmentsFor(tableNameWithType, true)) { @@ -3463,9 +3475,9 @@ public class PinotHelixResourceManager { // Add segments for proactive clean-up. segmentsToCleanUp.addAll(segmentsToForEntryToRevert); - } else if (lineageEntry.getState() == LineageEntryState.COMPLETED - && "REFRESH".equalsIgnoreCase(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) - && CollectionUtils.isEqualCollection(segmentsFrom, lineageEntry.getSegmentsTo())) { + } else if (lineageEntry.getState() == LineageEntryState.COMPLETED && "REFRESH".equalsIgnoreCase( + IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) && CollectionUtils.isEqualCollection( + segmentsFrom, lineageEntry.getSegmentsTo())) { // This part of code assumes that we only allow at most 2 data snapshots at a time by proactively // deleting the older snapshots (for REFRESH tables). // @@ -3514,8 +3526,8 @@ public class PinotHelixResourceManager { segmentLineage.addLineageEntry(segmentLineageEntryId, new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis())); - _lineageManager - .updateLineageForStartReplaceSegments(tableConfig, segmentLineageEntryId, customMap, segmentLineage); + _lineageManager.updateLineageForStartReplaceSegments(tableConfig, segmentLineageEntryId, customMap, + segmentLineage); // Write back to the lineage entry to the property store if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) { // Trigger the proactive segment clean up if needed. Once the lineage is updated in the property store, it @@ -3768,8 +3780,7 @@ public class PinotHelixResourceManager { * @param customMap */ private boolean writeLineageEntryWithTightLoop(TableConfig tableConfig, String lineageEntryId, - LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch, - ZkHelixPropertyStore<ZNRecord> propertyStore, + LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch, ZkHelixPropertyStore<ZNRecord> propertyStore, LineageUpdateType lineageUpdateType, Map<String, String> customMap) { for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) { // Fetch the segment lineage @@ -3792,16 +3803,16 @@ public class PinotHelixResourceManager { segmentLineageToUpdate.updateLineageEntry(lineageEntryId, lineageEntryToUpdate); switch (lineageUpdateType) { case START: - _lineageManager - .updateLineageForStartReplaceSegments(tableConfig, lineageEntryId, customMap, segmentLineageToUpdate); + _lineageManager.updateLineageForStartReplaceSegments(tableConfig, lineageEntryId, customMap, + segmentLineageToUpdate); break; case END: - _lineageManager - .updateLineageForEndReplaceSegments(tableConfig, lineageEntryId, customMap, segmentLineageToUpdate); + _lineageManager.updateLineageForEndReplaceSegments(tableConfig, lineageEntryId, customMap, + segmentLineageToUpdate); break; case REVERT: - _lineageManager - .updateLineageForRevertReplaceSegments(tableConfig, lineageEntryId, customMap, segmentLineageToUpdate); + _lineageManager.updateLineageForRevertReplaceSegments(tableConfig, lineageEntryId, customMap, + segmentLineageToUpdate); break; default: } @@ -4034,8 +4045,8 @@ public class PinotHelixResourceManager { } } for (TableConfig tableConfig : getAllTableConfigs()) { - String tag = TagNameUtils.getServerTagForTenant(tableConfig.getTenantConfig().getServer(), - tableConfig.getTableType()); + String tag = + TagNameUtils.getServerTagForTenant(tableConfig.getTenantConfig().getServer(), tableConfig.getTableType()); tagMinInstanceMap.put(tag, Math.max(tagMinInstanceMap.getOrDefault(tag, 0), tableConfig.getReplication())); String brokerTag = TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker()); tagMinInstanceMap.put(brokerTag, 1); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java index 954c2ced38..3ec8ea5147 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.Executor; import javax.annotation.Nullable; import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.IdealState; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.task.TaskState; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -101,6 +102,10 @@ public class ClusterInfoAccessor { return ZKMetadataProvider.getSegmentsZKMetadata(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType); } + public IdealState getIdealState(String tableNameWithType) { + return _pinotHelixResourceManager.getTableIdealState(tableNameWithType); + } + /** * Get shared executor */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java index 51cbe542bf..b35bb54423 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java @@ -18,9 +18,13 @@ */ package org.apache.pinot.controller.helix.core.minion.generator; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.helix.model.IdealState; import org.apache.helix.task.JobConfig; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.controller.api.exception.UnknownTaskTypeException; import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; import org.apache.pinot.core.common.MinionConstants; @@ -89,6 +93,28 @@ public abstract class BaseTaskGenerator implements PinotTaskGenerator { return MinionConstants.DEFAULT_MAX_ATTEMPTS_PER_TASK; } + /** + * Returns the list of segment zk metadata for available segments in the table. The list does NOT filter out inactive + * segments based on the lineage. In order to compute the valid segments, we look at both idealstate and segment + * zk metadata in the property store and compute the intersection. In this way, we can avoid picking the dangling + * segments. + * + * @param tableNameWithType + * @return the list of segment zk metadata for available segments in the table. + */ + public List<SegmentZKMetadata> getSegmentsZKMetadataForTable(String tableNameWithType) { + IdealState idealState = _clusterInfoAccessor.getIdealState(tableNameWithType); + Set<String> segmentsForTable = idealState.getPartitionSet(); + List<SegmentZKMetadata> segmentZKMetadataList = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType); + List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>(); + for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())) { + selectedSegmentZKMetadataList.add(segmentZKMetadata); + } + } + return selectedSegmentZKMetadataList; + } + @Override public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String, String> taskConfigs) throws Exception { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java index 99b1341f6d..6b439add13 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java @@ -393,7 +393,8 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe throws Exception { URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) { - URI outputSegmentTarURI = URI.create(outputSegmentDirURI + localSegmentTarFile.getName()); + URI outputSegmentTarURI = + URI.create(MinionTaskUtils.normalizeDirectoryURI(outputSegmentDirURI) + localSegmentTarFile.getName()); if (!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) && outputFileFS.exists( outputSegmentTarURI)) { throw new RuntimeException(String.format("Output file: %s already exists. " diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index 0d8c1e375c..a46f903ed9 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -29,6 +29,7 @@ import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory; public class MinionTaskUtils { private static final Logger LOGGER = LoggerFactory.getLogger(MinionTaskUtils.class); + private static final String DEFAULT_DIR_PATH_TERMINATOR = "/"; + private MinionTaskUtils() { } @@ -84,7 +87,8 @@ public class MinionTaskUtils { singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.toString()); } else { - URI outputDirURI = URI.create(clusterInfoAccessor.getDataDir() + "/" + tableName); + URI outputDirURI = URI.create( + normalizeDirectoryURI(clusterInfoAccessor.getDataDir()) + TableNameBuilder.extractRawTableName(tableName)); String outputDirURIScheme = outputDirURI.getScheme(); if (!isLocalOutputDir(outputDirURIScheme)) { @@ -115,4 +119,15 @@ public class MinionTaskUtils { public static PinotFS getLocalPinotFs() { return new LocalPinotFS(); } + + public static String normalizeDirectoryURI(URI dirURI) { + return normalizeDirectoryURI(dirURI.toString()); + } + + public static String normalizeDirectoryURI(String dirInStr) { + if (!dirInStr.endsWith(DEFAULT_DIR_PATH_TERMINATOR)) { + return dirInStr + DEFAULT_DIR_PATH_TERMINATOR; + } + return dirInStr; + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java index 7d87abc96d..0b09bf6d35 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java @@ -157,7 +157,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { LOGGER.info("Start generating task configs for table: {} for task: {}", tableNameWithType, taskType); // Get all segment metadata - List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType); + List<SegmentZKMetadata> allSegments = getSegmentsZKMetadataForTable(tableNameWithType); // Filter segments based on status List<SegmentZKMetadata> preSelectedSegmentsBasedOnStatus = filterSegmentsBasedOnStatus(tableConfig.getTableType(), allSegments); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java index c5cbe4de6e..02b9a47283 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java @@ -90,7 +90,7 @@ public class PurgeTaskGenerator extends BaseTaskGenerator { } List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); if (tableConfig.getTableType() == TableType.REALTIME) { - List<SegmentZKMetadata> segmentsZKMetadataAll = _clusterInfoAccessor.getSegmentsZKMetadata(tableName); + List<SegmentZKMetadata> segmentsZKMetadataAll = getSegmentsZKMetadataForTable(tableName); for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadataAll) { CommonConstants.Segment.Realtime.Status status = segmentZKMetadata.getStatus(); if (status.isCompleted()) { @@ -98,7 +98,7 @@ public class PurgeTaskGenerator extends BaseTaskGenerator { } } } else { - segmentsZKMetadata = _clusterInfoAccessor.getSegmentsZKMetadata(tableName); + segmentsZKMetadata = getSegmentsZKMetadataForTable(tableName); } List<SegmentZKMetadata> purgedSegmentsZKMetadata = new ArrayList<>(); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java index 51be9b1ce3..750cb15416 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java @@ -246,7 +246,7 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends BaseTaskGenerator { */ private void getCompletedSegmentsInfo(String realtimeTableName, List<SegmentZKMetadata> completedSegmentsZKMetadata, Map<Integer, String> partitionToLatestLLCSegmentName, Set<Integer> allPartitions) { - List<SegmentZKMetadata> segmentsZKMetadata = _clusterInfoAccessor.getSegmentsZKMetadata(realtimeTableName); + List<SegmentZKMetadata> segmentsZKMetadata = getSegmentsZKMetadataForTable(realtimeTableName); Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>(); for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java index 268f84456a..c5e662e285 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java @@ -149,7 +149,7 @@ public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator { List<SegmentZKMetadata> segmentsZKMetadata = Collections.emptyList(); // For append mode, we don't create segments for input file URIs already created. if (BatchConfigProperties.SegmentIngestionType.APPEND.name().equalsIgnoreCase(batchSegmentIngestionType)) { - segmentsZKMetadata = _clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName); + segmentsZKMetadata = getSegmentsZKMetadataForTable(offlineTableName); } Set<String> existingSegmentInputFiles = getExistingSegmentInputFiles(segmentsZKMetadata); Set<String> inputFilesFromRunningTasks = getInputFilesFromRunningTasks(offlineTableName); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java index aa84ffefa5..590102319e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java @@ -232,7 +232,7 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { List<SegmentZKMetadata> completedSegments = new ArrayList<>(); String bufferPeriod = taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD); long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod); - List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType); + List<SegmentZKMetadata> allSegments = getSegmentsZKMetadataForTable(tableNameWithType); for (SegmentZKMetadata segment : allSegments) { CommonConstants.Segment.Realtime.Status status = segment.getStatus(); // initial segments selection based on status and age diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java index c367bda735..6c1372f56e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import org.apache.helix.model.IdealState; import org.apache.helix.task.TaskState; import org.apache.pinot.common.lineage.LineageEntry; import org.apache.pinot.common.lineage.LineageEntryState; @@ -84,40 +85,31 @@ public class MergeRollupTaskGeneratorTest { @Test public void testValidateIfMergeRollupCanBeEnabledOrNot() { - TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) - .setTableName(RAW_TABLE_NAME) - .setTimeColumnName(TIME_COLUMN_NAME) - .build(); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) + .build(); assertTrue(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE)); IngestionConfig ingestionConfig = new IngestionConfig(); - ingestionConfig.setBatchIngestionConfig( - new BatchIngestionConfig(Collections.emptyList(), "REFRESH", "daily")); - tableConfig = new TableConfigBuilder(TableType.OFFLINE) - .setTableName(RAW_TABLE_NAME) - .setTimeColumnName(TIME_COLUMN_NAME) - .setIngestionConfig(ingestionConfig) - .build(); + ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(Collections.emptyList(), "REFRESH", "daily")); + tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) + .setIngestionConfig(ingestionConfig).build(); assertFalse(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE)); - tableConfig = new TableConfigBuilder(TableType.REALTIME) - .setTableName(RAW_TABLE_NAME) - .setTimeColumnName(TIME_COLUMN_NAME) - .build(); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) + .build(); assertTrue(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE)); - tableConfig = new TableConfigBuilder(TableType.REALTIME) - .setTableName(RAW_TABLE_NAME) - .setTimeColumnName(TIME_COLUMN_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) - .build(); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build(); assertFalse(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE)); - tableConfig = new TableConfigBuilder(TableType.REALTIME) - .setTableName(RAW_TABLE_NAME) - .setTimeColumnName(TIME_COLUMN_NAME) - .setDedupConfig(new DedupConfig(true, HashFunction.MD5)) - .build(); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) + .setDedupConfig(new DedupConfig(true, HashFunction.MD5)).build(); assertFalse(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE)); } @@ -135,13 +127,17 @@ public class MergeRollupTaskGeneratorTest { realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS); SegmentZKMetadata realtimeTableSegmentMetadata2 = getSegmentZKMetadata("testTable__1__0__0", 5000, 50_000, TimeUnit.MILLISECONDS, null); - when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) - .thenReturn(Lists.newArrayList(realtimeTableSegmentMetadata1, realtimeTableSegmentMetadata2)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn( + Lists.newArrayList(realtimeTableSegmentMetadata1, realtimeTableSegmentMetadata2)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn( + getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList("testTable__0", "server0", "ONLINE"))); SegmentZKMetadata offlineTableSegmentMetadata = getSegmentZKMetadata("testTable__0", 5000, 50_000, TimeUnit.MILLISECONDS, null); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(offlineTableSegmentMetadata)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(offlineTableSegmentMetadata)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn( + getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList("testTable__0__0__0", "testTable__1__0__0"))); MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator(); generator.init(mockClusterInfoProvide); @@ -173,9 +169,8 @@ public class MergeRollupTaskGeneratorTest { maxNumRecordsPerSegments); } - private void checkPinotTaskConfig(Map<String, String> pinotTaskConfig, String mergeLevel, - String mergeType, String partitionBucketTimePeriod, String roundBucketTimePeriod, - String maxNumRecordsPerSegments) { + private void checkPinotTaskConfig(Map<String, String> pinotTaskConfig, String mergeLevel, String mergeType, + String partitionBucketTimePeriod, String roundBucketTimePeriod, String maxNumRecordsPerSegments) { assertEquals(pinotTaskConfig.get(MinionConstants.TABLE_NAME_KEY), OFFLINE_TABLE_NAME); assertTrue("true".equalsIgnoreCase(pinotTaskConfig.get(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY))); assertEquals(pinotTaskConfig.get(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY), mergeLevel); @@ -228,15 +223,16 @@ public class MergeRollupTaskGeneratorTest { taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs); TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap); ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(Collections.emptyList())); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(Collections.emptyList())); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(new IdealState(OFFLINE_TABLE_NAME)); mockMergeRollupTaskMetadataGetterAndSetter(mockClusterInfoProvide); MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator(); generator.init(mockClusterInfoProvide); List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); - assertNull(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)); + assertNull(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)); assertEquals(pinotTaskConfigs.size(), 0); } @@ -261,12 +257,14 @@ public class MergeRollupTaskGeneratorTest { getSegmentZKMetadata(segmentName1, currentTime - 500_000L, currentTime, TimeUnit.MILLISECONDS, null); metadata1.setTotalDocs(0); when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(metadata1)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn( + getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1))); MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator(); generator.init(mockClusterInfoProvide); List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); - assertNull(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)); + assertNull(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)); assertEquals(pinotTaskConfigs.size(), 0); } @@ -290,6 +288,8 @@ public class MergeRollupTaskGeneratorTest { SegmentZKMetadata metadata1 = getSegmentZKMetadata(segmentName1, currentTime - 500_000L, currentTime, TimeUnit.MILLISECONDS, null); when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(metadata1)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn( + getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1))); MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator(); generator.init(mockClusterInfoProvide); @@ -321,8 +321,10 @@ public class MergeRollupTaskGeneratorTest { SegmentZKMetadata metadata2 = getSegmentZKMetadata(segmentName2, 86_400_000L, 100_000_000L, TimeUnit.MILLISECONDS, "download2"); metadata2.setTotalDocs(4000000L); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn( + getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, segmentName2))); // Single task MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator(); @@ -338,8 +340,11 @@ public class MergeRollupTaskGeneratorTest { SegmentZKMetadata metadata3 = getSegmentZKMetadata(segmentName3, 86_400_000L, 110_000_000L, TimeUnit.MILLISECONDS, null); metadata3.setTotalDocs(5000000L); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2, metadata3)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn( + getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, segmentName2, segmentName3))); + pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); assertEquals(pinotTaskConfigs.size(), 2); checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", "1d", @@ -380,8 +385,11 @@ public class MergeRollupTaskGeneratorTest { getSegmentZKMetadata(segmentName5, 345_600_000L, 346_000_000L, TimeUnit.MILLISECONDS, "download5"); // No spilled over data - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME, + Lists.newArrayList(segmentName1, segmentName2, segmentName3, segmentName4, segmentName5))); + MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator(); generator.init(mockClusterInfoProvide); List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); @@ -395,8 +403,11 @@ public class MergeRollupTaskGeneratorTest { String segmentName6 = "testTable__6"; SegmentZKMetadata metadata6 = getSegmentZKMetadata(segmentName6, 172_800_000L, 260_000_000L, TimeUnit.MILLISECONDS, null); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadata6)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadata6)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME, + Lists.newArrayList(segmentName1, segmentName2, segmentName3, segmentName4, segmentName5, segmentName6))); + pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); assertEquals(pinotTaskConfigs.size(), 2); checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", "1d", @@ -405,8 +416,10 @@ public class MergeRollupTaskGeneratorTest { null, "1000000"); // Has time bucket without overlapping segments - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata4, metadata5)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2, metadata4, metadata5)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME, + Lists.newArrayList(segmentName1, segmentName2, segmentName3, segmentName4, segmentName5))); pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); assertEquals(pinotTaskConfigs.size(), 3); checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", "1d", @@ -419,8 +432,11 @@ public class MergeRollupTaskGeneratorTest { metadata1.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); metadata2.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); metadata4.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadata6)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadata6)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME, + Lists.newArrayList(segmentName1, segmentName2, segmentName3, segmentName4, segmentName5, segmentName6))); + pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); assertEquals(pinotTaskConfigs.size(), 3); checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName3, DAILY, "concat", "1d", null, "1000000"); @@ -436,9 +452,8 @@ public class MergeRollupTaskGeneratorTest { TreeMap<String, Long> waterMarkMap = new TreeMap<>(); // Watermark for daily is at 30 days since epoch waterMarkMap.put(DAILY, 2_592_000_000L); - when(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)) - .thenReturn(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, waterMarkMap).toZNRecord()); + when(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)).thenReturn(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, waterMarkMap).toZNRecord()); String segmentName7 = "testTable__7"; String segmentName8 = "testTable__8"; @@ -448,8 +463,11 @@ public class MergeRollupTaskGeneratorTest { getSegmentZKMetadata(segmentName8, 2_592_000_000L, 2_600_000_000L, TimeUnit.MILLISECONDS, "download8"); metadata7.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); metadata8.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata7, metadata8)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata7, metadata8)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn( + getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName7, segmentName8))); + pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); assertEquals(pinotTaskConfigs.size(), 1); checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName7, MONTHLY, "concat", "30d", null, "1000000"); @@ -494,8 +512,10 @@ public class MergeRollupTaskGeneratorTest { metadata4.setPartitionMetadata(new SegmentPartitionMetadata(Collections.singletonMap("memberId", new ColumnPartitionMetadata("murmur", 10, Collections.singleton(1), null)))); ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, metadata4)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2, metadata3, metadata4)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn( + getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, segmentName2, segmentName3, segmentName4))); MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator(); generator.init(mockClusterInfoProvide); @@ -574,17 +594,19 @@ public class MergeRollupTaskGeneratorTest { SegmentZKMetadata metadata2 = getSegmentZKMetadata(segmentName2, 345_600_000L, 400_000_000L, TimeUnit.MILLISECONDS, null); ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2)); mockMergeRollupTaskMetadataGetterAndSetter(mockClusterInfoProvide); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn( + getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, segmentName2))); // Cold start, set watermark to smallest segment metadata start time round off to the nearest bucket boundary MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator(); generator.init(mockClusterInfoProvide); List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); - assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap() - .get(DAILY).longValue(), 86_400_000L); + assertEquals(MergeRollupTaskMetadata.fromZNRecord( + mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 86_400_000L); assertEquals(pinotTaskConfigs.size(), 1); checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1, DAILY, "concat", "1d", null, "1000000"); @@ -595,9 +617,9 @@ public class MergeRollupTaskGeneratorTest { MinionConstants.MergeRollupTask.TASK_TYPE, -1); metadata1.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); - assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap() - .get(DAILY).longValue(), 345_600_000L); + assertEquals(MergeRollupTaskMetadata.fromZNRecord( + mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 345_600_000L); assertEquals(pinotTaskConfigs.size(), 1); checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName2, DAILY, "concat", "1d", null, "1000000"); @@ -607,9 +629,9 @@ public class MergeRollupTaskGeneratorTest { MinionConstants.MergeRollupTask.TASK_TYPE, -1); metadata2.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); - assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap() - .get(DAILY).longValue(), 345_600_000L); + assertEquals(MergeRollupTaskMetadata.fromZNRecord( + mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 345_600_000L); assertEquals(pinotTaskConfigs.size(), 0); } @@ -640,9 +662,8 @@ public class MergeRollupTaskGeneratorTest { ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class); Map<String, Long> waterMarkMap = new TreeMap<>(); waterMarkMap.put(DAILY, 86_400_000L); - when(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)) - .thenReturn(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, waterMarkMap).toZNRecord()); + when(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)).thenReturn(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, waterMarkMap).toZNRecord()); Map<String, TaskState> taskStatesMap = new HashMap<>(); String taskName = "Task_MergeRollupTask_" + System.currentTimeMillis(); @@ -651,12 +672,15 @@ public class MergeRollupTaskGeneratorTest { taskConfigs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, DAILY); taskConfigs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName1); when(mockClusterInfoProvide.getTaskStates(MinionConstants.MergeRollupTask.TASK_TYPE)).thenReturn(taskStatesMap); - when(mockClusterInfoProvide.getTaskConfigs(taskName)) - .thenReturn(Lists.newArrayList(new PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, taskConfigs))); + when(mockClusterInfoProvide.getTaskConfigs(taskName)).thenReturn( + Lists.newArrayList(new PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, taskConfigs))); // If same task and table, IN_PROGRESS, then don't generate again - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn( + getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, segmentName2))); + taskStatesMap.put(taskName, TaskState.IN_PROGRESS); MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator(); generator.init(mockClusterInfoProvide); @@ -664,8 +688,11 @@ public class MergeRollupTaskGeneratorTest { assertTrue(pinotTaskConfigs.isEmpty()); // If same task and table, IN_PROGRESS, but older than 1 day, generate - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn( + getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, segmentName2))); + String oldTaskName = "Task_MergeRollupTask_" + (System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3)); taskStatesMap.remove(taskName); taskStatesMap.put(oldTaskName, TaskState.IN_PROGRESS); @@ -674,10 +701,12 @@ public class MergeRollupTaskGeneratorTest { checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1, DAILY, "concat", "1d", null, "1000000"); // If same task and table, but COMPLETED, generate - mergedMetadata1 - .setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2, mergedMetadata1)); + mergedMetadata1.setCustomMap( + ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2, mergedMetadata1)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn( + getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, segmentName2, mergedSegmentName1))); SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME); segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), new LineageEntry(Collections.singletonList(segmentName1), Collections.singletonList(mergedSegmentName1), @@ -730,17 +759,19 @@ public class MergeRollupTaskGeneratorTest { getSegmentZKMetadata(segmentName5, 2_592_000_000L, 2_592_020_000L, TimeUnit.MILLISECONDS, null); // starts 30 days since epoch ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME, + Lists.newArrayList(segmentName1, segmentName2, segmentName3, segmentName4, segmentName5))); mockMergeRollupTaskMetadataGetterAndSetter(mockClusterInfoProvide); // Cold start only schedule daily merge tasks MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator(); generator.init(mockClusterInfoProvide); List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); - assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap() - .get(DAILY).longValue(), 86_400_000L); + assertEquals(MergeRollupTaskMetadata.fromZNRecord( + mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 86_400_000L); assertEquals(pinotTaskConfigs.size(), 1); Map<String, String> taskConfigsDaily1 = pinotTaskConfigs.get(0).getConfigs(); checkPinotTaskConfig(taskConfigsDaily1, segmentName1 + "," + segmentName2 + "," + segmentName3, DAILY, "concat", @@ -751,17 +782,19 @@ public class MergeRollupTaskGeneratorTest { String segmentNameMergedDaily1 = "merged_testTable__1__2__3"; SegmentZKMetadata metadataMergedDaily1 = getSegmentZKMetadata(segmentNameMergedDaily1, 86_400_000L, 110_000_000L, TimeUnit.MILLISECONDS, null); - metadataMergedDaily1 - .setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) - .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadataMergedDaily1)); + metadataMergedDaily1.setCustomMap( + ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadataMergedDaily1)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME, + Lists.newArrayList(segmentName1, segmentName2, segmentName3, segmentName4, segmentName5, + segmentNameMergedDaily1))); SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME); segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), new LineageEntry(Arrays.asList(segmentName1, segmentName2, segmentName3), Collections.singletonList(segmentNameMergedDaily1), LineageEntryState.COMPLETED, 11111L)); when(mockClusterInfoProvide.getSegmentLineage(OFFLINE_TABLE_NAME)).thenReturn(segmentLineage); - Map<String, TaskState> taskStatesMap = new HashMap<>(); String taskName1 = "Task_MergeRollupTask_1"; taskStatesMap.put(taskName1, TaskState.COMPLETED); @@ -771,9 +804,9 @@ public class MergeRollupTaskGeneratorTest { pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); - assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap() - .get(DAILY).longValue(), 2_505_600_000L); + assertEquals(MergeRollupTaskMetadata.fromZNRecord( + mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 2_505_600_000L); assertEquals(pinotTaskConfigs.size(), 1); Map<String, String> taskConfigsDaily2 = pinotTaskConfigs.get(0).getConfigs(); checkPinotTaskConfig(taskConfigsDaily2, segmentName4, DAILY, "concat", "1d", null, "1000000"); @@ -782,16 +815,19 @@ public class MergeRollupTaskGeneratorTest { String segmentNameMergedDaily2 = "merged_testTable__4_1"; SegmentZKMetadata metadataMergedDaily2 = getSegmentZKMetadata(segmentNameMergedDaily2, 2_505_600_000L, 2_591_999_999L, TimeUnit.MILLISECONDS, null); - metadataMergedDaily2 - .setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); + metadataMergedDaily2.setCustomMap( + ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); String segmentNameMergedDaily3 = "merged_testTable__4_2"; SegmentZKMetadata metadataMergedDaily3 = getSegmentZKMetadata(segmentNameMergedDaily3, 2_592_000_000L, 2_592_010_000L, TimeUnit.MILLISECONDS, null); - metadataMergedDaily3 - .setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists - .newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadataMergedDaily1, metadataMergedDaily2, - metadataMergedDaily3)); + metadataMergedDaily3.setCustomMap( + ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadataMergedDaily1, + metadataMergedDaily2, metadataMergedDaily3)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME, + Lists.newArrayList(segmentName1, segmentName2, segmentName3, segmentName4, segmentName5, + segmentNameMergedDaily1, segmentNameMergedDaily2, segmentNameMergedDaily3))); segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), new LineageEntry(Collections.singletonList(segmentName4), @@ -804,12 +840,12 @@ public class MergeRollupTaskGeneratorTest { pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); - assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap() - .get(DAILY).longValue(), 2_592_000_000L); - assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap() - .get(MONTHLY).longValue(), 0L); + assertEquals(MergeRollupTaskMetadata.fromZNRecord( + mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 2_592_000_000L); + assertEquals(MergeRollupTaskMetadata.fromZNRecord( + mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)).getWatermarkMap().get(MONTHLY).longValue(), 0L); assertEquals(pinotTaskConfigs.size(), 2); Map<String, String> taskConfigsDaily3 = pinotTaskConfigs.get(0).getConfigs(); Map<String, String> taskConfigsMonthly1 = pinotTaskConfigs.get(1).getConfigs(); @@ -823,16 +859,20 @@ public class MergeRollupTaskGeneratorTest { String segmentNameMergedDaily4 = "merged_testTable__4_2__5"; SegmentZKMetadata metadataMergedDaily4 = getSegmentZKMetadata(segmentNameMergedDaily4, 2_592_000_000L, 2_592_020_000L, TimeUnit.MILLISECONDS, null); - metadataMergedDaily4 - .setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); + metadataMergedDaily4.setCustomMap( + ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY)); String segmentNameMergedMonthly1 = "merged_testTable__1__2__3__4_1"; SegmentZKMetadata metadataMergedMonthly1 = getSegmentZKMetadata(segmentNameMergedMonthly1, 86_400_000L, 2_591_999_999L, TimeUnit.MILLISECONDS, null); - metadataMergedMonthly1 - .setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, MONTHLY)); - when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists - .newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadataMergedDaily1, metadataMergedDaily2, - metadataMergedDaily3, metadataMergedDaily4, metadataMergedMonthly1)); + metadataMergedMonthly1.setCustomMap( + ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, MONTHLY)); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn( + Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadataMergedDaily1, + metadataMergedDaily2, metadataMergedDaily3, metadataMergedDaily4, metadataMergedMonthly1)); + when(mockClusterInfoProvide.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(getIdealState(OFFLINE_TABLE_NAME, + Lists.newArrayList(segmentName1, segmentName2, segmentName3, segmentName4, segmentName5, + segmentNameMergedDaily1, segmentNameMergedDaily2, segmentNameMergedDaily3, segmentNameMergedDaily4, + segmentNameMergedMonthly1))); segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), new LineageEntry(Arrays.asList(segmentNameMergedDaily3, segmentName5), @@ -852,12 +892,12 @@ public class MergeRollupTaskGeneratorTest { pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); - assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap() - .get(DAILY).longValue(), 2_592_000_000L); // 30 days since epoch - assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide - .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap() - .get(MONTHLY).longValue(), 0L); + assertEquals(MergeRollupTaskMetadata.fromZNRecord( + mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY).longValue(), 2_592_000_000L); // 30 days since epoch + assertEquals(MergeRollupTaskMetadata.fromZNRecord( + mockClusterInfoProvide.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, + OFFLINE_TABLE_NAME)).getWatermarkMap().get(MONTHLY).longValue(), 0L); assertEquals(pinotTaskConfigs.size(), 0); } @@ -871,4 +911,13 @@ public class MergeRollupTaskGeneratorTest { segmentZKMetadata.setTotalDocs(1000); return segmentZKMetadata; } + + private IdealState getIdealState(String tableName, List<String> segmentNames) { + IdealState idealState = new IdealState(tableName); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + for (String segmentName : segmentNames) { + idealState.setPartitionState(segmentName, "Server_0", "ONLINE"); + } + return idealState; + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java index 0fe1fb8c89..84169f3b93 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.helix.model.IdealState; import org.apache.helix.task.TaskState; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; @@ -85,6 +86,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { getSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 5000, 50_000, TimeUnit.MILLISECONDS, null); when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) .thenReturn(Lists.newArrayList(segmentZKMetadata)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)) + .thenReturn(getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList(segmentZKMetadata.getSegmentName()))); RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator(); generator.init(mockClusterInfoProvide); @@ -139,6 +142,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { null); when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) .thenReturn(Lists.newArrayList(segmentZKMetadata)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)) + .thenReturn(getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList(segmentZKMetadata.getSegmentName()))); RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator(); generator.init(mockClusterInfoProvide); @@ -175,6 +180,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class); when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>()); when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList()); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)) + .thenReturn(getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList())); RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator(); generator.init(mockClusterInfoProvide); @@ -186,6 +193,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { getSegmentZKMetadata("testTable__0__0__12345", Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null); when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) .thenReturn(Lists.newArrayList(segmentZKMetadata1)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)) + .thenReturn(getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList(segmentZKMetadata1.getSegmentName()))); generator = new RealtimeToOfflineSegmentsTaskGenerator(); generator.init(mockClusterInfoProvide); @@ -199,6 +208,9 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { getSegmentZKMetadata("testTable__1__1__13456", Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null); when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) .thenReturn(Lists.newArrayList(segmentZKMetadata1, segmentZKMetadata2, segmentZKMetadata3)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME, + Lists.newArrayList(segmentZKMetadata1.getSegmentName(), segmentZKMetadata2.getSegmentName(), + segmentZKMetadata3.getSegmentName()))); generator = new RealtimeToOfflineSegmentsTaskGenerator(); generator.init(mockClusterInfoProvide); @@ -223,6 +235,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { TimeUnit.MILLISECONDS, "download2"); // 21 May 2020 8am to 22 May 2020 8am UTC when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) .thenReturn(Lists.newArrayList(segmentZKMetadata1, segmentZKMetadata2)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME, + Lists.newArrayList(segmentZKMetadata1.getSegmentName(), segmentZKMetadata2.getSegmentName()))); // StartTime calculated using segment metadata Map<String, Map<String, String>> taskConfigsMap = new HashMap<>(); @@ -248,6 +262,9 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { "download2"); // 21 May 2020 8am to 22 May 2020 8am UTC when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) .thenReturn(Lists.newArrayList(segmentZKMetadata1, segmentZKMetadata2)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME, + Lists.newArrayList(segmentZKMetadata1.getSegmentName(), segmentZKMetadata2.getSegmentName()))); + generator = new RealtimeToOfflineSegmentsTaskGenerator(); generator.init(mockClusterInfoProvide); pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig)); @@ -279,6 +296,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { TimeUnit.MILLISECONDS, "download2"); // 05-21-2020T08:00:00 UTC to 05-22-2020T08:00:00 UTC when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) .thenReturn(Lists.newArrayList(segmentZKMetadata1, segmentZKMetadata2)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME, + Lists.newArrayList(segmentZKMetadata1.getSegmentName(), segmentZKMetadata2.getSegmentName()))); // Default configs Map<String, Map<String, String>> taskConfigsMap = new HashMap<>(); @@ -372,6 +391,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { getSegmentZKMetadata("testTable__0__1__12345", Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null); when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) .thenReturn(Lists.newArrayList(segmentZKMetadata1, segmentZKMetadata2)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME, + Lists.newArrayList(segmentZKMetadata1.getSegmentName(), segmentZKMetadata2.getSegmentName()))); RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator(); generator.init(mockClusterInfoProvide); @@ -386,6 +407,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { getSegmentZKMetadata("testTable__0__1__12345", Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null); when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) .thenReturn(Lists.newArrayList(segmentZKMetadata1, segmentZKMetadata2)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME, + Lists.newArrayList(segmentZKMetadata1.getSegmentName(), segmentZKMetadata2.getSegmentName()))); pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig)); assertTrue(pinotTaskConfigs.isEmpty()); @@ -396,6 +419,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { getSegmentZKMetadata("testTable__0__1__12345", Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null); when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) .thenReturn(Lists.newArrayList(segmentZKMetadata1, segmentZKMetadata2)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME, + Lists.newArrayList(segmentZKMetadata1.getSegmentName(), segmentZKMetadata2.getSegmentName()))); pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig)); assertEquals(pinotTaskConfigs.size(), 1); } @@ -419,6 +444,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { TimeUnit.MILLISECONDS, "download2"); // 05-23-2020T08:00:00 UTC to 05-24-2020T08:00:00 UTC when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn( Collections.singletonList(segmentZKMetadata)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME, + Lists.newArrayList(segmentZKMetadata.getSegmentName()))); RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator(); generator.init(mockClusterInfoProvide); @@ -450,6 +477,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { TimeUnit.MILLISECONDS, null); when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) .thenReturn(Lists.newArrayList(segmentZKMetadata)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME, + Lists.newArrayList(segmentZKMetadata.getSegmentName()))); RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator(); generator.init(mockClusterInfoProvide); @@ -472,6 +501,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { TimeUnit.MILLISECONDS, null); when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)) .thenReturn(Lists.newArrayList(segmentZKMetadata)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME, + Lists.newArrayList(segmentZKMetadata.getSegmentName()))); pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig)); assertTrue(pinotTaskConfigs.isEmpty()); @@ -487,4 +518,13 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { realtimeSegmentZKMetadata.setDownloadUrl(downloadURL); return realtimeSegmentZKMetadata; } + + private IdealState getIdealState(String tableName, List<String> segmentNames) { + IdealState idealState = new IdealState(tableName); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + for (String segmentName: segmentNames) { + idealState.setPartitionState(segmentName, "Server_0", "ONLINE"); + } + return idealState; + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java index 9c93979d9f..03908a958e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.helix.model.IdealState; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; import org.apache.pinot.core.common.MinionConstants; @@ -130,6 +131,9 @@ public class UpsertCompactionTaskGeneratorTest { public void testGenerateTasksWithNoSegments() { when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn( Lists.newArrayList(Collections.emptyList())); + when(_mockClusterInfoAccessor.getIdealState(REALTIME_TABLE_NAME)).thenReturn( + getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList(Collections.emptyList()))); + _taskGenerator.init(_mockClusterInfoAccessor); List<PinotTaskConfig> pinotTaskConfigs = _taskGenerator.generateTasks(Lists.newArrayList(_tableConfig)); @@ -143,6 +147,9 @@ public class UpsertCompactionTaskGeneratorTest { consumingSegment.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS); when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn( Lists.newArrayList(consumingSegment)); + when(_mockClusterInfoAccessor.getIdealState(REALTIME_TABLE_NAME)).thenReturn( + getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList("testTable__0"))); + _taskGenerator.init(_mockClusterInfoAccessor); List<PinotTaskConfig> pinotTaskConfigs = _taskGenerator.generateTasks(Lists.newArrayList(_tableConfig)); @@ -154,6 +161,9 @@ public class UpsertCompactionTaskGeneratorTest { public void testGenerateTasksWithNewlyCompletedSegment() { when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn( Lists.newArrayList(_completedSegment)); + when(_mockClusterInfoAccessor.getIdealState(REALTIME_TABLE_NAME)).thenReturn( + getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList(_completedSegment.getSegmentName()))); + _taskGenerator.init(_mockClusterInfoAccessor); List<PinotTaskConfig> pinotTaskConfigs = _taskGenerator.generateTasks(Lists.newArrayList(_tableConfig)); @@ -270,4 +280,13 @@ public class UpsertCompactionTaskGeneratorTest { } return compactionConfigs; } + + private IdealState getIdealState(String tableName, List<String> segmentNames) { + IdealState idealState = new IdealState(tableName); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + for (String segmentName: segmentNames) { + idealState.setPartitionState(segmentName, "Server_0", "ONLINE"); + } + return idealState; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org