This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 73830ae3ce [multistage][cleanup] clean up PhysicalPlanContext usage (#10950) 73830ae3ce is described below commit 73830ae3ce94428b7e9991d1bd160bd62a14b7c1 Author: Rong Rong <ro...@apache.org> AuthorDate: Tue Jun 20 21:28:05 2023 -0700 [multistage][cleanup] clean up PhysicalPlanContext usage (#10950) * [clean up] server plan should not extend physical plan, it should encapsulate * [clean up] reformat opChainExecutionContext, constructor to only take PhysicalPlanContext directly * [clean up] remove timeoutMs as argument and member var from plan context --- .../apache/pinot/query/runtime/QueryRunner.java | 31 ++++---- .../runtime/plan/OpChainExecutionContext.java | 12 +-- .../query/runtime/plan/PhysicalPlanContext.java | 13 +--- .../plan/pipeline/PipelineBreakerExecutor.java | 4 +- .../plan/server/ServerPlanRequestContext.java | 28 +++---- .../plan/server/ServerPlanRequestUtils.java | 21 ++---- .../plan/server/ServerPlanRequestVisitor.java | 6 +- .../query/service/dispatch/QueryDispatcher.java | 14 ++-- .../executor/OpChainSchedulerServiceTest.java | 2 +- .../runtime/executor/RoundRobinSchedulerTest.java | 2 +- .../operator/MailboxReceiveOperatorTest.java | 87 ++++++++-------------- .../runtime/operator/MailboxSendOperatorTest.java | 2 +- .../pinot/query/runtime/operator/OpChainTest.java | 41 ++++------ .../query/runtime/operator/OperatorTestUtil.java | 10 +-- .../operator/SortedMailboxReceiveOperatorTest.java | 85 +++++++-------------- 15 files changed, 134 insertions(+), 224 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 914c6c2ed0..aa16f0b220 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -151,7 +151,7 @@ public class QueryRunner { PipelineBreakerResult pipelineBreakerResult; try { pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, - distributedStagePlan, timeoutMs, deadlineMs, requestId, isTraceEnabled); + distributedStagePlan, deadlineMs, requestId, isTraceEnabled); } catch (Exception e) { LOGGER.error("Error executing pre-stage pipeline breaker for: {}:{}", requestId, distributedStagePlan.getStageId(), e); @@ -174,7 +174,7 @@ public class QueryRunner { try { PlanNode stageRoot = distributedStagePlan.getStageRoot(); OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot, - new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs, + new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs, distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled)); _scheduler.register(rootOperator); } catch (Exception e) { @@ -203,19 +203,18 @@ public class QueryRunner { PipelineBreakerResult pipelineBreakerResult, long timeoutMs, long deadlineMs, long requestId) { boolean isTraceEnabled = Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false")); + PhysicalPlanContext planContext = new PhysicalPlanContext(_mailboxService, requestId, + distributedStagePlan.getStageId(), deadlineMs, distributedStagePlan.getServer(), + distributedStagePlan.getStageMetadata(), pipelineBreakerResult, isTraceEnabled); List<ServerPlanRequestContext> serverPlanRequestContexts = - constructServerQueryRequests(distributedStagePlan, requestMetadataMap, pipelineBreakerResult, - _helixPropertyStore, _mailboxService, deadlineMs); + constructServerQueryRequests(planContext, distributedStagePlan, requestMetadataMap, _helixPropertyStore); List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size()); for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) { serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(), new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis())); } MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot(); - OpChainExecutionContext opChainExecutionContext = - new OpChainExecutionContext(_mailboxService, requestId, sendNode.getPlanFragmentId(), - distributedStagePlan.getServer(), timeoutMs, deadlineMs, distributedStagePlan.getStageMetadata(), - isTraceEnabled); + OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(planContext); MultiStageOperator leafStageOperator = new LeafStageTransferableBlockOperator(opChainExecutionContext, this::processServerQueryRequest, serverQueryRequests, sendNode.getDataSchema()); @@ -226,9 +225,9 @@ public class QueryRunner { return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList()); } - private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan, - Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, - ZkHelixPropertyStore<ZNRecord> helixPropertyStore, MailboxService mailboxService, long deadlineMs) { + private static List<ServerPlanRequestContext> constructServerQueryRequests(PhysicalPlanContext planContext, + DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap, + ZkHelixPropertyStore<ZNRecord> helixPropertyStore) { StageMetadata stageMetadata = distributedStagePlan.getStageMetadata(); WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata(); String rawTableName = StageMetadata.getTableName(stageMetadata); @@ -244,17 +243,15 @@ public class QueryRunner { TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); - requests.add(ServerPlanRequestUtils.build(mailboxService, distributedStagePlan, requestMetadataMap, - pipelineBreakerResult, tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), - TableType.OFFLINE, tableEntry.getValue(), deadlineMs)); + requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig, + schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, tableEntry.getValue())); } else if (TableType.REALTIME.name().equals(tableType)) { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); - requests.add(ServerPlanRequestUtils.build(mailboxService, distributedStagePlan, requestMetadataMap, - pipelineBreakerResult, tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), - TableType.REALTIME, tableEntry.getValue(), deadlineMs)); + requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig, + schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, tableEntry.getValue())); } else { throw new IllegalArgumentException("Unsupported table type key: " + tableType); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java index 5da20c4e70..b23f41400a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.runtime.plan; +import com.google.common.annotations.VisibleForTesting; import java.util.function.Consumer; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.routing.VirtualServerAddress; @@ -35,21 +36,20 @@ public class OpChainExecutionContext { private final long _requestId; private final int _stageId; private final VirtualServerAddress _server; - private final long _timeoutMs; private final long _deadlineMs; private final StageMetadata _stageMetadata; private final OpChainId _id; private final OpChainStats _stats; private final boolean _traceEnabled; + @VisibleForTesting public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId, - VirtualServerAddress server, long timeoutMs, long deadlineMs, StageMetadata stageMetadata, + VirtualServerAddress server, long deadlineMs, StageMetadata stageMetadata, boolean traceEnabled) { _mailboxService = mailboxService; _requestId = requestId; _stageId = stageId; _server = server; - _timeoutMs = timeoutMs; _deadlineMs = deadlineMs; _stageMetadata = stageMetadata; _id = new OpChainId(requestId, server.workerId(), stageId); @@ -59,7 +59,7 @@ public class OpChainExecutionContext { public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) { this(physicalPlanContext.getMailboxService(), physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(), - physicalPlanContext.getServer(), physicalPlanContext.getTimeoutMs(), physicalPlanContext.getDeadlineMs(), + physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(), physicalPlanContext.getStageMetadata(), physicalPlanContext.isTraceEnabled()); } @@ -83,10 +83,6 @@ public class OpChainExecutionContext { return _server; } - public long getTimeoutMs() { - return _timeoutMs; - } - public long getDeadlineMs() { return _deadlineMs; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java index 2da5772930..a2fb6f4643 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java @@ -29,8 +29,6 @@ public class PhysicalPlanContext { protected final MailboxService _mailboxService; protected final long _requestId; protected final int _stageId; - // TODO: Timeout is not needed since deadline is already present. - private final long _timeoutMs; private final long _deadlineMs; protected final VirtualServerAddress _server; protected final StageMetadata _stageMetadata; @@ -39,13 +37,12 @@ public class PhysicalPlanContext { private final OpChainExecutionContext _opChainExecutionContext; private final boolean _traceEnabled; - public PhysicalPlanContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs, - long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata, - PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) { + public PhysicalPlanContext(MailboxService mailboxService, long requestId, int stageId, long deadlineMs, + VirtualServerAddress server, StageMetadata stageMetadata, PipelineBreakerResult pipelineBreakerResult, + boolean traceEnabled) { _mailboxService = mailboxService; _requestId = requestId; _stageId = stageId; - _timeoutMs = timeoutMs; _deadlineMs = deadlineMs; _server = server; _stageMetadata = stageMetadata; @@ -62,10 +59,6 @@ public class PhysicalPlanContext { return _stageId; } - public long getTimeoutMs() { - return _timeoutMs; - } - public long getDeadlineMs() { return _deadlineMs; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java index a59a310fed..0202127314 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java @@ -53,7 +53,7 @@ public class PipelineBreakerExecutor { * Currently, pipeline breaker executor can only execute mailbox receive pipeline breaker. */ public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler, - MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long timeoutMs, long deadlineMs, + MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long deadlineMs, long requestId, boolean isTraceEnabled) throws Exception { PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext( @@ -65,7 +65,7 @@ public class PipelineBreakerExecutor { // receive-mail callbacks. // see also: MailboxIdUtils TODOs, de-couple mailbox id from query information PhysicalPlanContext physicalPlanContext = - new PhysicalPlanContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs, + new PhysicalPlanContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs, distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled); Map<Integer, List<TransferableBlock>> resultMap = PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java index 5c7bc12503..fb1c3af0c1 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java @@ -20,12 +20,7 @@ package org.apache.pinot.query.runtime.plan.server; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.request.PinotQuery; -import org.apache.pinot.core.routing.TimeBoundaryInfo; -import org.apache.pinot.query.mailbox.MailboxService; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.plan.PhysicalPlanContext; -import org.apache.pinot.query.runtime.plan.StageMetadata; -import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; import org.apache.pinot.spi.config.table.TableType; @@ -33,22 +28,21 @@ import org.apache.pinot.spi.config.table.TableType; * Context class for converting a {@link org.apache.pinot.query.runtime.plan.DistributedStagePlan} into * {@link PinotQuery} to execute on server. */ -public class ServerPlanRequestContext extends PhysicalPlanContext { - protected TableType _tableType; - protected TimeBoundaryInfo _timeBoundaryInfo; +public class ServerPlanRequestContext { + private final PhysicalPlanContext _planContext; + private final TableType _tableType; - protected PinotQuery _pinotQuery; - protected InstanceRequest _instanceRequest; + private PinotQuery _pinotQuery; + private InstanceRequest _instanceRequest; - public ServerPlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs, - long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata, - PipelineBreakerResult pipelineBreakerResult, PinotQuery pinotQuery, - TableType tableType, TimeBoundaryInfo timeBoundaryInfo, boolean traceEnabled) { - super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, stageMetadata, pipelineBreakerResult, - traceEnabled); + public ServerPlanRequestContext(PhysicalPlanContext planContext, PinotQuery pinotQuery, TableType tableType) { + _planContext = planContext; _pinotQuery = pinotQuery; _tableType = tableType; - _timeBoundaryInfo = timeBoundaryInfo; + } + + public PhysicalPlanContext getPlanContext() { + return _planContext; } public TableType getTableType() { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index 7a0454798b..13659ae810 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -36,11 +36,10 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.routing.TimeBoundaryInfo; -import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; import org.apache.pinot.query.planner.plannode.JoinNode; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; -import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; +import org.apache.pinot.query.runtime.plan.PhysicalPlanContext; import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -70,10 +69,9 @@ public class ServerPlanRequestUtils { // do not instantiate. } - public static ServerPlanRequestContext build(MailboxService mailboxService, DistributedStagePlan stagePlan, - Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, - TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo, TableType tableType, - List<String> segmentList, long deadlineMs) { + public static ServerPlanRequestContext build(PhysicalPlanContext planContext, DistributedStagePlan stagePlan, + Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo, + TableType tableType, List<String> segmentList) { // Before-visit: construct the ServerPlanRequestContext baseline // Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing. long requestId = (Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)) << 16) + ( @@ -89,13 +87,10 @@ public class ServerPlanRequestUtils { } LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit); pinotQuery.setExplain(false); - ServerPlanRequestContext context = - new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, deadlineMs, - stagePlan.getServer(), stagePlan.getStageMetadata(), pipelineBreakerResult, pinotQuery, tableType, - timeBoundaryInfo, traceEnabled); + ServerPlanRequestContext serverContext = new ServerPlanRequestContext(planContext, pinotQuery, tableType); // visit the plan and create query physical plan. - ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), context); + ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), serverContext); // Post-visit: finalize context. // 1. global rewrite/optimize @@ -128,8 +123,8 @@ public class ServerPlanRequestUtils { instanceRequest.setSearchSegments(segmentList); instanceRequest.setQuery(brokerRequest); - context.setInstanceRequest(instanceRequest); - return context; + serverContext.setInstanceRequest(instanceRequest); + return serverContext; } /** diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java index 7cda76bfa2..8fa6df2756 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java @@ -43,6 +43,7 @@ import org.apache.pinot.query.planner.plannode.TableScanNode; import org.apache.pinot.query.planner.plannode.ValueNode; import org.apache.pinot.query.planner.plannode.WindowNode; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -109,8 +110,9 @@ public class ServerPlanRequestVisitor implements PlanNodeVisitor<Void, ServerPla staticSide = node.getInputs().get(1); } staticSide.visit(this, context); - int resultMapId = context.getPipelineBreakerResult().getNodeIdMap().get(dynamicSide); - List<TransferableBlock> transferableBlocks = context.getPipelineBreakerResult().getResultMap().getOrDefault( + PipelineBreakerResult pipelineBreakerResult = context.getPlanContext().getPipelineBreakerResult(); + int resultMapId = pipelineBreakerResult.getNodeIdMap().get(dynamicSide); + List<TransferableBlock> transferableBlocks = pipelineBreakerResult.getResultMap().getOrDefault( resultMapId, Collections.emptyList()); List<Object[]> resultDataContainer = new ArrayList<>(); DataSchema dataSchema = dynamicSide.getDataSchema(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index d7eb25a944..fb1dc05619 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -58,6 +58,7 @@ import org.apache.pinot.query.runtime.operator.OpChainStats; import org.apache.pinot.query.runtime.operator.OperatorStats; import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.query.runtime.plan.PhysicalPlanContext; import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; import org.apache.pinot.query.service.QueryConfig; @@ -193,12 +194,13 @@ public class QueryDispatcher { DispatchablePlanFragment reduceStagePlanFragment = dispatchableSubPlan.getQueryStageList().get(reduceStageId); MailboxReceiveNode reduceNode = (MailboxReceiveNode) reduceStagePlanFragment.getPlanFragment().getFragmentRoot(); VirtualServerAddress server = new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getPort(), 0); - OpChainExecutionContext context = - new OpChainExecutionContext(mailboxService, requestId, reduceStageId, server, timeoutMs, - System.currentTimeMillis() + timeoutMs, - new StageMetadata.Builder().setWorkerMetadataList(reduceStagePlanFragment.getWorkerMetadataList()) - .addCustomProperties(reduceStagePlanFragment.getCustomProperties()).build(), - traceEnabled); + StageMetadata brokerStageMetadata = new StageMetadata.Builder() + .setWorkerMetadataList(reduceStagePlanFragment.getWorkerMetadataList()) + .addCustomProperties(reduceStagePlanFragment.getCustomProperties()) + .build(); + PhysicalPlanContext planContext = new PhysicalPlanContext(mailboxService, requestId, reduceStageId, + System.currentTimeMillis() + timeoutMs, server, brokerStageMetadata, null, traceEnabled); + OpChainExecutionContext context = new OpChainExecutionContext(planContext); MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(context, reduceNode.getSenderStageId()); List<DataBlock> resultDataBlocks = reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, statsAggregatorMap, dispatchableSubPlan, diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java index fbbab9e4d9..332ceaaaab 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java @@ -74,7 +74,7 @@ public class OpChainSchedulerServiceTest { private OpChain getChain(MultiStageOperator operator) { VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1); - OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, 0, null, true); + OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, null, true); return new OpChain(context, operator, ImmutableList.of()); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java index 6589fe079f..798eb534ba 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java @@ -180,6 +180,6 @@ public class RoundRobinSchedulerTest { private OpChainExecutionContext getOpChainExecutionContext(long requestId, int stageId, int virtualServerId) { return new OpChainExecutionContext(null, requestId, stageId, - new VirtualServerAddress("localhost", 1234, virtualServerId), 0, 0, null, true); + new VirtualServerAddress("localhost", 1234, virtualServerId), 0, null, true); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java index bcec439843..fed5ce422e 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java @@ -74,40 +74,21 @@ public class MailboxReceiveOperatorTest { when(_mailboxService.getPort()).thenReturn(123); VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0); VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1); - _stageMetadataBoth = new StageMetadata.Builder() - .setWorkerMetadataList(Stream.of(server1, server2).map( - s -> new WorkerMetadata.Builder() - .setVirtualServerAddress(s) - .addMailBoxInfoMap(0, - new MailboxMetadata( - ImmutableList.of( - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), - ImmutableList.of(server1, server2), ImmutableMap.of())) - .addMailBoxInfoMap(1, - new MailboxMetadata( - ImmutableList.of( - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), - ImmutableList.of(server1, server2), ImmutableMap.of())) - .build()) - .collect(Collectors.toList())) - .build(); + _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map( + s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), + org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), + ImmutableList.of(server1, server2), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), + org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), + ImmutableList.of(server1, server2), ImmutableMap.of())).build()).collect(Collectors.toList())).build(); // sending stage is 0, receiving stage is 1 - _stageMetadata1 = new StageMetadata.Builder() - .setWorkerMetadataList(Stream.of(server1).map( - s -> new WorkerMetadata.Builder() - .setVirtualServerAddress(s) - .addMailBoxInfoMap(0, new MailboxMetadata( - ImmutableList.of( - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), - ImmutableList.of(server1), ImmutableMap.of())) - .addMailBoxInfoMap(1, new MailboxMetadata( - ImmutableList.of( - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), - ImmutableList.of(server1), ImmutableMap.of())) - .build()).collect(Collectors.toList())) - .build(); + _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map( + s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), + ImmutableList.of(server1), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), + ImmutableList.of(server1), ImmutableMap.of())).build()).collect(Collectors.toList())).build(); } @AfterMethod @@ -120,13 +101,11 @@ public class MailboxReceiveOperatorTest { public void shouldThrowSingletonNoMatchMailboxServer() { VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0); VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1); - StageMetadata stageMetadata = new StageMetadata.Builder() - .setWorkerMetadataList(Stream.of(server1, server2).map( - s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()).collect(Collectors.toList())) - .build(); + StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList( + Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()) + .collect(Collectors.toList())).build(); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - stageMetadata, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata, false); //noinspection resource new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1); } @@ -134,8 +113,7 @@ public class MailboxReceiveOperatorTest { @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*") public void shouldThrowRangeDistributionNotSupported() { OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - null, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, null, false); //noinspection resource new MailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, 1); } @@ -147,7 +125,7 @@ public class MailboxReceiveOperatorTest { // Short timeoutMs should result in timeout OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L, + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L, _stageMetadata1, false); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) { Thread.sleep(100L); @@ -158,8 +136,8 @@ public class MailboxReceiveOperatorTest { } // Longer timeout or default timeout (10s) doesn't result in timeout - context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10_000L, - System.currentTimeMillis() + 10_000L, _stageMetadata1, false); + context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L, + _stageMetadata1, false); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) { Thread.sleep(100L); TransferableBlock mailbox = receiveOp.nextBlock(); @@ -172,8 +150,7 @@ public class MailboxReceiveOperatorTest { when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadata1, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) { assertTrue(receiveOp.nextBlock().isNoOpBlock()); } @@ -185,8 +162,7 @@ public class MailboxReceiveOperatorTest { when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadata1, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) { assertTrue(receiveOp.nextBlock().isEndOfStreamBlock()); } @@ -200,8 +176,7 @@ public class MailboxReceiveOperatorTest { TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadata1, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) { List<Object[]> actualRows = receiveOp.nextBlock().getContainer(); assertEquals(actualRows.size(), 1); @@ -218,8 +193,7 @@ public class MailboxReceiveOperatorTest { TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage))); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadata1, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) { TransferableBlock block = receiveOp.nextBlock(); assertTrue(block.isErrorBlock()); @@ -237,8 +211,7 @@ public class MailboxReceiveOperatorTest { TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadataBoth, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, 1)) { List<Object[]> actualRows = receiveOp.nextBlock().getContainer(); @@ -261,8 +234,7 @@ public class MailboxReceiveOperatorTest { TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadataBoth, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, 1)) { // Receive first block from server1 @@ -287,8 +259,7 @@ public class MailboxReceiveOperatorTest { TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadataBoth, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, 1)) { TransferableBlock block = receiveOp.nextBlock(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index ade517f1a1..2b74bb727a 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -183,7 +183,7 @@ public class MailboxSendOperatorTest { .setWorkerMetadataList(Collections.singletonList( new WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build(); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE, Long.MAX_VALUE, + new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE, stageMetadata, false); return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java index 4af33067fa..910cc38523 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java @@ -91,21 +91,14 @@ public class OpChainTest { public void setUp() { _mocks = MockitoAnnotations.openMocks(this); _serverAddress = new VirtualServerAddress("localhost", 123, 0); - _receivingStageMetadata = new StageMetadata.Builder() - .setWorkerMetadataList(Stream.of(_serverAddress).map( - s -> new WorkerMetadata.Builder() - .setVirtualServerAddress(s) - .addMailBoxInfoMap(0, new MailboxMetadata( - ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), - ImmutableList.of(s), ImmutableMap.of())) - .addMailBoxInfoMap(1, new MailboxMetadata( - ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), - ImmutableList.of(s), ImmutableMap.of())) - .addMailBoxInfoMap(2, new MailboxMetadata( - ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), - ImmutableList.of(s), ImmutableMap.of())) - .build()).collect(Collectors.toList())) - .build(); + _receivingStageMetadata = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(_serverAddress).map( + s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, + new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s), + ImmutableMap.of())).addMailBoxInfoMap(1, + new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s), + ImmutableMap.of())).addMailBoxInfoMap(2, + new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s), + ImmutableMap.of())).build()).collect(Collectors.toList())).build(); when(_mailboxService1.getReceivingMailbox(any())).thenReturn(_mailbox1); when(_mailboxService2.getReceivingMailbox(any())).thenReturn(_mailbox2); @@ -198,9 +191,8 @@ public class OpChainTest { int receivedStageId = 2; int senderStageId = 1; - OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, 1000, - System.currentTimeMillis() + 1000, _receivingStageMetadata, true); + OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, + System.currentTimeMillis() + 1000, _receivingStageMetadata, true); Stack<MultiStageOperator> operators = getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime); @@ -214,7 +206,7 @@ public class OpChainTest { OpChainExecutionContext secondStageContext = new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, - 1000, System.currentTimeMillis() + 1000, _receivingStageMetadata, true); + System.currentTimeMillis() + 1000, _receivingStageMetadata, true); MailboxReceiveOperator secondStageReceiveOp = new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1); @@ -238,9 +230,8 @@ public class OpChainTest { int receivedStageId = 2; int senderStageId = 1; - OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, 1000, - System.currentTimeMillis() + 1000, _receivingStageMetadata, false); + OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, + System.currentTimeMillis() + 1000, _receivingStageMetadata, false); Stack<MultiStageOperator> operators = getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime); @@ -251,7 +242,7 @@ public class OpChainTest { opChain.getStats().queued(); OpChainExecutionContext secondStageContext = - new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, 1000, + new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, System.currentTimeMillis() + 1000, _receivingStageMetadata, false); MailboxReceiveOperator secondStageReceiveOp = new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId); @@ -290,8 +281,8 @@ public class OpChainTest { List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock( new SelectionResultsBlock(upStreamSchema, Arrays.asList(new Object[]{1}, new Object[]{2})), queryContext)); LeafStageTransferableBlockOperator leafOp = new LeafStageTransferableBlockOperator(context, - LeafStageTransferableBlockOperatorTest.getStaticBlockProcessor(resultsBlockList), - Collections.singletonList(mock(ServerQueryRequest.class)), upStreamSchema); + LeafStageTransferableBlockOperatorTest.getStaticBlockProcessor(resultsBlockList), + Collections.singletonList(mock(ServerQueryRequest.class)), upStreamSchema); //Transform operator RexExpression.InputRef ref0 = new RexExpression.InputRef(0); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java index b5196e812f..7ecd344ab3 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java @@ -27,6 +27,7 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory; + public class OperatorTestUtil { // simple key-value collision schema/data test set: "Aa" and "BB" have same hash code in java. private static final List<List<Object[]>> SIMPLE_KV_DATA_ROWS = @@ -63,19 +64,16 @@ public class OperatorTestUtil { public static OpChainExecutionContext getDefaultContext() { VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0); - return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE, - null, true); + return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, true); } public static OpChainExecutionContext getDefaultContextWithTracingDisabled() { VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0); - return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE, - null, false); + return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, false); } public static OpChainExecutionContext getContext(long requestId, int stageId, VirtualServerAddress virtualServerAddress) { - return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE, - null, true); + return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, null, true); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java index 5bd973f4c2..25ef09f279 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java @@ -84,40 +84,21 @@ public class SortedMailboxReceiveOperatorTest { when(_mailboxService.getPort()).thenReturn(123); VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0); VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1); - _stageMetadataBoth = new StageMetadata.Builder() - .setWorkerMetadataList(Stream.of(server1, server2).map( - s -> new WorkerMetadata.Builder() - .setVirtualServerAddress(s) - .addMailBoxInfoMap(0, - new MailboxMetadata( - ImmutableList.of( - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), - ImmutableList.of(server1, server2), ImmutableMap.of())) - .addMailBoxInfoMap(1, - new MailboxMetadata( - ImmutableList.of( - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), - ImmutableList.of(server1, server2), ImmutableMap.of())) - .build()) - .collect(Collectors.toList())) - .build(); + _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map( + s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), + org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), + ImmutableList.of(server1, server2), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), + org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), + ImmutableList.of(server1, server2), ImmutableMap.of())).build()).collect(Collectors.toList())).build(); // sending stage is 0, receiving stage is 1 - _stageMetadata1 = new StageMetadata.Builder() - .setWorkerMetadataList(Stream.of(server1).map( - s -> new WorkerMetadata.Builder() - .setVirtualServerAddress(s) - .addMailBoxInfoMap(0, new MailboxMetadata( - ImmutableList.of( - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), - ImmutableList.of(server1), ImmutableMap.of())) - .addMailBoxInfoMap(1, new MailboxMetadata( - ImmutableList.of( - org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), - ImmutableList.of(server1), ImmutableMap.of())) - .build()).collect(Collectors.toList())) - .build(); + _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map( + s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), + ImmutableList.of(server1), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata( + ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), + ImmutableList.of(server1), ImmutableMap.of())).build()).collect(Collectors.toList())).build(); } @AfterMethod @@ -134,8 +115,7 @@ public class SortedMailboxReceiveOperatorTest { Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()) .collect(Collectors.toList())).build(); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - stageMetadata, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata, false); //noinspection resource new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1); @@ -144,8 +124,7 @@ public class SortedMailboxReceiveOperatorTest { @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*") public void shouldThrowRangeDistributionNotSupported() { OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, null, - false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, null, false); //noinspection resource new SortedMailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1); @@ -155,7 +134,7 @@ public class SortedMailboxReceiveOperatorTest { public void shouldThrowOnEmptyCollationKey() { when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L, + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L, _stageMetadata1, false); //noinspection resource new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(), @@ -168,7 +147,7 @@ public class SortedMailboxReceiveOperatorTest { when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); // Short timeoutMs should result in timeout OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L, + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L, _stageMetadata1, false); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, @@ -181,8 +160,8 @@ public class SortedMailboxReceiveOperatorTest { } // Longer timeout or default timeout (10s) doesn't result in timeout - context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10_000L, - System.currentTimeMillis() + 10_000L, _stageMetadata1, false); + context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L, + _stageMetadata1, false); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -196,8 +175,7 @@ public class SortedMailboxReceiveOperatorTest { public void shouldReceiveSingletonNullMailbox() { when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadata1, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -210,8 +188,7 @@ public class SortedMailboxReceiveOperatorTest { when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadata1, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -226,8 +203,7 @@ public class SortedMailboxReceiveOperatorTest { when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row), TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadata1, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -245,8 +221,7 @@ public class SortedMailboxReceiveOperatorTest { when(_mailbox1.poll()).thenReturn( TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage))); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadata1, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -265,8 +240,7 @@ public class SortedMailboxReceiveOperatorTest { when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row), TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadataBoth, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -289,8 +263,7 @@ public class SortedMailboxReceiveOperatorTest { when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row), TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadataBoth, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -315,8 +288,7 @@ public class SortedMailboxReceiveOperatorTest { OperatorTestUtil.block(DATA_SCHEMA, row4), OperatorTestUtil.block(DATA_SCHEMA, row5), TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadataBoth, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1)) { @@ -347,8 +319,7 @@ public class SortedMailboxReceiveOperatorTest { TransferableBlockUtils.getEndOfStreamTransferableBlock()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, - _stageMetadataBoth, false); + new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false); try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirections, collationNullDirections, false, 1)) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org