This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6a47311c03 [multistage][feature] add pipeline breaker stats collector (#10958) 6a47311c03 is described below commit 6a47311c03fdc020a4e05a3acf5cacb7843e3307 Author: Rong Rong <ro...@apache.org> AuthorDate: Fri Jun 23 19:57:16 2023 -0700 [multistage][feature] add pipeline breaker stats collector (#10958) - [multistage][pb][stats] add stats support for pipeline breaker results - add stats test collector - refactor the test setup format to be easier to understand --------- Co-authored-by: Rong Rong <ro...@startree.ai> --- .../runtime/plan/OpChainExecutionContext.java | 11 +++- .../plan/pipeline/PipelineBreakerExecutor.java | 17 +++--- .../plan/pipeline/PipelineBreakerOperator.java | 1 - .../plan/pipeline/PipelineBreakerResult.java | 12 +++- .../executor/OpChainSchedulerServiceTest.java | 2 +- .../runtime/executor/RoundRobinSchedulerTest.java | 2 +- .../operator/MailboxReceiveOperatorTest.java | 58 +++++++++++--------- .../runtime/operator/MailboxSendOperatorTest.java | 2 +- .../pinot/query/runtime/operator/OpChainTest.java | 8 +-- .../query/runtime/operator/OperatorTestUtil.java | 23 +++++++- .../operator/SortedMailboxReceiveOperatorTest.java | 64 ++++++++++++---------- .../plan/pipeline/PipelineBreakerExecutorTest.java | 23 ++++++-- 12 files changed, 139 insertions(+), 84 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java index b23f41400a..b179900e0c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java @@ -24,6 +24,7 @@ import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.operator.OpChainId; import org.apache.pinot.query.runtime.operator.OpChainStats; +import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; /** @@ -45,7 +46,7 @@ public class OpChainExecutionContext { @VisibleForTesting public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId, VirtualServerAddress server, long deadlineMs, StageMetadata stageMetadata, - boolean traceEnabled) { + PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) { _mailboxService = mailboxService; _requestId = requestId; _stageId = stageId; @@ -54,13 +55,17 @@ public class OpChainExecutionContext { _stageMetadata = stageMetadata; _id = new OpChainId(requestId, server.workerId(), stageId); _stats = new OpChainStats(_id.toString()); + if (pipelineBreakerResult != null && pipelineBreakerResult.getOpChainStats() != null) { + _stats.getOperatorStatsMap().putAll( + pipelineBreakerResult.getOpChainStats().getOperatorStatsMap()); + } _traceEnabled = traceEnabled; } public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) { this(physicalPlanContext.getMailboxService(), physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(), - physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(), - physicalPlanContext.getStageMetadata(), physicalPlanContext.isTraceEnabled()); + physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(), physicalPlanContext.getStageMetadata(), + physicalPlanContext.getPipelineBreakerResult(), physicalPlanContext.isTraceEnabled()); } public MailboxService getMailboxService() { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java index fa33192a64..c148b37b0c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java @@ -77,9 +77,7 @@ public class PipelineBreakerExecutor { PhysicalPlanContext physicalPlanContext = new PhysicalPlanContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs, distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled); - Map<Integer, List<TransferableBlock>> resultMap = - PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext); - return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap); + return PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext); } catch (Exception e) { LOGGER.error("Unable to create pipeline breaker results for Req: " + requestId + ", Stage: " + distributedStagePlan.getStageId(), e); @@ -91,14 +89,14 @@ public class PipelineBreakerExecutor { resultMap.put(key, Collections.singletonList(errorBlock)); } } - return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap); + return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap, null); } } else { return null; } } - private static Map<Integer, List<TransferableBlock>> execute(OpChainSchedulerService scheduler, + private static PipelineBreakerResult execute(OpChainSchedulerService scheduler, PipelineBreakerContext context, PhysicalPlanContext physicalPlanContext) throws Exception { Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new HashMap<>(); @@ -111,11 +109,11 @@ public class PipelineBreakerExecutor { OpChain tempOpChain = PhysicalPlanVisitor.walkPlanNode(planNode, physicalPlanContext); pipelineWorkerMap.put(key, tempOpChain.getRoot()); } - return runMailboxReceivePipelineBreaker(scheduler, pipelineWorkerMap, physicalPlanContext); + return runMailboxReceivePipelineBreaker(scheduler, context, pipelineWorkerMap, physicalPlanContext); } - private static Map<Integer, List<TransferableBlock>> runMailboxReceivePipelineBreaker( - OpChainSchedulerService scheduler, Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap, + private static PipelineBreakerResult runMailboxReceivePipelineBreaker(OpChainSchedulerService scheduler, + PipelineBreakerContext context, Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap, PhysicalPlanContext physicalPlanContext) throws Exception { PipelineBreakerOperator pipelineBreakerOperator = new PipelineBreakerOperator( @@ -126,7 +124,8 @@ public class PipelineBreakerExecutor { scheduler.register(pipelineBreakerOpChain); long timeoutMs = physicalPlanContext.getDeadlineMs() - System.currentTimeMillis(); if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) { - return pipelineBreakerOperator.getResultMap(); + return new PipelineBreakerResult(context.getNodeIdMap(), pipelineBreakerOperator.getResultMap(), + pipelineBreakerOpChain.getStats()); } else { throw new IOException("Exception occur when awaiting breaker results!"); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java index 438ce15a37..c9d4d58682 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java @@ -93,7 +93,6 @@ class PipelineBreakerOperator extends MultiStageOperator { return _errorBlock; } List<TransferableBlock> blockList = _resultMap.computeIfAbsent(worker.getKey(), (k) -> new ArrayList<>()); - // TODO: only data block is handled, we also need to handle metadata block from upstream in the future. if (!block.isEndOfStreamBlock()) { blockList.add(block); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java index 8388337ff1..fe0f2cba61 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java @@ -20,8 +20,10 @@ package org.apache.pinot.query.runtime.plan.pipeline; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.operator.OpChainStats; /** @@ -30,10 +32,13 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; public class PipelineBreakerResult { private final Map<PlanNode, Integer> _nodeIdMap; private final Map<Integer, List<TransferableBlock>> _resultMap; + private final OpChainStats _opChainStats; - public PipelineBreakerResult(Map<PlanNode, Integer> nodeIdMap, Map<Integer, List<TransferableBlock>> resultMap) { + public PipelineBreakerResult(Map<PlanNode, Integer> nodeIdMap, Map<Integer, List<TransferableBlock>> resultMap, + OpChainStats opChainStats) { _nodeIdMap = nodeIdMap; _resultMap = resultMap; + _opChainStats = opChainStats; } public Map<PlanNode, Integer> getNodeIdMap() { @@ -43,4 +48,9 @@ public class PipelineBreakerResult { public Map<Integer, List<TransferableBlock>> getResultMap() { return _resultMap; } + + @Nullable + public OpChainStats getOpChainStats() { + return _opChainStats; + } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java index 332ceaaaab..5088decbbe 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java @@ -74,7 +74,7 @@ public class OpChainSchedulerServiceTest { private OpChain getChain(MultiStageOperator operator) { VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1); - OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, null, true); + OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, null, null, true); return new OpChain(context, operator, ImmutableList.of()); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java index 798eb534ba..1c400c6e0f 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java @@ -180,6 +180,6 @@ public class RoundRobinSchedulerTest { private OpChainExecutionContext getOpChainExecutionContext(long requestId, int stageId, int virtualServerId) { return new OpChainExecutionContext(null, requestId, stageId, - new VirtualServerAddress("localhost", 1234, virtualServerId), 0, null, true); + new VirtualServerAddress("localhost", 1234, virtualServerId), 0, null, null, true); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java index fed5ce422e..48ace11a3f 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java @@ -75,20 +75,25 @@ public class MailboxReceiveOperatorTest { VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0); VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1); _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map( - s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata( - ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), - ImmutableList.of(server1, server2), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata( - ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), - ImmutableList.of(server1, server2), ImmutableMap.of())).build()).collect(Collectors.toList())).build(); - // sending stage is 0, receiving stage is 1 + s -> new WorkerMetadata.Builder().setVirtualServerAddress(s) + .addMailBoxInfoMap(0, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), + org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), + ImmutableList.of(server1, server2), ImmutableMap.of())) + .addMailBoxInfoMap(1, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), + org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), + ImmutableList.of(server1, server2), ImmutableMap.of())) + .build()).collect(Collectors.toList())).build(); _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map( - s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata( - ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), - ImmutableList.of(server1), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata( - ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), - ImmutableList.of(server1), ImmutableMap.of())).build()).collect(Collectors.toList())).build(); + s -> new WorkerMetadata.Builder().setVirtualServerAddress(s) + .addMailBoxInfoMap(0, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), + ImmutableList.of(server1), ImmutableMap.of())) + .addMailBoxInfoMap(1, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), + ImmutableList.of(server1), ImmutableMap.of())) + .build()).collect(Collectors.toList())).build(); } @AfterMethod @@ -105,7 +110,7 @@ public class MailboxReceiveOperatorTest { Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()) .collect(Collectors.toList())).build(); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata); //noinspection resource new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1); } @@ -113,7 +118,7 @@ public class MailboxReceiveOperatorTest { @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*") public void shouldThrowRangeDistributionNotSupported() { OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, null, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, null); //noinspection resource new MailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, 1); } @@ -125,8 +130,8 @@ public class MailboxReceiveOperatorTest { // Short timeoutMs should result in timeout OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L, - _stageMetadata1, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L, + _stageMetadata1); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) { Thread.sleep(100L); TransferableBlock mailbox = receiveOp.nextBlock(); @@ -136,8 +141,9 @@ public class MailboxReceiveOperatorTest { } // Longer timeout or default timeout (10s) doesn't result in timeout - context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L, - _stageMetadata1, false); + context = + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L, + _stageMetadata1); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) { Thread.sleep(100L); TransferableBlock mailbox = receiveOp.nextBlock(); @@ -150,7 +156,7 @@ public class MailboxReceiveOperatorTest { when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) { assertTrue(receiveOp.nextBlock().isNoOpBlock()); } @@ -162,7 +168,7 @@ public class MailboxReceiveOperatorTest { when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) { assertTrue(receiveOp.nextBlock().isEndOfStreamBlock()); } @@ -176,7 +182,7 @@ public class MailboxReceiveOperatorTest { TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) { List<Object[]> actualRows = receiveOp.nextBlock().getContainer(); assertEquals(actualRows.size(), 1); @@ -193,7 +199,7 @@ public class MailboxReceiveOperatorTest { TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage))); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) { TransferableBlock block = receiveOp.nextBlock(); assertTrue(block.isErrorBlock()); @@ -211,7 +217,7 @@ public class MailboxReceiveOperatorTest { TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, 1)) { List<Object[]> actualRows = receiveOp.nextBlock().getContainer(); @@ -234,7 +240,7 @@ public class MailboxReceiveOperatorTest { TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, 1)) { // Receive first block from server1 @@ -259,7 +265,7 @@ public class MailboxReceiveOperatorTest { TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, 1)) { TransferableBlock block = receiveOp.nextBlock(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index 2b74bb727a..c35714aa16 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -184,7 +184,7 @@ public class MailboxSendOperatorTest { new WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build(); OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE, - stageMetadata, false); + stageMetadata, null, false); return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java index 910cc38523..e7fdb70f3b 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java @@ -192,7 +192,7 @@ public class OpChainTest { int receivedStageId = 2; int senderStageId = 1; OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, - System.currentTimeMillis() + 1000, _receivingStageMetadata, true); + System.currentTimeMillis() + 1000, _receivingStageMetadata, null, true); Stack<MultiStageOperator> operators = getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime); @@ -206,7 +206,7 @@ public class OpChainTest { OpChainExecutionContext secondStageContext = new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, - System.currentTimeMillis() + 1000, _receivingStageMetadata, true); + System.currentTimeMillis() + 1000, _receivingStageMetadata, null, true); MailboxReceiveOperator secondStageReceiveOp = new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1); @@ -231,7 +231,7 @@ public class OpChainTest { int receivedStageId = 2; int senderStageId = 1; OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, - System.currentTimeMillis() + 1000, _receivingStageMetadata, false); + System.currentTimeMillis() + 1000, _receivingStageMetadata, null, false); Stack<MultiStageOperator> operators = getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime); @@ -243,7 +243,7 @@ public class OpChainTest { OpChainExecutionContext secondStageContext = new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, - System.currentTimeMillis() + 1000, _receivingStageMetadata, false); + System.currentTimeMillis() + 1000, _receivingStageMetadata, null, false); MailboxReceiveOperator secondStageReceiveOp = new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java index 7ecd344ab3..55878658f7 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java @@ -18,13 +18,18 @@ */ package org.apache.pinot.query.runtime.operator; +import com.google.common.collect.ImmutableMap; import java.util.Arrays; import java.util.List; +import java.util.Map; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory; @@ -41,6 +46,12 @@ public class OperatorTestUtil { public static final String OP_1 = "op1"; public static final String OP_2 = "op2"; + public static Map<String, String> getDummyStats(long requestId, int stageId, VirtualServerAddress serverAddress) { + OperatorStats operatorStats = new OperatorStats(requestId, stageId, serverAddress); + String statsId = new OpChainId(requestId, serverAddress.workerId(), stageId).toString(); + return OperatorUtils.getMetadataFromOperatorStats(ImmutableMap.of(statsId, operatorStats)); + } + static { MOCK_OPERATOR_FACTORY = new MockDataBlockOperatorFactory().registerOperator(OP_1, SIMPLE_KV_DATA_SCHEMA) .registerOperator(OP_2, SIMPLE_KV_DATA_SCHEMA).addRows(OP_1, SIMPLE_KV_DATA_ROWS.get(0)) @@ -62,18 +73,24 @@ public class OperatorTestUtil { return new TransferableBlock(Arrays.asList(rows), schema, DataBlock.Type.ROW); } + public static OpChainExecutionContext getOpChainContext(MailboxService mailboxService, + VirtualServerAddress receiverAddress, long deadlineMs, StageMetadata stageMetadata) { + return new OpChainExecutionContext(mailboxService, 0, 0, receiverAddress, deadlineMs, stageMetadata, null, false); + } + public static OpChainExecutionContext getDefaultContext() { VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0); - return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, true); + return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, null, true); } public static OpChainExecutionContext getDefaultContextWithTracingDisabled() { VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0); - return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, false); + return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, null, false); } public static OpChainExecutionContext getContext(long requestId, int stageId, VirtualServerAddress virtualServerAddress) { - return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, null, true); + return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, null, null, + true); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java index 25ef09f279..80cf7ed1c4 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java @@ -85,20 +85,25 @@ public class SortedMailboxReceiveOperatorTest { VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0); VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1); _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map( - s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata( - ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), - ImmutableList.of(server1, server2), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata( - ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), - ImmutableList.of(server1, server2), ImmutableMap.of())).build()).collect(Collectors.toList())).build(); - // sending stage is 0, receiving stage is 1 + s -> new WorkerMetadata.Builder().setVirtualServerAddress(s) + .addMailBoxInfoMap(0, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), + org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), + ImmutableList.of(server1, server2), ImmutableMap.of())) + .addMailBoxInfoMap(1, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), + org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), + ImmutableList.of(server1, server2), ImmutableMap.of())) + .build()).collect(Collectors.toList())).build(); _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map( - s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata( - ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), - ImmutableList.of(server1), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata( - ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), - ImmutableList.of(server1), ImmutableMap.of())).build()).collect(Collectors.toList())).build(); + s -> new WorkerMetadata.Builder().setVirtualServerAddress(s) + .addMailBoxInfoMap(0, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), + ImmutableList.of(server1), ImmutableMap.of())) + .addMailBoxInfoMap(1, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), + ImmutableList.of(server1), ImmutableMap.of())) + .build()).collect(Collectors.toList())).build(); } @AfterMethod @@ -115,7 +120,7 @@ public class SortedMailboxReceiveOperatorTest { Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()) .collect(Collectors.toList())).build(); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata); //noinspection resource new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1); @@ -124,7 +129,7 @@ public class SortedMailboxReceiveOperatorTest { @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*") public void shouldThrowRangeDistributionNotSupported() { OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, null, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, null); //noinspection resource new SortedMailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1); @@ -134,8 +139,8 @@ public class SortedMailboxReceiveOperatorTest { public void shouldThrowOnEmptyCollationKey() { when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L, - _stageMetadata1, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L, + _stageMetadata1); //noinspection resource new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), false, 1); @@ -147,8 +152,8 @@ public class SortedMailboxReceiveOperatorTest { when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); // Short timeoutMs should result in timeout OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L, - _stageMetadata1, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L, + _stageMetadata1); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -160,8 +165,9 @@ public class SortedMailboxReceiveOperatorTest { } // Longer timeout or default timeout (10s) doesn't result in timeout - context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L, - _stageMetadata1, false); + context = + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L, + _stageMetadata1); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -175,7 +181,7 @@ public class SortedMailboxReceiveOperatorTest { public void shouldReceiveSingletonNullMailbox() { when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -188,7 +194,7 @@ public class SortedMailboxReceiveOperatorTest { when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -203,7 +209,7 @@ public class SortedMailboxReceiveOperatorTest { when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row), TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -221,7 +227,7 @@ public class SortedMailboxReceiveOperatorTest { when(_mailbox1.poll()).thenReturn( TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage))); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -240,7 +246,7 @@ public class SortedMailboxReceiveOperatorTest { when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row), TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -263,7 +269,7 @@ public class SortedMailboxReceiveOperatorTest { when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row), TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -288,7 +294,7 @@ public class SortedMailboxReceiveOperatorTest { OperatorTestUtil.block(DATA_SCHEMA, row4), OperatorTestUtil.block(DATA_SCHEMA, row5), TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -319,7 +325,7 @@ public class SortedMailboxReceiveOperatorTest { TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirections, collationNullDirections, false, 1)) { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java index 56d78effe4..6680b56942 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java @@ -87,8 +87,7 @@ public class PipelineBreakerExecutorTest { .addMailBoxInfoMap(2, new MailboxMetadata( ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)), ImmutableList.of(_server), ImmutableMap.of())) - .build()) - .collect(Collectors.toList())).build(); + .build()).collect(Collectors.toList())).build(); @BeforeClass public void setUpClass() { @@ -127,7 +126,7 @@ public class PipelineBreakerExecutorTest { Object[] row2 = new Object[]{2, 3}; when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1), OperatorTestUtil.block(DATA_SCHEMA, row2), - TransferableBlockUtils.getEndOfStreamTransferableBlock()); + TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 1, _server))); PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, @@ -138,6 +137,10 @@ public class PipelineBreakerExecutorTest { Assert.assertNotNull(pipelineBreakerResult); Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1); Assert.assertEquals(pipelineBreakerResult.getResultMap().values().iterator().next().size(), 2); + + // should collect stats from previous stage here + Assert.assertNotNull(pipelineBreakerResult.getOpChainStats()); + Assert.assertEquals(pipelineBreakerResult.getOpChainStats().getOperatorStatsMap().size(), 1); } @Test @@ -160,9 +163,9 @@ public class PipelineBreakerExecutorTest { Object[] row1 = new Object[]{1, 1}; Object[] row2 = new Object[]{2, 3}; when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1), - TransferableBlockUtils.getEndOfStreamTransferableBlock()); + TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 1, _server))); when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row2), - TransferableBlockUtils.getEndOfStreamTransferableBlock()); + TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 2, _server))); PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, @@ -176,6 +179,10 @@ public class PipelineBreakerExecutorTest { Assert.assertEquals(it.next().size(), 1); Assert.assertEquals(it.next().size(), 1); Assert.assertFalse(it.hasNext()); + + // should collect stats from previous stage here + Assert.assertNotNull(pipelineBreakerResult.getOpChainStats()); + Assert.assertEquals(pipelineBreakerResult.getOpChainStats().getOperatorStatsMap().size(), 2); } @Test @@ -199,6 +206,9 @@ public class PipelineBreakerExecutorTest { Assert.assertEquals(resultBlocks.size(), 1); Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock()); Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock()); + + // should have null stats from previous stage here + Assert.assertNull(pipelineBreakerResult.getOpChainStats()); } @Test @@ -268,6 +278,9 @@ public class PipelineBreakerExecutorTest { Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock()); Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock()); } + + // should have null stats from previous stage here + Assert.assertNull(pipelineBreakerResult.getOpChainStats()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org