This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new acc24a1 Combine operators: remove redundant variables and override logger in subclass (#6690) acc24a1 is described below commit acc24a105f3c2d728ed2589e4838108769e82e2c Author: Liang Mingqiang <mili...@linkedin.com> AuthorDate: Wed Mar 17 18:08:32 2021 -0700 Combine operators: remove redundant variables and override logger in subclass (#6690) - remove redundant variables in subclass - override logger in subclass for easy debugging - remove unnecessary constructor in BaaseCombineOperator since subclass has access to protected variables in supper class. --- .../core/operator/combine/BaseCombineOperator.java | 9 ++++----- .../operator/combine/GroupByCombineOperator.java | 1 + .../combine/GroupByOrderByCombineOperator.java | 1 + ...MaxValueBasedSelectionOrderByCombineOperator.java | 20 ++++++-------------- .../combine/SelectionOnlyCombineOperator.java | 3 +++ .../combine/SelectionOrderByCombineOperator.java | 3 +++ 6 files changed, 18 insertions(+), 19 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 1ed5fc1..aef6cd1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -45,23 +45,22 @@ import org.slf4j.LoggerFactory; */ @SuppressWarnings("rawtypes") public abstract class BaseCombineOperator extends BaseOperator<IntermediateResultsBlock> { - protected static final Logger LOGGER = LoggerFactory.getLogger(BaseCombineOperator.class); + private static final Logger LOGGER = LoggerFactory.getLogger(BaseCombineOperator.class); protected final List<Operator> _operators; protected final QueryContext _queryContext; protected final ExecutorService _executorService; protected final long _endTimeMs; - protected final int _numOperators; - protected int _numThreads; // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined // behavior (even JVM crash) when processing queries against it. protected final Phaser _phaser = new Phaser(1); - protected Future[] _futures; // Use a _blockingQueue to store the per-segment result - private final BlockingQueue<IntermediateResultsBlock> _blockingQueue; + protected final BlockingQueue<IntermediateResultsBlock> _blockingQueue; + protected int _numThreads; + protected Future[] _futures; public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, long endTimeMs) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index ff40b2e..dadb23a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -79,6 +79,7 @@ public class GroupByCombineOperator extends BaseCombineOperator { public GroupByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, long endTimeMs, int innerSegmentNumGroupsLimit) { + // GroupByCombineOperator use numOperators as numThreads super(operators, queryContext, executorService, endTimeMs, operators.size()); _innerSegmentNumGroupsLimit = innerSegmentNumGroupsLimit; _interSegmentNumGroupsLimit = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index 99ba9df..38f1a54 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -79,6 +79,7 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, long endTimeMs, int trimThreshold) { + // GroupByOrderByCombineOperator use numOperators as numThreads super(operators, queryContext, executorService, endTimeMs, operators.size()); _initLock = new ReentrantLock(); _trimSize = GroupByUtils.getTableCapacity(_queryContext); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java index 901d6bc..29359f6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java @@ -22,11 +22,8 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.PriorityQueue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -40,6 +37,8 @@ import org.apache.pinot.core.query.request.context.ExpressionContext; import org.apache.pinot.core.query.request.context.OrderByExpressionContext; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -54,18 +53,14 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils; */ @SuppressWarnings({"rawtypes", "unchecked"}) public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombineOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(MinMaxValueBasedSelectionOrderByCombineOperator.class); private static final String OPERATOR_NAME = "MinMaxValueBasedSelectionOrderByCombineOperator"; - // For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this // special IntermediateResultsBlock into the BlockingQueue to awake the main thread private static final IntermediateResultsBlock LAST_RESULTS_BLOCK = new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]), Collections.emptyList()); - private final int _numOperators; - private final int numThreads; - // Use a BlockingQueue to store the per-segment result - private final BlockingQueue<IntermediateResultsBlock> _blockingQueue; // Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue) private final AtomicInteger _numOperatorsSkipped = new AtomicInteger(); private final AtomicReference<Comparable> _globalBoundaryValue = new AtomicReference<>(); @@ -76,9 +71,6 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine ExecutorService executorService, long endTimeMs, List<MinMaxValueContext> minMaxValueContexts) { super(operators, queryContext, executorService, endTimeMs); _minMaxValueContexts = minMaxValueContexts; - _numOperators = _operators.size(); - numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators); - _blockingQueue = new ArrayBlockingQueue<>(_numOperators); _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset(); } @@ -118,7 +110,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine // segment result is merged. Comparable threadBoundaryValue = null; - for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += numThreads) { + for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) { // Calculate the boundary value from global boundary and thread boundary Comparable boundaryValue = _globalBoundaryValue.get(); if (boundaryValue == null) { @@ -146,7 +138,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine if (minMaxValueContext._minValue != null) { int result = minMaxValueContext._minValue.compareTo(boundaryValue); if (result > 0 || (result == 0 && numOrderByExpressions == 1)) { - _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / numThreads); + _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads); _blockingQueue.offer(LAST_RESULTS_BLOCK); return; } @@ -157,7 +149,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine if (minMaxValueContext._maxValue != null) { int result = minMaxValueContext._maxValue.compareTo(boundaryValue); if (result < 0 || (result == 0 && numOrderByExpressions == 1)) { - _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / numThreads); + _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads); _blockingQueue.offer(LAST_RESULTS_BLOCK); return; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java index 2da4c56..f2d4d06 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java @@ -27,6 +27,8 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -37,6 +39,7 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils; */ @SuppressWarnings("rawtypes") public class SelectionOnlyCombineOperator extends BaseCombineOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(SelectionOnlyCombineOperator.class); private static final String OPERATOR_NAME = "SelectionOnlyCombineOperator"; private final int _numRowsToKeep; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java index be899fb..dfd34e9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java @@ -32,6 +32,8 @@ import org.apache.pinot.core.query.request.context.ExpressionContext; import org.apache.pinot.core.query.request.context.OrderByExpressionContext; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -43,6 +45,7 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils; */ @SuppressWarnings({"rawtypes", "unchecked"}) public class SelectionOrderByCombineOperator extends BaseCombineOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(SelectionOrderByCombineOperator.class); private static final String OPERATOR_NAME = "SelectionOrderByCombineOperator"; private final List<Operator> _operators; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org