Jackie-Jiang commented on code in PR #8255:
URL: https://github.com/apache/pinot/pull/8255#discussion_r854662341


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java:
##########
@@ -85,7 +87,7 @@ public static List<SegmentPruner> 
getSegmentPruners(TableConfig tableConfig,
             && 
LEGACY_PARTITION_AWARE_REALTIME_ROUTING.equalsIgnoreCase(routingTableBuilderName)))
 {
           PartitionSegmentPruner partitionSegmentPruner = 
getPartitionSegmentPruner(tableConfig, propertyStore);
           if (partitionSegmentPruner != null) {
-            segmentPruners.add(getPartitionSegmentPruner(tableConfig, 
propertyStore));
+            segmentPruners.add(partitionSegmentPruner);

Review Comment:
   Good catch



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java:
##########
@@ -18,266 +18,5 @@
  */
 package org.apache.pinot.broker.routing.segmentpruner;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.Nullable;
-import org.apache.helix.AccessOption;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.request.Expression;
-import org.apache.pinot.common.request.Function;
-import org.apache.pinot.common.request.Identifier;
-import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.common.utils.request.FilterQueryTree;
-import org.apache.pinot.common.utils.request.RequestUtils;
-import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
-import org.apache.pinot.segment.spi.partition.PartitionFunction;
-import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
-import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
-import org.apache.pinot.spi.utils.CommonConstants.Segment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The {@code PartitionSegmentPruner} prunes segments based on the their 
partition metadata stored in ZK. The pruner
- * supports queries with filter (or nested filter) of EQUALITY and IN 
predicates.
- */
-public class PartitionSegmentPruner implements SegmentPruner {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionSegmentPruner.class);
-  private static final PartitionInfo INVALID_PARTITION_INFO = new 
PartitionInfo(null, null);
-
-  private final String _tableNameWithType;
-  private final String _partitionColumn;
-  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private final String _segmentZKMetadataPathPrefix;
-  private final Map<String, PartitionInfo> _partitionInfoMap = new 
ConcurrentHashMap<>();
-
-  public PartitionSegmentPruner(String tableNameWithType, String 
partitionColumn,
-      ZkHelixPropertyStore<ZNRecord> propertyStore) {
-    _tableNameWithType = tableNameWithType;
-    _partitionColumn = partitionColumn;
-    _propertyStore = propertyStore;
-    _segmentZKMetadataPathPrefix = 
ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) + 
"/";
-  }
-
-  @Override
-  public void init(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments) {
-    // Bulk load partition info for all online segments
-    int numSegments = onlineSegments.size();
-    List<String> segments = new ArrayList<>(numSegments);
-    List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
-    for (String segment : onlineSegments) {
-      segments.add(segment);
-      segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
-    }
-    List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, 
null, AccessOption.PERSISTENT, false);
-    for (int i = 0; i < numSegments; i++) {
-      String segment = segments.get(i);
-      PartitionInfo partitionInfo = 
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
-      if (partitionInfo != null) {
-        _partitionInfoMap.put(segment, partitionInfo);
-      }
-    }
-  }
-
-  /**
-   * NOTE: Returns {@code null} when the ZNRecord is missing (could be 
transient Helix issue). Returns
-   *       {@link #INVALID_PARTITION_INFO} when the segment does not have 
valid partition metadata in its ZK metadata,
-   *       in which case we won't retry later.
-   */
-  @Nullable
-  private PartitionInfo 
extractPartitionInfoFromSegmentZKMetadataZNRecord(String segment, @Nullable 
ZNRecord znRecord) {
-    if (znRecord == null) {
-      LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: 
{}", segment, _tableNameWithType);
-      return null;
-    }
-
-    String partitionMetadataJson = 
znRecord.getSimpleField(Segment.PARTITION_METADATA);
-    if (partitionMetadataJson == null) {
-      LOGGER.warn("Failed to find segment partition metadata for segment: {}, 
table: {}", segment, _tableNameWithType);
-      return INVALID_PARTITION_INFO;
-    }
-
-    SegmentPartitionMetadata segmentPartitionMetadata;
-    try {
-      segmentPartitionMetadata = 
SegmentPartitionMetadata.fromJsonString(partitionMetadataJson);
-    } catch (Exception e) {
-      LOGGER.warn("Caught exception while extracting segment partition 
metadata for segment: {}, table: {}", segment,
-          _tableNameWithType, e);
-      return INVALID_PARTITION_INFO;
-    }
-
-    ColumnPartitionMetadata columnPartitionMetadata =
-        segmentPartitionMetadata.getColumnPartitionMap().get(_partitionColumn);
-    if (columnPartitionMetadata == null) {
-      LOGGER.warn("Failed to find column partition metadata for column: {}, 
segment: {}, table: {}", _partitionColumn,
-          segment, _tableNameWithType);
-      return INVALID_PARTITION_INFO;
-    }
-
-    return new 
PartitionInfo(PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata.getFunctionName(),
-        columnPartitionMetadata.getNumPartitions(), 
columnPartitionMetadata.getFunctionConfig()),
-        columnPartitionMetadata.getPartitions());
-  }
-
-  @Override
-  public synchronized void onAssignmentChange(IdealState idealState, 
ExternalView externalView,
-      Set<String> onlineSegments) {
-    // NOTE: We don't update all the segment ZK metadata for every external 
view change, but only the new added/removed
-    //       ones. The refreshed segment ZK metadata change won't be picked up.
-    for (String segment : onlineSegments) {
-      _partitionInfoMap.computeIfAbsent(segment, k -> 
extractPartitionInfoFromSegmentZKMetadataZNRecord(k,
-          _propertyStore.get(_segmentZKMetadataPathPrefix + k, null, 
AccessOption.PERSISTENT)));
-    }
-    _partitionInfoMap.keySet().retainAll(onlineSegments);
-  }
-
-  @Override
-  public synchronized void refreshSegment(String segment) {
-    PartitionInfo partitionInfo = 
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment,
-        _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, 
AccessOption.PERSISTENT));
-    if (partitionInfo != null) {
-      _partitionInfoMap.put(segment, partitionInfo);
-    } else {
-      _partitionInfoMap.remove(segment);
-    }
-  }
-
-  @Override
-  public Set<String> prune(BrokerRequest brokerRequest, Set<String> segments) {
-    PinotQuery pinotQuery = brokerRequest.getPinotQuery();
-    if (pinotQuery != null) {
-      // SQL
-
-      Expression filterExpression = pinotQuery.getFilterExpression();
-      if (filterExpression == null) {
-        return segments;
-      }
-      Set<String> selectedSegments = new HashSet<>();
-      for (String segment : segments) {
-        PartitionInfo partitionInfo = _partitionInfoMap.get(segment);
-        if (partitionInfo == null || partitionInfo == INVALID_PARTITION_INFO 
|| isPartitionMatch(filterExpression,
-            partitionInfo)) {
-          selectedSegments.add(segment);
-        }
-      }
-      return selectedSegments;
-    } else {
-      // PQL
-      FilterQueryTree filterQueryTree = 
RequestUtils.generateFilterQueryTree(brokerRequest);
-      if (filterQueryTree == null) {
-        return segments;
-      }
-      Set<String> selectedSegments = new HashSet<>();
-      for (String segment : segments) {
-        PartitionInfo partitionInfo = _partitionInfoMap.get(segment);
-        if (partitionInfo == null || partitionInfo == INVALID_PARTITION_INFO 
|| isPartitionMatch(filterQueryTree,
-            partitionInfo)) {
-          selectedSegments.add(segment);
-        }
-      }
-      return selectedSegments;
-    }
-  }
-
-  private boolean isPartitionMatch(Expression filterExpression, PartitionInfo 
partitionInfo) {
-    Function function = filterExpression.getFunctionCall();
-    FilterKind filterKind = FilterKind.valueOf(function.getOperator());
-    List<Expression> operands = function.getOperands();
-    switch (filterKind) {
-      case AND:
-        for (Expression child : operands) {
-          if (!isPartitionMatch(child, partitionInfo)) {
-            return false;
-          }
-        }
-        return true;
-      case OR:
-        for (Expression child : operands) {
-          if (isPartitionMatch(child, partitionInfo)) {
-            return true;
-          }
-        }
-        return false;
-      case EQUALS: {
-        Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && 
identifier.getName().equals(_partitionColumn)) {
-          return partitionInfo._partitions.contains(
-              
partitionInfo._partitionFunction.getPartition(operands.get(1).getLiteral().getFieldValue().toString()));
-        } else {
-          return true;
-        }
-      }
-      case IN: {
-        Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && 
identifier.getName().equals(_partitionColumn)) {
-          int numOperands = operands.size();
-          for (int i = 1; i < numOperands; i++) {
-            if 
(partitionInfo._partitions.contains(partitionInfo._partitionFunction.getPartition(
-                operands.get(i).getLiteral().getFieldValue().toString()))) {
-              return true;
-            }
-          }
-          return false;
-        } else {
-          return true;
-        }
-      }
-      default:
-        return true;
-    }
-  }
-
-  @Deprecated
-  private boolean isPartitionMatch(FilterQueryTree filterQueryTree, 
PartitionInfo partitionInfo) {
-    switch (filterQueryTree.getOperator()) {
-      case AND:
-        for (FilterQueryTree child : filterQueryTree.getChildren()) {
-          if (!isPartitionMatch(child, partitionInfo)) {
-            return false;
-          }
-        }
-        return true;
-      case OR:
-        for (FilterQueryTree child : filterQueryTree.getChildren()) {
-          if (isPartitionMatch(child, partitionInfo)) {
-            return true;
-          }
-        }
-        return false;
-      case EQUALITY:
-      case IN:
-        if (filterQueryTree.getColumn().equals(_partitionColumn)) {
-          for (String value : filterQueryTree.getValue()) {
-            if 
(partitionInfo._partitions.contains(partitionInfo._partitionFunction.getPartition(value)))
 {
-              return true;
-            }
-          }
-          return false;
-        }
-        return true;
-      default:
-        return true;
-    }
-  }
-
-  private static class PartitionInfo {
-    final PartitionFunction _partitionFunction;
-    final Set<Integer> _partitions;
-
-    PartitionInfo(PartitionFunction partitionFunction, Set<Integer> 
partitions) {
-      _partitionFunction = partitionFunction;
-      _partitions = partitions;
-    }
-  }
+public interface PartitionSegmentPruner extends SegmentPruner {

Review Comment:
   Let's remove this file since this extra interface does not provide extra 
value (it should show as a rename from `PartitionSegmentPruner` to 
`SinglePartitionColumnSegmentPruner`)



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java:
##########
@@ -178,6 +187,15 @@ private TableTaskConfig getMultiLevelConcatTaskConfig() {
     return new 
TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE,
 tableTaskConfigs));
   }
 
+  private SegmentPartitionConfig getMultiColumnSegmentPartitionConfig() {

Review Comment:
   (minor)
   ```suggestion
     private SegmentPartitionConfig getMultiColumnsSegmentPartitionConfig() {
   ```



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -351,33 +352,41 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
                     mergeConfigs, taskConfigs));
           }
         } else {
-          // For partitioned table, schedule separate tasks for each partition
+          // For partitioned table, schedule separate tasks for each 
partitionId (partitionId is constructed from
+          // partitions of all partition columns. There should be exact match 
between partition columns of segment and
+          // partition columns of table configuration, and there is only 
partition per column in segment metadata).
+          // Other segments which do not meet these conditions are considered 
as outlier segments, and additional tasks
+          // are generated for them.
           Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
-          Preconditions.checkState(columnPartitionMap.size() == 1, "Cannot 
partition on multiple columns for table: %s",
-              tableConfig.getTableName());
-          Map.Entry<String, ColumnPartitionConfig> partitionEntry = 
columnPartitionMap.entrySet().iterator().next();
-          String partitionColumn = partitionEntry.getKey();
-
+          List<String> partitionColumns = new 
ArrayList<>(columnPartitionMap.keySet());
           for (List<SegmentZKMetadata> selectedSegmentsPerBucket : 
selectedSegmentsForAllBuckets) {
-            Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new 
HashMap<>();
-            // Handle segments that have multiple partitions or no partition 
info
+            Map<List<Integer>, List<SegmentZKMetadata>> partitionToSegments = 
new LinkedHashMap<>();
             List<SegmentZKMetadata> outlierSegments = new ArrayList<>();
             for (SegmentZKMetadata selectedSegment : 
selectedSegmentsPerBucket) {
               SegmentPartitionMetadata segmentPartitionMetadata = 
selectedSegment.getPartitionMetadata();
-              if (segmentPartitionMetadata == null
-                  || 
segmentPartitionMetadata.getPartitions(partitionColumn).size() != 1) {
+              List<Integer> partitions = new ArrayList<>();
+              if (segmentPartitionMetadata != null && 
CollectionUtils.isEqualCollection(partitionColumns,

Review Comment:
   `CollectionUtils.isEqualCollection()` is much slower than `Set.equals()`. 
Suggest keeping both `partitionColumnSet` and `partitionColumnList`



##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java:
##########
@@ -558,7 +563,8 @@ protected void waitForDocsLoaded(long timeoutMs, boolean 
raiseError) {
       @Override
       public Boolean apply(@Nullable Void aVoid) {
         try {
-          return getCurrentCountStarResult() == countStarResult;
+          long currentCountStarResult = getCurrentCountStarResult();
+          return currentCountStarResult == countStarResult;

Review Comment:
   (minor) seems unrelated



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -351,33 +352,41 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
                     mergeConfigs, taskConfigs));
           }
         } else {
-          // For partitioned table, schedule separate tasks for each partition
+          // For partitioned table, schedule separate tasks for each 
partitionId (partitionId is constructed from
+          // partitions of all partition columns. There should be exact match 
between partition columns of segment and
+          // partition columns of table configuration, and there is only 
partition per column in segment metadata).
+          // Other segments which do not meet these conditions are considered 
as outlier segments, and additional tasks
+          // are generated for them.
           Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
-          Preconditions.checkState(columnPartitionMap.size() == 1, "Cannot 
partition on multiple columns for table: %s",
-              tableConfig.getTableName());
-          Map.Entry<String, ColumnPartitionConfig> partitionEntry = 
columnPartitionMap.entrySet().iterator().next();
-          String partitionColumn = partitionEntry.getKey();
-
+          List<String> partitionColumns = new 
ArrayList<>(columnPartitionMap.keySet());
           for (List<SegmentZKMetadata> selectedSegmentsPerBucket : 
selectedSegmentsForAllBuckets) {
-            Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new 
HashMap<>();
-            // Handle segments that have multiple partitions or no partition 
info
+            Map<List<Integer>, List<SegmentZKMetadata>> partitionToSegments = 
new LinkedHashMap<>();

Review Comment:
   I don't think we need a linked map here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to