davecromberge commented on code in PR #14856: URL: https://github.com/apache/pinot/pull/14856#discussion_r1932256452
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java: ########## @@ -78,16 +79,59 @@ public static Map<String, Map<String, String>> getLevelToConfigMap(Map<String, S return levelToConfigMap; } + /** + * Returns a lookup key composed of the current merge level / key combination + * @param key the key of the value within the task configuration. + * @param taskConfig the current merge rollup task configuration used for sourcing the merge level. + * @return composite lookup key if the merge level is configured. Otherwise, return original key. + */ + public static String buildMergeLevelKeyPrefix(String key, Map<String, String> taskConfig) { + String mergeLevel = taskConfig.get(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY); + if (mergeLevel == null) { + return key; + } else { + return mergeLevel + "." + key; + } + } + /** * Extracts an array of dimensions to reduce/erase from the task config. * <p>The config for the dimensions to erase should be a comma-separated string value. */ public static Set<String> getDimensionsToErase(Map<String, String> taskConfig) { - if (taskConfig == null || taskConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY) == null) { + if (taskConfig == null) { return new HashSet<>(); } - return Arrays.stream(taskConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY).split(",")) + String key = buildMergeLevelKeyPrefix(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, taskConfig); + String dimensionsToErase = taskConfig.get(key); + + if (dimensionsToErase == null) { + return new HashSet<>(); + } + return Arrays.stream(dimensionsToErase.split(",")) .map(String::trim) .collect(Collectors.toSet()); } + + /** + * Returns a map from column name to the aggregation function parameters associated with it based on the task config. + */ + public static Map<String, Map<String, String>> getAggregationFunctionParameters(Map<String, String> taskConfig) { + Map<String, Map<String, String>> aggregationFunctionParameters = new HashMap<>(); + String prefix = buildMergeLevelKeyPrefix(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX, taskConfig); Review Comment: Yes. The customisation to the merge rollup behaviour is based on merge level. The current level is extracted from the configuration in the `buildMergeLevelKeyPrefix` helper function. It is a convenience to scope any configuration lookups to the current level. Another design consideration would have been to strip the level prefix entirely from the task configuration and merge with the rest of the configuration. This assumes that a single task is scoped to a level. However, as described in the PR this would potentially break existing implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org