This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7baf82cc56 Improve byte-based gRPC block splitting (#15694)
7baf82cc56 is described below

commit 7baf82cc5698b6b88958429977ad1e7e8c5de7bb
Author: Alberto Bastos <alberto.var...@startree.ai>
AuthorDate: Fri Jun 6 08:35:48 2025 +0200

    Improve byte-based gRPC block splitting (#15694)
---
 .../tests/MultiStageEngineSmallBufferTest.java     | 180 +++++++++++++++++++++
 .../pinot/query/mailbox/GrpcSendingMailbox.java    | 127 +++++++++------
 .../apache/pinot/query/mailbox/MailboxService.java |  17 +-
 3 files changed, 274 insertions(+), 50 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineSmallBufferTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineSmallBufferTest.java
new file mode 100644
index 0000000000..a1428fd935
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineSmallBufferTest.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT;
+import static 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+
+public class MultiStageEngineSmallBufferTest extends 
BaseClusterIntegrationTestSet {
+
+  private static final String SCHEMA_FILE_NAME = 
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
+  private static final int NUM_SERVERS = 4;
+  private static final int INBOUND_BLOCK_SIZE = 256;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+
+    // Set the multi-stage max server query threads for the cluster, so that 
we can test the query queueing logic
+    // in the MultiStageBrokerRequestHandler
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    _helixManager.getConfigAccessor()
+        .set(scope, 
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, 
"30");
+
+    startBroker();
+    startServer();
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    // Set up the H2 connection
+    setUpH2Connection(avroFiles);
+
+    // Initialize the query generator
+    setUpQueryGenerator(avroFiles);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @Override
+  protected void startServer()
+      throws Exception {
+    startServers(NUM_SERVERS);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    dropOfflineTable(DEFAULT_TABLE_NAME);
+
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  @Override
+  protected String getSchemaFileName() {
+    return SCHEMA_FILE_NAME;
+  }
+
+  @BeforeMethod
+  @Override
+  public void resetMultiStage() {
+    setUseMultiStageQueryEngine(true);
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    brokerConf.setProperty(KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, 
INBOUND_BLOCK_SIZE);
+    brokerConf.setProperty(KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT, true);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    serverConf.setProperty(KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, 
INBOUND_BLOCK_SIZE);
+    serverConf.setProperty(KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT, true);
+  }
+
+  @Test(invocationCount = 50)
+  public void testConcurrentSplittedMailboxes()
+      throws Exception {
+    int numClients = 32;
+
+    String query =
+        "SELECT ActualElapsedTime FROM mytable ORDER BY ActualElapsedTime DESC 
LIMIT 100";
+    String expected = 
"[[678],[668],[662],[658],[651],[650],[647],[629],[625],[625],[621],[617],[610],[610],[607],[605]"
+        + 
",[603],[582],[578],[576],[574],[572],[572],[566],[565],[564],[558],[555],[555],[554],[554],[553],[552],[550]"
+        + 
",[549],[544],[543],[541],[540],[538],[537],[535],[533],[532],[526],[521],[520],[519],[518],[516],[515],[514]"
+        + 
",[508],[508],[507],[505],[505],[502],[502],[499],[499],[498],[495],[494],[494],[493],[490],[487],[487],[484]"
+        + 
",[484],[481],[481],[480],[479],[478],[478],[477],[473],[472],[471],[468],[465],[464],[464],[463],[461],[461]"
+        + 
",[460],[459],[459],[458],[457],[456],[453],[453],[451],[451],[450],[450]]";
+
+    List<String> results = Collections.synchronizedList(new 
ArrayList<>(numClients));
+
+    CountDownLatch startLatch = new CountDownLatch(1);
+    CountDownLatch doneLatch = new CountDownLatch(numClients);
+      for (int i = 0; i < numClients; i++) {
+        new Thread(() -> {
+          try {
+            startLatch.await();
+            JsonNode jsonNode = postQuery(query);
+            assertNotNull(jsonNode);
+            String actual = jsonNode.get("resultTable").get("rows").toString();
+            results.add(actual);
+          } catch (Exception e) {
+            results.add("Error: " + e.getMessage());
+            e.printStackTrace();
+          } finally {
+            doneLatch.countDown();
+          }
+        }).start();
+      }
+
+      startLatch.countDown();
+      doneLatch.await();
+
+      assertEquals(numClients, results.size());
+      for (String result : results) {
+        assertEquals(expected, result);
+      }
+    }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 7165fb2ec4..0ef5fed139 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -51,7 +51,6 @@ import 
org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.segment.spi.memory.DataBuffer;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.QueryErrorCode;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,28 +69,20 @@ public class GrpcSendingMailbox implements SendingMailbox {
   private final long _deadlineMs;
   private final StatMap<MailboxSendOperator.StatKey> _statMap;
   private final MailboxStatusObserver _statusObserver = new 
MailboxStatusObserver();
-  private final boolean _splitBlocks;
-  private final int _maxByteStringSize;
+  private final Sender _sender;
 
   private StreamObserver<MailboxContent> _contentObserver;
 
   public GrpcSendingMailbox(
       PinotConfiguration config, String id, ChannelManager channelManager, 
String hostname, int port, long deadlineMs,
-      StatMap<MailboxSendOperator.StatKey> statMap) {
+      StatMap<MailboxSendOperator.StatKey> statMap, int maxByteStringSize) {
     _id = id;
     _channelManager = channelManager;
     _hostname = hostname;
     _port = port;
     _deadlineMs = deadlineMs;
     _statMap = statMap;
-    _splitBlocks = config.getProperty(
-        
CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT,
-        
CommonConstants.MultiStageQueryRunner.DEFAULT_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT);
-    // so far we ensure payload is not bigger than maxBlockSize/2, we can fine 
tune this later
-    _maxByteStringSize = Math.max(config.getProperty(
-        
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
-        
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
-    ) / 2, 1);
+    _sender = maxByteStringSize > 0 ? new SplitSender(this, maxByteStringSize) 
: new NonSplitSender(this);
   }
 
   @Override
@@ -124,12 +115,31 @@ public class GrpcSendingMailbox implements SendingMailbox 
{
     if (_contentObserver == null) {
       _contentObserver = getContentObserver();
     }
-    splitAndSend(block, serializedStats);
+    processAndSend(block, serializedStats);
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("==[GRPC SEND]== message " + block + " sent to: " + _id);
     }
   }
 
+  private void processAndSend(MseBlock block, List<DataBuffer> serializedStats)
+      throws IOException {
+    _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
+    long start = System.currentTimeMillis();
+    try {
+      DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, 
serializedStats);
+      int sizeInBytes = _sender.processAndSend(dataBlock);
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Serialized block: {} to {} bytes", block, sizeInBytes);
+      }
+      _statMap.merge(MailboxSendOperator.StatKey.SERIALIZED_BYTES, 
sizeInBytes);
+    } catch (Throwable t) {
+      LOGGER.warn("Caught exception while serializing block: {}", block, t);
+      throw t;
+    } finally {
+      _statMap.merge(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS, 
System.currentTimeMillis() - start);
+    }
+  }
+
   @Override
   public void complete() {
     if (isTerminated()) {
@@ -155,7 +165,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
       // NOTE: DO NOT use onError() because it will terminate the stream, and 
receiver might not get the callback
       MseBlock errorBlock = ErrorMseBlock.fromError(
           QueryErrorCode.QUERY_CANCELLATION, "Cancelled by sender with 
exception: " + msg);
-      splitAndSend(errorBlock, List.of());
+      processAndSend(errorBlock, List.of());
       _contentObserver.onCompleted();
     } catch (Exception e) {
       // Exception can be thrown if the stream is already closed, so we simply 
ignore it
@@ -183,39 +193,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
         .open(_statusObserver);
   }
 
-  private void splitAndSend(MseBlock block, List<DataBuffer> serializedStats)
-      throws IOException {
-    _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
-    long start = System.currentTimeMillis();
-    try {
-      DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, 
serializedStats);
-      List<ByteString> byteStrings;
-      if (_splitBlocks) {
-        byteStrings = toByteStrings(dataBlock, _maxByteStringSize);
-      } else {
-        byteStrings = List.of(DataBlockUtils.toByteString(dataBlock));
-      }
-      int sizeInBytes = 
byteStrings.stream().mapToInt(ByteString::size).reduce(0, Integer::sum);
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Serialized block: {} to {} bytes", block, sizeInBytes);
-      }
-      _statMap.merge(MailboxSendOperator.StatKey.SERIALIZED_BYTES, 
sizeInBytes);
-
-      Iterator<ByteString> byteStringIt = byteStrings.iterator();
-      while (byteStringIt.hasNext()) {
-        ByteString byteString = byteStringIt.next();
-        boolean waitForMore = byteStringIt.hasNext();
-        sendContent(byteString, waitForMore);
-      }
-    } catch (Throwable t) {
-      LOGGER.warn("Caught exception while serializing block: {}", block, t);
-      throw t;
-    } finally {
-      _statMap.merge(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS, 
System.currentTimeMillis() - start);
-    }
-  }
-
-  private void sendContent(ByteString byteString, boolean waitForMore) {
+  protected void sendContent(ByteString byteString, boolean waitForMore) {
     MailboxContent content = MailboxContent.newBuilder()
         .setMailboxId(_id)
         .setPayload(byteString)
@@ -232,8 +210,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
   private static class MseBlockSerializer implements 
MseBlock.Visitor<DataBlock, List<DataBuffer>> {
     private static final MseBlockSerializer INSTANCE = new 
MseBlockSerializer();
 
-    public static DataBlock toDataBlock(MseBlock block, List<DataBuffer> 
serializedStats)
-        throws IOException {
+    public static DataBlock toDataBlock(MseBlock block, List<DataBuffer> 
serializedStats) {
       return block.accept(INSTANCE, serializedStats);
     }
 
@@ -326,4 +303,56 @@ public class GrpcSendingMailbox implements SendingMailbox {
 
     return result;
   }
+
+  private static abstract class Sender {
+    protected final GrpcSendingMailbox _mailbox;
+
+    protected Sender(GrpcSendingMailbox mailbox) {
+      _mailbox = mailbox;
+    }
+
+    protected abstract int processAndSend(DataBlock dataBlock)
+        throws IOException;
+  }
+
+  private static class SplitSender extends Sender {
+    private final int _maxByteStringSize;
+
+    public SplitSender(GrpcSendingMailbox mailbox, int maxByteStringSize) {
+      super(mailbox);
+      _maxByteStringSize = maxByteStringSize;
+    }
+
+    @Override
+    protected int processAndSend(DataBlock dataBlock)
+        throws IOException {
+      List<ByteString> byteStrings = toByteStrings(dataBlock, 
_maxByteStringSize);
+      int sizeInBytes = 0;
+      for (ByteString byteString : byteStrings) {
+        sizeInBytes += byteString.size();
+      }
+      Iterator<ByteString> byteStringIt = byteStrings.iterator();
+      while (byteStringIt.hasNext()) {
+        ByteString byteString = byteStringIt.next();
+        boolean waitForMore = byteStringIt.hasNext();
+        _mailbox.sendContent(byteString, waitForMore);
+      }
+      return sizeInBytes;
+    }
+  }
+
+  private static class NonSplitSender extends Sender {
+    public NonSplitSender(GrpcSendingMailbox mailbox) {
+      super(mailbox);
+    }
+
+    @Override
+    protected int processAndSend(DataBlock dataBlock)
+        throws IOException {
+      ByteString byteString = DataBlockUtils.toByteString(dataBlock);
+      int sizeInBytes = byteString.size();
+      _mailbox.sendContent(byteString, false);
+      return sizeInBytes;
+    }
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index e53aac1d02..e8b778d4ee 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -30,6 +30,7 @@ import org.apache.pinot.query.mailbox.channel.ChannelManager;
 import org.apache.pinot.query.mailbox.channel.GrpcMailboxServer;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +65,7 @@ public class MailboxService {
   private final PinotConfiguration _config;
   private final ChannelManager _channelManager;
   @Nullable private final TlsConfig _tlsConfig;
+  private final int _maxByteStringSize;
 
   private GrpcMailboxServer _grpcMailboxServer;
 
@@ -77,6 +79,18 @@ public class MailboxService {
     _config = config;
     _tlsConfig = tlsConfig;
     _channelManager = new ChannelManager(tlsConfig);
+    boolean splitBlocks = config.getProperty(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT);
+    if (splitBlocks) {
+      // so far we ensure payload is not bigger than maxBlockSize/2, we can 
fine tune this later
+      _maxByteStringSize = Math.max(config.getProperty(
+          
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+          
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
+      ) / 2, 1);
+    } else {
+      _maxByteStringSize = 0;
+    }
     LOGGER.info("Initialized MailboxService with hostname: {}, port: {}", 
hostname, port);
   }
 
@@ -115,7 +129,8 @@ public class MailboxService {
     if (_hostname.equals(hostname) && _port == port) {
       return new InMemorySendingMailbox(mailboxId, this, deadlineMs, statMap);
     } else {
-      return new GrpcSendingMailbox(_config, mailboxId, _channelManager, 
hostname, port, deadlineMs, statMap);
+      return new GrpcSendingMailbox(
+          _config, mailboxId, _channelManager, hostname, port, deadlineMs, 
statMap, _maxByteStringSize);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to