This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new cd35332 Use query timeout for planning phase (#5990) cd35332 is described below commit cd35332a51c0c4997e8fdffca0dad9d090d0cf66 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue Sep 8 14:32:08 2020 -0700 Use query timeout for planning phase (#5990) Currently the `CombinePlanNode` is using fixed 10 seconds as the timeout for the multi-threaded query planning, and the query planning time is not counted into the timeout for the query. This PR: - Replaces the fixed 10 seconds timeout with the query timeout for the query planing phase - Count the query planing time into the query timeout --- .../combine/AggregationOnlyCombineOperator.java | 4 ++-- .../core/operator/combine/BaseCombineOperator.java | 10 ++++----- .../operator/combine/GroupByCombineOperator.java | 11 +++++----- .../combine/GroupByOrderByCombineOperator.java | 11 +++++----- .../combine/SelectionOnlyCombineOperator.java | 4 ++-- .../combine/SelectionOrderByCombineOperator.java | 9 +++----- .../apache/pinot/core/plan/CombinePlanNode.java | 25 +++++++++------------- .../core/plan/maker/InstancePlanMakerImplV2.java | 4 ++-- .../apache/pinot/core/plan/maker/PlanMaker.java | 2 +- .../query/executor/ServerQueryExecutorV1Impl.java | 10 ++++----- .../combine/SelectionCombineOperatorTest.java | 6 ++++-- .../pinot/core/plan/CombinePlanNodeTest.java | 16 ++++++++------ .../org/apache/pinot/queries/BaseQueriesTest.java | 4 ++-- .../apache/pinot/queries/DistinctQueriesTest.java | 8 +++---- 14 files changed, 60 insertions(+), 64 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java index 83a474e..c3ec21b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java @@ -34,8 +34,8 @@ public class AggregationOnlyCombineOperator extends BaseCombineOperator { private static final String OPERATOR_NAME = "AggregationOnlyCombineOperator"; public AggregationOnlyCombineOperator(List<Operator> operators, QueryContext queryContext, - ExecutorService executorService, long timeOutMs) { - super(operators, queryContext, executorService, timeOutMs); + ExecutorService executorService, long endTimeMs) { + super(operators, queryContext, executorService, endTimeMs); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 2d10a35..7e60844 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -50,20 +50,18 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul protected final List<Operator> _operators; protected final QueryContext _queryContext; protected final ExecutorService _executorService; - protected final long _timeOutMs; + protected final long _endTimeMs; public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, - long timeOutMs) { + long endTimeMs) { _operators = operators; _queryContext = queryContext; _executorService = executorService; - _timeOutMs = timeOutMs; + _endTimeMs = endTimeMs; } @Override protected IntermediateResultsBlock getNextBlock() { - long startTimeMs = System.currentTimeMillis(); - long endTimeMs = startTimeMs + _timeOutMs; int numOperators = _operators.size(); int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators); @@ -124,7 +122,7 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul int numBlocksMerged = 0; while (numBlocksMerged < numOperators) { IntermediateResultsBlock blockToMerge = - blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (blockToMerge == null) { // Query times out, skip merging the remaining results blocks LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index 2b54984..ff84f39 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -67,17 +67,17 @@ public class GroupByCombineOperator extends BaseOperator<IntermediateResultsBloc private final List<Operator> _operators; private final QueryContext _queryContext; private final ExecutorService _executorService; - private final long _timeOutMs; + private final long _endTimeMs; // Limit on number of groups stored, beyond which no new group will be created private final int _innerSegmentNumGroupsLimit; private final int _interSegmentNumGroupsLimit; public GroupByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, - long timeOutMs, int innerSegmentNumGroupsLimit) { + long endTimeMs, int innerSegmentNumGroupsLimit) { _operators = operators; _queryContext = queryContext; _executorService = executorService; - _timeOutMs = timeOutMs; + _endTimeMs = endTimeMs; _innerSegmentNumGroupsLimit = innerSegmentNumGroupsLimit; _interSegmentNumGroupsLimit = (int) Math.min((long) innerSegmentNumGroupsLimit * INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR, Integer.MAX_VALUE); @@ -189,11 +189,12 @@ public class GroupByCombineOperator extends BaseOperator<IntermediateResultsBloc } try { - boolean opCompleted = operatorLatch.await(_timeOutMs, TimeUnit.MILLISECONDS); + long timeoutMs = _endTimeMs - System.currentTimeMillis(); + boolean opCompleted = operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS); if (!opCompleted) { // If this happens, the broker side should already timed out, just log the error and return String errorMessage = String - .format("Timed out while combining group-by results after %dms, queryContext = %s", _timeOutMs, + .format("Timed out while combining group-by results after %dms, queryContext = %s", timeoutMs, _queryContext); LOGGER.error(errorMessage); return new IntermediateResultsBlock(new TimeoutException(errorMessage)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index 8fb174c..37af764 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -66,18 +66,18 @@ public class GroupByOrderByCombineOperator extends BaseOperator<IntermediateResu private final List<Operator> _operators; private final QueryContext _queryContext; private final ExecutorService _executorService; - private final long _timeOutMs; + private final long _endTimeMs; private final int _indexedTableCapacity; private final Lock _initLock; private DataSchema _dataSchema; private ConcurrentIndexedTable _indexedTable; public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext queryContext, - ExecutorService executorService, long timeOutMs) { + ExecutorService executorService, long endTimeMs) { _operators = operators; _queryContext = queryContext; _executorService = executorService; - _timeOutMs = timeOutMs; + _endTimeMs = endTimeMs; _initLock = new ReentrantLock(); _indexedTableCapacity = GroupByUtils.getTableCapacity(_queryContext); } @@ -220,11 +220,12 @@ public class GroupByOrderByCombineOperator extends BaseOperator<IntermediateResu } try { - boolean opCompleted = operatorLatch.await(_timeOutMs, TimeUnit.MILLISECONDS); + long timeoutMs = _endTimeMs - System.currentTimeMillis(); + boolean opCompleted = operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS); if (!opCompleted) { // If this happens, the broker side should already timed out, just log the error and return String errorMessage = String - .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", _timeOutMs, + .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs, _queryContext); LOGGER.error(errorMessage); return new IntermediateResultsBlock(new TimeoutException(errorMessage)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java index 07aad33..2da4c56 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java @@ -42,8 +42,8 @@ public class SelectionOnlyCombineOperator extends BaseCombineOperator { private final int _numRowsToKeep; public SelectionOnlyCombineOperator(List<Operator> operators, QueryContext queryContext, - ExecutorService executorService, long timeOutMs) { - super(operators, queryContext, executorService, timeOutMs); + ExecutorService executorService, long endTimeMs) { + super(operators, queryContext, executorService, endTimeMs); _numRowsToKeep = queryContext.getLimit(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java index af5b18e..f95908f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java @@ -69,8 +69,8 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator { private final int _numRowsToKeep; public SelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext, - ExecutorService executorService, long timeOutMs) { - super(operators, queryContext, executorService, timeOutMs); + ExecutorService executorService, long endTimeMs) { + super(operators, queryContext, executorService, endTimeMs); _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset(); } @@ -91,9 +91,6 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator { } private IntermediateResultsBlock minMaxValueBasedCombine() { - long startTimeMs = System.currentTimeMillis(); - long endTimeMs = startTimeMs + _timeOutMs; - List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions(); assert orderByExpressions != null; int numOrderByExpressions = orderByExpressions.size(); @@ -270,7 +267,7 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator { int numBlocksMerged = 0; while (numBlocksMerged + numOperatorsSkipped.get() < numOperators) { IntermediateResultsBlock blockToMerge = - blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (blockToMerge == null) { // Query times out, skip merging the remaining results blocks LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java index 4f01fea..b6d9155 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java @@ -51,12 +51,10 @@ public class CombinePlanNode implements PlanNode { // Try to schedule 10 plans for each thread, or evenly distribute plans to all MAX_NUM_THREADS_PER_QUERY threads private static final int TARGET_NUM_PLANS_PER_THREAD = 10; - private static final int TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN = 10_000; - private final List<PlanNode> _planNodes; private final QueryContext _queryContext; private final ExecutorService _executorService; - private final long _timeOutMs; + private final long _endTimeMs; private final int _numGroupsLimit; /** @@ -65,15 +63,15 @@ public class CombinePlanNode implements PlanNode { * @param planNodes List of underlying plan nodes * @param queryContext Query context * @param executorService Executor service - * @param timeOutMs Time out in milliseconds for query execution (not for planning phase) + * @param endTimeMs End time in milliseconds for the query * @param numGroupsLimit Limit of number of groups stored in each segment */ public CombinePlanNode(List<PlanNode> planNodes, QueryContext queryContext, ExecutorService executorService, - long timeOutMs, int numGroupsLimit) { + long endTimeMs, int numGroupsLimit) { _planNodes = planNodes; _queryContext = queryContext; _executorService = executorService; - _timeOutMs = timeOutMs; + _endTimeMs = endTimeMs; _numGroupsLimit = numGroupsLimit; } @@ -91,9 +89,6 @@ public class CombinePlanNode implements PlanNode { } else { // Large number of plan nodes, run them in parallel - // Calculate the time out timestamp - long endTimeMs = System.currentTimeMillis() + TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN; - int numThreads = Math.min((numPlanNodes + TARGET_NUM_PLANS_PER_THREAD - 1) / TARGET_NUM_PLANS_PER_THREAD, MAX_NUM_THREADS_PER_QUERY); @@ -136,7 +131,7 @@ public class CombinePlanNode implements PlanNode { try { for (Future future : futures) { List<Operator> ops = - (List<Operator>) future.get(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + (List<Operator>) future.get(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); operators.addAll(ops); } } catch (Exception e) { @@ -163,22 +158,22 @@ public class CombinePlanNode implements PlanNode { if (QueryContextUtils.isAggregationQuery(_queryContext)) { if (_queryContext.getGroupByExpressions() == null) { // Aggregation only - return new AggregationOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs); + return new AggregationOnlyCombineOperator(operators, _queryContext, _executorService, _endTimeMs); } else { // Aggregation group-by QueryOptions queryOptions = new QueryOptions(_queryContext.getQueryOptions()); if (queryOptions.isGroupByModeSQL()) { - return new GroupByOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs); + return new GroupByOrderByCombineOperator(operators, _queryContext, _executorService, _endTimeMs); } - return new GroupByCombineOperator(operators, _queryContext, _executorService, _timeOutMs, _numGroupsLimit); + return new GroupByCombineOperator(operators, _queryContext, _executorService, _endTimeMs, _numGroupsLimit); } } else { if (_queryContext.getLimit() == 0 || _queryContext.getOrderByExpressions() == null) { // Selection only - return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs); + return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService, _endTimeMs); } else { // Selection order-by - return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs); + return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService, _endTimeMs); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index caae551..746b08a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -97,13 +97,13 @@ public class InstancePlanMakerImplV2 implements PlanMaker { @Override public Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext, - ExecutorService executorService, long timeOutMs) { + ExecutorService executorService, long endTimeMs) { List<PlanNode> planNodes = new ArrayList<>(indexSegments.size()); for (IndexSegment indexSegment : indexSegments) { planNodes.add(makeSegmentPlanNode(indexSegment, queryContext)); } CombinePlanNode combinePlanNode = - new CombinePlanNode(planNodes, queryContext, executorService, timeOutMs, _numGroupsLimit); + new CombinePlanNode(planNodes, queryContext, executorService, endTimeMs, _numGroupsLimit); return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java index 553a6d8..b4a316d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java @@ -37,7 +37,7 @@ public interface PlanMaker { * Returns an instance level {@link Plan} which contains the logical execution plan for multiple segments. */ Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext, ExecutorService executorService, - long timeoutMs); + long endTimeMs); /** * Returns a segment level {@link PlanNode} which contains the logical execution plan for one segment. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 46d2579..df29292 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -100,7 +100,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { if (schedulerWaitTimer != null) { schedulerWaitTimer.stopAndRecord(); } - long querySchedulingTimeMs = System.currentTimeMillis() - timerContext.getQueryArrivalTimeMs(); + long queryArrivalTimeMs = timerContext.getQueryArrivalTimeMs(); + long querySchedulingTimeMs = System.currentTimeMillis() - queryArrivalTimeMs; TimerContext.Timer queryProcessingTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PROCESSING); long requestId = queryRequest.getRequestId(); @@ -116,10 +117,9 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { queryTimeoutMs = timeoutFromQueryOptions; } } - long remainingTimeMs = queryTimeoutMs - querySchedulingTimeMs; // Query scheduler wait time already exceeds query timeout, directly return - if (remainingTimeMs <= 0) { + if (querySchedulingTimeMs >= queryTimeoutMs) { _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.SCHEDULING_TIMEOUT_EXCEPTIONS, 1); String errorMessage = String .format("Query scheduling took %dms (longer than query timeout of %dms)", querySchedulingTimeMs, @@ -213,8 +213,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { for (SegmentDataManager segmentDataManager : segmentDataManagers) { indexSegments.add(segmentDataManager.getSegment()); } - Plan globalQueryPlan = - _planMaker.makeInstancePlan(indexSegments, queryContext, executorService, remainingTimeMs); + long endTimeMs = queryArrivalTimeMs + queryTimeoutMs; + Plan globalQueryPlan = _planMaker.makeInstancePlan(indexSegments, queryContext, executorService, endTimeMs); planBuildTimer.stopAndRecord(); TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java index ac4ecf5..5ee3da2 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.segment.ReadMode; +import org.apache.pinot.common.utils.CommonConstants.Server; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.data.readers.GenericRowRecordReader; import org.apache.pinot.core.indexsegment.IndexSegment; @@ -227,8 +228,9 @@ public class SelectionCombineOperatorTest { for (IndexSegment indexSegment : _indexSegments) { planNodes.add(PLAN_MAKER.makeSegmentPlanNode(indexSegment, queryContext)); } - CombinePlanNode combinePlanNode = - new CombinePlanNode(planNodes, queryContext, EXECUTOR, 1000, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); + CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, queryContext, EXECUTOR, + System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS, + InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); return combinePlanNode.run().nextBlock(); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java index ed8fb6e..b0d5d13 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; +import org.apache.pinot.common.utils.CommonConstants.Server; import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; @@ -55,7 +56,8 @@ public class CombinePlanNodeTest { return null; }); } - CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, 1000, + CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, + System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); combinePlanNode.run(); Assert.assertEquals(numPlans, count.get()); @@ -64,15 +66,13 @@ public class CombinePlanNodeTest { @Test public void testSlowPlanNode() { - // Warning: this test is slow (take 10 seconds). - AtomicBoolean notInterrupted = new AtomicBoolean(); List<PlanNode> planNodes = new ArrayList<>(); for (int i = 0; i < 20; i++) { planNodes.add(() -> { try { - Thread.sleep(20000); + Thread.sleep(10000); } catch (InterruptedException e) { // Thread should be interrupted throw new RuntimeException(e); @@ -81,8 +81,9 @@ public class CombinePlanNodeTest { return null; }); } - CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, 0, - InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); + CombinePlanNode combinePlanNode = + new CombinePlanNode(planNodes, _queryContext, _executorService, System.currentTimeMillis() + 100, + InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); try { combinePlanNode.run(); } catch (RuntimeException e) { @@ -102,7 +103,8 @@ public class CombinePlanNodeTest { throw new RuntimeException("Inner exception message."); }); } - CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, 0, + CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, + System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); try { combinePlanNode.run(); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java index 619302c..86c144d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java @@ -198,8 +198,8 @@ public abstract class BaseQueriesTest { */ private BrokerResponseNative getBrokerResponse(QueryContext queryContext, PlanMaker planMaker) { // Server side. - Plan plan = planMaker - .makeInstancePlan(getIndexSegments(), queryContext, EXECUTOR_SERVICE, Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS); + Plan plan = planMaker.makeInstancePlan(getIndexSegments(), queryContext, EXECUTOR_SERVICE, + System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS); DataTable instanceResponse = plan.execute(); // Broker side. diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java index 19afd73..f419cfd 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java @@ -35,7 +35,7 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.response.broker.SelectionResults; import org.apache.pinot.common.segment.ReadMode; -import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.CommonConstants.Server; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.common.utils.DataTable; @@ -305,7 +305,6 @@ public class DistinctQueriesTest extends BaseQueriesTest { } } - /** * Test DISTINCT query within a single segment. * <p>The following query types are tested: @@ -759,6 +758,7 @@ public class DistinctQueriesTest extends BaseQueriesTest { }; testDistinctInterSegmentHelper(pqlQueries, sqlQueries); } + /** * Helper method to query 2 servers with different segments. Server0 will have 2 copies of segment0; Server1 will have * 2 copies of segment1. @@ -768,10 +768,10 @@ public class DistinctQueriesTest extends BaseQueriesTest { // Server side DataTable instanceResponse0 = PLAN_MAKER .makeInstancePlan(Arrays.asList(segment0, segment0), queryContext, EXECUTOR_SERVICE, - CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute(); + System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute(); DataTable instanceResponse1 = PLAN_MAKER .makeInstancePlan(Arrays.asList(segment1, segment1), queryContext, EXECUTOR_SERVICE, - CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute(); + System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute(); // Broker side BrokerReduceService brokerReduceService = new BrokerReduceService(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org