This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8ece2147eb [multistage] fix timeout & excessive data block exception (#9116) 8ece2147eb is described below commit 8ece2147eb8dfd916774d814416b5b6c23432de0 Author: Rong Rong <walterddr.walter...@gmail.com> AuthorDate: Mon Aug 1 10:16:53 2022 -0700 [multistage] fix timeout & excessive data block exception (#9116) * adding in global timeout * adding in query config for data block size Co-authored-by: Rong Rong <ro...@startree.ai> --- .../pinot/query/mailbox/channel/ChannelManager.java | 14 ++++++++++---- .../pinot/query/mailbox/channel/GrpcMailboxServer.java | 4 +++- .../query/runtime/operator/MailboxReceiveOperator.java | 6 ++++-- .../java/org/apache/pinot/query/service/QueryConfig.java | 3 +++ .../org/apache/pinot/query/service/QueryDispatcher.java | 16 ++++++++++++++-- 5 files changed, 34 insertions(+), 9 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java index 54ff4bcfb7..72d90c07ac 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java @@ -22,6 +22,7 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import java.util.concurrent.ConcurrentHashMap; import org.apache.pinot.query.mailbox.GrpcMailboxService; +import org.apache.pinot.query.service.QueryConfig; /** @@ -33,7 +34,6 @@ import org.apache.pinot.query.mailbox.GrpcMailboxService; * <p>the channelId should be in the format of: <code>"senderHost:senderPort:receiverHost:receiverPort"</code> */ public class ChannelManager { - private static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024 * 1024; private final GrpcMailboxService _mailboxService; private final GrpcMailboxServer _grpcMailboxServer; @@ -54,9 +54,15 @@ public class ChannelManager { } public ManagedChannel getChannel(String channelId) { - String[] channelParts = channelId.split(":"); return _channelMap.computeIfAbsent(channelId, - (id) -> ManagedChannelBuilder.forAddress(channelParts[0], Integer.parseInt(channelParts[1])) - .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE).usePlaintext().build()); + (id) -> constructChannel(id.split(":"))); + } + + private static ManagedChannel constructChannel(String[] channelParts) { + ManagedChannelBuilder<?> managedChannelBuilder = ManagedChannelBuilder + .forAddress(channelParts[0], Integer.parseInt(channelParts[1])) + .maxInboundMessageSize(QueryConfig.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE) + .usePlaintext(); + return managedChannelBuilder.build(); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java index a260bbaeef..2a6706b709 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pinot.common.proto.Mailbox; import org.apache.pinot.common.proto.PinotMailboxGrpc; import org.apache.pinot.query.mailbox.GrpcMailboxService; +import org.apache.pinot.query.service.QueryConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +46,8 @@ public class GrpcMailboxServer extends PinotMailboxGrpc.PinotMailboxImplBase { public GrpcMailboxServer(GrpcMailboxService mailboxService, int port) { _mailboxService = mailboxService; - _server = ServerBuilder.forPort(port).addService(this).build(); + _server = ServerBuilder.forPort(port).addService(this) + .maxInboundMessageSize(QueryConfig.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE).build(); LOGGER.info("Initialized GrpcMailboxServer on port: {}", port); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index 014cf49bed..92309f946b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -35,6 +35,7 @@ import org.apache.pinot.query.mailbox.ReceivingMailbox; import org.apache.pinot.query.mailbox.StringMailboxIdentifier; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.service.QueryConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +47,6 @@ import org.slf4j.LoggerFactory; public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> { private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class); private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE"; - private static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L; private final MailboxService<Mailbox.MailboxContent> _mailboxService; private final RelDistribution.Type _exchangeType; @@ -56,6 +56,7 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> { private final int _port; private final long _jobId; private final int _stageId; + private final long _timeout; private TransferableBlock _upstreamErrorBlock; public MailboxReceiveOperator(MailboxService<Mailbox.MailboxContent> mailboxService, DataSchema dataSchema, @@ -69,6 +70,7 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> { _port = port; _jobId = jobId; _stageId = stageId; + _timeout = QueryConfig.DEFAULT_TIMEOUT_NANO; _upstreamErrorBlock = null; } @@ -91,7 +93,7 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> { } // TODO: do a round robin check against all MailboxContentStreamObservers and find which one that has data. boolean hasOpenedMailbox = true; - long timeoutWatermark = System.nanoTime() + DEFAULT_TIMEOUT_NANO; + long timeoutWatermark = System.nanoTime() + _timeout; while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) { hasOpenedMailbox = false; for (ServerInstance sendingInstance : _sendingStageInstances) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java index c0bbb08174..f2a3d12d14 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java @@ -22,6 +22,9 @@ package org.apache.pinot.query.service; * Configuration for setting up query runtime. */ public class QueryConfig { + public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE = 128 * 1024 * 1024; + public static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L; + public static final String KEY_OF_QUERY_SERVER_PORT = "pinot.query.server.port"; public static final int DEFAULT_QUERY_SERVER_PORT = 0; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java index 70989caca0..653d2b216d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java @@ -21,16 +21,19 @@ package org.apache.pinot.query.service; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.calcite.rel.RelDistribution; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.proto.Mailbox; import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.common.datablock.BaseDataBlock; +import org.apache.pinot.core.common.datablock.DataBlockUtils; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.QueryPlan; @@ -67,7 +70,7 @@ public class QueryDispatcher { queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(), requestId, reduceNode.getSenderStageId(), reduceNode.getDataSchema(), mailboxService.getHostname(), mailboxService.getMailboxPort()); - return reduceMailboxReceive(mailboxReceiveOperator); + return reduceMailboxReceive(mailboxReceiveOperator, timeoutNano); } public int submit(long requestId, QueryPlan queryPlan) @@ -114,9 +117,14 @@ public class QueryDispatcher { } public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator) { + return reduceMailboxReceive(mailboxReceiveOperator, QueryConfig.DEFAULT_TIMEOUT_NANO); + } + + public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutNano) { List<DataTable> resultDataBlocks = new ArrayList<>(); TransferableBlock transferableBlock; - while (true) { + long timeoutWatermark = System.nanoTime() + timeoutNano; + while (System.nanoTime() < timeoutWatermark) { transferableBlock = mailboxReceiveOperator.nextBlock(); if (TransferableBlockUtils.isEndOfStream(transferableBlock)) { // TODO: we only received bubble up error from the execution stage tree. @@ -132,6 +140,10 @@ public class QueryDispatcher { resultDataBlocks.add(dataTable); } } + if (System.nanoTime() >= timeoutWatermark) { + resultDataBlocks = Collections.singletonList( + DataBlockUtils.getErrorDataBlock(QueryException.EXECUTION_TIMEOUT_ERROR)); + } return resultDataBlocks; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org