Jackie-Jiang commented on code in PR #16899:
URL: https://github.com/apache/pinot/pull/16899#discussion_r2393153507


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java:
##########
@@ -123,7 +125,13 @@ private void cancelStream() {
 
   @Override
   public void onError(Throwable t) {
-    LOGGER.warn("Error on receiver side", t);
+    if (t instanceof StatusRuntimeException

Review Comment:
   If we follow the same way as `cancel()`, we don't need special handling on 
`onError()`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -135,4 +136,13 @@ public boolean isTerminated() {
   public String toString() {
     return "m" + _id;
   }
+
+  @Override
+  public void close() {

Review Comment:
   Same here, should we follow the same way as `cancel()`?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -341,15 +341,14 @@ private void notifyErrorAfterSubmission(int stageId, 
ErrorMseBlock errorBlock, W
     }
     long deadlineMs = QueryThreadContext.getPassiveDeadlineMs();
     for (RoutingInfo routingInfo : routingInfos) {
-      try {
-        StatMap<MailboxSendOperator.StatKey> statMap = new 
StatMap<>(MailboxSendOperator.StatKey.class);
-        SendingMailbox sendingMailbox =
-            _mailboxService.getSendingMailbox(routingInfo.getHostname(), 
routingInfo.getPort(),
-                routingInfo.getMailboxId(), deadlineMs, statMap);
+      StatMap<MailboxSendOperator.StatKey> statMap = new 
StatMap<>(MailboxSendOperator.StatKey.class);
+      try (SendingMailbox sendingMailbox = 
_mailboxService.getSendingMailbox(routingInfo.getHostname(),
+          routingInfo.getPort(), routingInfo.getMailboxId(), deadlineMs, 
statMap)) {
         // TODO: Here we are breaking the stats invariants, sending errors 
without including the stats of the
         //  current stage. We will need to fix this in future, but for now, we 
are sending the error block without
         //  the stats.
         sendingMailbox.send(errorBlock, Collections.emptyList());
+        sendingMailbox.complete();

Review Comment:
   Another way to fix it is to catch all `Throwable` and invoke `cancel()` in 
the catch clause



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -305,6 +313,16 @@ static List<ByteString> toByteStrings(List<ByteBuffer> 
bytes, int maxByteStringS
     return result;
   }
 
+  @Override
+  public void close()
+      throws Exception {
+    if (!isTerminated()) {
+      LOGGER.debug("Closing gPRC mailbox without proper EOS message");
+      _closeAttempted = true;
+      _contentObserver.onError(Status.CANCELLED.asException());

Review Comment:
   I see. So essentially we ensure `GrpcSendingMailbox` always call 
`onCompleted` or `onError` once.
   
   When will this happen? Given this shouldn't happen, should we log warning at 
least so that we can debug?
   Should we follow the same way as `cancel()` where we send an error block 
then call `onCompleted()`? There is some note on why we don't use `onError()` 
when invoking `cancel()`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -341,15 +341,14 @@ private void notifyErrorAfterSubmission(int stageId, 
ErrorMseBlock errorBlock, W
     }
     long deadlineMs = QueryThreadContext.getPassiveDeadlineMs();
     for (RoutingInfo routingInfo : routingInfos) {
-      try {
-        StatMap<MailboxSendOperator.StatKey> statMap = new 
StatMap<>(MailboxSendOperator.StatKey.class);
-        SendingMailbox sendingMailbox =
-            _mailboxService.getSendingMailbox(routingInfo.getHostname(), 
routingInfo.getPort(),
-                routingInfo.getMailboxId(), deadlineMs, statMap);
+      StatMap<MailboxSendOperator.StatKey> statMap = new 
StatMap<>(MailboxSendOperator.StatKey.class);
+      try (SendingMailbox sendingMailbox = 
_mailboxService.getSendingMailbox(routingInfo.getHostname(),
+          routingInfo.getPort(), routingInfo.getMailboxId(), deadlineMs, 
statMap)) {
         // TODO: Here we are breaking the stats invariants, sending errors 
without including the stats of the
         //  current stage. We will need to fix this in future, but for now, we 
are sending the error block without
         //  the stats.
         sendingMailbox.send(errorBlock, Collections.emptyList());
+        sendingMailbox.complete();

Review Comment:
   Could this be the root cause? Seems `OpChainSchedulerService` will always 
cancel the stream when it encounters error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to