This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ece2147eb [multistage] fix timeout & excessive data block exception 
(#9116)
8ece2147eb is described below

commit 8ece2147eb8dfd916774d814416b5b6c23432de0
Author: Rong Rong <walterddr.walter...@gmail.com>
AuthorDate: Mon Aug 1 10:16:53 2022 -0700

    [multistage] fix timeout & excessive data block exception (#9116)
    
    * adding in global timeout
    
    * adding in query config for data block size
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../pinot/query/mailbox/channel/ChannelManager.java      | 14 ++++++++++----
 .../pinot/query/mailbox/channel/GrpcMailboxServer.java   |  4 +++-
 .../query/runtime/operator/MailboxReceiveOperator.java   |  6 ++++--
 .../java/org/apache/pinot/query/service/QueryConfig.java |  3 +++
 .../org/apache/pinot/query/service/QueryDispatcher.java  | 16 ++++++++++++++--
 5 files changed, 34 insertions(+), 9 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
index 54ff4bcfb7..72d90c07ac 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
@@ -22,6 +22,7 @@ import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
+import org.apache.pinot.query.service.QueryConfig;
 
 
 /**
@@ -33,7 +34,6 @@ import org.apache.pinot.query.mailbox.GrpcMailboxService;
  * <p>the channelId should be in the format of: 
<code>"senderHost:senderPort:receiverHost:receiverPort"</code>
  */
 public class ChannelManager {
-  private static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024 
* 1024;
 
   private final GrpcMailboxService _mailboxService;
   private final GrpcMailboxServer _grpcMailboxServer;
@@ -54,9 +54,15 @@ public class ChannelManager {
   }
 
   public ManagedChannel getChannel(String channelId) {
-    String[] channelParts = channelId.split(":");
     return _channelMap.computeIfAbsent(channelId,
-        (id) -> ManagedChannelBuilder.forAddress(channelParts[0], 
Integer.parseInt(channelParts[1]))
-            
.maxInboundMessageSize(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE).usePlaintext().build());
+        (id) -> constructChannel(id.split(":")));
+  }
+
+  private static ManagedChannel constructChannel(String[] channelParts) {
+    ManagedChannelBuilder<?> managedChannelBuilder = ManagedChannelBuilder
+        .forAddress(channelParts[0], Integer.parseInt(channelParts[1]))
+        
.maxInboundMessageSize(QueryConfig.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE)
+        .usePlaintext();
+    return managedChannelBuilder.build();
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
index a260bbaeef..2a6706b709 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
+import org.apache.pinot.query.service.QueryConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +46,8 @@ public class GrpcMailboxServer extends 
PinotMailboxGrpc.PinotMailboxImplBase {
 
   public GrpcMailboxServer(GrpcMailboxService mailboxService, int port) {
     _mailboxService = mailboxService;
-    _server = ServerBuilder.forPort(port).addService(this).build();
+    _server = ServerBuilder.forPort(port).addService(this)
+        
.maxInboundMessageSize(QueryConfig.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE).build();
     LOGGER.info("Initialized GrpcMailboxServer on port: {}", port);
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 014cf49bed..92309f946b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -35,6 +35,7 @@ import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.service.QueryConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +47,6 @@ import org.slf4j.LoggerFactory;
 public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MailboxReceiveOperator.class);
   private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
-  private static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L;
 
   private final MailboxService<Mailbox.MailboxContent> _mailboxService;
   private final RelDistribution.Type _exchangeType;
@@ -56,6 +56,7 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
   private final int _port;
   private final long _jobId;
   private final int _stageId;
+  private final long _timeout;
   private TransferableBlock _upstreamErrorBlock;
 
   public MailboxReceiveOperator(MailboxService<Mailbox.MailboxContent> 
mailboxService, DataSchema dataSchema,
@@ -69,6 +70,7 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
     _port = port;
     _jobId = jobId;
     _stageId = stageId;
+    _timeout = QueryConfig.DEFAULT_TIMEOUT_NANO;
     _upstreamErrorBlock = null;
   }
 
@@ -91,7 +93,7 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
     }
     // TODO: do a round robin check against all MailboxContentStreamObservers 
and find which one that has data.
     boolean hasOpenedMailbox = true;
-    long timeoutWatermark = System.nanoTime() + DEFAULT_TIMEOUT_NANO;
+    long timeoutWatermark = System.nanoTime() + _timeout;
     while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) {
       hasOpenedMailbox = false;
       for (ServerInstance sendingInstance : _sendingStageInstances) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
index c0bbb08174..f2a3d12d14 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
@@ -22,6 +22,9 @@ package org.apache.pinot.query.service;
  * Configuration for setting up query runtime.
  */
 public class QueryConfig {
+  public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE = 
128 * 1024 * 1024;
+  public static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L;
+
   public static final String KEY_OF_QUERY_SERVER_PORT = 
"pinot.query.server.port";
   public static final int DEFAULT_QUERY_SERVER_PORT = 0;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 70989caca0..653d2b216d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -21,16 +21,19 @@ package org.apache.pinot.query.service;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
@@ -67,7 +70,7 @@ public class QueryDispatcher {
         
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
         requestId, reduceNode.getSenderStageId(), reduceNode.getDataSchema(), 
mailboxService.getHostname(),
         mailboxService.getMailboxPort());
-    return reduceMailboxReceive(mailboxReceiveOperator);
+    return reduceMailboxReceive(mailboxReceiveOperator, timeoutNano);
   }
 
   public int submit(long requestId, QueryPlan queryPlan)
@@ -114,9 +117,14 @@ public class QueryDispatcher {
   }
 
   public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator 
mailboxReceiveOperator) {
+    return reduceMailboxReceive(mailboxReceiveOperator, 
QueryConfig.DEFAULT_TIMEOUT_NANO);
+  }
+
+  public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator 
mailboxReceiveOperator, long timeoutNano) {
     List<DataTable> resultDataBlocks = new ArrayList<>();
     TransferableBlock transferableBlock;
-    while (true) {
+    long timeoutWatermark = System.nanoTime() + timeoutNano;
+    while (System.nanoTime() < timeoutWatermark) {
       transferableBlock = mailboxReceiveOperator.nextBlock();
       if (TransferableBlockUtils.isEndOfStream(transferableBlock)) {
         // TODO: we only received bubble up error from the execution stage 
tree.
@@ -132,6 +140,10 @@ public class QueryDispatcher {
         resultDataBlocks.add(dataTable);
       }
     }
+    if (System.nanoTime() >= timeoutWatermark) {
+      resultDataBlocks = Collections.singletonList(
+          
DataBlockUtils.getErrorDataBlock(QueryException.EXECUTION_TIMEOUT_ERROR));
+    }
     return resultDataBlocks;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to