raghavyadav01 commented on code in PR #14676: URL: https://github.com/apache/pinot/pull/14676#discussion_r1898063275
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesBrokerPlanVisitor.java: ########## @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.timeseries; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; + + +public class PhysicalTimeSeriesBrokerPlanVisitor { + // Warning: Don't use singleton access pattern, since Quickstarts run in a single JVM and spawn multiple broker/server + public PhysicalTimeSeriesBrokerPlanVisitor() { + } + + public void init() { + } + + public BaseTimeSeriesOperator compile(BaseTimeSeriesPlanNode rootNode, TimeSeriesExecutionContext context, + Map<String, Integer> numInputServersByExchangeNode) { + // Step-1: Replace time series exchange node with its Physical Plan Node. + rootNode = initExchangeReceivePlanNode(rootNode, context, numInputServersByExchangeNode); + // Step-2: Trigger recursive operator generation + return rootNode.run(); + } + + public BaseTimeSeriesPlanNode initExchangeReceivePlanNode(BaseTimeSeriesPlanNode planNode, + TimeSeriesExecutionContext context, Map<String, Integer> numInputServersByExchangeNode) { + if (planNode instanceof LeafTimeSeriesPlanNode) { + throw new IllegalStateException("Found leaf time series plan node in broker"); + } else if (planNode instanceof TimeSeriesExchangeNode) { + int numInputServers = numInputServersByExchangeNode.get(planNode.getId()); Review Comment: How will the input servers be computed? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java: ########## @@ -513,6 +469,58 @@ public void shutdown() { _executorService.shutdown(); } + public PinotBrokerTimeSeriesResponse submitAndGet(RequestContext context, TimeSeriesDispatchablePlan plan, + long timeoutMs, Map<String, String> queryOptions) { + long requestId = context.getRequestId(); + try { + TimeSeriesBlock result = submitAndGet(requestId, plan, timeoutMs, queryOptions, context); + return PinotBrokerTimeSeriesResponse.fromTimeSeriesBlock(result); + } catch (Throwable t) { + return PinotBrokerTimeSeriesResponse.newErrorResponse(t.getClass().getSimpleName(), t.getMessage()); + } + } + + TimeSeriesBlock submitAndGet(long requestId, TimeSeriesDispatchablePlan plan, long timeoutMs, Review Comment: Can you share short blurb about this method? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java: ########## @@ -30,37 +35,57 @@ * engine integration. */ public class TimeSeriesDispatchObserver implements StreamObserver<Worker.TimeSeriesResponse> { - private final QueryServerInstance _serverInstance; - private final Consumer<AsyncQueryTimeSeriesDispatchResponse> _callback; + /** + * Each server should send data for each leaf node once. This capacity controls the size of the queue we use to + * buffer the data sent by the sender. This is set large enough that we should never hit this for any practical + * use-case, while guarding us against bugs. + */ + public static final int MAX_QUEUE_CAPACITY = 4096; Review Comment: Is this a per server Queue ? ########## pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java: ########## @@ -92,55 +85,44 @@ public TimeSeriesLogicalPlanResult buildLogicalPlan(RangeTimeSeriesRequest reque public TimeSeriesDispatchablePlan buildPhysicalPlan(RangeTimeSeriesRequest timeSeriesRequest, RequestContext requestContext, TimeSeriesLogicalPlanResult logicalPlan) { - // Step-1: Find tables in the query. - final Set<String> tableNames = new HashSet<>(); - findTableNames(logicalPlan.getPlanNode(), tableNames::add); - Preconditions.checkState(tableNames.size() == 1, - "Expected exactly one table name in the logical plan, got: %s", - tableNames); - String tableName = tableNames.iterator().next(); - // Step-2: Compute routing table assuming all segments are selected. This is to perform the check to reject tables - // that span across multiple servers. - RoutingTable routingTable = _routingManager.getRoutingTable(compileBrokerRequest(tableName), - requestContext.getRequestId()); - Preconditions.checkState(routingTable != null, - "Failed to get routing table for table: %s", tableName); - Preconditions.checkState(routingTable.getServerInstanceToSegmentsMap().size() == 1, - "Only support routing to a single server. Computed: %s", - routingTable.getServerInstanceToSegmentsMap().size()); - var entry = routingTable.getServerInstanceToSegmentsMap().entrySet().iterator().next(); - ServerInstance serverInstance = entry.getKey(); - // Step-3: Assign segments to the leaf plan nodes. + // Step-1: Assign segments to servers for each leaf node. TableScanVisitor.Context scanVisitorContext = TableScanVisitor.createContext(requestContext.getRequestId()); TableScanVisitor.INSTANCE.assignSegmentsToPlan(logicalPlan.getPlanNode(), logicalPlan.getTimeBuckets(), scanVisitorContext); - return new TimeSeriesDispatchablePlan(timeSeriesRequest.getLanguage(), - new TimeSeriesQueryServerInstance(serverInstance), - TimeSeriesPlanSerde.serialize(logicalPlan.getPlanNode()), logicalPlan.getTimeBuckets(), - scanVisitorContext.getPlanIdToSegmentMap()); + List<TimeSeriesQueryServerInstance> serverInstances = scanVisitorContext.getQueryServers(); + // Step-2: Create plan fragments. + List<BaseTimeSeriesPlanNode> fragments = TimeSeriesPlanFragmenter.getFragments( + logicalPlan.getPlanNode(), serverInstances.size() == 1); + // Step-3: Compute number of servers each exchange node will receive data from. + Map<String, Integer> numServersForExchangePlanNode = computeNumServersForExchangePlanNode(serverInstances, + fragments, scanVisitorContext.getLeafIdToSegmentsByInstanceId()); + return new TimeSeriesDispatchablePlan(timeSeriesRequest.getLanguage(), serverInstances, fragments.get(0), + fragments.subList(1, fragments.size()), logicalPlan.getTimeBuckets(), + scanVisitorContext.getLeafIdToSegmentsByInstanceId(), numServersForExchangePlanNode); } - public static void findTableNames(BaseTimeSeriesPlanNode planNode, Consumer<String> tableNameConsumer) { - if (planNode instanceof LeafTimeSeriesPlanNode) { - LeafTimeSeriesPlanNode scanNode = (LeafTimeSeriesPlanNode) planNode; - tableNameConsumer.accept(scanNode.getTableName()); - return; + private Map<String, Integer> computeNumServersForExchangePlanNode(List<TimeSeriesQueryServerInstance> serverInstances, + List<BaseTimeSeriesPlanNode> planNodes, Map<String, Map<String, List<String>>> leafIdToSegmentsByInstanceId) { + // TODO(timeseries): Handle this gracefully and return an empty block. + Preconditions.checkState(!serverInstances.isEmpty(), "No servers selected for the query"); + if (serverInstances.size() == 1) { Review Comment: Why do we need to differentiate between single server vs multi server? Shouldn't it be transparent? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java: ########## @@ -30,37 +35,57 @@ * engine integration. */ public class TimeSeriesDispatchObserver implements StreamObserver<Worker.TimeSeriesResponse> { Review Comment: Now as Broker is doing reduce work ? Can broker become bottleneck? Did you perform any analysis for broker in your cluster testing? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java: ########## @@ -258,45 +259,65 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map * TODO: This design is at odds with MSE because MSE runs even the leaf stage via OpChainSchedulerService. * However, both OpChain scheduler and this method use the same ExecutorService. */ - public void processTimeSeriesQuery(String serializedPlan, Map<String, String> metadata, + public void processTimeSeriesQuery(List<String> serializedPlanFragments, Map<String, String> metadata, StreamObserver<Worker.TimeSeriesResponse> responseObserver) { // Define a common way to handle errors. - final Consumer<Throwable> handleErrors = (t) -> { - Map<String, String> errorMetadata = new HashMap<>(); - errorMetadata.put(WorkerResponseMetadataKeys.ERROR_TYPE, t.getClass().getSimpleName()); - errorMetadata.put(WorkerResponseMetadataKeys.ERROR_MESSAGE, t.getMessage() == null - ? "Unknown error: no message" : t.getMessage()); - responseObserver.onNext(Worker.TimeSeriesResponse.newBuilder().putAllMetadata(errorMetadata).build()); - responseObserver.onCompleted(); + final Consumer<Pair<Throwable, String>> handleErrors = (pair) -> { + Throwable t = pair.getLeft(); + try { + String planId = pair.getRight(); + Map<String, String> errorMetadata = new HashMap<>(); + errorMetadata.put(WorkerResponseMetadataKeys.ERROR_TYPE, t.getClass().getSimpleName()); + errorMetadata.put(WorkerResponseMetadataKeys.ERROR_MESSAGE, t.getMessage() == null + ? "Unknown error: no message" : t.getMessage()); + errorMetadata.put(WorkerResponseMetadataKeys.PLAN_ID, planId); + // TODO(timeseries): remove logging for failed queries. + LOGGER.warn("time-series query failed:", t); + responseObserver.onNext(Worker.TimeSeriesResponse.newBuilder().putAllMetadata(errorMetadata).build()); + responseObserver.onCompleted(); + } catch (Throwable t2) { + LOGGER.warn("Unable to send error to broker. Original error: {}", t.getMessage(), t2); + } }; + if (serializedPlanFragments.isEmpty()) { + handleErrors.accept(Pair.of(new IllegalStateException("No plan fragments received in server"), "")); + return; + } try { final long deadlineMs = extractDeadlineMs(metadata); Preconditions.checkState(System.currentTimeMillis() < deadlineMs, - "Query timed out before getting processed in server. Remaining time: %s", deadlineMs); - // Deserialize plan, and compile to create a tree of operators. - BaseTimeSeriesPlanNode rootNode = TimeSeriesPlanSerde.deserialize(serializedPlan); + "Query timed out before getting processed in server. Exceeded time by (ms): %s", + System.currentTimeMillis() - deadlineMs); + List<BaseTimeSeriesPlanNode> fragmentRoots = serializedPlanFragments.stream() + .map(TimeSeriesPlanSerde::deserialize).collect(Collectors.toList()); TimeSeriesExecutionContext context = new TimeSeriesExecutionContext( - metadata.get(WorkerRequestMetadataKeys.LANGUAGE), extractTimeBuckets(metadata), - extractPlanToSegmentMap(metadata), deadlineMs, metadata); - BaseTimeSeriesOperator operator = _timeSeriesPhysicalPlanVisitor.compile(rootNode, context); + metadata.get(WorkerRequestMetadataKeys.LANGUAGE), extractTimeBuckets(metadata), deadlineMs, metadata, + extractPlanToSegmentMap(metadata), Collections.emptyMap()); + final List<BaseTimeSeriesOperator> fragmentOpChains = fragmentRoots.stream().map(x -> { + return _timeSeriesPhysicalPlanVisitor.compile(x, context); + }).collect(Collectors.toList()); // Run the operator using the same executor service as OpChainSchedulerService _executorService.submit(() -> { + String currentPlanId = ""; try { - TimeSeriesBlock seriesBlock = operator.nextBlock(); - Worker.TimeSeriesResponse response = Worker.TimeSeriesResponse.newBuilder() - .setPayload(ByteString.copyFrom( - PinotBrokerTimeSeriesResponse.fromTimeSeriesBlock(seriesBlock).serialize(), - StandardCharsets.UTF_8)) - .build(); - responseObserver.onNext(response); + for (int index = 0; index < fragmentOpChains.size(); index++) { Review Comment: Will this be blocking till all the fragments are executed? What happens when query times out? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org