Jackie-Jiang commented on code in PR #8255: URL: https://github.com/apache/pinot/pull/8255#discussion_r854662341
########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java: ########## @@ -85,7 +87,7 @@ public static List<SegmentPruner> getSegmentPruners(TableConfig tableConfig, && LEGACY_PARTITION_AWARE_REALTIME_ROUTING.equalsIgnoreCase(routingTableBuilderName))) { PartitionSegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore); if (partitionSegmentPruner != null) { - segmentPruners.add(getPartitionSegmentPruner(tableConfig, propertyStore)); + segmentPruners.add(partitionSegmentPruner); Review Comment: Good catch ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java: ########## @@ -18,266 +18,5 @@ */ package org.apache.pinot.broker.routing.segmentpruner; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import javax.annotation.Nullable; -import org.apache.helix.AccessOption; -import org.apache.helix.ZNRecord; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; -import org.apache.helix.store.zk.ZkHelixPropertyStore; -import org.apache.pinot.common.metadata.ZKMetadataProvider; -import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; -import org.apache.pinot.common.request.BrokerRequest; -import org.apache.pinot.common.request.Expression; -import org.apache.pinot.common.request.Function; -import org.apache.pinot.common.request.Identifier; -import org.apache.pinot.common.request.PinotQuery; -import org.apache.pinot.common.utils.request.FilterQueryTree; -import org.apache.pinot.common.utils.request.RequestUtils; -import org.apache.pinot.pql.parsers.pql2.ast.FilterKind; -import org.apache.pinot.segment.spi.partition.PartitionFunction; -import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; -import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata; -import org.apache.pinot.spi.utils.CommonConstants.Segment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * The {@code PartitionSegmentPruner} prunes segments based on the their partition metadata stored in ZK. The pruner - * supports queries with filter (or nested filter) of EQUALITY and IN predicates. - */ -public class PartitionSegmentPruner implements SegmentPruner { - private static final Logger LOGGER = LoggerFactory.getLogger(PartitionSegmentPruner.class); - private static final PartitionInfo INVALID_PARTITION_INFO = new PartitionInfo(null, null); - - private final String _tableNameWithType; - private final String _partitionColumn; - private final ZkHelixPropertyStore<ZNRecord> _propertyStore; - private final String _segmentZKMetadataPathPrefix; - private final Map<String, PartitionInfo> _partitionInfoMap = new ConcurrentHashMap<>(); - - public PartitionSegmentPruner(String tableNameWithType, String partitionColumn, - ZkHelixPropertyStore<ZNRecord> propertyStore) { - _tableNameWithType = tableNameWithType; - _partitionColumn = partitionColumn; - _propertyStore = propertyStore; - _segmentZKMetadataPathPrefix = ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) + "/"; - } - - @Override - public void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) { - // Bulk load partition info for all online segments - int numSegments = onlineSegments.size(); - List<String> segments = new ArrayList<>(numSegments); - List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments); - for (String segment : onlineSegments) { - segments.add(segment); - segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment); - } - List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false); - for (int i = 0; i < numSegments; i++) { - String segment = segments.get(i); - PartitionInfo partitionInfo = extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i)); - if (partitionInfo != null) { - _partitionInfoMap.put(segment, partitionInfo); - } - } - } - - /** - * NOTE: Returns {@code null} when the ZNRecord is missing (could be transient Helix issue). Returns - * {@link #INVALID_PARTITION_INFO} when the segment does not have valid partition metadata in its ZK metadata, - * in which case we won't retry later. - */ - @Nullable - private PartitionInfo extractPartitionInfoFromSegmentZKMetadataZNRecord(String segment, @Nullable ZNRecord znRecord) { - if (znRecord == null) { - LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _tableNameWithType); - return null; - } - - String partitionMetadataJson = znRecord.getSimpleField(Segment.PARTITION_METADATA); - if (partitionMetadataJson == null) { - LOGGER.warn("Failed to find segment partition metadata for segment: {}, table: {}", segment, _tableNameWithType); - return INVALID_PARTITION_INFO; - } - - SegmentPartitionMetadata segmentPartitionMetadata; - try { - segmentPartitionMetadata = SegmentPartitionMetadata.fromJsonString(partitionMetadataJson); - } catch (Exception e) { - LOGGER.warn("Caught exception while extracting segment partition metadata for segment: {}, table: {}", segment, - _tableNameWithType, e); - return INVALID_PARTITION_INFO; - } - - ColumnPartitionMetadata columnPartitionMetadata = - segmentPartitionMetadata.getColumnPartitionMap().get(_partitionColumn); - if (columnPartitionMetadata == null) { - LOGGER.warn("Failed to find column partition metadata for column: {}, segment: {}, table: {}", _partitionColumn, - segment, _tableNameWithType); - return INVALID_PARTITION_INFO; - } - - return new PartitionInfo(PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata.getFunctionName(), - columnPartitionMetadata.getNumPartitions(), columnPartitionMetadata.getFunctionConfig()), - columnPartitionMetadata.getPartitions()); - } - - @Override - public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView, - Set<String> onlineSegments) { - // NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed - // ones. The refreshed segment ZK metadata change won't be picked up. - for (String segment : onlineSegments) { - _partitionInfoMap.computeIfAbsent(segment, k -> extractPartitionInfoFromSegmentZKMetadataZNRecord(k, - _propertyStore.get(_segmentZKMetadataPathPrefix + k, null, AccessOption.PERSISTENT))); - } - _partitionInfoMap.keySet().retainAll(onlineSegments); - } - - @Override - public synchronized void refreshSegment(String segment) { - PartitionInfo partitionInfo = extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, - _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)); - if (partitionInfo != null) { - _partitionInfoMap.put(segment, partitionInfo); - } else { - _partitionInfoMap.remove(segment); - } - } - - @Override - public Set<String> prune(BrokerRequest brokerRequest, Set<String> segments) { - PinotQuery pinotQuery = brokerRequest.getPinotQuery(); - if (pinotQuery != null) { - // SQL - - Expression filterExpression = pinotQuery.getFilterExpression(); - if (filterExpression == null) { - return segments; - } - Set<String> selectedSegments = new HashSet<>(); - for (String segment : segments) { - PartitionInfo partitionInfo = _partitionInfoMap.get(segment); - if (partitionInfo == null || partitionInfo == INVALID_PARTITION_INFO || isPartitionMatch(filterExpression, - partitionInfo)) { - selectedSegments.add(segment); - } - } - return selectedSegments; - } else { - // PQL - FilterQueryTree filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest); - if (filterQueryTree == null) { - return segments; - } - Set<String> selectedSegments = new HashSet<>(); - for (String segment : segments) { - PartitionInfo partitionInfo = _partitionInfoMap.get(segment); - if (partitionInfo == null || partitionInfo == INVALID_PARTITION_INFO || isPartitionMatch(filterQueryTree, - partitionInfo)) { - selectedSegments.add(segment); - } - } - return selectedSegments; - } - } - - private boolean isPartitionMatch(Expression filterExpression, PartitionInfo partitionInfo) { - Function function = filterExpression.getFunctionCall(); - FilterKind filterKind = FilterKind.valueOf(function.getOperator()); - List<Expression> operands = function.getOperands(); - switch (filterKind) { - case AND: - for (Expression child : operands) { - if (!isPartitionMatch(child, partitionInfo)) { - return false; - } - } - return true; - case OR: - for (Expression child : operands) { - if (isPartitionMatch(child, partitionInfo)) { - return true; - } - } - return false; - case EQUALS: { - Identifier identifier = operands.get(0).getIdentifier(); - if (identifier != null && identifier.getName().equals(_partitionColumn)) { - return partitionInfo._partitions.contains( - partitionInfo._partitionFunction.getPartition(operands.get(1).getLiteral().getFieldValue().toString())); - } else { - return true; - } - } - case IN: { - Identifier identifier = operands.get(0).getIdentifier(); - if (identifier != null && identifier.getName().equals(_partitionColumn)) { - int numOperands = operands.size(); - for (int i = 1; i < numOperands; i++) { - if (partitionInfo._partitions.contains(partitionInfo._partitionFunction.getPartition( - operands.get(i).getLiteral().getFieldValue().toString()))) { - return true; - } - } - return false; - } else { - return true; - } - } - default: - return true; - } - } - - @Deprecated - private boolean isPartitionMatch(FilterQueryTree filterQueryTree, PartitionInfo partitionInfo) { - switch (filterQueryTree.getOperator()) { - case AND: - for (FilterQueryTree child : filterQueryTree.getChildren()) { - if (!isPartitionMatch(child, partitionInfo)) { - return false; - } - } - return true; - case OR: - for (FilterQueryTree child : filterQueryTree.getChildren()) { - if (isPartitionMatch(child, partitionInfo)) { - return true; - } - } - return false; - case EQUALITY: - case IN: - if (filterQueryTree.getColumn().equals(_partitionColumn)) { - for (String value : filterQueryTree.getValue()) { - if (partitionInfo._partitions.contains(partitionInfo._partitionFunction.getPartition(value))) { - return true; - } - } - return false; - } - return true; - default: - return true; - } - } - - private static class PartitionInfo { - final PartitionFunction _partitionFunction; - final Set<Integer> _partitions; - - PartitionInfo(PartitionFunction partitionFunction, Set<Integer> partitions) { - _partitionFunction = partitionFunction; - _partitions = partitions; - } - } +public interface PartitionSegmentPruner extends SegmentPruner { Review Comment: Let's remove this file since this extra interface does not provide extra value (it should show as a rename from `PartitionSegmentPruner` to `SinglePartitionColumnSegmentPruner`) ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java: ########## @@ -178,6 +187,15 @@ private TableTaskConfig getMultiLevelConcatTaskConfig() { return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs)); } + private SegmentPartitionConfig getMultiColumnSegmentPartitionConfig() { Review Comment: (minor) ```suggestion private SegmentPartitionConfig getMultiColumnsSegmentPartitionConfig() { ``` ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java: ########## @@ -351,33 +352,41 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { mergeConfigs, taskConfigs)); } } else { - // For partitioned table, schedule separate tasks for each partition + // For partitioned table, schedule separate tasks for each partitionId (partitionId is constructed from + // partitions of all partition columns. There should be exact match between partition columns of segment and + // partition columns of table configuration, and there is only partition per column in segment metadata). + // Other segments which do not meet these conditions are considered as outlier segments, and additional tasks + // are generated for them. Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); - Preconditions.checkState(columnPartitionMap.size() == 1, "Cannot partition on multiple columns for table: %s", - tableConfig.getTableName()); - Map.Entry<String, ColumnPartitionConfig> partitionEntry = columnPartitionMap.entrySet().iterator().next(); - String partitionColumn = partitionEntry.getKey(); - + List<String> partitionColumns = new ArrayList<>(columnPartitionMap.keySet()); for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) { - Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new HashMap<>(); - // Handle segments that have multiple partitions or no partition info + Map<List<Integer>, List<SegmentZKMetadata>> partitionToSegments = new LinkedHashMap<>(); List<SegmentZKMetadata> outlierSegments = new ArrayList<>(); for (SegmentZKMetadata selectedSegment : selectedSegmentsPerBucket) { SegmentPartitionMetadata segmentPartitionMetadata = selectedSegment.getPartitionMetadata(); - if (segmentPartitionMetadata == null - || segmentPartitionMetadata.getPartitions(partitionColumn).size() != 1) { + List<Integer> partitions = new ArrayList<>(); + if (segmentPartitionMetadata != null && CollectionUtils.isEqualCollection(partitionColumns, Review Comment: `CollectionUtils.isEqualCollection()` is much slower than `Set.equals()`. Suggest keeping both `partitionColumnSet` and `partitionColumnList` ########## pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java: ########## @@ -558,7 +563,8 @@ protected void waitForDocsLoaded(long timeoutMs, boolean raiseError) { @Override public Boolean apply(@Nullable Void aVoid) { try { - return getCurrentCountStarResult() == countStarResult; + long currentCountStarResult = getCurrentCountStarResult(); + return currentCountStarResult == countStarResult; Review Comment: (minor) seems unrelated ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java: ########## @@ -351,33 +352,41 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { mergeConfigs, taskConfigs)); } } else { - // For partitioned table, schedule separate tasks for each partition + // For partitioned table, schedule separate tasks for each partitionId (partitionId is constructed from + // partitions of all partition columns. There should be exact match between partition columns of segment and + // partition columns of table configuration, and there is only partition per column in segment metadata). + // Other segments which do not meet these conditions are considered as outlier segments, and additional tasks + // are generated for them. Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); - Preconditions.checkState(columnPartitionMap.size() == 1, "Cannot partition on multiple columns for table: %s", - tableConfig.getTableName()); - Map.Entry<String, ColumnPartitionConfig> partitionEntry = columnPartitionMap.entrySet().iterator().next(); - String partitionColumn = partitionEntry.getKey(); - + List<String> partitionColumns = new ArrayList<>(columnPartitionMap.keySet()); for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) { - Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new HashMap<>(); - // Handle segments that have multiple partitions or no partition info + Map<List<Integer>, List<SegmentZKMetadata>> partitionToSegments = new LinkedHashMap<>(); Review Comment: I don't think we need a linked map here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org