This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 08f6155cad [multistage] Minor Changes to Support Physical Optimizer Related Changes (#15562) 08f6155cad is described below commit 08f6155cadf72a9174dfd536283841b8a5b21706 Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Thu Apr 24 06:32:39 2025 +0530 [multistage] Minor Changes to Support Physical Optimizer Related Changes (#15562) --- .../query/planner/logical/RelToPlanNodeConverter.java | 2 +- .../pinot/query/planner/plannode/ExchangeNode.java | 13 +++++++++++-- .../apache/pinot/query/planner/plannode/SetOpNode.java | 12 ++++++------ .../org/apache/pinot/query/routing/WorkerManager.java | 16 ++++++++++++++++ .../runtime/operator/exchange/SingletonExchange.java | 5 +---- .../runtime/operator/exchange/SingletonExchangeTest.java | 10 ---------- 6 files changed, 35 insertions(+), 23 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java index d42f903ab9..06a3ea731a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java @@ -186,7 +186,7 @@ public final class RelToPlanNodeConverter { } } return new ExchangeNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), convertInputs(node.getInputs()), - exchangeType, distributionType, keys, prePartitioned, collations, sortOnSender, sortOnReceiver, null); + exchangeType, distributionType, keys, prePartitioned, collations, sortOnSender, sortOnReceiver, null, null); } private SetOpNode convertLogicalSetOp(SetOp node) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java index 79b78d12ee..ea02ca6b7e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java @@ -27,6 +27,7 @@ import org.apache.calcite.rel.RelFieldCollation; import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.PlanFragmenter; +import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy; /** @@ -45,11 +46,13 @@ public class ExchangeNode extends BasePlanNode { private final boolean _sortOnReceiver; // Table names should be set for SUB_PLAN exchange type. private final Set<String> _tableNames; + @Nullable + private final ExchangeStrategy _exchangeStrategy; public ExchangeNode(int stageId, DataSchema dataSchema, List<PlanNode> inputs, PinotRelExchangeType exchangeType, RelDistribution.Type distributionType, @Nullable List<Integer> keys, boolean prePartitioned, @Nullable List<RelFieldCollation> collations, boolean sortOnSender, boolean sortOnReceiver, - @Nullable Set<String> tableNames) { + @Nullable Set<String> tableNames, ExchangeStrategy exchangeStrategy) { super(stageId, dataSchema, null, inputs); _exchangeType = exchangeType; _distributionType = distributionType; @@ -59,6 +62,7 @@ public class ExchangeNode extends BasePlanNode { _sortOnSender = sortOnSender; _sortOnReceiver = sortOnReceiver; _tableNames = tableNames; + _exchangeStrategy = exchangeStrategy; } public PinotRelExchangeType getExchangeType() { @@ -96,6 +100,11 @@ public class ExchangeNode extends BasePlanNode { return _tableNames; } + @Nullable + public ExchangeStrategy getExchangeStrategy() { + return _exchangeStrategy; + } + @Override public String explain() { return "EXCHANGE"; @@ -109,7 +118,7 @@ public class ExchangeNode extends BasePlanNode { @Override public PlanNode withInputs(List<PlanNode> inputs) { return new ExchangeNode(_stageId, _dataSchema, inputs, _exchangeType, _distributionType, _keys, _prePartitioned, - _collations, _sortOnSender, _sortOnReceiver, _tableNames); + _collations, _sortOnSender, _sortOnReceiver, _tableNames, null); } @Override diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java index 7c60731c3e..5a5d3e1656 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java @@ -20,10 +20,10 @@ package org.apache.pinot.query.planner.plannode; import java.util.List; import java.util.Objects; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.Minus; import org.apache.calcite.rel.core.SetOp; -import org.apache.calcite.rel.logical.LogicalIntersect; -import org.apache.calcite.rel.logical.LogicalMinus; -import org.apache.calcite.rel.logical.LogicalUnion; +import org.apache.calcite.rel.core.Union; import org.apache.pinot.common.utils.DataSchema; @@ -88,13 +88,13 @@ public class SetOpNode extends BasePlanNode { UNION, INTERSECT, MINUS; public static SetOpType fromObject(SetOp setOp) { - if (setOp instanceof LogicalUnion) { + if (setOp instanceof Union) { return UNION; } - if (setOp instanceof LogicalIntersect) { + if (setOp instanceof Intersect) { return INTERSECT; } - if (setOp instanceof LogicalMinus) { + if (setOp instanceof Minus) { return MINUS; } throw new IllegalArgumentException("Unsupported set operation: " + setOp.getClass()); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index 6963b6f907..5adfadcffc 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -81,6 +81,22 @@ public class WorkerManager { _routingManager = routingManager; } + public String getInstanceId() { + return _instanceId; + } + + public String getHostName() { + return _hostName; + } + + public int getPort() { + return _port; + } + + public RoutingManager getRoutingManager() { + return _routingManager; + } + public void assignWorkers(PlanFragment rootFragment, DispatchablePlanContext context) { // ROOT stage doesn't have a QueryServer as it is strictly only reducing results, so here we simply assign the // worker instance with identical server/mailbox port number. diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java index 16867b4f2c..f6fd85d954 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.TimeoutException; import java.util.function.Function; -import org.apache.pinot.query.mailbox.InMemorySendingMailbox; import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.runtime.blocks.BlockSplitter; import org.apache.pinot.query.runtime.blocks.MseBlock; @@ -38,9 +37,7 @@ class SingletonExchange extends BlockExchange { SingletonExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter splitter, Function<List<SendingMailbox>, Integer> statsIndexChooser) { super(sendingMailboxes, splitter, statsIndexChooser); - Preconditions.checkArgument( - sendingMailboxes.size() == 1 && sendingMailboxes.get(0) instanceof InMemorySendingMailbox, - "Expect single InMemorySendingMailbox for SingletonExchange"); + Preconditions.checkArgument(sendingMailboxes.size() == 1, "Expect single mailbox in Singleton Exchange"); } SingletonExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter splitter) { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java index 2e06461e1f..6c1cfcb101 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java @@ -77,16 +77,6 @@ public class SingletonExchangeTest { Assert.assertEquals(captor.getValue(), _block); } - @Test(expectedExceptions = IllegalArgumentException.class) - public void shouldThrowWhenSingletonWithNonLocalMailbox() - throws Exception { - // Given: - ImmutableList<SendingMailbox> destinations = ImmutableList.of(_mailbox2); - - // When: - new SingletonExchange(destinations, BlockSplitter.NO_OP).route(destinations, _block); - } - @Test(expectedExceptions = IllegalArgumentException.class) public void shouldThrowWhenSingletonWithMultipleMailboxes() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org