This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 98374e87cf7 Change SendingMailbox to be closeable. (#16899)
98374e87cf7 is described below
commit 98374e87cf73cce1a6aa136706d41df611a61f4d
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Fri Oct 17 16:55:20 2025 +0200
Change SendingMailbox to be closeable. (#16899)
---
.../pinot/query/mailbox/GrpcSendingMailbox.java | 41 ++++++++++++++++++----
.../query/mailbox/InMemorySendingMailbox.java | 16 +++++++++
.../apache/pinot/query/mailbox/SendingMailbox.java | 2 +-
.../mailbox/channel/MailboxContentObserver.java | 2 +-
.../mailbox/channel/MailboxStatusObserver.java | 23 +++++++++++-
.../apache/pinot/query/runtime/QueryRunner.java | 8 ++---
.../runtime/operator/exchange/BlockExchange.java | 29 +++++++++++++--
.../runtime/queries/ResourceBasedQueriesTest.java | 1 +
8 files changed, 105 insertions(+), 17 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index bdc9f5253a4..7bb8840c14d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -68,6 +68,8 @@ public class GrpcSendingMailbox implements SendingMailbox {
private final StatMap<MailboxSendOperator.StatKey> _statMap;
private final MailboxStatusObserver _statusObserver = new
MailboxStatusObserver();
private final int _maxByteStringSize;
+ /// Indicates whether the sending side has attempted to close the mailbox
(either via complete() or cancel()).
+ private volatile boolean _senderSideClosed;
private StreamObserver<MailboxContent> _contentObserver;
@@ -97,16 +99,21 @@ public class GrpcSendingMailbox implements SendingMailbox {
@Override
public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
- sendInternal(block, serializedStats);
- LOGGER.debug("Completing mailbox: {}", _id);
- _contentObserver.onCompleted();
+ if (sendInternal(block, serializedStats)) {
+ LOGGER.debug("Completing mailbox: {}", _id);
+ _contentObserver.onCompleted();
+ _senderSideClosed = true;
+ } else {
+ LOGGER.warn("Trying to send EOS to the already terminated mailbox: {}",
_id);
+ }
}
- private void sendInternal(MseBlock block, List<DataBuffer> serializedStats) {
+ /// Tries to send the block to the receiver. Returns true if the block is
sent, false otherwise.
+ private boolean sendInternal(MseBlock block, List<DataBuffer>
serializedStats) {
if (isTerminated() || (isEarlyTerminated() && block.isData())) {
LOGGER.debug("==[GRPC SEND]== terminated or early terminated mailbox.
Skipping sending message {} to: {}",
block, _id);
- return;
+ return false;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("==[GRPC SEND]== sending message " + block + " to: " + _id);
@@ -122,6 +129,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("==[GRPC SEND]== message " + block + " sent to: " + _id);
}
+ return true;
}
private void processAndSend(MseBlock block, List<DataBuffer> serializedStats)
@@ -166,6 +174,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
LOGGER.debug("Already terminated mailbox: {}", _id);
return;
}
+ _senderSideClosed = true;
LOGGER.debug("Cancelling mailbox: {}", _id);
if (_contentObserver == null) {
_contentObserver = getContentObserver();
@@ -190,7 +199,10 @@ public class GrpcSendingMailbox implements SendingMailbox {
@Override
public boolean isTerminated() {
- return _statusObserver.isFinished();
+ // _senderSideClosed is set when the sending side has attempted to close
the mailbox (either via complete() or
+ // cancel()). But we also need to return true the gRPC status observer has
observed that the connection is closed
+ // (ie due to timeout)
+ return _senderSideClosed || _statusObserver.isFinished();
}
private StreamObserver<MailboxContent> getContentObserver() {
@@ -314,4 +326,21 @@ public class GrpcSendingMailbox implements SendingMailbox {
return result;
}
+
+ @Override
+ public void close()
+ throws Exception {
+ if (!isTerminated()) {
+ String errorMsg = "Closing gPRC mailbox without proper EOS message";
+ RuntimeException ex = new RuntimeException(errorMsg);
+ ex.fillInStackTrace();
+ LOGGER.error(errorMsg, ex);
+ _senderSideClosed = true;
+
+ MseBlock errorBlock = ErrorMseBlock.fromError(QueryErrorCode.INTERNAL,
errorMsg);
+ if (_contentObserver != null) {
+ processAndSend(errorBlock, List.of());
+ }
+ }
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 6d8233568fa..c7602564574 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -105,6 +105,7 @@ public class InMemorySendingMailbox implements
SendingMailbox {
if (_isTerminated) {
return;
}
+ _isTerminated = true;
LOGGER.debug("Cancelling mailbox: {}", _id);
if (_receivingMailbox == null) {
_receivingMailbox = _mailboxService.getReceivingMailbox(_id);
@@ -128,4 +129,19 @@ public class InMemorySendingMailbox implements
SendingMailbox {
public String toString() {
return "m" + _id;
}
+
+ @Override
+ public void close() {
+ if (!isTerminated()) {
+ String msg = "Closing in-memory mailbox without proper EOS message";
+ RuntimeException exception = new RuntimeException(msg);
+ exception.fillInStackTrace();
+
+ LOGGER.error(msg, exception);
+ if (_receivingMailbox == null) {
+ _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
+ }
+ _receivingMailbox.setErrorBlock(ErrorMseBlock.fromException(exception),
Collections.emptyList());
+ }
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 65a0d69831c..2f5161cd15f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -39,7 +39,7 @@ import org.apache.pinot.segment.spi.memory.DataBuffer;
* </li>
* </ol>
*/
-public interface SendingMailbox {
+public interface SendingMailbox extends AutoCloseable {
/**
* Returns whether the mailbox is sending data to a local receiver, where
blocks can be directly passed to the
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
index 0efcd22020a..99400c0dd2d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
@@ -124,7 +124,7 @@ public class MailboxContentObserver implements
StreamObserver<MailboxContent> {
@Override
public void onError(Throwable t) {
- LOGGER.warn("Error on receiver side", t);
+ LOGGER.warn("Receiving mailbox received an error from sender side", t);
_mailboxBuffers.clear();
if (_mailbox != null) {
String msg = t != null ? t.getMessage() : "Unknown";
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
index 4288f4a087a..259109d3d92 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.mailbox.channel;
+import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -67,7 +68,27 @@ public class MailboxStatusObserver implements
StreamObserver<MailboxStatus> {
@Override
public void onError(Throwable t) {
- LOGGER.warn("Error on sender side", t);
+ boolean skipLog = false;
+ if (t instanceof StatusRuntimeException) {
+ switch (((StatusRuntimeException) t).getStatus().getCode()) {
+ case CANCELLED:
+ // If the receiver cancelled the stream, we should not treat it as
an error.
+ LOGGER.trace("Sending mailbox stream cancelled by receiving side");
+ skipLog = true;
+ break;
+ case DEADLINE_EXCEEDED:
+ // If the request timeout, we should not treat it as an error.
+ LOGGER.trace("Sending mailbox stream deadline exceeded");
+ skipLog = true;
+ break;
+ default:
+ // Other gRPC errors are treated as errors
+ break;
+ }
+ }
+ if (!skipLog) {
+ LOGGER.warn("Sending mailbox received an error from receiving side", t);
+ }
_finished.set(true);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 5626cd4d3a0..f6f598bd48a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -325,11 +325,9 @@ public class QueryRunner {
long deadlineMs = executionContext.getPassiveDeadlineMs();
for (RoutingInfo routingInfo : routingInfos) {
String mailboxId = routingInfo.getMailboxId();
- try {
- StatMap<MailboxSendOperator.StatKey> statMap = new
StatMap<>(MailboxSendOperator.StatKey.class);
- SendingMailbox sendingMailbox =
- _mailboxService.getSendingMailbox(routingInfo.getHostname(),
routingInfo.getPort(),
- mailboxId, deadlineMs, statMap);
+ StatMap<MailboxSendOperator.StatKey> statMap = new
StatMap<>(MailboxSendOperator.StatKey.class);
+ try (SendingMailbox sendingMailbox =
_mailboxService.getSendingMailbox(routingInfo.getHostname(),
+ routingInfo.getPort(), mailboxId, deadlineMs, statMap)) {
// TODO: Here we are breaking the stats invariants, sending errors
without including the stats of the
// current stage. We will need to fix this in future, but for now, we
are sending the error block without
// the stats.
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index c99c469a90a..02292b180e6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
/**
* This class contains the shared logic across all different exchange types
for exchanging data across servers.
*/
-public abstract class BlockExchange {
+public abstract class BlockExchange implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(BlockExchange.class);
// TODO: Deduct this value via grpc config maximum byte size; and make it
configurable with override.
// TODO: Max block size is a soft limit. only counts fixedSize datatable
byte buffer
@@ -176,9 +176,27 @@ public abstract class BlockExchange {
protected abstract void route(List<SendingMailbox> destinations,
MseBlock.Data block);
- // Called when the OpChain gracefully returns.
- // TODO: This is a no-op right now.
+ @Override
public void close() {
+ RuntimeException firstException = null;
+ for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+ try {
+ sendingMailbox.close();
+ } catch (Exception e) {
+ if (firstException == null) {
+ if (firstException instanceof RuntimeException) {
+ firstException = (RuntimeException) e;
+ } else {
+ firstException = new RuntimeException(e);
+ }
+ } else {
+ firstException.addSuppressed(e);
+ }
+ }
+ }
+ if (firstException != null) {
+ throw firstException;
+ }
}
public void cancel(Throwable t) {
@@ -253,5 +271,10 @@ public abstract class BlockExchange {
public String toString() {
return "e" + _id;
}
+
+ @Override
+ public void close() {
+ BlockExchange.this.close();
+ }
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 6edba101848..28e81f440b8 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -274,6 +274,7 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
// query pinot
runQuery(sql, expectErrorMsg, false).ifPresent(queryResult -> {
try {
+ Assert.assertNull(queryResult.getProcessingException(), "Expected no
exception");
compareRowEquals(queryResult.getResultTable(), queryH2(h2Sql),
keepOutputRowOrder);
} catch (Exception e) {
Assert.fail(e.getMessage(), e);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]