This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b670c552f5 Fix pipeline breaker error handling (#11411) b670c552f5 is described below commit b670c552f5b8ab8cbf6d1eaf79c79a0d926c7ebe Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Aug 23 11:43:45 2023 -0700 Fix pipeline breaker error handling (#11411) --- .../apache/pinot/query/mailbox/MailboxService.java | 2 - .../apache/pinot/query/runtime/QueryRunner.java | 56 ++++++---- .../plan/pipeline/PipelineBreakerExecutor.java | 56 +++++----- .../plan/pipeline/PipelineBreakerOperator.java | 116 +++++++++------------ .../plan/pipeline/PipelineBreakerResult.java | 9 +- .../query/service/dispatch/QueryDispatcher.java | 12 +-- .../pinot/query/runtime/QueryRunnerTest.java | 46 ++++---- .../pinot/query/runtime/QueryRunnerTestBase.java | 10 +- .../plan/pipeline/PipelineBreakerExecutorTest.java | 28 ++--- 9 files changed, 166 insertions(+), 169 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java index c95c7c3f23..7c171f1bf4 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.query.mailbox; -import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; @@ -100,7 +99,6 @@ public class MailboxService { * not open the underlying channel or acquire any additional resources. Instead, it will initialize lazily when the * data is sent for the first time. */ - @VisibleForTesting public SendingMailbox getSendingMailbox(String hostname, int port, String mailboxId, long deadlineMs) { if (_hostname.equals(hostname) && _port == port) { return new InMemorySendingMailbox(mailboxId, this, deadlineMs); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index edcd0a0fdc..b66c71de48 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -36,9 +36,12 @@ import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl; import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.query.mailbox.MailboxIdUtils; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.plannode.MailboxSendNode; import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.routing.MailboxMetadata; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils; import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator; @@ -144,34 +147,47 @@ public class QueryRunner { long deadlineMs = System.currentTimeMillis() + timeoutMs; // run pre-stage execution for all pipeline breakers - PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, - _mailboxService, distributedStagePlan, deadlineMs, requestId, isTraceEnabled); + PipelineBreakerResult pipelineBreakerResult = + PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, deadlineMs, + requestId, isTraceEnabled); + + // Send error block to all the receivers if pipeline breaker fails + if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) { + TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock(); + LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId, + distributedStagePlan.getStageId(), errorBlock.getDataBlock().getExceptions()); + int receiverStageId = ((MailboxSendNode) distributedStagePlan.getStageRoot()).getReceiverStageId(); + MailboxMetadata mailboxMetadata = distributedStagePlan.getStageMetadata().getWorkerMetadataList() + .get(distributedStagePlan.getServer().workerId()).getMailBoxInfosMap().get(receiverStageId); + List<String> mailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata); + for (int i = 0; i < mailboxIds.size(); i++) { + try { + _mailboxService.getSendingMailbox(mailboxMetadata.getVirtualAddress(i).hostname(), + mailboxMetadata.getVirtualAddress(i).port(), mailboxIds.get(i), deadlineMs).send(errorBlock); + } catch (TimeoutException e) { + LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}", mailboxIds.get(i), + requestId, distributedStagePlan.getStageId(), e); + } catch (Exception e) { + LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}", + mailboxIds.get(i), requestId, distributedStagePlan.getStageId(), e); + } + } + return; + } // Set Join Overflow configs to StageMetadata from request setJoinOverflowConfigs(distributedStagePlan, requestMetadataMap); // run OpChain + OpChain opChain; if (DistributedStagePlan.isLeafStage(distributedStagePlan)) { - try { - OpChain rootOperator = compileLeafStage(requestId, distributedStagePlan, requestMetadataMap, - pipelineBreakerResult, deadlineMs, isTraceEnabled); - _scheduler.register(rootOperator); - } catch (Exception e) { - LOGGER.error("Error executing leaf stage for: {}:{}", requestId, distributedStagePlan.getStageId(), e); - _scheduler.cancel(requestId); - throw e; - } + opChain = compileLeafStage(requestId, distributedStagePlan, requestMetadataMap, pipelineBreakerResult, deadlineMs, + isTraceEnabled); } else { - try { - OpChain rootOperator = compileIntermediateStage(requestId, distributedStagePlan, requestMetadataMap, - pipelineBreakerResult, deadlineMs, isTraceEnabled); - _scheduler.register(rootOperator); - } catch (Exception e) { - LOGGER.error("Error executing intermediate stage for: {}:{}", requestId, distributedStagePlan.getStageId(), e); - _scheduler.cancel(requestId); - throw e; - } + opChain = compileIntermediateStage(requestId, distributedStagePlan, requestMetadataMap, pipelineBreakerResult, + deadlineMs, isTraceEnabled); } + _scheduler.register(opChain); } private void setJoinOverflowConfigs(DistributedStagePlan distributedStagePlan, diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java index 6149b758fe..69663f5d33 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java @@ -18,13 +18,13 @@ */ package org.apache.pinot.query.runtime.plan.pipeline; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; @@ -45,11 +45,11 @@ import org.slf4j.LoggerFactory; * Utility class to run pipeline breaker execution and collects the results. */ public class PipelineBreakerExecutor { - private static final Logger LOGGER = LoggerFactory.getLogger(PipelineBreakerExecutor.class); private PipelineBreakerExecutor() { - // do not instantiate. } + private static final Logger LOGGER = LoggerFactory.getLogger(PipelineBreakerExecutor.class); + /** * Execute a pipeline breaker and collect the results (synchronously). Currently, pipeline breaker executor can only * execute mailbox receive pipeline breaker. @@ -64,10 +64,10 @@ public class PipelineBreakerExecutor { * - If exception occurs, exception block will be wrapped in {@link TransferableBlock} and assigned to each PB node. * - Normal stats will be attached to each PB node and downstream execution should return with stats attached. */ - public static PipelineBreakerResult executePipelineBreakers( - OpChainSchedulerService scheduler, - MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long deadlineMs, - long requestId, boolean isTraceEnabled) { + @Nullable + public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler, + MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long deadlineMs, long requestId, + boolean isTraceEnabled) { PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext(); PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), pipelineBreakerContext); if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) { @@ -76,31 +76,25 @@ public class PipelineBreakerExecutor { // TODO: This PlanRequestContext needs to indicate it is a pre-stage opChain and only listens to pre-stage // OpChain receive-mail callbacks. // see also: MailboxIdUtils TODOs, de-couple mailbox id from query information - OpChainExecutionContext opChainContext = new OpChainExecutionContext(mailboxService, requestId, - stageRoot.getPlanFragmentId(), distributedStagePlan.getServer(), deadlineMs, - distributedStagePlan.getStageMetadata(), null, isTraceEnabled); + OpChainExecutionContext opChainContext = + new OpChainExecutionContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), + distributedStagePlan.getServer(), deadlineMs, distributedStagePlan.getStageMetadata(), null, + isTraceEnabled); PhysicalPlanContext physicalPlanContext = new PhysicalPlanContext(opChainContext, null); return PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext); } catch (Exception e) { - LOGGER.error("Unable to create pipeline breaker results for Req: " + requestId + ", Stage: " - + distributedStagePlan.getStageId(), e); - // Create all error blocks for all pipeline breaker nodes. - TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(e); - Map<Integer, List<TransferableBlock>> resultMap = new HashMap<>(); - for (int key : pipelineBreakerContext.getNodeIdMap().values()) { - if (pipelineBreakerContext.getPipelineBreakerMap().containsKey(key)) { - resultMap.put(key, Collections.singletonList(errorBlock)); - } - } - return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap, null); + LOGGER.error("Caught exception executing pipeline breaker for request: {}, stage: {}", requestId, + distributedStagePlan.getStageId(), e); + return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), Collections.emptyMap(), + TransferableBlockUtils.getErrorTransferableBlock(e), null); } } else { return null; } } - private static PipelineBreakerResult execute(OpChainSchedulerService scheduler, - PipelineBreakerContext context, PhysicalPlanContext physicalPlanContext) + private static PipelineBreakerResult execute(OpChainSchedulerService scheduler, PipelineBreakerContext context, + PhysicalPlanContext physicalPlanContext) throws Exception { Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new HashMap<>(); for (Map.Entry<Integer, PlanNode> e : context.getPipelineBreakerMap().entrySet()) { @@ -119,18 +113,20 @@ public class PipelineBreakerExecutor { PipelineBreakerContext context, Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap, PhysicalPlanContext physicalPlanContext) throws Exception { - PipelineBreakerOperator pipelineBreakerOperator = new PipelineBreakerOperator( - physicalPlanContext.getOpChainExecutionContext(), pipelineWorkerMap); + PipelineBreakerOperator pipelineBreakerOperator = + new PipelineBreakerOperator(physicalPlanContext.getOpChainExecutionContext(), pipelineWorkerMap); CountDownLatch latch = new CountDownLatch(1); - OpChain pipelineBreakerOpChain = new OpChain(physicalPlanContext.getOpChainExecutionContext(), - pipelineBreakerOperator, physicalPlanContext.getReceivingMailboxIds(), (id) -> latch.countDown()); + OpChain pipelineBreakerOpChain = + new OpChain(physicalPlanContext.getOpChainExecutionContext(), pipelineBreakerOperator, + physicalPlanContext.getReceivingMailboxIds(), (id) -> latch.countDown()); scheduler.register(pipelineBreakerOpChain); long timeoutMs = physicalPlanContext.getDeadlineMs() - System.currentTimeMillis(); if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) { return new PipelineBreakerResult(context.getNodeIdMap(), pipelineBreakerOperator.getResultMap(), - pipelineBreakerOpChain.getStats()); + pipelineBreakerOperator.getErrorBlock(), pipelineBreakerOpChain.getStats()); } else { - throw new IOException("Exception occur when awaiting breaker results!"); + throw new TimeoutException( + String.format("Timed out waiting for pipeline breaker results after: %dms", timeoutMs)); } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java index 1aab124135..9fe2588827 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java @@ -18,41 +18,34 @@ */ package org.apache.pinot.query.runtime.plan.pipeline; -import com.google.common.collect.ImmutableSet; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; -import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Queue; import javax.annotation.Nullable; -import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; class PipelineBreakerOperator extends MultiStageOperator { - private static final Logger LOGGER = LoggerFactory.getLogger(PipelineBreakerOperator.class); private static final String EXPLAIN_NAME = "PIPELINE_BREAKER"; - private final Deque<Map.Entry<Integer, Operator<TransferableBlock>>> _workerEntries; - private final Map<Integer, List<TransferableBlock>> _resultMap; - private final ImmutableSet<Integer> _expectedKeySet; - private TransferableBlock _finalBlock; - public PipelineBreakerOperator(OpChainExecutionContext context, - Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap) { + private final Map<Integer, Operator<TransferableBlock>> _workerMap; + + private Map<Integer, List<TransferableBlock>> _resultMap; + private TransferableBlock _errorBlock; + + public PipelineBreakerOperator(OpChainExecutionContext context, Map<Integer, Operator<TransferableBlock>> workerMap) { super(context); + _workerMap = workerMap; _resultMap = new HashMap<>(); - _expectedKeySet = ImmutableSet.copyOf(pipelineWorkerMap.keySet()); - _workerEntries = new ArrayDeque<>(); - _workerEntries.addAll(pipelineWorkerMap.entrySet()); - for (int workerKey : _expectedKeySet) { + for (int workerKey : workerMap.keySet()) { _resultMap.put(workerKey, new ArrayList<>()); } } @@ -62,6 +55,10 @@ class PipelineBreakerOperator extends MultiStageOperator { } @Nullable + public TransferableBlock getErrorBlock() { + return _errorBlock; + } + @Override public String toExplainString() { return EXPLAIN_NAME; @@ -69,63 +66,44 @@ class PipelineBreakerOperator extends MultiStageOperator { @Override protected TransferableBlock getNextBlock() { - // Poll from every mailbox operator: - // - Return the first content block - // - If no content block found but there are mailboxes not finished, try again - // - If all content blocks are already returned, return end-of-stream block - while (!_workerEntries.isEmpty()) { - if (_finalBlock != null) { - return _finalBlock; - } - if (System.currentTimeMillis() > _context.getDeadlineMs()) { - _finalBlock = TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR); - constructErrorResponse(_finalBlock); - return _finalBlock; - } - - Map.Entry<Integer, Operator<TransferableBlock>> worker = _workerEntries.getLast(); - TransferableBlock block = worker.getValue().nextBlock(); - - if (block == null) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("==[PB]== Null block on " + _context.getId() + " worker " + worker.getKey()); + if (_errorBlock != null) { + return _errorBlock; + } + // NOTE: Put an empty list for each worker in case there is no data block returned from that worker + if (_workerMap.size() == 1) { + Map.Entry<Integer, Operator<TransferableBlock>> entry = _workerMap.entrySet().iterator().next(); + List<TransferableBlock> dataBlocks = new ArrayList<>(); + _resultMap = Collections.singletonMap(entry.getKey(), dataBlocks); + Operator<TransferableBlock> operator = entry.getValue(); + TransferableBlock block = operator.nextBlock(); + while (!block.isSuccessfulEndOfStreamBlock()) { + if (block.isErrorBlock()) { + _errorBlock = block; + return block; } - continue; + dataBlocks.add(block); + block = operator.nextBlock(); } - - // Release the mailbox worker when the block is end-of-stream - if (block.isSuccessfulEndOfStreamBlock()) { - _workerEntries.removeLast(); - continue; + } else { + _resultMap = new HashMap<>(); + for (int workerKey : _workerMap.keySet()) { + _resultMap.put(workerKey, new ArrayList<>()); } - - if (block.isErrorBlock()) { - _finalBlock = block; - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("==[PB]== Returned block from : " + _context.getId() + " block: " + block); - } - _resultMap.get(worker.getKey()).add(block); - return block; - } - if (System.currentTimeMillis() > _context.getDeadlineMs()) { - _finalBlock = TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR); - return _finalBlock; - } else if (_finalBlock == null) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("==[PB]== Finished : " + _context.getId()); + // Keep polling from every operator in round-robin fashion + Queue<Map.Entry<Integer, Operator<TransferableBlock>>> entries = new ArrayDeque<>(_workerMap.entrySet()); + while (!entries.isEmpty()) { + Map.Entry<Integer, Operator<TransferableBlock>> entry = entries.poll(); + TransferableBlock block = entry.getValue().nextBlock(); + if (block.isErrorBlock()) { + _errorBlock = block; + return block; + } + if (block.isDataBlock()) { + _resultMap.get(entry.getKey()).add(block); + entries.offer(entry); + } } - _finalBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock(); - } - return _finalBlock; - } - - /** - * Setting all result map to error if any of the pipeline breaker returns an ERROR. - */ - private void constructErrorResponse(TransferableBlock errorBlock) { - for (int key : _expectedKeySet) { - _resultMap.put(key, Collections.singletonList(errorBlock)); } + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java index fe0f2cba61..2e2b003e34 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java @@ -32,12 +32,14 @@ import org.apache.pinot.query.runtime.operator.OpChainStats; public class PipelineBreakerResult { private final Map<PlanNode, Integer> _nodeIdMap; private final Map<Integer, List<TransferableBlock>> _resultMap; + private final TransferableBlock _errorBlock; private final OpChainStats _opChainStats; public PipelineBreakerResult(Map<PlanNode, Integer> nodeIdMap, Map<Integer, List<TransferableBlock>> resultMap, - OpChainStats opChainStats) { + @Nullable TransferableBlock errorBlock, @Nullable OpChainStats opChainStats) { _nodeIdMap = nodeIdMap; _resultMap = resultMap; + _errorBlock = errorBlock; _opChainStats = opChainStats; } @@ -49,6 +51,11 @@ public class PipelineBreakerResult { return _resultMap; } + @Nullable + public TransferableBlock getErrorBlock() { + return _errorBlock; + } + @Nullable public OpChainStats getOpChainStats() { return _opChainStats; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 0d94a4d099..50ed9f7cf5 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -19,6 +19,7 @@ package org.apache.pinot.query.service.dispatch; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import io.grpc.Deadline; import java.util.ArrayList; import java.util.HashMap; @@ -211,8 +212,11 @@ public class QueryDispatcher { PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(scheduler, mailboxService, reducerStagePlan, System.currentTimeMillis() + timeoutMs, requestId, traceEnabled); - if (pipelineBreakerResult == null) { - throw new RuntimeException("Broker reducer error during query execution!"); + Preconditions.checkState(pipelineBreakerResult != null, "Pipeline breaker result should not be null"); + if (pipelineBreakerResult.getErrorBlock() != null) { + throw new RuntimeException( + "Received error query execution result block: " + pipelineBreakerResult.getErrorBlock().getDataBlock() + .getExceptions()); } collectStats(dispatchableSubPlan, pipelineBreakerResult.getOpChainStats(), statsAggregatorMap); List<TransferableBlock> resultDataBlocks = pipelineBreakerResult.getResultMap().get(0); @@ -245,10 +249,6 @@ public class QueryDispatcher { List<Object[]> resultRows = new ArrayList<>(); DataSchema resultSchema = toResultSchema(sourceSchema, fields); for (TransferableBlock transferableBlock : queryResult) { - if (transferableBlock.isErrorBlock()) { - throw new RuntimeException( - "Received error query execution result block: " + transferableBlock.getDataBlock().getExceptions()); - } DataBlock dataBlock = transferableBlock.getDataBlock(); int numColumns = resultSchema.getColumnNames().length; int numRows = dataBlock.getNumberOfRows(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java index 7ee9ed8732..3c73434c15 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java @@ -19,14 +19,15 @@ package org.apache.pinot.query.runtime; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.QueryEnvironmentTestBase; import org.apache.pinot.query.QueryServerEnclosure; import org.apache.pinot.query.mailbox.MailboxService; @@ -44,6 +45,8 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -188,12 +191,18 @@ public class QueryRunnerTest extends QueryRunnerTestBase { */ @Test(dataProvider = "testDataWithSqlExecutionExceptions") public void testSqlWithExceptionMsgChecker(String sql, String exceptionMsg) { - long requestId = RANDOM_REQUEST_ID_GEN.nextLong(); - DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); - Map<String, String> requestMetadataMap = - ImmutableMap.of(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, String.valueOf(requestId), - CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, - String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS)); + long requestId = REQUEST_ID_GEN.getAndIncrement(); + SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sql); + QueryEnvironment.QueryPlannerResult queryPlannerResult = + _queryEnvironment.planQuery(sql, sqlNodeAndOptions, requestId); + DispatchableSubPlan dispatchableSubPlan = queryPlannerResult.getQueryPlan(); + Map<String, String> requestMetadataMap = new HashMap<>(); + requestMetadataMap.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, String.valueOf(requestId)); + Long timeoutMs = QueryOptionsUtils.getTimeoutMs(sqlNodeAndOptions.getOptions()); + requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, + String.valueOf(timeoutMs != null ? timeoutMs : CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS)); + requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, "true"); + requestMetadataMap.putAll(sqlNodeAndOptions.getOptions()); int reducerStageId = -1; for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) { if (dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment() @@ -210,17 +219,16 @@ public class QueryRunnerTest extends QueryRunnerTestBase { Long.parseLong(requestMetadataMap.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS)), _mailboxService, _reducerScheduler, null, false); - } catch (RuntimeException rte) { - Assert.assertTrue(rte.getMessage().contains("Received error query execution result block")); - // TODO: The actual message is (usually) something like: - // Received error query execution result block: {200=QueryExecutionError: - // java.lang.IllegalArgumentException: Illegal Json Path: $['path'] does not match document - // at org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator.throwPathNotFoundException(...) - // at org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator.processValue(...) - // at org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator.evaluateBlock(...) - // at org.apache.pinot.core.common.DataFetcher$ColumnValueReader.readIntValues(DataFetcher.java:489)} - Assert.assertTrue(rte.getMessage().contains(exceptionMsg), "Exception should contain: " + exceptionMsg - + "! but found: " + rte.getMessage()); + Assert.fail("Should have thrown exception!"); + } catch (RuntimeException e) { + // NOTE: The actual message is (usually) something like: + // Received error query execution result block: {200=QueryExecutionError: + // Query execution error on: Server_localhost_12345 + // java.lang.IllegalArgumentException: Illegal Json Path: $['path'] does not match document + String exceptionMessage = e.getMessage(); + Assert.assertTrue(exceptionMessage.startsWith("Received error query execution result block: ")); + Assert.assertTrue(exceptionMessage.contains(exceptionMsg), + "Exception should contain: " + exceptionMsg + ", but found: " + exceptionMessage); } } @@ -288,7 +296,7 @@ public class QueryRunnerTest extends QueryRunnerTestBase { // Timeout exception should occur with this option: new Object[]{ "SET timeoutMs = 1; SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c ON a.col1 = c.col1", - "timeout" + "Timeout" }, // Function with incorrect argument signature should throw runtime exception when casting string to numeric diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java index d8589c42a0..7dcf8bb759 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java @@ -37,14 +37,15 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.NamedThreadFactory; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator; import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.QueryServerEnclosure; @@ -87,7 +88,7 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { protected static final String SEGMENT_BREAKER_KEY = "__SEGMENT_BREAKER_KEY__"; protected static final String SEGMENT_BREAKER_STR = "------"; protected static final GenericRow SEGMENT_BREAKER_ROW = new GenericRow(); - protected static final Random RANDOM_REQUEST_ID_GEN = new Random(); + protected static final AtomicLong REQUEST_ID_GEN = new AtomicLong(); protected QueryEnvironment _queryEnvironment; protected String _reducerHostname; protected int _reducerGrpcPort; @@ -108,15 +109,16 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { * ser/de dispatches. */ protected List<Object[]> queryRunner(String sql, Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) { - long requestId = RANDOM_REQUEST_ID_GEN.nextLong(); + long requestId = REQUEST_ID_GEN.getAndIncrement(); SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sql); QueryEnvironment.QueryPlannerResult queryPlannerResult = _queryEnvironment.planQuery(sql, sqlNodeAndOptions, requestId); DispatchableSubPlan dispatchableSubPlan = queryPlannerResult.getQueryPlan(); Map<String, String> requestMetadataMap = new HashMap<>(); requestMetadataMap.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, String.valueOf(requestId)); + Long timeoutMs = QueryOptionsUtils.getTimeoutMs(sqlNodeAndOptions.getOptions()); requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, - String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS)); + String.valueOf(timeoutMs != null ? timeoutMs : CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS)); requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, "true"); requestMetadataMap.putAll(sqlNodeAndOptions.getOptions()); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java index 7f33dc45a4..b5ba82e48d 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java @@ -136,6 +136,7 @@ public class PipelineBreakerExecutorTest { // then // should have single PB result, receive 2 data blocks, EOS block shouldn't be included Assert.assertNotNull(pipelineBreakerResult); + Assert.assertNull(pipelineBreakerResult.getErrorBlock()); Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1); Assert.assertEquals(pipelineBreakerResult.getResultMap().values().iterator().next().size(), 2); @@ -176,6 +177,7 @@ public class PipelineBreakerExecutorTest { // then // should have two PB result, receive 2 data blocks, one each, EOS block shouldn't be included Assert.assertNotNull(pipelineBreakerResult); + Assert.assertNull(pipelineBreakerResult.getErrorBlock()); Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2); Iterator<List<TransferableBlock>> it = pipelineBreakerResult.getResultMap().values().iterator(); Assert.assertEquals(it.next().size(), 1); @@ -201,8 +203,9 @@ public class PipelineBreakerExecutorTest { System.currentTimeMillis() + 10_000L, 0, false); // then - // should contain only failure error blocks + // should return empty block list Assert.assertNotNull(pipelineBreakerResult); + Assert.assertNull(pipelineBreakerResult.getErrorBlock()); Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1); List<TransferableBlock> resultBlocks = pipelineBreakerResult.getResultMap().values().iterator().next(); Assert.assertEquals(resultBlocks.size(), 0); @@ -233,11 +236,9 @@ public class PipelineBreakerExecutorTest { // then // should contain only failure error blocks Assert.assertNotNull(pipelineBreakerResult); - Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1); - List<TransferableBlock> resultBlocks = pipelineBreakerResult.getResultMap().values().iterator().next(); - Assert.assertEquals(resultBlocks.size(), 1); - Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock()); - Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock()); + TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock(); + Assert.assertNotNull(errorBlock); + Assert.assertTrue(errorBlock.isErrorBlock()); } @Test @@ -311,17 +312,8 @@ public class PipelineBreakerExecutorTest { // then // should fail even if one of the 2 PB doesn't contain error block from sender. Assert.assertNotNull(pipelineBreakerResult); - Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2); - - boolean errorFound = false; - for (List<TransferableBlock> resultBlocks : pipelineBreakerResult.getResultMap().values()) { - if (!resultBlocks.isEmpty()) { - TransferableBlock lastBlock = resultBlocks.get(resultBlocks.size() - 1); - if (lastBlock.isErrorBlock()) { - errorFound = true; - } - } - } - Assert.assertTrue(errorFound, "An error block should be the last block on at least one of the result map entries"); + TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock(); + Assert.assertNotNull(errorBlock); + Assert.assertTrue(errorBlock.isErrorBlock()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org