gortiz commented on code in PR #16899:
URL: https://github.com/apache/pinot/pull/16899#discussion_r2402007761
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -305,6 +313,16 @@ static List<ByteString> toByteStrings(List<ByteBuffer>
bytes, int maxByteStringS
return result;
}
+ @Override
+ public void close()
+ throws Exception {
+ if (!isTerminated()) {
+ LOGGER.debug("Closing gPRC mailbox without proper EOS message");
+ _closeAttempted = true;
+ _contentObserver.onError(Status.CANCELLED.asException());
Review Comment:
Changed
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java:
##########
@@ -123,7 +125,13 @@ private void cancelStream() {
@Override
public void onError(Throwable t) {
- LOGGER.warn("Error on receiver side", t);
+ if (t instanceof StatusRuntimeException
Review Comment:
This is still the way we should implement our endpoints, given that this is
how gRPC implements cancellation. However, I'm going to remove it because we
will continue using our own mode.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -174,9 +174,15 @@ protected void sendBlock(SendingMailbox sendingMailbox,
MseBlock.Data block)
protected abstract void route(List<SendingMailbox> destinations,
MseBlock.Data block)
throws IOException, TimeoutException;
- // Called when the OpChain gracefully returns.
- // TODO: This is a no-op right now.
+ @Override
public void close() {
+ for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+ try {
+ sendingMailbox.close();
+ } catch (Exception e) {
+ LOGGER.debug("Exception while cancelling mailbox: {}", sendingMailbox,
e);
Review Comment:
I've rewritten this code. It wasn't a good idea to suppress the exception.
Instead we should try to close all sending mailboxes and then throw one of the
exceptions.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -135,4 +136,13 @@ public boolean isTerminated() {
public String toString() {
return "m" + _id;
}
+
+ @Override
+ public void close() {
Review Comment:
We are actually calling cancel here, right?
I'm changing the log to warn, as in gRPC
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]