Jackie-Jiang commented on code in PR #17003:
URL: https://github.com/apache/pinot/pull/17003#discussion_r2426977054
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java:
##########
@@ -143,8 +143,8 @@ public void onError(Throwable t) {
_mailboxBuffers.clear();
if (_mailbox != null) {
String msg = t != null ? t.getMessage() : "Unknown";
- _mailbox.setErrorBlock(ErrorMseBlock.fromError(
- QueryErrorCode.QUERY_CANCELLATION, "Cancelled by sender with
exception: " + msg), List.of());
+ String errorMessage = "GRPC mailbox cancelled by sender with exception:
" + msg;
+
_mailbox.setErrorBlock(ErrorMseBlock.fromError(QueryErrorCode.QUERY_CANCELLATION,
errorMessage), List.of());
Review Comment:
Not introduced by this PR, but I feel this shouldn't be count as
`QUERY_CANCELLATION`. It is probably an `INTERNAL` error
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java:
##########
@@ -80,7 +79,7 @@ public void setUpStats() {
_stats = new StatMap<>(MailboxSendOperator.StatKey.class);
}
- @Test
+ @Test(timeOut = 100000)
Review Comment:
Is it recommended to use `timeOut` here? Is it possible to make it on the
class level so that we don't need to put it for every method?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -68,19 +67,19 @@ public class GrpcSendingMailbox implements SendingMailbox {
private final long _deadlineMs;
private final StatMap<MailboxSendOperator.StatKey> _statMap;
private final MailboxStatusObserver _statusObserver = new
MailboxStatusObserver();
- private final Sender _sender;
+ private final int _maxByteStringSize;
private StreamObserver<MailboxContent> _contentObserver;
public GrpcSendingMailbox(String id, ChannelManager channelManager, String
hostname, int port, long deadlineMs,
- StatMap<MailboxSendOperator.StatKey> statMap, int maxByteStringSize) {
+ StatMap<MailboxSendOperator.StatKey> statMap, int maxInboundMessageSize)
{
_id = id;
_channelManager = channelManager;
_hostname = hostname;
_port = port;
_deadlineMs = deadlineMs;
_statMap = statMap;
- _sender = maxByteStringSize > 0 ? new SplitSender(this, maxByteStringSize)
: new NonSplitSender(this);
+ _maxByteStringSize = Math.max(maxInboundMessageSize / 2, 1);
Review Comment:
Consider adding a TODO to revisit this decision. If there are certain
overhead (e.g. bytes added to the serialized data) for gRPC message, we should
leave absolute bytes buffer, instead of only allowing half size
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]