This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7baf82cc56 Improve byte-based gRPC block splitting (#15694) 7baf82cc56 is described below commit 7baf82cc5698b6b88958429977ad1e7e8c5de7bb Author: Alberto Bastos <alberto.var...@startree.ai> AuthorDate: Fri Jun 6 08:35:48 2025 +0200 Improve byte-based gRPC block splitting (#15694) --- .../tests/MultiStageEngineSmallBufferTest.java | 180 +++++++++++++++++++++ .../pinot/query/mailbox/GrpcSendingMailbox.java | 127 +++++++++------ .../apache/pinot/query/mailbox/MailboxService.java | 17 +- 3 files changed, 274 insertions(+), 50 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineSmallBufferTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineSmallBufferTest.java new file mode 100644 index 0000000000..a1428fd935 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineSmallBufferTest.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import org.apache.commons.io.FileUtils; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT; +import static org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + + +public class MultiStageEngineSmallBufferTest extends BaseClusterIntegrationTestSet { + + private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema"; + private static final int NUM_SERVERS = 4; + private static final int INBOUND_BLOCK_SIZE = 256; + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + + // Set the multi-stage max server query threads for the cluster, so that we can test the query queueing logic + // in the MultiStageBrokerRequestHandler + HelixConfigScope scope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName()) + .build(); + _helixManager.getConfigAccessor() + .set(scope, CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "30"); + + startBroker(); + startServer(); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + // Set up the H2 connection + setUpH2Connection(avroFiles); + + // Initialize the query generator + setUpQueryGenerator(avroFiles); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + @Override + protected void startServer() + throws Exception { + startServers(NUM_SERVERS); + } + + @AfterClass + public void tearDown() + throws Exception { + dropOfflineTable(DEFAULT_TABLE_NAME); + + stopServer(); + stopBroker(); + stopController(); + stopZk(); + + FileUtils.deleteDirectory(_tempDir); + } + + @Override + protected String getSchemaFileName() { + return SCHEMA_FILE_NAME; + } + + @BeforeMethod + @Override + public void resetMultiStage() { + setUseMultiStageQueryEngine(true); + } + + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + brokerConf.setProperty(KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, INBOUND_BLOCK_SIZE); + brokerConf.setProperty(KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT, true); + } + + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + serverConf.setProperty(KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, INBOUND_BLOCK_SIZE); + serverConf.setProperty(KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT, true); + } + + @Test(invocationCount = 50) + public void testConcurrentSplittedMailboxes() + throws Exception { + int numClients = 32; + + String query = + "SELECT ActualElapsedTime FROM mytable ORDER BY ActualElapsedTime DESC LIMIT 100"; + String expected = "[[678],[668],[662],[658],[651],[650],[647],[629],[625],[625],[621],[617],[610],[610],[607],[605]" + + ",[603],[582],[578],[576],[574],[572],[572],[566],[565],[564],[558],[555],[555],[554],[554],[553],[552],[550]" + + ",[549],[544],[543],[541],[540],[538],[537],[535],[533],[532],[526],[521],[520],[519],[518],[516],[515],[514]" + + ",[508],[508],[507],[505],[505],[502],[502],[499],[499],[498],[495],[494],[494],[493],[490],[487],[487],[484]" + + ",[484],[481],[481],[480],[479],[478],[478],[477],[473],[472],[471],[468],[465],[464],[464],[463],[461],[461]" + + ",[460],[459],[459],[458],[457],[456],[453],[453],[451],[451],[450],[450]]"; + + List<String> results = Collections.synchronizedList(new ArrayList<>(numClients)); + + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numClients); + for (int i = 0; i < numClients; i++) { + new Thread(() -> { + try { + startLatch.await(); + JsonNode jsonNode = postQuery(query); + assertNotNull(jsonNode); + String actual = jsonNode.get("resultTable").get("rows").toString(); + results.add(actual); + } catch (Exception e) { + results.add("Error: " + e.getMessage()); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }).start(); + } + + startLatch.countDown(); + doneLatch.await(); + + assertEquals(numClients, results.size()); + for (String result : results) { + assertEquals(expected, result); + } + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java index 7165fb2ec4..0ef5fed139 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java @@ -51,7 +51,6 @@ import org.apache.pinot.query.runtime.operator.MailboxSendOperator; import org.apache.pinot.segment.spi.memory.DataBuffer; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.exception.QueryErrorCode; -import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,28 +69,20 @@ public class GrpcSendingMailbox implements SendingMailbox { private final long _deadlineMs; private final StatMap<MailboxSendOperator.StatKey> _statMap; private final MailboxStatusObserver _statusObserver = new MailboxStatusObserver(); - private final boolean _splitBlocks; - private final int _maxByteStringSize; + private final Sender _sender; private StreamObserver<MailboxContent> _contentObserver; public GrpcSendingMailbox( PinotConfiguration config, String id, ChannelManager channelManager, String hostname, int port, long deadlineMs, - StatMap<MailboxSendOperator.StatKey> statMap) { + StatMap<MailboxSendOperator.StatKey> statMap, int maxByteStringSize) { _id = id; _channelManager = channelManager; _hostname = hostname; _port = port; _deadlineMs = deadlineMs; _statMap = statMap; - _splitBlocks = config.getProperty( - CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT, - CommonConstants.MultiStageQueryRunner.DEFAULT_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT); - // so far we ensure payload is not bigger than maxBlockSize/2, we can fine tune this later - _maxByteStringSize = Math.max(config.getProperty( - CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, - CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES - ) / 2, 1); + _sender = maxByteStringSize > 0 ? new SplitSender(this, maxByteStringSize) : new NonSplitSender(this); } @Override @@ -124,12 +115,31 @@ public class GrpcSendingMailbox implements SendingMailbox { if (_contentObserver == null) { _contentObserver = getContentObserver(); } - splitAndSend(block, serializedStats); + processAndSend(block, serializedStats); if (LOGGER.isDebugEnabled()) { LOGGER.debug("==[GRPC SEND]== message " + block + " sent to: " + _id); } } + private void processAndSend(MseBlock block, List<DataBuffer> serializedStats) + throws IOException { + _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1); + long start = System.currentTimeMillis(); + try { + DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, serializedStats); + int sizeInBytes = _sender.processAndSend(dataBlock); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Serialized block: {} to {} bytes", block, sizeInBytes); + } + _statMap.merge(MailboxSendOperator.StatKey.SERIALIZED_BYTES, sizeInBytes); + } catch (Throwable t) { + LOGGER.warn("Caught exception while serializing block: {}", block, t); + throw t; + } finally { + _statMap.merge(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS, System.currentTimeMillis() - start); + } + } + @Override public void complete() { if (isTerminated()) { @@ -155,7 +165,7 @@ public class GrpcSendingMailbox implements SendingMailbox { // NOTE: DO NOT use onError() because it will terminate the stream, and receiver might not get the callback MseBlock errorBlock = ErrorMseBlock.fromError( QueryErrorCode.QUERY_CANCELLATION, "Cancelled by sender with exception: " + msg); - splitAndSend(errorBlock, List.of()); + processAndSend(errorBlock, List.of()); _contentObserver.onCompleted(); } catch (Exception e) { // Exception can be thrown if the stream is already closed, so we simply ignore it @@ -183,39 +193,7 @@ public class GrpcSendingMailbox implements SendingMailbox { .open(_statusObserver); } - private void splitAndSend(MseBlock block, List<DataBuffer> serializedStats) - throws IOException { - _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1); - long start = System.currentTimeMillis(); - try { - DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, serializedStats); - List<ByteString> byteStrings; - if (_splitBlocks) { - byteStrings = toByteStrings(dataBlock, _maxByteStringSize); - } else { - byteStrings = List.of(DataBlockUtils.toByteString(dataBlock)); - } - int sizeInBytes = byteStrings.stream().mapToInt(ByteString::size).reduce(0, Integer::sum); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Serialized block: {} to {} bytes", block, sizeInBytes); - } - _statMap.merge(MailboxSendOperator.StatKey.SERIALIZED_BYTES, sizeInBytes); - - Iterator<ByteString> byteStringIt = byteStrings.iterator(); - while (byteStringIt.hasNext()) { - ByteString byteString = byteStringIt.next(); - boolean waitForMore = byteStringIt.hasNext(); - sendContent(byteString, waitForMore); - } - } catch (Throwable t) { - LOGGER.warn("Caught exception while serializing block: {}", block, t); - throw t; - } finally { - _statMap.merge(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS, System.currentTimeMillis() - start); - } - } - - private void sendContent(ByteString byteString, boolean waitForMore) { + protected void sendContent(ByteString byteString, boolean waitForMore) { MailboxContent content = MailboxContent.newBuilder() .setMailboxId(_id) .setPayload(byteString) @@ -232,8 +210,7 @@ public class GrpcSendingMailbox implements SendingMailbox { private static class MseBlockSerializer implements MseBlock.Visitor<DataBlock, List<DataBuffer>> { private static final MseBlockSerializer INSTANCE = new MseBlockSerializer(); - public static DataBlock toDataBlock(MseBlock block, List<DataBuffer> serializedStats) - throws IOException { + public static DataBlock toDataBlock(MseBlock block, List<DataBuffer> serializedStats) { return block.accept(INSTANCE, serializedStats); } @@ -326,4 +303,56 @@ public class GrpcSendingMailbox implements SendingMailbox { return result; } + + private static abstract class Sender { + protected final GrpcSendingMailbox _mailbox; + + protected Sender(GrpcSendingMailbox mailbox) { + _mailbox = mailbox; + } + + protected abstract int processAndSend(DataBlock dataBlock) + throws IOException; + } + + private static class SplitSender extends Sender { + private final int _maxByteStringSize; + + public SplitSender(GrpcSendingMailbox mailbox, int maxByteStringSize) { + super(mailbox); + _maxByteStringSize = maxByteStringSize; + } + + @Override + protected int processAndSend(DataBlock dataBlock) + throws IOException { + List<ByteString> byteStrings = toByteStrings(dataBlock, _maxByteStringSize); + int sizeInBytes = 0; + for (ByteString byteString : byteStrings) { + sizeInBytes += byteString.size(); + } + Iterator<ByteString> byteStringIt = byteStrings.iterator(); + while (byteStringIt.hasNext()) { + ByteString byteString = byteStringIt.next(); + boolean waitForMore = byteStringIt.hasNext(); + _mailbox.sendContent(byteString, waitForMore); + } + return sizeInBytes; + } + } + + private static class NonSplitSender extends Sender { + public NonSplitSender(GrpcSendingMailbox mailbox) { + super(mailbox); + } + + @Override + protected int processAndSend(DataBlock dataBlock) + throws IOException { + ByteString byteString = DataBlockUtils.toByteString(dataBlock); + int sizeInBytes = byteString.size(); + _mailbox.sendContent(byteString, false); + return sizeInBytes; + } + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java index e53aac1d02..e8b778d4ee 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java @@ -30,6 +30,7 @@ import org.apache.pinot.query.mailbox.channel.ChannelManager; import org.apache.pinot.query.mailbox.channel.GrpcMailboxServer; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +65,7 @@ public class MailboxService { private final PinotConfiguration _config; private final ChannelManager _channelManager; @Nullable private final TlsConfig _tlsConfig; + private final int _maxByteStringSize; private GrpcMailboxServer _grpcMailboxServer; @@ -77,6 +79,18 @@ public class MailboxService { _config = config; _tlsConfig = tlsConfig; _channelManager = new ChannelManager(tlsConfig); + boolean splitBlocks = config.getProperty( + CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT, + CommonConstants.MultiStageQueryRunner.DEFAULT_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT); + if (splitBlocks) { + // so far we ensure payload is not bigger than maxBlockSize/2, we can fine tune this later + _maxByteStringSize = Math.max(config.getProperty( + CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, + CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES + ) / 2, 1); + } else { + _maxByteStringSize = 0; + } LOGGER.info("Initialized MailboxService with hostname: {}, port: {}", hostname, port); } @@ -115,7 +129,8 @@ public class MailboxService { if (_hostname.equals(hostname) && _port == port) { return new InMemorySendingMailbox(mailboxId, this, deadlineMs, statMap); } else { - return new GrpcSendingMailbox(_config, mailboxId, _channelManager, hostname, port, deadlineMs, statMap); + return new GrpcSendingMailbox( + _config, mailboxId, _channelManager, hostname, port, deadlineMs, statMap, _maxByteStringSize); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org