This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b2f7045d7e Make stats system more resilient in MSE (#15312) b2f7045d7e is described below commit b2f7045d7ebbdd879f4636b174bcffd866c2e512 Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Tue Mar 25 11:54:54 2025 +0100 Make stats system more resilient in MSE (#15312) --- .../broker/broker/helix/BaseBrokerStarter.java | 1 + .../pinot/common/utils/helix/HelixHelper.java | 12 ++ .../apache/pinot/common/version/PinotVersion.java | 5 +- .../pinot/controller/BaseControllerStarter.java | 2 + .../MultiStageWithoutStatsIntegrationTest.java | 171 +++++++++++++++++++ .../org/apache/pinot/minion/BaseMinionStarter.java | 1 + .../apache/pinot/query/runtime/QueryRunner.java | 12 +- .../runtime/operator/MailboxSendOperator.java | 18 +- .../utils/BlockingMultiStreamConsumer.java | 111 +++++++++++-- .../query/runtime/plan/MultiStageQueryStats.java | 44 ++++- .../runtime/plan/OpChainExecutionContext.java | 9 +- .../plan/pipeline/PipelineBreakerExecutor.java | 12 +- .../query/service/dispatch/QueryDispatcher.java | 2 +- .../apache/pinot/query/QueryServerEnclosure.java | 2 +- .../executor/OpChainSchedulerServiceTest.java | 3 +- .../operator/MailboxReceiveOperatorTest.java | 77 ++++++++- .../runtime/operator/MailboxSendOperatorTest.java | 2 +- .../query/runtime/operator/OperatorTestUtil.java | 4 +- .../plan/pipeline/PipelineBreakerExecutorTest.java | 22 ++- .../pinot/server/starter/ServerInstance.java | 5 +- .../server/starter/helix/BaseServerStarter.java | 12 +- .../server/starter/helix/SendStatsPredicate.java | 185 +++++++++++++++++++++ .../pinot/server/worker/WorkerQueryServer.java | 7 +- .../apache/pinot/spi/utils/CommonConstants.java | 18 ++ 24 files changed, 676 insertions(+), 61 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index e136ade820..8ba28333cc 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -549,6 +549,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable { instanceTags = instanceConfig.getTags(); updated = true; } + updated |= HelixHelper.updatePinotVersion(instanceConfig); if (updated) { HelixHelper.updateInstanceConfig(_participantHelixManager, instanceConfig); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java index 37c36cd567..c5071e4939 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java @@ -45,6 +45,7 @@ import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; import org.apache.pinot.common.helix.ExtraInstanceConfig; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.common.version.PinotVersion; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; @@ -642,4 +643,15 @@ public class HelixHelper { boolean mapUpdated = record.getMapFields().remove(disabledPartitionsKey) != null; return listUpdated | mapUpdated; } + + public static boolean updatePinotVersion(InstanceConfig instanceConfig) { + ZNRecord record = instanceConfig.getRecord(); + String currentVer = PinotVersion.VERSION; + String oldVer = record.getSimpleField(CommonConstants.Helix.Instance.PINOT_VERSION_KEY); + if (!currentVer.equals(oldVer)) { + record.setSimpleField(CommonConstants.Helix.Instance.PINOT_VERSION_KEY, currentVer); + return true; + } + return false; + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/version/PinotVersion.java b/pinot-common/src/main/java/org/apache/pinot/common/version/PinotVersion.java index ad52a015b4..1058cc0f29 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/version/PinotVersion.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/version/PinotVersion.java @@ -40,6 +40,7 @@ public class PinotVersion { * in a local, non-maven build this will resolve to {@code UNKNOWN}. */ public static final String VERSION; + public static final String UNKNOWN = "UNKNOWN"; /** * A sanitized version string with all dots replaced with underscores, which is necessary @@ -61,13 +62,13 @@ public class PinotVersion { version = String.valueOf(properties.get("pinot.version")); } catch (IOException e) { LOGGER.error("Could not load version properties; setting version to UNKNOWN.", e); - version = "UNKNOWN"; + version = UNKNOWN; } // if building this via some non-maven environment (e.g. IntelliJ) it is possible that // the properties file will not be properly filtered, in which case just return UNKNOWN if (version.equals("${project.version}")) { - VERSION = "UNKNOWN"; + VERSION = UNKNOWN; LOGGER.warn("Using UNKNOWN version properties because project.version was not resolved during build."); } else { VERSION = version; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index e0a20b0798..c45edbf7b5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -838,6 +838,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { updated |= HelixHelper.addDefaultTags(instanceConfig, () -> Collections.singletonList(CommonConstants.Helix.CONTROLLER_INSTANCE)); updated |= HelixHelper.removeDisabledPartitions(instanceConfig); + updated |= HelixHelper.updatePinotVersion(instanceConfig); + if (updated) { HelixHelper.updateInstanceConfig(_helixParticipantManager, instanceConfig); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java new file mode 100644 index 0000000000..69ef1e27c8 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.pinot.server.starter.helix.SendStatsPredicate; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.util.TestUtils; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class MultiStageWithoutStatsIntegrationTest extends BaseClusterIntegrationTestSet { + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + HelixConfigScope scope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName()) + .build(); + startBrokers(1); + startServers(1); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments. For exhaustive testing, concurrently upload multiple segments with the same name + // and validate correctness with parallel push protection enabled. + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + // Create a copy of _tarDir to create multiple segments with the same name. + File tarDir2 = new File(_tempDir, "tarDir2"); + FileUtils.copyDirectory(_tarDir, tarDir2); + + List<File> tarDirs = new ArrayList<>(); + tarDirs.add(_tarDir); + tarDirs.add(tarDir2); + try { + uploadSegments(getTableName(), TableType.OFFLINE, tarDirs); + } catch (Exception e) { + // If enableParallelPushProtection is enabled and the same segment is uploaded concurrently, we could get one + // of the three exception: + // - 409 conflict of the second call enters ProcessExistingSegment + // - segmentZkMetadata creation failure if both calls entered ProcessNewSegment + // - Failed to copy segment tar file to final location due to the same segment pushed twice concurrently + // In such cases we upload all the segments again to ensure that the data is set up correctly. + assertTrue(e.getMessage().contains("Another segment upload is in progress for segment") || e.getMessage() + .contains("Failed to create ZK metadata for segment") || e.getMessage() + .contains("java.nio.file.FileAlreadyExistsException"), e.getMessage()); + uploadSegments(getTableName(), _tarDir); + } + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + serverConf.setProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE, + SendStatsPredicate.Mode.NEVER.name()); + super.overrideServerConf(serverConf); + } + + @AfterClass + public void tearDown() + throws Exception { + // Brokers and servers has been stopped + stopController(); + stopZk(); + FileUtils.deleteDirectory(_tempDir); + } + + public boolean useMultiStageQueryEngine() { + return true; + } + + @Override + protected List<FieldConfig> getFieldConfigs() { + return Collections.singletonList( + new FieldConfig("DivAirports", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), + FieldConfig.CompressionCodec.MV_ENTRY_DICT, null)); + } + + @Test + public void testIntersection() + throws Exception { + @Language("sql") + String query = "SELECT *\n" + + "FROM (\n" + + " SELECT CarrierDelay\n" + + " FROM mytable\n" + + " WHERE DaysSinceEpoch > 0\n" + + " )\n" + + "INTERSECT\n" + + "(\n" + + " SELECT ArrDelay\n" + + " FROM mytable\n" + + " WHERE DaysSinceEpoch > 0\n" + + ")"; + JsonNode node = postQuery(query); + + // Expected: + // "stageStats" : { + // "type" : "MAILBOX_RECEIVE", + // "executionTimeMs" : whatever, + // "fanIn" : whatever, + // "rawMessages" : whatever, + // "deserializedBytes" : whatever, + // "upstreamWaitMs" : whatever, + // "children" : [ { + // "type" : "EMPTY_MAILBOX_SEND", + // "stage" : 1, + // "description" : "No stats available for this stage. It may have been pruned." + // } ] + // } + + JsonNode stageStats = node.get("stageStats"); + assertNotNull(stageStats, "Stage stats should not be null"); + + assertEquals(stageStats.get("type").asText(), "MAILBOX_RECEIVE"); + + JsonNode children = stageStats.get("children"); + assertNotNull(children, "Children should not be null"); + assertEquals(children.size(), 1); + + JsonNode child = children.get(0); + assertEquals(child.get("type").asText(), "EMPTY_MAILBOX_SEND"); + assertEquals(child.get("stage").asInt(), 1); + assertEquals(child.get("description").asText(), "No stats available for this stage. It may have been pruned."); + } +} diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java index c55d9c1981..9daad84106 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java @@ -142,6 +142,7 @@ public abstract class BaseMinionStarter implements ServiceStartable { updated |= HelixHelper.addDefaultTags(instanceConfig, () -> Collections.singletonList(CommonConstants.Helix.UNTAGGED_MINION_INSTANCE)); updated |= HelixHelper.removeDisabledPartitions(instanceConfig); + updated |= HelixHelper.updatePinotVersion(instanceConfig); if (updated) { HelixHelper.updateInstanceConfig(_helixManager, instanceConfig); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 1ba7fda9ac..7320061c5e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -132,13 +133,14 @@ public class QueryRunner { private JoinOverFlowMode _joinOverflowMode; @Nullable private PhysicalTimeSeriesServerPlanVisitor _timeSeriesPhysicalPlanVisitor; + private BooleanSupplier _sendStats; /** * Initializes the query executor. * <p>Should be called only once and before calling any other method. */ public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, HelixManager helixManager, - ServerMetrics serverMetrics, @Nullable TlsConfig tlsConfig) { + ServerMetrics serverMetrics, @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats) { String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME); if (hostname.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) { hostname = hostname.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH); @@ -213,6 +215,8 @@ public class QueryRunner { TimeSeriesBuilderFactoryProvider.init(config); } + _sendStats = sendStats; + LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", hostname, port); } @@ -250,7 +254,7 @@ public class QueryRunner { // run pre-stage execution for all pipeline breakers PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler, _mailboxService, workerMetadata, stagePlan, - opChainMetadata, requestId, deadlineMs, parentContext); + opChainMetadata, requestId, deadlineMs, parentContext, _sendStats.getAsBoolean()); // Send error block to all the receivers if pipeline breaker fails if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) { @@ -287,7 +291,7 @@ public class QueryRunner { // run OpChain OpChainExecutionContext executionContext = new OpChainExecutionContext(_mailboxService, requestId, deadlineMs, opChainMetadata, stageMetadata, - workerMetadata, pipelineBreakerResult, parentContext); + workerMetadata, pipelineBreakerResult, parentContext, _sendStats.getAsBoolean()); OpChain opChain; if (workerMetadata.isLeafStageWorker()) { opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, _helixManager, _serverMetrics, @@ -470,7 +474,7 @@ public class QueryRunner { }; // compile OpChain OpChainExecutionContext executionContext = new OpChainExecutionContext(_mailboxService, requestId, deadlineMs, - opChainMetadata, stageMetadata, workerMetadata, null, null); + opChainMetadata, stageMetadata, workerMetadata, null, null, false); OpChain opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, _helixManager, _serverMetrics, _leafQueryExecutor, _executorService, leafNodesConsumer, true); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index 5e0a1938ed..ad91e32dd2 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -202,13 +202,17 @@ public class MailboxSendOperator extends MultiStageOperator { try { TransferableBlock block = _input.nextBlock(); if (block.isSuccessfulEndOfStreamBlock()) { - updateEosBlock(block, _statMap); - // no need to check early terminate signal b/c the current block is already EOS - sendTransferableBlock(block); - // After sending its own stats, the sending operator of the stage 1 has the complete view of all stats - // Therefore this is the only place we can update some of the metrics like total seen rows or time spent. - if (_context.getStageId() == 1) { - updateMetrics(block); + if (_context.isSendStats()) { + block = updateEosBlock(block, _statMap); + // no need to check early terminate signal b/c the current block is already EOS + sendTransferableBlock(block); + // After sending its own stats, the sending operator of the stage 1 has the complete view of all stats + // Therefore this is the only place we can update some of the metrics like total seen rows or time spent. + if (_context.getStageId() == 1) { + updateMetrics(block); + } + } else { + sendTransferableBlock(TransferableBlockUtils.getEndOfStreamTransferableBlock()); } } else { if (sendTransferableBlock(block)) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java index 9b0c923f89..dd8356b45f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java @@ -32,6 +32,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * This class is a utility class that helps to consume multiple mailboxes in a blocking manner by a single thread. + * + * The reader entry point is {@link #readBlockBlocking()} which will block until some of the mailboxes is ready to be + * read. The method is blocking and will return the next block to be consumed. This method is designed to be called by + * a single thread we call the consumer thread. + * + * All other methods but the ones specifically specified can only be called by the consumer thread. + * @param <E> + */ public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(BlockingMultiStreamConsumer.class); private final Object _id; @@ -55,44 +65,97 @@ public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable { _lastRead = _mailboxes.size() - 1; } + /** + * Returns whether the element is considered an error element or not. + * + * This method is called by the consumer thread. + */ protected abstract boolean isError(E element); + /** + * Returns whether the element is considered a successful end of stream element or not. + * + * This method is called by the consumer thread. + */ protected abstract boolean isEos(E element); /** - * This method is called whenever one of the consumer sends a EOS. It is guaranteed that the received element is - * an EOS as defined by {@link #isEos(Object)} + * This method is called whenever a {@link #isEos(Object) successful EOS} is read from one of the mailboxes. + * + * It is guaranteed that the received element is an EOS as defined by {@link #isEos(Object)}. + * + * This method is called by the consumer thread. */ - protected abstract void onConsumerFinish(E element); + protected abstract void onMailboxEnd(E element); + /** + * This method is called whenever a timeout is reached while reading an element. + * + * This method is called by the consumer thread. + */ protected abstract E onTimeout(); + /** + * This method is called whenever an exception is thrown while reading an element. + * + * This method is called by the consumer thread. + */ protected abstract E onException(Exception e); + /** + * This method is called whenever all mailboxes emitted EOS. + * + * This method is called by the consumer thread. + */ protected abstract E onEos(); + /** + * This method must be called when the consumer is not going to read anymore from the mailboxes. + * + * <strong>This method can be called from any thread</strong>. + */ @Override public void close() { cancelRemainingMailboxes(); } + /** + * This method is called whenever the consumer is cancelled. + * + * <strong>This method can be called from any thread</strong>. + */ public void cancel(Throwable t) { cancelRemainingMailboxes(); } + /** + * This method is called whenever the consumer is early terminated. + * + * This method is called by the consumer thread. + */ public void earlyTerminate() { for (AsyncStream<E> mailbox : _mailboxes) { mailbox.earlyTerminate(); } } + /** + * This method is called whenever the consumer is early terminated. + * + * <strong>This method can be called from any thread</strong>. + */ protected void cancelRemainingMailboxes() { for (AsyncStream<E> mailbox : _mailboxes) { mailbox.cancel(); } } - public void onData() { + /** + * This method is called whenever the consumer is early terminated. + * + * <strong>This method can be called by any thread</strong>, although it is expected to be called by producer threads. + */ + protected void onData() { if (_newDataReady.offer(Boolean.TRUE)) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("New data notification delivered on " + _id + ". " + System.identityHashCode(_newDataReady)); @@ -116,7 +179,9 @@ public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable { * Right now the implementation tries to be fair. If one call returned the block from mailbox {@code i}, then next * call will look for mailbox {@code i+1}, {@code i+2}... in a circular manner. * - * In order to unblock a thread blocked here, {@link #onData()} should be called. * + * In order to unblock a thread blocked here, {@link #onData()} should be called. + * + * This method is called by the consumer thread. */ public E readBlockBlocking() { if (LOGGER.isTraceEnabled()) { @@ -192,7 +257,7 @@ public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable { LOGGER.debug("==[RECEIVE]== EOS received : " + _id + " in mailbox: " + removed.getId() + " (mailboxes alive: " + ids + ")"); } - onConsumerFinish(block); + onMailboxEnd(block); block = readBlockOrNull(); } @@ -244,11 +309,14 @@ public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable { public static class OfTransferableBlock extends BlockingMultiStreamConsumer<TransferableBlock> { - private final MultiStageQueryStats _stats; + private final int _stageId; + @Nullable + private MultiStageQueryStats _stats; public OfTransferableBlock(OpChainExecutionContext context, List<? extends AsyncStream<TransferableBlock>> asyncProducers) { super(context.getId(), context.getDeadlineMs(), asyncProducers); + _stageId = context.getStageId(); _stats = MultiStageQueryStats.emptyStats(context.getStageId()); } @@ -263,18 +331,27 @@ public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable { } @Override - protected void onConsumerFinish(TransferableBlock element) { - if (element.getQueryStats() != null) { - _stats.mergeUpstream(element.getQueryStats()); - } else { - _stats.mergeUpstream(element.getSerializedStatsByStage()); + protected void onMailboxEnd(TransferableBlock element) { + try { + MultiStageQueryStats stats = _stats; + if (stats != null) { + if (element.getQueryStats() != null) { + stats.mergeUpstream(element.getQueryStats(), true); + } else { + stats.mergeUpstream(element.getSerializedStatsByStage(), true); + } + } + } catch (Exception e) { + // If there is any error merging stats, continue without them + LOGGER.warn("Error merging stats", e); + _stats = null; } } @Override protected TransferableBlock onTimeout() { // TODO: Add the sender stage id to the error message - String errMsg = "Timed out on stage " + _stats.getCurrentStageId() + " waiting for data sent by a child stage"; + String errMsg = "Timed out on stage " + _stageId + " waiting for data sent by a child stage"; // We log this case as debug because: // - The opchain will already log a stackless message once the opchain fail // - The trace is not useful (the log message is good enough to find where we failed) @@ -287,7 +364,7 @@ public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable { @Override protected TransferableBlock onException(Exception e) { // TODO: Add the sender stage id to the error message - String errMsg = "Found an error on stage " + _stats.getCurrentStageId() + " while reading from a child stage"; + String errMsg = "Found an error on stage " + _stageId + " while reading from a child stage"; // We log this case as warn because contrary to the timeout case, it should be rare to finish an execution // with an exception and the stack trace may be useful to find the root cause. LOGGER.warn(errMsg, e); @@ -296,7 +373,11 @@ public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable { @Override protected TransferableBlock onEos() { - return TransferableBlockUtils.getEndOfStreamTransferableBlock(_stats); + MultiStageQueryStats stats = _stats; + if (stats == null) { // possible in case of error + stats = MultiStageQueryStats.emptyStats(_stageId); + } + return TransferableBlockUtils.getEndOfStreamTransferableBlock(stats); } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java index 42f924d8fd..3cf898f981 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java @@ -213,6 +213,16 @@ public class MultiStageQueryStats { } } + /** + * Merge upstream stats from another MultiStageQueryStats object into this one. + * + * This method is equivalent to calling {@link #mergeUpstream(MultiStageQueryStats, boolean)} with bubbleUp set to + * false. + */ + public void mergeUpstream(MultiStageQueryStats otherStats) { + mergeUpstream(otherStats, false); + } + /** * Merge upstream stats from another MultiStageQueryStats object into this one. * <p> @@ -221,8 +231,10 @@ public class MultiStageQueryStats { * <p> * For example set operations may need to merge the stats from all its upstreams before concatenating stats of the * current stage. + * + * @param bubbleUp true if and only if runtime exceptions should be thrown when merging stats. */ - public void mergeUpstream(MultiStageQueryStats otherStats) { + public void mergeUpstream(MultiStageQueryStats otherStats, boolean bubbleUp) { Preconditions.checkArgument(_currentStageId <= otherStats._currentStageId, "Cannot merge stats from early stage %s into stats of later stage %s", otherStats._currentStageId, _currentStageId); @@ -256,12 +268,28 @@ public class MultiStageQueryStats { myStats.merge(otherStatsForStage); } } catch (IllegalArgumentException | IllegalStateException ex) { + if (bubbleUp) { + throw ex; + } LOGGER.warn("Error merging stats on stage {}. Ignoring the new stats", i, ex); } } } + /** + * Merge upstream stats from a list of DataBuffer objects into this one. + * + * This method is equivalent to calling {@link #mergeUpstream(List, boolean)} with bubbleUp set to false. + */ public void mergeUpstream(List<DataBuffer> otherStats) { + mergeUpstream(otherStats, false); + } + + /** + * Merge upstream stats from a list of DataBuffer objects into this one. + * @param bubbleUp true if and only if runtime exceptions should be thrown when merging stats. + */ + public void mergeUpstream(List<DataBuffer> otherStats, boolean bubbleUp) { for (int i = 0; i <= _currentStageId && i < otherStats.size(); i++) { if (otherStats.get(i) != null) { throw new IllegalArgumentException("Cannot merge stats from early stage " + i + " into stats of " @@ -283,8 +311,14 @@ public class MultiStageQueryStats { myStats.merge(dis); } } catch (IOException ex) { + if (bubbleUp) { + throw new RuntimeException("Error merging stats on stage " + i, ex); + } LOGGER.warn("Error deserializing stats on stage " + i + ". Considering the new stats empty", ex); } catch (IllegalArgumentException | IllegalStateException ex) { + if (bubbleUp) { + throw ex; + } LOGGER.warn("Error merging stats on stage " + i + ". Ignoring the new stats", ex); } } @@ -421,6 +455,10 @@ public class MultiStageQueryStats { return _operatorStats.size() - 1; } + public boolean isEmpty() { + return _operatorTypes.isEmpty(); + } + public void forEach(BiConsumer<MultiStageOperator.Type, StatMap<?>> consumer) { Iterator<MultiStageOperator.Type> typeIterator = _operatorTypes.iterator(); Iterator<StatMap<?>> statIterator = _operatorStats.iterator(); @@ -522,13 +560,13 @@ public class MultiStageQueryStats { if (numOperators != _operatorTypes.size()) { try { Closed deserialized = deserialize(input, numOperators); - throw new RuntimeException("Cannot merge stats from stages with different operators. Expected " + throw new IllegalStateException("Cannot merge stats from stages with different operators. Expected " + _operatorTypes + " operators, got " + numOperators + ". Deserialized stats: " + deserialized); } catch (IOException e) { throw new IOException("Cannot merge stats from stages with different operators. Expected " + _operatorTypes + " operators, got " + numOperators, e); } catch (RuntimeException e) { - throw new RuntimeException("Cannot merge stats from stages with different operators. Expected " + throw new IllegalStateException("Cannot merge stats from stages with different operators. Expected " + _operatorTypes + " operators, got " + numOperators, e); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java index 0b4e502762..5c5153a7d1 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java @@ -51,18 +51,21 @@ public class OpChainExecutionContext { private final PipelineBreakerResult _pipelineBreakerResult; private final boolean _traceEnabled; private final ThreadExecutionContext _parentContext; + private final boolean _sendStats; private ServerPlanRequestContext _leafStageContext; public OpChainExecutionContext(MailboxService mailboxService, long requestId, long deadlineMs, Map<String, String> opChainMetadata, StageMetadata stageMetadata, WorkerMetadata workerMetadata, - @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable ThreadExecutionContext parentContext) { + @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable ThreadExecutionContext parentContext, + boolean sendStats) { _mailboxService = mailboxService; _requestId = requestId; _deadlineMs = deadlineMs; _opChainMetadata = Collections.unmodifiableMap(opChainMetadata); _stageMetadata = stageMetadata; _workerMetadata = workerMetadata; + _sendStats = sendStats; _server = new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getPort(), workerMetadata.getWorkerId()); _id = new OpChainId(requestId, workerMetadata.getWorkerId(), stageMetadata.getStageId()); @@ -132,4 +135,8 @@ public class OpChainExecutionContext { public ThreadExecutionContext getParentContext() { return _parentContext; } + + public boolean isSendStats() { + return _sendStats; + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java index 841e0e9ad8..c531d652bf 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java @@ -51,14 +51,6 @@ public class PipelineBreakerExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(PipelineBreakerExecutor.class); - @Nullable - public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler, - MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan stagePlan, - Map<String, String> opChainMetadata, long requestId, long deadlineMs) { - return executePipelineBreakers(scheduler, mailboxService, workerMetadata, stagePlan, opChainMetadata, requestId, - deadlineMs, null); - } - /** * Execute a pipeline breaker and collect the results (synchronously). Currently, pipeline breaker executor can only * execute mailbox receive pipeline breaker. @@ -77,7 +69,7 @@ public class PipelineBreakerExecutor { public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler, MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan stagePlan, Map<String, String> opChainMetadata, long requestId, long deadlineMs, - @Nullable ThreadExecutionContext parentContext) { + @Nullable ThreadExecutionContext parentContext, boolean sendStats) { PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext(); PipelineBreakerVisitor.visitPlanRoot(stagePlan.getRootNode(), pipelineBreakerContext); if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) { @@ -87,7 +79,7 @@ public class PipelineBreakerExecutor { // see also: MailboxIdUtils TODOs, de-couple mailbox id from query information OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(mailboxService, requestId, deadlineMs, opChainMetadata, - stagePlan.getStageMetadata(), workerMetadata, null, parentContext); + stagePlan.getStageMetadata(), workerMetadata, null, parentContext, sendStats); return execute(scheduler, pipelineBreakerContext, opChainExecutionContext); } catch (Exception e) { LOGGER.error("Caught exception executing pipeline breaker for request: {}, stage: {}", requestId, diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index f3710866eb..0571141866 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -497,7 +497,7 @@ public class QueryDispatcher { ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext(); OpChainExecutionContext executionContext = new OpChainExecutionContext(mailboxService, requestId, deadlineMs, queryOptions, stageMetadata, - workerMetadata.get(0), null, parentContext); + workerMetadata.get(0), null, parentContext, true); PairList<Integer, String> resultFields = subPlan.getQueryResultFields(); DataSchema sourceSchema = receiveNode.getDataSchema(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java index 9de563af9b..f13f7f9e22 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java @@ -79,7 +79,7 @@ public class QueryServerEnclosure { HelixManager helixManager = mockHelixManager(factory.buildSchemaMap()); _queryRunner = new QueryRunner(); _queryRunner.init(new PinotConfiguration(runnerConfig), instanceDataManager, helixManager, ServerMetrics.get(), - null); + null, () -> true); } private HelixManager mockHelixManager(Map<String, Schema> schemaMap) { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java index 74e2e55af0..c702f3c47c 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java @@ -80,7 +80,8 @@ public class OpChainSchedulerServiceTest { WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), ImmutableMap.of()); OpChainExecutionContext context = new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE, ImmutableMap.of(), - new StageMetadata(0, ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null, null); + new StageMetadata(0, ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null, null, + true); return new OpChain(context, operator); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java index b47068661d..efdc4e775a 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java @@ -18,11 +18,13 @@ */ package org.apache.pinot.query.runtime.operator; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.calcite.rel.RelDistribution; +import org.apache.pinot.common.datablock.MetadataBlock; import org.apache.pinot.common.datatable.StatMap; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.mailbox.MailboxService; @@ -37,7 +39,10 @@ import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockTestUtils; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.operator.MultiStageOperator.Type; +import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.segment.spi.memory.DataBuffer; import org.apache.pinot.spi.exception.QueryErrorCode; import org.mockito.Mock; import org.testng.annotations.AfterMethod; @@ -51,8 +56,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.openMocks; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; public class MailboxReceiveOperatorTest { @@ -229,6 +233,75 @@ public class MailboxReceiveOperatorTest { } } + @Test + public void differentUpstreamHeapStatsProduceEmptyStats() { + when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1); + MultiStageQueryStats stats1 = new MultiStageQueryStats.Builder(1) + .addLast(open -> + open.addLastOperator(Type.MAILBOX_SEND, new StatMap<>(MailboxSendOperator.StatKey.class)) + .addLastOperator(Type.LEAF, new StatMap<>(LeafStageTransferableBlockOperator.StatKey.class)) + .close()) + .build(); + TransferableBlock block1 = TransferableBlockUtils.getEndOfStreamTransferableBlock(stats1); + when(_mailbox1.poll()).thenReturn(block1); + + when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_2))).thenReturn(_mailbox2); + MultiStageQueryStats stats2 = new MultiStageQueryStats.Builder(1) + .addLast(open -> + open.addLastOperator(Type.MAILBOX_SEND, new StatMap<>(MailboxSendOperator.StatKey.class)) + .addLastOperator(Type.FILTER, new StatMap<>(FilterOperator.StatKey.class)) + .addLastOperator(Type.LEAF, new StatMap<>(LeafStageTransferableBlockOperator.StatKey.class)) + .close()) + .build(); + TransferableBlock block2 = TransferableBlockUtils.getEndOfStreamTransferableBlock(stats2); + when(_mailbox2.poll()).thenReturn(block2); + + try (MailboxReceiveOperator operator = getOperator(_stageMetadataBoth, RelDistribution.Type.SINGLETON)) { + TransferableBlock block = operator.nextBlock(); + assertTrue(block.isSuccessfulEndOfStreamBlock(), "Block should be successful EOS"); + assertNotNull(block.getQueryStats(), "Query stats should not be null"); + MultiStageQueryStats.StageStats.Closed upstreamStats = block.getQueryStats().getUpstreamStageStats(1); + assertNull(upstreamStats, "Upstream stats should be null in case of error merging stats"); + } + } + + @Test + public void differentSerializedUpstreamStatsProduceEmptyStats() + throws IOException { + when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1); + List<DataBuffer> stats1 = new MultiStageQueryStats.Builder(1) + .addLast(open -> + open.addLastOperator(Type.MAILBOX_SEND, new StatMap<>(MailboxSendOperator.StatKey.class)) + .addLastOperator(Type.LEAF, new StatMap<>(LeafStageTransferableBlockOperator.StatKey.class)) + .close()) + .build() + .serialize(); + MetadataBlock metadataBlock1 = new MetadataBlock(stats1); + TransferableBlock block1 = TransferableBlockUtils.wrap(metadataBlock1); + when(_mailbox1.poll()).thenReturn(block1); + + when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_2))).thenReturn(_mailbox2); + List<DataBuffer> stats2 = new MultiStageQueryStats.Builder(1) + .addLast(open -> + open.addLastOperator(Type.MAILBOX_SEND, new StatMap<>(MailboxSendOperator.StatKey.class)) + .addLastOperator(Type.FILTER, new StatMap<>(FilterOperator.StatKey.class)) + .addLastOperator(Type.LEAF, new StatMap<>(LeafStageTransferableBlockOperator.StatKey.class)) + .close()) + .build() + .serialize(); + MetadataBlock metadataBlock2 = new MetadataBlock(stats2); + TransferableBlock block2 = TransferableBlockUtils.wrap(metadataBlock2); + when(_mailbox2.poll()).thenReturn(block2); + + try (MailboxReceiveOperator operator = getOperator(_stageMetadataBoth, RelDistribution.Type.SINGLETON)) { + TransferableBlock block = operator.nextBlock(); + assertTrue(block.isSuccessfulEndOfStreamBlock(), "Block should be successful EOS"); + assertNotNull(block.getQueryStats(), "Query stats should not be null"); + MultiStageQueryStats.StageStats.Closed upstreamStats = block.getQueryStats().getUpstreamStageStats(1); + assertNull(upstreamStats, "Upstream stats should be null in case of error merging stats"); + } + } + private MailboxReceiveOperator getOperator(StageMetadata stageMetadata, RelDistribution.Type distributionType, long deadlineMs) { OpChainExecutionContext context = OperatorTestUtil.getOpChainContext(_mailboxService, deadlineMs, stageMetadata); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index a54dc182a9..aa384e7b97 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -197,7 +197,7 @@ public class MailboxSendOperatorTest { StageMetadata stageMetadata = new StageMetadata(SENDER_STAGE_ID, List.of(workerMetadata), Map.of()); OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService, 123L, Long.MAX_VALUE, Map.of(), stageMetadata, workerMetadata, - null, null); + null, null, true); return new MailboxSendOperator(context, _input, statMap -> _exchange); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java index 0d6317ab2d..5a824f51e2 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java @@ -83,7 +83,7 @@ public class OperatorTestUtil { public static OpChainExecutionContext getOpChainContext(MailboxService mailboxService, long deadlineMs, StageMetadata stageMetadata) { return new OpChainExecutionContext(mailboxService, 0, deadlineMs, ImmutableMap.of(), stageMetadata, - stageMetadata.getWorkerMetadataList().get(0), null, null); + stageMetadata.getWorkerMetadataList().get(0), null, null, true); } public static OpChainExecutionContext getTracingContext() { @@ -105,7 +105,7 @@ public class OperatorTestUtil { WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), ImmutableMap.of()); StageMetadata stageMetadata = new StageMetadata(0, ImmutableList.of(workerMetadata), ImmutableMap.of()); OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE, - opChainMetadata, stageMetadata, workerMetadata, null, null); + opChainMetadata, stageMetadata, workerMetadata, null, null, true); StagePlan stagePlan = new StagePlan(null, stageMetadata); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java index 51e766e8e0..3991505a7f 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java @@ -22,9 +22,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.core.JoinRelType; import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType; @@ -101,6 +103,14 @@ public class PipelineBreakerExecutorTest { _mocks.close(); } + @Nullable + public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler, + MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan stagePlan, + Map<String, String> opChainMetadata, long requestId, long deadlineMs) { + return PipelineBreakerExecutor.executePipelineBreakers(scheduler, mailboxService, workerMetadata, stagePlan, + opChainMetadata, requestId, deadlineMs, null, true); + } + @AfterClass public void tearDown() { ExecutorServiceUtils.close(_executor); @@ -120,7 +130,7 @@ public class PipelineBreakerExecutorTest { TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(1))); PipelineBreakerResult pipelineBreakerResult = - PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan, + executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan, ImmutableMap.of(), 0, Long.MAX_VALUE); // then @@ -156,7 +166,7 @@ public class PipelineBreakerExecutorTest { TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(2))); PipelineBreakerResult pipelineBreakerResult = - PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan, + executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan, ImmutableMap.of(), 0, Long.MAX_VALUE); // then @@ -184,7 +194,7 @@ public class PipelineBreakerExecutorTest { // when PipelineBreakerResult pipelineBreakerResult = - PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan, + executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan, ImmutableMap.of(), 0, Long.MAX_VALUE); // then @@ -212,7 +222,7 @@ public class PipelineBreakerExecutorTest { }); PipelineBreakerResult pipelineBreakerResult = - PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan, + executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan, ImmutableMap.of(), 0, System.currentTimeMillis() + 100); // then @@ -245,7 +255,7 @@ public class PipelineBreakerExecutorTest { TransferableBlockTestUtils.getEndOfStreamTransferableBlock(1)); PipelineBreakerResult pipelineBreakerResult = - PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan, + executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan, ImmutableMap.of(), 0, Long.MAX_VALUE); // then @@ -278,7 +288,7 @@ public class PipelineBreakerExecutorTest { TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)); PipelineBreakerResult pipelineBreakerResult = - PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan, + executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan, ImmutableMap.of(), 0, Long.MAX_VALUE); // then diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java index 8d20db0335..2b4ac80523 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java @@ -49,6 +49,7 @@ import org.apache.pinot.server.access.AccessControl; import org.apache.pinot.server.access.AccessControlFactory; import org.apache.pinot.server.access.AllowAllAccessFactory; import org.apache.pinot.server.conf.ServerConf; +import org.apache.pinot.server.starter.helix.SendStatsPredicate; import org.apache.pinot.server.worker.WorkerQueryServer; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.metrics.PinotMetricUtils; @@ -84,7 +85,7 @@ public class ServerInstance { private boolean _queryServerStarted = false; public ServerInstance(ServerConf serverConf, HelixManager helixManager, AccessControlFactory accessControlFactory, - @Nullable SegmentOperationsThrottler segmentOperationsThrottler) + @Nullable SegmentOperationsThrottler segmentOperationsThrottler, SendStatsPredicate sendStatsPredicate) throws Exception { LOGGER.info("Initializing server instance"); _helixManager = helixManager; @@ -131,7 +132,7 @@ public class ServerInstance { LOGGER.info("Initializing Multi-stage query engine"); _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), _instanceDataManager, helixManager, _serverMetrics, - serverConf.isMultiStageEngineTlsEnabled() ? tlsConfig : null); + serverConf.isMultiStageEngineTlsEnabled() ? tlsConfig : null, sendStatsPredicate); } else { _workerQueryServer = null; } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 5c0a7e74f0..ed5277d63d 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -454,6 +454,7 @@ public abstract class BaseServerStarter implements ServiceStartable { updated |= updatePortIfNeeded(simpleFields, Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, serverConf.getMultiStageMailboxPort()); } + updated |= HelixHelper.updatePinotVersion(instanceConfig); // Update environment properties if (_pinotEnvironmentProvider != null) { @@ -663,8 +664,10 @@ public abstract class BaseServerStarter implements ServiceStartable { new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler, segmentStarTreePreprocessThrottler, segmentDownloadThrottler); + SendStatsPredicate sendStatsPredicate = SendStatsPredicate.create(_serverConf); ServerConf serverConf = new ServerConf(_serverConf); - _serverInstance = new ServerInstance(serverConf, _helixManager, _accessControlFactory, _segmentOperationsThrottler); + _serverInstance = new ServerInstance(serverConf, _helixManager, _accessControlFactory, _segmentOperationsThrottler, + sendStatsPredicate); ServerMetrics serverMetrics = _serverInstance.getServerMetrics(); InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); @@ -694,6 +697,13 @@ public abstract class BaseServerStarter implements ServiceStartable { } _clusterConfigChangeHandler.registerClusterConfigChangeListener(_segmentOperationsThrottler); + LOGGER.info("Initializing and registering the SendStatsPredicate"); + try { + _helixManager.addInstanceConfigChangeListener(sendStatsPredicate); + } catch (Exception e) { + LOGGER.error("Failed to register SendStatsPredicate as the Helix InstanceConfigChangeListener", e); + } + // Start restlet server for admin API endpoint LOGGER.info("Starting server admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs)); _adminApiApplication = createServerAdminApp(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java new file mode 100644 index 0000000000..833f7d4504 --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.starter.helix; + +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; +import org.apache.helix.NotificationContext; +import org.apache.helix.api.listeners.InstanceConfigChangeListener; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.version.PinotVersion; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/// A class used to determine whether MSE should to send stats or not. +/// +/// The stat mechanism used in MSE is very efficient, so contrary to what we do in SSE, we decided to always collect and +/// send stats in MSE. However, there are some versions of Pinot that have known issues with the stats mechanism, so we +/// created this class as a mechanism to disable stats sending in case of problematic versions. +/// +/// Specifically, Pinot 1.3.0 and lower have known issues when they receive unexpected stats from upstream stages, but +/// even these versions are prepared to receive empty stats from upstream stages. +/// Therefore the cleanest and safer solution is to not send stats when we know a problematic version is in the cluster. +/// +/// We support three modes: +/// - SAFE: This is the default mode. In this mode, we will send stats unless we detect a problematic version in the +/// cluster. This doesn't require human intervention and is the recommended mode. +/// - ALWAYS: In this mode, we will always send stats, regardless of the version of the cluster. This mimics the +/// behavior in 1.3.0 and lower versions. +/// - NEVER: In this mode, we will never send stats, regardless of the version of the cluster. This is useful for +/// testing purposes or if for whatever reason you want to disable stats. +public abstract class SendStatsPredicate implements InstanceConfigChangeListener { + private static final Logger LOGGER = LoggerFactory.getLogger(SendStatsPredicate.class); + + public abstract boolean getSendStats(); + + public static SendStatsPredicate create(PinotConfiguration configuration) { + String modeStr = configuration.getProperty( + CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE, + CommonConstants.MultiStageQueryRunner.DEFAULT_SEND_STATS_MODE).toUpperCase(Locale.ENGLISH); + Mode mode; + try { + mode = Mode.valueOf(modeStr.trim().toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid value " + modeStr + " for " + + CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE, e); + } + return mode.create(); + } + + public enum Mode { + SAFE { + @Override + public SendStatsPredicate create() { + return new Safe(); + } + }, + ALWAYS { + @Override + public SendStatsPredicate create() { + return new SendStatsPredicate() { + @Override + public boolean getSendStats() { + return true; + } + + @Override + public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext context) { + // Nothing to do + } + }; + } + }, + NEVER { + @Override + public SendStatsPredicate create() { + return new SendStatsPredicate() { + @Override + public boolean getSendStats() { + return false; + } + + @Override + public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext context) { + // Nothing to do + } + }; + } + }; + + public abstract SendStatsPredicate create(); + } + + private static class Safe extends SendStatsPredicate { + private final AtomicBoolean _sendStats = new AtomicBoolean(true); + + @Override + public boolean getSendStats() { + return _sendStats.get(); + } + + @Override + public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext context) { + Map<String, String> problematicVersionsById = new HashMap<>(); + for (InstanceConfig instanceConfig : instanceConfigs) { + switch (InstanceTypeUtils.getInstanceType(instanceConfig.getInstanceName())) { + case BROKER: + case SERVER: + String otherVersion = instanceConfig.getRecord() + .getStringField(CommonConstants.Helix.Instance.PINOT_VERSION_KEY, null); + if (isProblematicVersion(otherVersion)) { + problematicVersionsById.put(instanceConfig.getInstanceName(), otherVersion); + } + break; + default: + continue; + } + } + boolean sendStats = problematicVersionsById.isEmpty(); + if (_sendStats.getAndSet(sendStats) != sendStats) { + if (sendStats) { + LOGGER.warn("Send MSE stats is now enabled"); + } else { + LOGGER.warn("Send MSE stats is now disabled (problematic versions: {})", problematicVersionsById); + } + } + } + + /// Returns true if the version is problematic + /// + /// Ideally [PinotVersion] should have a way to extract versions in comparable format, but given it doesn't we + /// need to parse the string here. In case version doesn't match `1\.x\..*`, we treat is as a problematic version + private boolean isProblematicVersion(@Nullable String versionStr) { + if (versionStr == null) { + return true; + } + if (versionStr.equals(PinotVersion.UNKNOWN)) { + return true; + } + if (versionStr.equals(PinotVersion.VERSION)) { + return false; + } + // Lets try to parse 1.x versions + String[] splits = versionStr.trim().split("\\."); + if (splits.length < 2) { + return true; + } + // Versions less than 1.x are problematic for sure + if (!splits[0].equals("1")) { + return true; + } + try { + // Versions less than 1.4 are problematic + if (Integer.parseInt(splits[1]) < 4) { + return true; + } + } catch (NumberFormatException e) { + return true; + } + return false; + } + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java index 0d177d7fa0..26041b8fcb 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java @@ -25,6 +25,7 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.query.runtime.QueryRunner; import org.apache.pinot.query.service.server.QueryServer; +import org.apache.pinot.server.starter.helix.SendStatsPredicate; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.NetUtils; @@ -37,12 +38,14 @@ public class WorkerQueryServer { private final QueryServer _queryWorkerService; public WorkerQueryServer(PinotConfiguration configuration, InstanceDataManager instanceDataManager, - HelixManager helixManager, ServerMetrics serverMetrics, @Nullable TlsConfig tlsConfig) { + HelixManager helixManager, ServerMetrics serverMetrics, @Nullable TlsConfig tlsConfig, + SendStatsPredicate sendStats) { _configuration = toWorkerQueryConfig(configuration); _queryServicePort = _configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT, CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT); QueryRunner queryRunner = new QueryRunner(); - queryRunner.init(_configuration, instanceDataManager, helixManager, serverMetrics, tlsConfig); + queryRunner.init(_configuration, instanceDataManager, helixManager, serverMetrics, tlsConfig, + sendStats::getSendStats); _queryWorkerService = new QueryServer(_queryServicePort, queryRunner, tlsConfig, serverMetrics, configuration); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index d4c5cacb2f..9acbda4f69 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -205,6 +205,7 @@ public class CommonConstants { public static final String MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY = "queryMailboxPort"; public static final String SYSTEM_RESOURCE_INFO_KEY = "SYSTEM_RESOURCE_INFO"; + public static final String PINOT_VERSION_KEY = "pinotVersion"; } public static final String SET_INSTANCE_ID_TO_HOSTNAME_KEY = "pinot.set.instance.id.to.hostname"; @@ -1473,6 +1474,23 @@ public class CommonConstants { public static final String KEY_OF_MAX_ROWS_IN_JOIN = "pinot.query.join.max.rows"; public static final String KEY_OF_JOIN_OVERFLOW_MODE = "pinot.query.join.overflow.mode"; + /// Specifies the send stats mode used in MSE. + /// + /// Valid values are (in lower or upper case): + /// - "SAFE": MSE will only send stats if all instances in the cluster are running 1.4.0 or later. + /// - "ALWAYS": MSE will always send stats, regardless of the version of the instances in the cluster. + /// - "NEVER": MSE will never send stats. + /// + /// The reason for this flag that versions 1.3.0 and lower have two undesired behaviors: + /// 1. Some queries using intersection generate incorrect stats + /// 2. When stats from other nodes are sent but are different from expected, the query fails. + /// + /// In 1.4.0 the first issue is solved and instead of failing when unexpected stats are received, the query + /// continues without children stats. But if a query involves servers in versions 1.3.0 and 1.4.0, the one + /// running 1.3.0 may fail, which breaks backward compatibility. + public static final String KEY_OF_SEND_STATS_MODE = "pinot.query.mse.stats.mode"; + public static final String DEFAULT_SEND_STATS_MODE = "SAFE"; + public enum JoinOverFlowMode { THROW, BREAK } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org