This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 08f6155cad [multistage] Minor Changes to Support Physical Optimizer 
Related Changes (#15562)
08f6155cad is described below

commit 08f6155cadf72a9174dfd536283841b8a5b21706
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Thu Apr 24 06:32:39 2025 +0530

    [multistage] Minor Changes to Support Physical Optimizer Related Changes 
(#15562)
---
 .../query/planner/logical/RelToPlanNodeConverter.java    |  2 +-
 .../pinot/query/planner/plannode/ExchangeNode.java       | 13 +++++++++++--
 .../apache/pinot/query/planner/plannode/SetOpNode.java   | 12 ++++++------
 .../org/apache/pinot/query/routing/WorkerManager.java    | 16 ++++++++++++++++
 .../runtime/operator/exchange/SingletonExchange.java     |  5 +----
 .../runtime/operator/exchange/SingletonExchangeTest.java | 10 ----------
 6 files changed, 35 insertions(+), 23 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index d42f903ab9..06a3ea731a 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -186,7 +186,7 @@ public final class RelToPlanNodeConverter {
       }
     }
     return new ExchangeNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), 
convertInputs(node.getInputs()),
-        exchangeType, distributionType, keys, prePartitioned, collations, 
sortOnSender, sortOnReceiver, null);
+        exchangeType, distributionType, keys, prePartitioned, collations, 
sortOnSender, sortOnReceiver, null, null);
   }
 
   private SetOpNode convertLogicalSetOp(SetOp node) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
index 79b78d12ee..ea02ca6b7e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.PlanFragmenter;
+import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
 
 
 /**
@@ -45,11 +46,13 @@ public class ExchangeNode extends BasePlanNode {
   private final boolean _sortOnReceiver;
   // Table names should be set for SUB_PLAN exchange type.
   private final Set<String> _tableNames;
+  @Nullable
+  private final ExchangeStrategy _exchangeStrategy;
 
   public ExchangeNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs, PinotRelExchangeType exchangeType,
       RelDistribution.Type distributionType, @Nullable List<Integer> keys, 
boolean prePartitioned,
       @Nullable List<RelFieldCollation> collations, boolean sortOnSender, 
boolean sortOnReceiver,
-      @Nullable Set<String> tableNames) {
+      @Nullable Set<String> tableNames, ExchangeStrategy exchangeStrategy) {
     super(stageId, dataSchema, null, inputs);
     _exchangeType = exchangeType;
     _distributionType = distributionType;
@@ -59,6 +62,7 @@ public class ExchangeNode extends BasePlanNode {
     _sortOnSender = sortOnSender;
     _sortOnReceiver = sortOnReceiver;
     _tableNames = tableNames;
+    _exchangeStrategy = exchangeStrategy;
   }
 
   public PinotRelExchangeType getExchangeType() {
@@ -96,6 +100,11 @@ public class ExchangeNode extends BasePlanNode {
     return _tableNames;
   }
 
+  @Nullable
+  public ExchangeStrategy getExchangeStrategy() {
+    return _exchangeStrategy;
+  }
+
   @Override
   public String explain() {
     return "EXCHANGE";
@@ -109,7 +118,7 @@ public class ExchangeNode extends BasePlanNode {
   @Override
   public PlanNode withInputs(List<PlanNode> inputs) {
     return new ExchangeNode(_stageId, _dataSchema, inputs, _exchangeType, 
_distributionType, _keys, _prePartitioned,
-        _collations, _sortOnSender, _sortOnReceiver, _tableNames);
+        _collations, _sortOnSender, _sortOnReceiver, _tableNames, null);
   }
 
   @Override
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
index 7c60731c3e..5a5d3e1656 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
@@ -20,10 +20,10 @@ package org.apache.pinot.query.planner.plannode;
 
 import java.util.List;
 import java.util.Objects;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.Minus;
 import org.apache.calcite.rel.core.SetOp;
-import org.apache.calcite.rel.logical.LogicalIntersect;
-import org.apache.calcite.rel.logical.LogicalMinus;
-import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.core.Union;
 import org.apache.pinot.common.utils.DataSchema;
 
 
@@ -88,13 +88,13 @@ public class SetOpNode extends BasePlanNode {
     UNION, INTERSECT, MINUS;
 
     public static SetOpType fromObject(SetOp setOp) {
-      if (setOp instanceof LogicalUnion) {
+      if (setOp instanceof Union) {
         return UNION;
       }
-      if (setOp instanceof LogicalIntersect) {
+      if (setOp instanceof Intersect) {
         return INTERSECT;
       }
-      if (setOp instanceof LogicalMinus) {
+      if (setOp instanceof Minus) {
         return MINUS;
       }
       throw new IllegalArgumentException("Unsupported set operation: " + 
setOp.getClass());
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 6963b6f907..5adfadcffc 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -81,6 +81,22 @@ public class WorkerManager {
     _routingManager = routingManager;
   }
 
+  public String getInstanceId() {
+    return _instanceId;
+  }
+
+  public String getHostName() {
+    return _hostName;
+  }
+
+  public int getPort() {
+    return _port;
+  }
+
+  public RoutingManager getRoutingManager() {
+    return _routingManager;
+  }
+
   public void assignWorkers(PlanFragment rootFragment, DispatchablePlanContext 
context) {
     // ROOT stage doesn't have a QueryServer as it is strictly only reducing 
results, so here we simply assign the
     // worker instance with identical server/mailbox port number.
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
index 16867b4f2c..f6fd85d954 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
-import org.apache.pinot.query.mailbox.InMemorySendingMailbox;
 import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
@@ -38,9 +37,7 @@ class SingletonExchange extends BlockExchange {
   SingletonExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter 
splitter,
       Function<List<SendingMailbox>, Integer> statsIndexChooser) {
     super(sendingMailboxes, splitter, statsIndexChooser);
-    Preconditions.checkArgument(
-        sendingMailboxes.size() == 1 && sendingMailboxes.get(0) instanceof 
InMemorySendingMailbox,
-        "Expect single InMemorySendingMailbox for SingletonExchange");
+    Preconditions.checkArgument(sendingMailboxes.size() == 1, "Expect single 
mailbox in Singleton Exchange");
   }
 
   SingletonExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter 
splitter) {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
index 2e06461e1f..6c1cfcb101 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
@@ -77,16 +77,6 @@ public class SingletonExchangeTest {
     Assert.assertEquals(captor.getValue(), _block);
   }
 
-  @Test(expectedExceptions = IllegalArgumentException.class)
-  public void shouldThrowWhenSingletonWithNonLocalMailbox()
-      throws Exception {
-    // Given:
-    ImmutableList<SendingMailbox> destinations = ImmutableList.of(_mailbox2);
-
-    // When:
-    new SingletonExchange(destinations, 
BlockSplitter.NO_OP).route(destinations, _block);
-  }
-
   @Test(expectedExceptions = IllegalArgumentException.class)
   public void shouldThrowWhenSingletonWithMultipleMailboxes()
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to