raghavyadav01 commented on code in PR #14676:
URL: https://github.com/apache/pinot/pull/14676#discussion_r1898063275


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesBrokerPlanVisitor.java:
##########
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
+
+
+public class PhysicalTimeSeriesBrokerPlanVisitor {
+  // Warning: Don't use singleton access pattern, since Quickstarts run in a 
single JVM and spawn multiple broker/server
+  public PhysicalTimeSeriesBrokerPlanVisitor() {
+  }
+
+  public void init() {
+  }
+
+  public BaseTimeSeriesOperator compile(BaseTimeSeriesPlanNode rootNode, 
TimeSeriesExecutionContext context,
+      Map<String, Integer> numInputServersByExchangeNode) {
+    // Step-1: Replace time series exchange node with its Physical Plan Node.
+    rootNode = initExchangeReceivePlanNode(rootNode, context, 
numInputServersByExchangeNode);
+    // Step-2: Trigger recursive operator generation
+    return rootNode.run();
+  }
+
+  public BaseTimeSeriesPlanNode 
initExchangeReceivePlanNode(BaseTimeSeriesPlanNode planNode,
+      TimeSeriesExecutionContext context, Map<String, Integer> 
numInputServersByExchangeNode) {
+    if (planNode instanceof LeafTimeSeriesPlanNode) {
+      throw new IllegalStateException("Found leaf time series plan node in 
broker");
+    } else if (planNode instanceof TimeSeriesExchangeNode) {
+      int numInputServers = 
numInputServersByExchangeNode.get(planNode.getId());

Review Comment:
   How will the input servers be computed?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -513,6 +469,58 @@ public void shutdown() {
     _executorService.shutdown();
   }
 
+  public PinotBrokerTimeSeriesResponse submitAndGet(RequestContext context, 
TimeSeriesDispatchablePlan plan,
+      long timeoutMs, Map<String, String> queryOptions) {
+    long requestId = context.getRequestId();
+    try {
+      TimeSeriesBlock result = submitAndGet(requestId, plan, timeoutMs, 
queryOptions, context);
+      return PinotBrokerTimeSeriesResponse.fromTimeSeriesBlock(result);
+    } catch (Throwable t) {
+      return 
PinotBrokerTimeSeriesResponse.newErrorResponse(t.getClass().getSimpleName(), 
t.getMessage());
+    }
+  }
+
+  TimeSeriesBlock submitAndGet(long requestId, TimeSeriesDispatchablePlan 
plan, long timeoutMs,

Review Comment:
   Can you share short blurb about this method? 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java:
##########
@@ -30,37 +35,57 @@
  *   engine integration.
  */
 public class TimeSeriesDispatchObserver implements 
StreamObserver<Worker.TimeSeriesResponse> {
-  private final QueryServerInstance _serverInstance;
-  private final Consumer<AsyncQueryTimeSeriesDispatchResponse> _callback;
+  /**
+   * Each server should send data for each leaf node once. This capacity 
controls the size of the queue we use to
+   * buffer the data sent by the sender. This is set large enough that we 
should never hit this for any practical
+   * use-case, while guarding us against bugs.
+   */
+  public static final int MAX_QUEUE_CAPACITY = 4096;

Review Comment:
   Is this a per server Queue ? 



##########
pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java:
##########
@@ -92,55 +85,44 @@ public TimeSeriesLogicalPlanResult 
buildLogicalPlan(RangeTimeSeriesRequest reque
 
   public TimeSeriesDispatchablePlan buildPhysicalPlan(RangeTimeSeriesRequest 
timeSeriesRequest,
       RequestContext requestContext, TimeSeriesLogicalPlanResult logicalPlan) {
-    // Step-1: Find tables in the query.
-    final Set<String> tableNames = new HashSet<>();
-    findTableNames(logicalPlan.getPlanNode(), tableNames::add);
-    Preconditions.checkState(tableNames.size() == 1,
-        "Expected exactly one table name in the logical plan, got: %s",
-        tableNames);
-    String tableName = tableNames.iterator().next();
-    // Step-2: Compute routing table assuming all segments are selected. This 
is to perform the check to reject tables
-    //         that span across multiple servers.
-    RoutingTable routingTable = 
_routingManager.getRoutingTable(compileBrokerRequest(tableName),
-        requestContext.getRequestId());
-    Preconditions.checkState(routingTable != null,
-        "Failed to get routing table for table: %s", tableName);
-    
Preconditions.checkState(routingTable.getServerInstanceToSegmentsMap().size() 
== 1,
-        "Only support routing to a single server. Computed: %s",
-        routingTable.getServerInstanceToSegmentsMap().size());
-    var entry = 
routingTable.getServerInstanceToSegmentsMap().entrySet().iterator().next();
-    ServerInstance serverInstance = entry.getKey();
-    // Step-3: Assign segments to the leaf plan nodes.
+    // Step-1: Assign segments to servers for each leaf node.
     TableScanVisitor.Context scanVisitorContext = 
TableScanVisitor.createContext(requestContext.getRequestId());
     TableScanVisitor.INSTANCE.assignSegmentsToPlan(logicalPlan.getPlanNode(), 
logicalPlan.getTimeBuckets(),
         scanVisitorContext);
-    return new TimeSeriesDispatchablePlan(timeSeriesRequest.getLanguage(),
-        new TimeSeriesQueryServerInstance(serverInstance),
-        TimeSeriesPlanSerde.serialize(logicalPlan.getPlanNode()), 
logicalPlan.getTimeBuckets(),
-        scanVisitorContext.getPlanIdToSegmentMap());
+    List<TimeSeriesQueryServerInstance> serverInstances = 
scanVisitorContext.getQueryServers();
+    // Step-2: Create plan fragments.
+    List<BaseTimeSeriesPlanNode> fragments = 
TimeSeriesPlanFragmenter.getFragments(
+        logicalPlan.getPlanNode(), serverInstances.size() == 1);
+    // Step-3: Compute number of servers each exchange node will receive data 
from.
+    Map<String, Integer> numServersForExchangePlanNode = 
computeNumServersForExchangePlanNode(serverInstances,
+        fragments, scanVisitorContext.getLeafIdToSegmentsByInstanceId());
+    return new TimeSeriesDispatchablePlan(timeSeriesRequest.getLanguage(), 
serverInstances, fragments.get(0),
+        fragments.subList(1, fragments.size()), logicalPlan.getTimeBuckets(),
+        scanVisitorContext.getLeafIdToSegmentsByInstanceId(), 
numServersForExchangePlanNode);
   }
 
-  public static void findTableNames(BaseTimeSeriesPlanNode planNode, 
Consumer<String> tableNameConsumer) {
-    if (planNode instanceof LeafTimeSeriesPlanNode) {
-      LeafTimeSeriesPlanNode scanNode = (LeafTimeSeriesPlanNode) planNode;
-      tableNameConsumer.accept(scanNode.getTableName());
-      return;
+  private Map<String, Integer> 
computeNumServersForExchangePlanNode(List<TimeSeriesQueryServerInstance> 
serverInstances,
+      List<BaseTimeSeriesPlanNode> planNodes, Map<String, Map<String, 
List<String>>> leafIdToSegmentsByInstanceId) {
+    // TODO(timeseries): Handle this gracefully and return an empty block.
+    Preconditions.checkState(!serverInstances.isEmpty(), "No servers selected 
for the query");
+    if (serverInstances.size() == 1) {

Review Comment:
   Why do we need to differentiate between single server vs multi server? 
Shouldn't it be transparent? 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java:
##########
@@ -30,37 +35,57 @@
  *   engine integration.
  */
 public class TimeSeriesDispatchObserver implements 
StreamObserver<Worker.TimeSeriesResponse> {

Review Comment:
   Now as Broker is doing reduce work ? Can broker become bottleneck?  Did you 
perform any analysis for broker in your cluster testing? 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -258,45 +259,65 @@ public void processQuery(WorkerMetadata workerMetadata, 
StagePlan stagePlan, Map
    * TODO: This design is at odds with MSE because MSE runs even the leaf 
stage via OpChainSchedulerService.
    *   However, both OpChain scheduler and this method use the same 
ExecutorService.
    */
-  public void processTimeSeriesQuery(String serializedPlan, Map<String, 
String> metadata,
+  public void processTimeSeriesQuery(List<String> serializedPlanFragments, 
Map<String, String> metadata,
       StreamObserver<Worker.TimeSeriesResponse> responseObserver) {
     // Define a common way to handle errors.
-    final Consumer<Throwable> handleErrors = (t) -> {
-      Map<String, String> errorMetadata = new HashMap<>();
-      errorMetadata.put(WorkerResponseMetadataKeys.ERROR_TYPE, 
t.getClass().getSimpleName());
-      errorMetadata.put(WorkerResponseMetadataKeys.ERROR_MESSAGE, 
t.getMessage() == null
-          ? "Unknown error: no message" : t.getMessage());
-      
responseObserver.onNext(Worker.TimeSeriesResponse.newBuilder().putAllMetadata(errorMetadata).build());
-      responseObserver.onCompleted();
+    final Consumer<Pair<Throwable, String>> handleErrors = (pair) -> {
+      Throwable t = pair.getLeft();
+      try {
+        String planId = pair.getRight();
+        Map<String, String> errorMetadata = new HashMap<>();
+        errorMetadata.put(WorkerResponseMetadataKeys.ERROR_TYPE, 
t.getClass().getSimpleName());
+        errorMetadata.put(WorkerResponseMetadataKeys.ERROR_MESSAGE, 
t.getMessage() == null
+            ? "Unknown error: no message" : t.getMessage());
+        errorMetadata.put(WorkerResponseMetadataKeys.PLAN_ID, planId);
+        // TODO(timeseries): remove logging for failed queries.
+        LOGGER.warn("time-series query failed:", t);
+        
responseObserver.onNext(Worker.TimeSeriesResponse.newBuilder().putAllMetadata(errorMetadata).build());
+        responseObserver.onCompleted();
+      } catch (Throwable t2) {
+        LOGGER.warn("Unable to send error to broker. Original error: {}", 
t.getMessage(), t2);
+      }
     };
+    if (serializedPlanFragments.isEmpty()) {
+      handleErrors.accept(Pair.of(new IllegalStateException("No plan fragments 
received in server"), ""));
+      return;
+    }
     try {
       final long deadlineMs = extractDeadlineMs(metadata);
       Preconditions.checkState(System.currentTimeMillis() < deadlineMs,
-          "Query timed out before getting processed in server. Remaining time: 
%s", deadlineMs);
-      // Deserialize plan, and compile to create a tree of operators.
-      BaseTimeSeriesPlanNode rootNode = 
TimeSeriesPlanSerde.deserialize(serializedPlan);
+          "Query timed out before getting processed in server. Exceeded time 
by (ms): %s",
+          System.currentTimeMillis() - deadlineMs);
+      List<BaseTimeSeriesPlanNode> fragmentRoots = 
serializedPlanFragments.stream()
+          .map(TimeSeriesPlanSerde::deserialize).collect(Collectors.toList());
       TimeSeriesExecutionContext context = new TimeSeriesExecutionContext(
-          metadata.get(WorkerRequestMetadataKeys.LANGUAGE), 
extractTimeBuckets(metadata),
-          extractPlanToSegmentMap(metadata), deadlineMs, metadata);
-      BaseTimeSeriesOperator operator = 
_timeSeriesPhysicalPlanVisitor.compile(rootNode, context);
+          metadata.get(WorkerRequestMetadataKeys.LANGUAGE), 
extractTimeBuckets(metadata), deadlineMs, metadata,
+          extractPlanToSegmentMap(metadata), Collections.emptyMap());
+      final List<BaseTimeSeriesOperator> fragmentOpChains = 
fragmentRoots.stream().map(x -> {
+        return _timeSeriesPhysicalPlanVisitor.compile(x, context);
+      }).collect(Collectors.toList());
       // Run the operator using the same executor service as 
OpChainSchedulerService
       _executorService.submit(() -> {
+        String currentPlanId = "";
         try {
-          TimeSeriesBlock seriesBlock = operator.nextBlock();
-          Worker.TimeSeriesResponse response = 
Worker.TimeSeriesResponse.newBuilder()
-              .setPayload(ByteString.copyFrom(
-                  
PinotBrokerTimeSeriesResponse.fromTimeSeriesBlock(seriesBlock).serialize(),
-                  StandardCharsets.UTF_8))
-              .build();
-          responseObserver.onNext(response);
+          for (int index = 0; index < fragmentOpChains.size(); index++) {

Review Comment:
   Will this be blocking till all the fragments are executed? What happens when 
query times out? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to