This is an automated email from the ASF dual-hosted git repository.
pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ba014509414 Adding a new property minNumSegmentsPerTask for
UpsertCompactMerge task (#17104)
ba014509414 is described below
commit ba014509414823d42651ac297a1de3a0e77bbb8d
Author: tarun11Mavani <[email protected]>
AuthorDate: Tue Nov 11 01:05:43 2025 +0530
Adding a new property minNumSegmentsPerTask for UpsertCompactMerge task
(#17104)
---
.../apache/pinot/core/common/MinionConstants.java | 12 ++++++
.../UpsertCompactMergeTaskGenerator.java | 39 ++++++++++--------
.../segment/local/utils/TableConfigUtils.java | 37 ++++++++++++++++-
.../segment/local/utils/TableConfigUtilsTest.java | 47 ++++++++++++++++++++++
4 files changed, 117 insertions(+), 18 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 293f4ed5784..1f784fb368b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -316,6 +316,18 @@ public class MinionConstants {
*/
public static final long DEFAULT_MAX_NUM_SEGMENTS_PER_TASK = 10;
+ /**
+ * minimum number of segments to process in a single task
+ */
+ public static final String MIN_NUM_SEGMENTS_PER_TASK_KEY =
"minNumSegmentsPerTask";
+
+ /**
+ * default minimum number of segments to process in a single task.
+ * Keeping this default to 2 means that we won't run this task if there is
only one segment which can be merged.
+ * If this is set to 1, this task can act as UpsertCompact task as well.
+ */
+ public static final long DEFAULT_MIN_NUM_SEGMENTS_PER_TASK = 2;
+
public static final String MERGED_SEGMENTS_ZK_SUFFIX = ".mergedSegments";
public static final String MAX_ZK_CREATION_TIME_MILLIS_KEY =
"maxZKCreationTimeMillis";
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
index 13119e6f08f..5a5705bcf05 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -201,6 +201,9 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
// Get max number of subtasks for this table
int maxTasks = getAndUpdateMaxNumSubTasks(taskConfigs,
MinionConstants.DEFAULT_TABLE_MAX_NUM_TASKS, tableNameWithType);
+ long minNumSegments = Long.parseLong(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MIN_NUM_SEGMENTS_PER_TASK_KEY,
+
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MIN_NUM_SEGMENTS_PER_TASK)));
for (Map.Entry<Integer, List<List<SegmentMergerMetadata>>> entry
:
segmentSelectionResult.getSegmentsForCompactMergeByPartition().entrySet()) {
if (numTasks == maxTasks) {
@@ -211,9 +214,9 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
if (groups.isEmpty()) {
continue;
}
- // there are no groups with more than 1 segment to merge
- // TODO this can be later removed if we want to just do single-segment
compaction from this task
- if (groups.get(0).size() <= 1) {
+ //there are no groups with more than minNumSegmentsPerTask segment to
merge. Groups are already sorted, so we
+ // just check the first entry.
+ if (groups.get(0).size() < minNumSegments) {
continue;
}
// TODO see if multiple groups of same partition can be added
@@ -254,6 +257,9 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
long maxNumSegments = Long.parseLong(
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY,
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK)));
+ long minNumSegments = Long.parseLong(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MIN_NUM_SEGMENTS_PER_TASK_KEY,
+
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MIN_NUM_SEGMENTS_PER_TASK)));
// default to Long.MAX_VALUE to avoid size-based compaction by default
long outputSegmentMaxSizeInBytes = Long.MAX_VALUE;
@@ -381,20 +387,21 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
// Sort groups by total invalidDocs in descending order, if invalidDocs
count are same, prefer group with
// higher number of small segments in them
- // remove the groups having only 1 segments in them
- // TODO this check can be later removed if we want single-segment
compaction from this task itself
+ // remove the groups having less than minNumSegments segments in them
List<List<SegmentMergerMetadata>> compactMergeGroups =
- groups.stream().filter(x -> x.size() > 1).sorted((group1, group2) ->
{
- long invalidDocsSum1 =
group1.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum();
- long invalidDocsSum2 =
group2.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum();
- if (invalidDocsSum2 < invalidDocsSum1) {
- return -1;
- } else if (invalidDocsSum2 == invalidDocsSum1) {
- return Long.compare(group2.size(), group1.size());
- } else {
- return 1;
- }
- }).collect(Collectors.toList());
+ groups.stream()
+ .filter(x -> x.size() >= minNumSegments)
+ .sorted((group1, group2) -> {
+ long invalidDocsSum1 =
group1.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum();
+ long invalidDocsSum2 =
group2.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum();
+ if (invalidDocsSum2 < invalidDocsSum1) {
+ return -1;
+ } else if (invalidDocsSum2 == invalidDocsSum1) {
+ return Long.compare(group2.size(), group1.size());
+ } else {
+ return 1;
+ }
+ }).collect(Collectors.toList());
if (!compactMergeGroups.isEmpty()) {
groupedSegments.put(partitionID, compactMergeGroups);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 16c89b24c9b..9ddbe74eed2 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -190,6 +190,8 @@ public final class TableConfigUtils {
validatePartialUpsertStrategies(tableConfig, schema);
}
+ validateTaskConfig(tableConfig);
+
if (_enforcePoolBasedAssignment) {
validateInstancePoolsNReplicaGroups(tableConfig);
}
@@ -1136,6 +1138,37 @@ public final class TableConfigUtils {
}
}
+ /**
+ * Validates task configuration to ensure no conflicting task types are
configured.
+ */
+ @VisibleForTesting
+ static void validateTaskConfig(TableConfig tableConfig) {
+ TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+ if (taskConfig == null || taskConfig.getTaskTypeConfigsMap() == null) {
+ return;
+ }
+
+ Map<String, Map<String, String>> taskTypeConfigsMap =
taskConfig.getTaskTypeConfigsMap();
+
+ String minNumSegmentsPerTaskKey = "minNumSegmentsPerTask";
+ if (taskTypeConfigsMap.containsKey(UPSERT_COMPACT_MERGE_TASK_TYPE)
+ && taskTypeConfigsMap.containsKey(UPSERT_COMPACTION_TASK_TYPE)) {
+
+ Map<String, String> upsertCompactMergeConfig =
taskTypeConfigsMap.get(UPSERT_COMPACT_MERGE_TASK_TYPE);
+
+ if (upsertCompactMergeConfig != null) {
+ long minNumSegments = Long.parseLong(
+ upsertCompactMergeConfig.getOrDefault(minNumSegmentsPerTaskKey,
String.valueOf(2)));
+
+ Preconditions.checkState(minNumSegments > 1, String.format(
+ "When %s.%s is set to 1, %s should not be configured to avoid
indeterministic behavior. "
+ + "Please remove %s configuration or set %s to a value greater
than 1.",
+ UPSERT_COMPACT_MERGE_TASK_TYPE, minNumSegmentsPerTaskKey,
UPSERT_COMPACTION_TASK_TYPE,
+ UPSERT_COMPACTION_TASK_TYPE, minNumSegmentsPerTaskKey));
+ }
+ }
+ }
+
/**
* Validates the tier configs
* Checks for the right segmentSelectorType and its required properties
@@ -1372,7 +1405,7 @@ public final class TableConfigUtils {
}
String column = columnPair.getColumn();
if (!column.equals(AggregationFunctionColumnPair.STAR)) {
- aggregatedColumns.add(column);
+ aggregatedColumns.add(column);
} else if (columnPair.getFunctionType() !=
AggregationFunctionType.COUNT) {
throw new IllegalStateException("Non-COUNT function set the column
as '*' in the functionColumnPair: "
+ functionColumnPair + ". Please configure an actual column
for the function");
@@ -1401,7 +1434,7 @@ public final class TableConfigUtils {
}
String column = columnPair.getColumn();
if (!column.equals(AggregationFunctionColumnPair.STAR)) {
- aggregatedColumns.add(column);
+ aggregatedColumns.add(column);
} else if (columnPair.getFunctionType() !=
AggregationFunctionType.COUNT) {
throw new IllegalStateException("Non-COUNT function set the column
as '*' in the aggregationConfig for "
+ "function: " + aggregationConfig.getAggregationFunction()
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 30ea8126ace..5bf25d9fd95 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -3562,4 +3562,51 @@ public class TableConfigUtilsTest {
assertEquals(e.getMessage(), "MetadataTTL: 50000(ms) must be smaller
than the minimum segmentAge: 50000(ms)");
}
}
+
+ @Test
+ public void testValidateTaskConfigConflict() {
+ // Test that minNumSegmentsPerTask=1 with both UpsertCompactMergeTask and
UpsertCompactionTask configured fails
+ Map<String, String> upsertCompactMergeConfig = new HashMap<>();
+ upsertCompactMergeConfig.put("bufferTimePeriod", "5d");
+ upsertCompactMergeConfig.put("minNumSegmentsPerTask", "1");
+
+ Map<String, String> upsertCompactionConfig = new HashMap<>();
+ upsertCompactionConfig.put("bufferTimePeriod", "5d");
+
+ Map<String, Map<String, String>> taskTypeConfigsMap = new HashMap<>();
+ taskTypeConfigsMap.put("UpsertCompactMergeTask", upsertCompactMergeConfig);
+ taskTypeConfigsMap.put("UpsertCompactionTask", upsertCompactionConfig);
+
+ TableConfig tableConfigWithConflict = new
TableConfigBuilder(TableType.REALTIME)
+ .setTableName(TABLE_NAME)
+ .setTaskConfig(new
org.apache.pinot.spi.config.table.TableTaskConfig(taskTypeConfigsMap))
+ .build();
+
+ assertThrows(IllegalStateException.class, () ->
TableConfigUtils.validateTaskConfig(tableConfigWithConflict));
+
+ // Test that minNumSegmentsPerTask=2 with both tasks configured passes
+ upsertCompactMergeConfig.put("minNumSegmentsPerTask", "2");
+ taskTypeConfigsMap.put("UpsertCompactMergeTask", upsertCompactMergeConfig);
+
+ TableConfig tableConfigValid = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName(TABLE_NAME)
+ .setTaskConfig(new
org.apache.pinot.spi.config.table.TableTaskConfig(taskTypeConfigsMap))
+ .build();
+
+ // Should not throw
+ TableConfigUtils.validateTaskConfig(tableConfigValid);
+
+ // Test that only UpsertCompactMergeTask with minNumSegmentsPerTask=1
(without UpsertCompactionTask) passes
+ taskTypeConfigsMap.remove("UpsertCompactionTask");
+ upsertCompactMergeConfig.put("minNumSegmentsPerTask", "1");
+ taskTypeConfigsMap.put("UpsertCompactMergeTask", upsertCompactMergeConfig);
+
+ TableConfig tableConfigOnlyCompactMerge = new
TableConfigBuilder(TableType.REALTIME)
+ .setTableName(TABLE_NAME)
+ .setTaskConfig(new
org.apache.pinot.spi.config.table.TableTaskConfig(taskTypeConfigsMap))
+ .build();
+
+ // Should not throw
+ TableConfigUtils.validateTaskConfig(tableConfigOnlyCompactMerge);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]