This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b2f7045d7e Make stats system more resilient in MSE (#15312)
b2f7045d7e is described below

commit b2f7045d7ebbdd879f4636b174bcffd866c2e512
Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com>
AuthorDate: Tue Mar 25 11:54:54 2025 +0100

    Make stats system more resilient in MSE (#15312)
---
 .../broker/broker/helix/BaseBrokerStarter.java     |   1 +
 .../pinot/common/utils/helix/HelixHelper.java      |  12 ++
 .../apache/pinot/common/version/PinotVersion.java  |   5 +-
 .../pinot/controller/BaseControllerStarter.java    |   2 +
 .../MultiStageWithoutStatsIntegrationTest.java     | 171 +++++++++++++++++++
 .../org/apache/pinot/minion/BaseMinionStarter.java |   1 +
 .../apache/pinot/query/runtime/QueryRunner.java    |  12 +-
 .../runtime/operator/MailboxSendOperator.java      |  18 +-
 .../utils/BlockingMultiStreamConsumer.java         | 111 +++++++++++--
 .../query/runtime/plan/MultiStageQueryStats.java   |  44 ++++-
 .../runtime/plan/OpChainExecutionContext.java      |   9 +-
 .../plan/pipeline/PipelineBreakerExecutor.java     |  12 +-
 .../query/service/dispatch/QueryDispatcher.java    |   2 +-
 .../apache/pinot/query/QueryServerEnclosure.java   |   2 +-
 .../executor/OpChainSchedulerServiceTest.java      |   3 +-
 .../operator/MailboxReceiveOperatorTest.java       |  77 ++++++++-
 .../runtime/operator/MailboxSendOperatorTest.java  |   2 +-
 .../query/runtime/operator/OperatorTestUtil.java   |   4 +-
 .../plan/pipeline/PipelineBreakerExecutorTest.java |  22 ++-
 .../pinot/server/starter/ServerInstance.java       |   5 +-
 .../server/starter/helix/BaseServerStarter.java    |  12 +-
 .../server/starter/helix/SendStatsPredicate.java   | 185 +++++++++++++++++++++
 .../pinot/server/worker/WorkerQueryServer.java     |   7 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |  18 ++
 24 files changed, 676 insertions(+), 61 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index e136ade820..8ba28333cc 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -549,6 +549,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       instanceTags = instanceConfig.getTags();
       updated = true;
     }
+    updated |= HelixHelper.updatePinotVersion(instanceConfig);
     if (updated) {
       HelixHelper.updateInstanceConfig(_participantHelixManager, 
instanceConfig);
     }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index 37c36cd567..c5071e4939 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -45,6 +45,7 @@ import 
org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.pinot.common.helix.ExtraInstanceConfig;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.version.PinotVersion;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -642,4 +643,15 @@ public class HelixHelper {
     boolean mapUpdated = record.getMapFields().remove(disabledPartitionsKey) 
!= null;
     return listUpdated | mapUpdated;
   }
+
+  public static boolean updatePinotVersion(InstanceConfig instanceConfig) {
+    ZNRecord record = instanceConfig.getRecord();
+    String currentVer = PinotVersion.VERSION;
+    String oldVer = 
record.getSimpleField(CommonConstants.Helix.Instance.PINOT_VERSION_KEY);
+    if (!currentVer.equals(oldVer)) {
+      record.setSimpleField(CommonConstants.Helix.Instance.PINOT_VERSION_KEY, 
currentVer);
+      return true;
+    }
+    return false;
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/version/PinotVersion.java 
b/pinot-common/src/main/java/org/apache/pinot/common/version/PinotVersion.java
index ad52a015b4..1058cc0f29 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/version/PinotVersion.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/version/PinotVersion.java
@@ -40,6 +40,7 @@ public class PinotVersion {
    * in a local, non-maven build this will resolve to {@code UNKNOWN}.
    */
   public static final String VERSION;
+  public static final String UNKNOWN = "UNKNOWN";
 
   /**
    * A sanitized version string with all dots replaced with underscores, which 
is necessary
@@ -61,13 +62,13 @@ public class PinotVersion {
       version = String.valueOf(properties.get("pinot.version"));
     } catch (IOException e) {
       LOGGER.error("Could not load version properties; setting version to 
UNKNOWN.", e);
-      version = "UNKNOWN";
+      version = UNKNOWN;
     }
 
     // if building this via some non-maven environment (e.g. IntelliJ) it is 
possible that
     // the properties file will not be properly filtered, in which case just 
return UNKNOWN
     if (version.equals("${project.version}")) {
-      VERSION = "UNKNOWN";
+      VERSION = UNKNOWN;
       LOGGER.warn("Using UNKNOWN version properties because project.version 
was not resolved during build.");
     } else {
       VERSION = version;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index e0a20b0798..c45edbf7b5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -838,6 +838,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     updated |= HelixHelper.addDefaultTags(instanceConfig,
         () -> 
Collections.singletonList(CommonConstants.Helix.CONTROLLER_INSTANCE));
     updated |= HelixHelper.removeDisabledPartitions(instanceConfig);
+    updated |= HelixHelper.updatePinotVersion(instanceConfig);
+
     if (updated) {
       HelixHelper.updateInstanceConfig(_helixParticipantManager, 
instanceConfig);
     }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java
new file mode 100644
index 0000000000..69ef1e27c8
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.pinot.server.starter.helix.SendStatsPredicate;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
+import org.intellij.lang.annotations.Language;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class MultiStageWithoutStatsIntegrationTest extends 
BaseClusterIntegrationTestSet {
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    startBrokers(1);
+    startServers(1);
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload segments. For exhaustive testing, concurrently upload 
multiple segments with the same name
+    // and validate correctness with parallel push protection enabled.
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
+    // Create a copy of _tarDir to create multiple segments with the same name.
+    File tarDir2 = new File(_tempDir, "tarDir2");
+    FileUtils.copyDirectory(_tarDir, tarDir2);
+
+    List<File> tarDirs = new ArrayList<>();
+    tarDirs.add(_tarDir);
+    tarDirs.add(tarDir2);
+    try {
+      uploadSegments(getTableName(), TableType.OFFLINE, tarDirs);
+    } catch (Exception e) {
+      // If enableParallelPushProtection is enabled and the same segment is 
uploaded concurrently, we could get one
+      // of the three exception:
+      //   - 409 conflict of the second call enters ProcessExistingSegment
+      //   - segmentZkMetadata creation failure if both calls entered 
ProcessNewSegment
+      //   - Failed to copy segment tar file to final location due to the same 
segment pushed twice concurrently
+      // In such cases we upload all the segments again to ensure that the 
data is set up correctly.
+      assertTrue(e.getMessage().contains("Another segment upload is in 
progress for segment") || e.getMessage()
+          .contains("Failed to create ZK metadata for segment") || 
e.getMessage()
+          .contains("java.nio.file.FileAlreadyExistsException"), 
e.getMessage());
+      uploadSegments(getTableName(), _tarDir);
+    }
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    
serverConf.setProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE,
+        SendStatsPredicate.Mode.NEVER.name());
+    super.overrideServerConf(serverConf);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    // Brokers and servers has been stopped
+    stopController();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  public boolean useMultiStageQueryEngine() {
+    return true;
+  }
+
+  @Override
+  protected List<FieldConfig> getFieldConfigs() {
+    return Collections.singletonList(
+        new FieldConfig("DivAirports", FieldConfig.EncodingType.DICTIONARY, 
Collections.emptyList(),
+            FieldConfig.CompressionCodec.MV_ENTRY_DICT, null));
+  }
+
+  @Test
+  public void testIntersection()
+      throws Exception {
+    @Language("sql")
+    String query = "SELECT *\n"
+        + "FROM (\n"
+        + "    SELECT CarrierDelay\n"
+        + "    FROM mytable\n"
+        + "    WHERE DaysSinceEpoch > 0\n"
+        + "  )\n"
+        + "INTERSECT\n"
+        + "(\n"
+        + "  SELECT ArrDelay\n"
+        + "  FROM mytable\n"
+        + "  WHERE DaysSinceEpoch > 0\n"
+        + ")";
+    JsonNode node = postQuery(query);
+
+    // Expected:
+    // "stageStats" : {
+    //    "type" : "MAILBOX_RECEIVE",
+    //    "executionTimeMs" : whatever,
+    //    "fanIn" : whatever,
+    //    "rawMessages" : whatever,
+    //    "deserializedBytes" : whatever,
+    //    "upstreamWaitMs" : whatever,
+    //    "children" : [ {
+    //      "type" : "EMPTY_MAILBOX_SEND",
+    //      "stage" : 1,
+    //      "description" : "No stats available for this stage. It may have 
been pruned."
+    //    } ]
+    //  }
+
+    JsonNode stageStats = node.get("stageStats");
+    assertNotNull(stageStats, "Stage stats should not be null");
+
+    assertEquals(stageStats.get("type").asText(), "MAILBOX_RECEIVE");
+
+    JsonNode children = stageStats.get("children");
+    assertNotNull(children, "Children should not be null");
+    assertEquals(children.size(), 1);
+
+    JsonNode child = children.get(0);
+    assertEquals(child.get("type").asText(), "EMPTY_MAILBOX_SEND");
+    assertEquals(child.get("stage").asInt(), 1);
+    assertEquals(child.get("description").asText(), "No stats available for 
this stage. It may have been pruned.");
+  }
+}
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java 
b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
index c55d9c1981..9daad84106 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
@@ -142,6 +142,7 @@ public abstract class BaseMinionStarter implements 
ServiceStartable {
     updated |= HelixHelper.addDefaultTags(instanceConfig,
         () -> 
Collections.singletonList(CommonConstants.Helix.UNTAGGED_MINION_INSTANCE));
     updated |= HelixHelper.removeDisabledPartitions(instanceConfig);
+    updated |= HelixHelper.updatePinotVersion(instanceConfig);
     if (updated) {
       HelixHelper.updateInstanceConfig(_helixManager, instanceConfig);
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 1ba7fda9ac..7320061c5e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -132,13 +133,14 @@ public class QueryRunner {
   private JoinOverFlowMode _joinOverflowMode;
   @Nullable
   private PhysicalTimeSeriesServerPlanVisitor _timeSeriesPhysicalPlanVisitor;
+  private BooleanSupplier _sendStats;
 
   /**
    * Initializes the query executor.
    * <p>Should be called only once and before calling any other method.
    */
   public void init(PinotConfiguration config, InstanceDataManager 
instanceDataManager, HelixManager helixManager,
-      ServerMetrics serverMetrics, @Nullable TlsConfig tlsConfig) {
+      ServerMetrics serverMetrics, @Nullable TlsConfig tlsConfig, 
BooleanSupplier sendStats) {
     String hostname = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
     if (hostname.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
       hostname = 
hostname.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH);
@@ -213,6 +215,8 @@ public class QueryRunner {
       TimeSeriesBuilderFactoryProvider.init(config);
     }
 
+    _sendStats = sendStats;
+
     LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", 
hostname, port);
   }
 
@@ -250,7 +254,7 @@ public class QueryRunner {
     // run pre-stage execution for all pipeline breakers
     PipelineBreakerResult pipelineBreakerResult =
         PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler, 
_mailboxService, workerMetadata, stagePlan,
-            opChainMetadata, requestId, deadlineMs, parentContext);
+            opChainMetadata, requestId, deadlineMs, parentContext, 
_sendStats.getAsBoolean());
 
     // Send error block to all the receivers if pipeline breaker fails
     if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() 
!= null) {
@@ -287,7 +291,7 @@ public class QueryRunner {
     // run OpChain
     OpChainExecutionContext executionContext =
         new OpChainExecutionContext(_mailboxService, requestId, deadlineMs, 
opChainMetadata, stageMetadata,
-            workerMetadata, pipelineBreakerResult, parentContext);
+            workerMetadata, pipelineBreakerResult, parentContext, 
_sendStats.getAsBoolean());
     OpChain opChain;
     if (workerMetadata.isLeafStageWorker()) {
       opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, 
stagePlan, _helixManager, _serverMetrics,
@@ -470,7 +474,7 @@ public class QueryRunner {
     };
     // compile OpChain
     OpChainExecutionContext executionContext = new 
OpChainExecutionContext(_mailboxService, requestId, deadlineMs,
-        opChainMetadata, stageMetadata, workerMetadata, null, null);
+        opChainMetadata, stageMetadata, workerMetadata, null, null, false);
 
     OpChain opChain = 
ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, 
_helixManager,
         _serverMetrics, _leafQueryExecutor, _executorService, 
leafNodesConsumer, true);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 5e0a1938ed..ad91e32dd2 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -202,13 +202,17 @@ public class MailboxSendOperator extends 
MultiStageOperator {
     try {
       TransferableBlock block = _input.nextBlock();
       if (block.isSuccessfulEndOfStreamBlock()) {
-        updateEosBlock(block, _statMap);
-        // no need to check early terminate signal b/c the current block is 
already EOS
-        sendTransferableBlock(block);
-        // After sending its own stats, the sending operator of the stage 1 
has the complete view of all stats
-        // Therefore this is the only place we can update some of the metrics 
like total seen rows or time spent.
-        if (_context.getStageId() == 1) {
-          updateMetrics(block);
+        if (_context.isSendStats()) {
+          block = updateEosBlock(block, _statMap);
+          // no need to check early terminate signal b/c the current block is 
already EOS
+          sendTransferableBlock(block);
+          // After sending its own stats, the sending operator of the stage 1 
has the complete view of all stats
+          // Therefore this is the only place we can update some of the 
metrics like total seen rows or time spent.
+          if (_context.getStageId() == 1) {
+            updateMetrics(block);
+          }
+        } else {
+          
sendTransferableBlock(TransferableBlockUtils.getEndOfStreamTransferableBlock());
         }
       } else {
         if (sendTransferableBlock(block)) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
index 9b0c923f89..dd8356b45f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
@@ -32,6 +32,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * This class is a utility class that helps to consume multiple mailboxes in a 
blocking manner by a single thread.
+ *
+ * The reader entry point is {@link #readBlockBlocking()} which will block 
until some of the mailboxes is ready to be
+ * read. The method is blocking and will return the next block to be consumed. 
This method is designed to be called by
+ * a single thread we call the consumer thread.
+ *
+ * All other methods but the ones specifically specified can only be called by 
the consumer thread.
+ * @param <E>
+ */
 public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BlockingMultiStreamConsumer.class);
   private final Object _id;
@@ -55,44 +65,97 @@ public abstract class BlockingMultiStreamConsumer<E> 
implements AutoCloseable {
     _lastRead = _mailboxes.size() - 1;
   }
 
+  /**
+   * Returns whether the element is considered an error element or not.
+   *
+   * This method is called by the consumer thread.
+   */
   protected abstract boolean isError(E element);
 
+  /**
+   * Returns whether the element is considered a successful end of stream 
element or not.
+   *
+   * This method is called by the consumer thread.
+   */
   protected abstract boolean isEos(E element);
 
   /**
-   * This method is called whenever one of the consumer sends a EOS. It is 
guaranteed that the received element is
-   * an EOS as defined by {@link #isEos(Object)}
+   * This method is called whenever a {@link #isEos(Object) successful EOS} is 
read from one of the mailboxes.
+   *
+   * It is guaranteed that the received element is an EOS as defined by {@link 
#isEos(Object)}.
+   *
+   * This method is called by the consumer thread.
    */
-  protected abstract void onConsumerFinish(E element);
+  protected abstract void onMailboxEnd(E element);
 
+  /**
+   * This method is called whenever a timeout is reached while reading an 
element.
+   *
+   * This method is called by the consumer thread.
+   */
   protected abstract E onTimeout();
 
+  /**
+   * This method is called whenever an exception is thrown while reading an 
element.
+   *
+   * This method is called by the consumer thread.
+   */
   protected abstract E onException(Exception e);
 
+  /**
+   * This method is called whenever all mailboxes emitted EOS.
+   *
+   * This method is called by the consumer thread.
+   */
   protected abstract E onEos();
 
+  /**
+   * This method must be called when the consumer is not going to read anymore 
from the mailboxes.
+   *
+   * <strong>This method can be called from any thread</strong>.
+   */
   @Override
   public void close() {
     cancelRemainingMailboxes();
   }
 
+  /**
+   * This method is called whenever the consumer is cancelled.
+   *
+   * <strong>This method can be called from any thread</strong>.
+   */
   public void cancel(Throwable t) {
     cancelRemainingMailboxes();
   }
 
+  /**
+   * This method is called whenever the consumer is early terminated.
+   *
+   * This method is called by the consumer thread.
+   */
   public void earlyTerminate() {
     for (AsyncStream<E> mailbox : _mailboxes) {
       mailbox.earlyTerminate();
     }
   }
 
+  /**
+   * This method is called whenever the consumer is early terminated.
+   *
+   * <strong>This method can be called from any thread</strong>.
+   */
   protected void cancelRemainingMailboxes() {
     for (AsyncStream<E> mailbox : _mailboxes) {
       mailbox.cancel();
     }
   }
 
-  public void onData() {
+  /**
+   * This method is called whenever the consumer is early terminated.
+   *
+   * <strong>This method can be called by any thread</strong>, although it is 
expected to be called by producer threads.
+   */
+  protected void onData() {
     if (_newDataReady.offer(Boolean.TRUE)) {
       if (LOGGER.isTraceEnabled()) {
         LOGGER.trace("New data notification delivered on " + _id + ". " + 
System.identityHashCode(_newDataReady));
@@ -116,7 +179,9 @@ public abstract class BlockingMultiStreamConsumer<E> 
implements AutoCloseable {
    * Right now the implementation tries to be fair. If one call returned the 
block from mailbox {@code i}, then next
    * call will look for mailbox {@code i+1}, {@code i+2}... in a circular 
manner.
    *
-   * In order to unblock a thread blocked here, {@link #onData()} should be 
called.   *
+   * In order to unblock a thread blocked here, {@link #onData()} should be 
called.
+   *
+   * This method is called by the consumer thread.
    */
   public E readBlockBlocking() {
     if (LOGGER.isTraceEnabled()) {
@@ -192,7 +257,7 @@ public abstract class BlockingMultiStreamConsumer<E> 
implements AutoCloseable {
         LOGGER.debug("==[RECEIVE]== EOS received : " + _id + " in mailbox: " + 
removed.getId()
             + " (mailboxes alive: " + ids + ")");
       }
-      onConsumerFinish(block);
+      onMailboxEnd(block);
 
       block = readBlockOrNull();
     }
@@ -244,11 +309,14 @@ public abstract class BlockingMultiStreamConsumer<E> 
implements AutoCloseable {
 
   public static class OfTransferableBlock extends 
BlockingMultiStreamConsumer<TransferableBlock> {
 
-    private final MultiStageQueryStats _stats;
+    private final int _stageId;
+    @Nullable
+    private MultiStageQueryStats _stats;
 
     public OfTransferableBlock(OpChainExecutionContext context,
         List<? extends AsyncStream<TransferableBlock>> asyncProducers) {
       super(context.getId(), context.getDeadlineMs(), asyncProducers);
+      _stageId = context.getStageId();
       _stats = MultiStageQueryStats.emptyStats(context.getStageId());
     }
 
@@ -263,18 +331,27 @@ public abstract class BlockingMultiStreamConsumer<E> 
implements AutoCloseable {
     }
 
     @Override
-    protected void onConsumerFinish(TransferableBlock element) {
-      if (element.getQueryStats() != null) {
-        _stats.mergeUpstream(element.getQueryStats());
-      } else {
-        _stats.mergeUpstream(element.getSerializedStatsByStage());
+    protected void onMailboxEnd(TransferableBlock element) {
+      try {
+        MultiStageQueryStats stats = _stats;
+        if (stats != null) {
+          if (element.getQueryStats() != null) {
+            stats.mergeUpstream(element.getQueryStats(), true);
+          } else {
+            stats.mergeUpstream(element.getSerializedStatsByStage(), true);
+          }
+        }
+      } catch (Exception e) {
+        // If there is any error merging stats, continue without them
+        LOGGER.warn("Error merging stats", e);
+        _stats = null;
       }
     }
 
     @Override
     protected TransferableBlock onTimeout() {
       // TODO: Add the sender stage id to the error message
-      String errMsg = "Timed out on stage " + _stats.getCurrentStageId() + " 
waiting for data sent by a child stage";
+      String errMsg = "Timed out on stage " + _stageId + " waiting for data 
sent by a child stage";
       // We log this case as debug because:
       // - The opchain will already log a stackless message once the opchain 
fail
       // - The trace is not useful (the log message is good enough to find 
where we failed)
@@ -287,7 +364,7 @@ public abstract class BlockingMultiStreamConsumer<E> 
implements AutoCloseable {
     @Override
     protected TransferableBlock onException(Exception e) {
       // TODO: Add the sender stage id to the error message
-      String errMsg = "Found an error on stage " + _stats.getCurrentStageId() 
+ " while reading from a child stage";
+      String errMsg = "Found an error on stage " + _stageId + " while reading 
from a child stage";
       // We log this case as warn because contrary to the timeout case, it 
should be rare to finish an execution
       // with an exception and the stack trace may be useful to find the root 
cause.
       LOGGER.warn(errMsg, e);
@@ -296,7 +373,11 @@ public abstract class BlockingMultiStreamConsumer<E> 
implements AutoCloseable {
 
     @Override
     protected TransferableBlock onEos() {
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock(_stats);
+      MultiStageQueryStats stats = _stats;
+      if (stats == null) { // possible in case of error
+        stats = MultiStageQueryStats.emptyStats(_stageId);
+      }
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock(stats);
     }
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
index 42f924d8fd..3cf898f981 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
@@ -213,6 +213,16 @@ public class MultiStageQueryStats {
     }
   }
 
+  /**
+   * Merge upstream stats from another MultiStageQueryStats object into this 
one.
+   *
+   * This method is equivalent to calling {@link 
#mergeUpstream(MultiStageQueryStats, boolean)} with bubbleUp set to
+   * false.
+   */
+  public void mergeUpstream(MultiStageQueryStats otherStats) {
+    mergeUpstream(otherStats, false);
+  }
+
   /**
    * Merge upstream stats from another MultiStageQueryStats object into this 
one.
    * <p>
@@ -221,8 +231,10 @@ public class MultiStageQueryStats {
    * <p>
    * For example set operations may need to merge the stats from all its 
upstreams before concatenating stats of the
    * current stage.
+   *
+   * @param bubbleUp true if and only if runtime exceptions should be thrown 
when merging stats.
    */
-  public void mergeUpstream(MultiStageQueryStats otherStats) {
+  public void mergeUpstream(MultiStageQueryStats otherStats, boolean bubbleUp) 
{
     Preconditions.checkArgument(_currentStageId <= otherStats._currentStageId,
         "Cannot merge stats from early stage %s into stats of later stage %s",
         otherStats._currentStageId, _currentStageId);
@@ -256,12 +268,28 @@ public class MultiStageQueryStats {
           myStats.merge(otherStatsForStage);
         }
       } catch (IllegalArgumentException | IllegalStateException ex) {
+        if (bubbleUp) {
+          throw ex;
+        }
         LOGGER.warn("Error merging stats on stage {}. Ignoring the new stats", 
i, ex);
       }
     }
   }
 
+  /**
+   * Merge upstream stats from a list of DataBuffer objects into this one.
+   *
+   * This method is equivalent to calling {@link #mergeUpstream(List, 
boolean)} with bubbleUp set to false.
+   */
   public void mergeUpstream(List<DataBuffer> otherStats) {
+    mergeUpstream(otherStats, false);
+  }
+
+  /**
+   * Merge upstream stats from a list of DataBuffer objects into this one.
+   * @param bubbleUp true if and only if runtime exceptions should be thrown 
when merging stats.
+   */
+  public void mergeUpstream(List<DataBuffer> otherStats, boolean bubbleUp) {
     for (int i = 0; i <= _currentStageId && i < otherStats.size(); i++) {
       if (otherStats.get(i) != null) {
         throw new IllegalArgumentException("Cannot merge stats from early 
stage " + i + " into stats of "
@@ -283,8 +311,14 @@ public class MultiStageQueryStats {
             myStats.merge(dis);
           }
         } catch (IOException ex) {
+          if (bubbleUp) {
+            throw new RuntimeException("Error merging stats on stage " + i, 
ex);
+          }
           LOGGER.warn("Error deserializing stats on stage " + i + ". 
Considering the new stats empty", ex);
         } catch (IllegalArgumentException | IllegalStateException ex) {
+          if (bubbleUp) {
+            throw ex;
+          }
           LOGGER.warn("Error merging stats on stage " + i + ". Ignoring the 
new stats", ex);
         }
       }
@@ -421,6 +455,10 @@ public class MultiStageQueryStats {
       return _operatorStats.size() - 1;
     }
 
+    public boolean isEmpty() {
+      return _operatorTypes.isEmpty();
+    }
+
     public void forEach(BiConsumer<MultiStageOperator.Type, StatMap<?>> 
consumer) {
       Iterator<MultiStageOperator.Type> typeIterator = 
_operatorTypes.iterator();
       Iterator<StatMap<?>> statIterator = _operatorStats.iterator();
@@ -522,13 +560,13 @@ public class MultiStageQueryStats {
         if (numOperators != _operatorTypes.size()) {
           try {
             Closed deserialized = deserialize(input, numOperators);
-            throw new RuntimeException("Cannot merge stats from stages with 
different operators. Expected "
+            throw new IllegalStateException("Cannot merge stats from stages 
with different operators. Expected "
                 + _operatorTypes + " operators, got " + numOperators + ". 
Deserialized stats: " + deserialized);
           } catch (IOException e) {
             throw new IOException("Cannot merge stats from stages with 
different operators. Expected "
                 + _operatorTypes + " operators, got " + numOperators, e);
           } catch (RuntimeException e) {
-            throw new RuntimeException("Cannot merge stats from stages with 
different operators. Expected "
+            throw new IllegalStateException("Cannot merge stats from stages 
with different operators. Expected "
                 + _operatorTypes + " operators, got " + numOperators, e);
           }
         }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 0b4e502762..5c5153a7d1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -51,18 +51,21 @@ public class OpChainExecutionContext {
   private final PipelineBreakerResult _pipelineBreakerResult;
   private final boolean _traceEnabled;
   private final ThreadExecutionContext _parentContext;
+  private final boolean _sendStats;
 
   private ServerPlanRequestContext _leafStageContext;
 
   public OpChainExecutionContext(MailboxService mailboxService, long 
requestId, long deadlineMs,
       Map<String, String> opChainMetadata, StageMetadata stageMetadata, 
WorkerMetadata workerMetadata,
-      @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable 
ThreadExecutionContext parentContext) {
+      @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable 
ThreadExecutionContext parentContext,
+      boolean sendStats) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _deadlineMs = deadlineMs;
     _opChainMetadata = Collections.unmodifiableMap(opChainMetadata);
     _stageMetadata = stageMetadata;
     _workerMetadata = workerMetadata;
+    _sendStats = sendStats;
     _server =
         new VirtualServerAddress(mailboxService.getHostname(), 
mailboxService.getPort(), workerMetadata.getWorkerId());
     _id = new OpChainId(requestId, workerMetadata.getWorkerId(), 
stageMetadata.getStageId());
@@ -132,4 +135,8 @@ public class OpChainExecutionContext {
   public ThreadExecutionContext getParentContext() {
     return _parentContext;
   }
+
+  public boolean isSendStats() {
+    return _sendStats;
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index 841e0e9ad8..c531d652bf 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -51,14 +51,6 @@ public class PipelineBreakerExecutor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipelineBreakerExecutor.class);
 
-  @Nullable
-  public static PipelineBreakerResult 
executePipelineBreakers(OpChainSchedulerService scheduler,
-      MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan 
stagePlan,
-      Map<String, String> opChainMetadata, long requestId, long deadlineMs) {
-    return executePipelineBreakers(scheduler, mailboxService, workerMetadata, 
stagePlan, opChainMetadata, requestId,
-        deadlineMs, null);
-  }
-
   /**
    * Execute a pipeline breaker and collect the results (synchronously). 
Currently, pipeline breaker executor can only
    *    execute mailbox receive pipeline breaker.
@@ -77,7 +69,7 @@ public class PipelineBreakerExecutor {
   public static PipelineBreakerResult 
executePipelineBreakers(OpChainSchedulerService scheduler,
       MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan 
stagePlan,
       Map<String, String> opChainMetadata, long requestId, long deadlineMs,
-      @Nullable ThreadExecutionContext parentContext) {
+      @Nullable ThreadExecutionContext parentContext, boolean sendStats) {
     PipelineBreakerContext pipelineBreakerContext = new 
PipelineBreakerContext();
     PipelineBreakerVisitor.visitPlanRoot(stagePlan.getRootNode(), 
pipelineBreakerContext);
     if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) {
@@ -87,7 +79,7 @@ public class PipelineBreakerExecutor {
         // see also: MailboxIdUtils TODOs, de-couple mailbox id from query 
information
         OpChainExecutionContext opChainExecutionContext =
             new OpChainExecutionContext(mailboxService, requestId, deadlineMs, 
opChainMetadata,
-                stagePlan.getStageMetadata(), workerMetadata, null, 
parentContext);
+                stagePlan.getStageMetadata(), workerMetadata, null, 
parentContext, sendStats);
         return execute(scheduler, pipelineBreakerContext, 
opChainExecutionContext);
       } catch (Exception e) {
         LOGGER.error("Caught exception executing pipeline breaker for request: 
{}, stage: {}", requestId,
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index f3710866eb..0571141866 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -497,7 +497,7 @@ public class QueryDispatcher {
     ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
     OpChainExecutionContext executionContext =
         new OpChainExecutionContext(mailboxService, requestId, deadlineMs, 
queryOptions, stageMetadata,
-            workerMetadata.get(0), null, parentContext);
+            workerMetadata.get(0), null, parentContext, true);
 
     PairList<Integer, String> resultFields = subPlan.getQueryResultFields();
     DataSchema sourceSchema = receiveNode.getDataSchema();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 9de563af9b..f13f7f9e22 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -79,7 +79,7 @@ public class QueryServerEnclosure {
     HelixManager helixManager = mockHelixManager(factory.buildSchemaMap());
     _queryRunner = new QueryRunner();
     _queryRunner.init(new PinotConfiguration(runnerConfig), 
instanceDataManager, helixManager, ServerMetrics.get(),
-        null);
+        null, () -> true);
   }
 
   private HelixManager mockHelixManager(Map<String, Schema> schemaMap) {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 74e2e55af0..c702f3c47c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -80,7 +80,8 @@ public class OpChainSchedulerServiceTest {
     WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), 
ImmutableMap.of());
     OpChainExecutionContext context =
         new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE, 
ImmutableMap.of(),
-            new StageMetadata(0, ImmutableList.of(workerMetadata), 
ImmutableMap.of()), workerMetadata, null, null);
+            new StageMetadata(0, ImmutableList.of(workerMetadata), 
ImmutableMap.of()), workerMetadata, null, null,
+            true);
     return new OpChain(context, operator);
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index b47068661d..efdc4e775a 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.datatable.StatMap;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.mailbox.MailboxService;
@@ -37,7 +39,10 @@ import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockTestUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator.Type;
+import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.segment.spi.memory.DataBuffer;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.mockito.Mock;
 import org.testng.annotations.AfterMethod;
@@ -51,8 +56,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.openMocks;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
 
 
 public class MailboxReceiveOperatorTest {
@@ -229,6 +233,75 @@ public class MailboxReceiveOperatorTest {
     }
   }
 
+  @Test
+  public void differentUpstreamHeapStatsProduceEmptyStats() {
+    
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
+    MultiStageQueryStats stats1 = new MultiStageQueryStats.Builder(1)
+        .addLast(open ->
+            open.addLastOperator(Type.MAILBOX_SEND, new 
StatMap<>(MailboxSendOperator.StatKey.class))
+                .addLastOperator(Type.LEAF, new 
StatMap<>(LeafStageTransferableBlockOperator.StatKey.class))
+            .close())
+        .build();
+    TransferableBlock block1 = 
TransferableBlockUtils.getEndOfStreamTransferableBlock(stats1);
+    when(_mailbox1.poll()).thenReturn(block1);
+
+    
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_2))).thenReturn(_mailbox2);
+    MultiStageQueryStats stats2 = new MultiStageQueryStats.Builder(1)
+        .addLast(open ->
+            open.addLastOperator(Type.MAILBOX_SEND, new 
StatMap<>(MailboxSendOperator.StatKey.class))
+                .addLastOperator(Type.FILTER, new 
StatMap<>(FilterOperator.StatKey.class))
+                .addLastOperator(Type.LEAF, new 
StatMap<>(LeafStageTransferableBlockOperator.StatKey.class))
+                .close())
+        .build();
+    TransferableBlock block2 = 
TransferableBlockUtils.getEndOfStreamTransferableBlock(stats2);
+    when(_mailbox2.poll()).thenReturn(block2);
+
+    try (MailboxReceiveOperator operator = getOperator(_stageMetadataBoth, 
RelDistribution.Type.SINGLETON)) {
+      TransferableBlock block = operator.nextBlock();
+      assertTrue(block.isSuccessfulEndOfStreamBlock(), "Block should be 
successful EOS");
+      assertNotNull(block.getQueryStats(), "Query stats should not be null");
+      MultiStageQueryStats.StageStats.Closed upstreamStats = 
block.getQueryStats().getUpstreamStageStats(1);
+      assertNull(upstreamStats, "Upstream stats should be null in case of 
error merging stats");
+    }
+  }
+
+  @Test
+  public void differentSerializedUpstreamStatsProduceEmptyStats()
+      throws IOException {
+    
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
+    List<DataBuffer> stats1 = new MultiStageQueryStats.Builder(1)
+        .addLast(open ->
+            open.addLastOperator(Type.MAILBOX_SEND, new 
StatMap<>(MailboxSendOperator.StatKey.class))
+                .addLastOperator(Type.LEAF, new 
StatMap<>(LeafStageTransferableBlockOperator.StatKey.class))
+                .close())
+        .build()
+        .serialize();
+    MetadataBlock metadataBlock1 = new MetadataBlock(stats1);
+    TransferableBlock block1 = TransferableBlockUtils.wrap(metadataBlock1);
+    when(_mailbox1.poll()).thenReturn(block1);
+
+    
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_2))).thenReturn(_mailbox2);
+    List<DataBuffer> stats2 = new MultiStageQueryStats.Builder(1)
+        .addLast(open ->
+            open.addLastOperator(Type.MAILBOX_SEND, new 
StatMap<>(MailboxSendOperator.StatKey.class))
+                .addLastOperator(Type.FILTER, new 
StatMap<>(FilterOperator.StatKey.class))
+                .addLastOperator(Type.LEAF, new 
StatMap<>(LeafStageTransferableBlockOperator.StatKey.class))
+                .close())
+        .build()
+        .serialize();
+    MetadataBlock metadataBlock2 = new MetadataBlock(stats2);
+    TransferableBlock block2 = TransferableBlockUtils.wrap(metadataBlock2);
+    when(_mailbox2.poll()).thenReturn(block2);
+
+    try (MailboxReceiveOperator operator = getOperator(_stageMetadataBoth, 
RelDistribution.Type.SINGLETON)) {
+      TransferableBlock block = operator.nextBlock();
+      assertTrue(block.isSuccessfulEndOfStreamBlock(), "Block should be 
successful EOS");
+      assertNotNull(block.getQueryStats(), "Query stats should not be null");
+      MultiStageQueryStats.StageStats.Closed upstreamStats = 
block.getQueryStats().getUpstreamStageStats(1);
+      assertNull(upstreamStats, "Upstream stats should be null in case of 
error merging stats");
+    }
+  }
+
   private MailboxReceiveOperator getOperator(StageMetadata stageMetadata, 
RelDistribution.Type distributionType,
       long deadlineMs) {
     OpChainExecutionContext context = 
OperatorTestUtil.getOpChainContext(_mailboxService, deadlineMs, stageMetadata);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index a54dc182a9..aa384e7b97 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -197,7 +197,7 @@ public class MailboxSendOperatorTest {
     StageMetadata stageMetadata = new StageMetadata(SENDER_STAGE_ID, 
List.of(workerMetadata), Map.of());
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 123L, Long.MAX_VALUE, 
Map.of(), stageMetadata, workerMetadata,
-            null, null);
+            null, null, true);
     return new MailboxSendOperator(context, _input, statMap -> _exchange);
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 0d6317ab2d..5a824f51e2 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -83,7 +83,7 @@ public class OperatorTestUtil {
   public static OpChainExecutionContext getOpChainContext(MailboxService 
mailboxService, long deadlineMs,
       StageMetadata stageMetadata) {
     return new OpChainExecutionContext(mailboxService, 0, deadlineMs, 
ImmutableMap.of(), stageMetadata,
-        stageMetadata.getWorkerMetadataList().get(0), null, null);
+        stageMetadata.getWorkerMetadataList().get(0), null, null, true);
   }
 
   public static OpChainExecutionContext getTracingContext() {
@@ -105,7 +105,7 @@ public class OperatorTestUtil {
     WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), 
ImmutableMap.of());
     StageMetadata stageMetadata = new StageMetadata(0, 
ImmutableList.of(workerMetadata), ImmutableMap.of());
     OpChainExecutionContext opChainExecutionContext = new 
OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE,
-        opChainMetadata, stageMetadata, workerMetadata, null, null);
+        opChainMetadata, stageMetadata, workerMetadata, null, null, true);
 
     StagePlan stagePlan = new StagePlan(null, stageMetadata);
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 51e766e8e0..3991505a7f 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -22,9 +22,11 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
@@ -101,6 +103,14 @@ public class PipelineBreakerExecutorTest {
     _mocks.close();
   }
 
+  @Nullable
+  public static PipelineBreakerResult 
executePipelineBreakers(OpChainSchedulerService scheduler,
+      MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan 
stagePlan,
+      Map<String, String> opChainMetadata, long requestId, long deadlineMs) {
+    return PipelineBreakerExecutor.executePipelineBreakers(scheduler, 
mailboxService, workerMetadata, stagePlan,
+        opChainMetadata, requestId, deadlineMs, null, true);
+  }
+
   @AfterClass
   public void tearDown() {
     ExecutorServiceUtils.close(_executor);
@@ -120,7 +130,7 @@ public class PipelineBreakerExecutorTest {
         
TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(1)));
 
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, _workerMetadata, stagePlan,
+        executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, 
stagePlan,
             ImmutableMap.of(), 0, Long.MAX_VALUE);
 
     // then
@@ -156,7 +166,7 @@ public class PipelineBreakerExecutorTest {
         
TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(2)));
 
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, _workerMetadata, stagePlan,
+        executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, 
stagePlan,
             ImmutableMap.of(), 0, Long.MAX_VALUE);
 
     // then
@@ -184,7 +194,7 @@ public class PipelineBreakerExecutorTest {
 
     // when
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, _workerMetadata, stagePlan,
+        executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, 
stagePlan,
             ImmutableMap.of(), 0, Long.MAX_VALUE);
 
     // then
@@ -212,7 +222,7 @@ public class PipelineBreakerExecutorTest {
     });
 
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, _workerMetadata, stagePlan,
+        executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, 
stagePlan,
             ImmutableMap.of(), 0, System.currentTimeMillis() + 100);
 
     // then
@@ -245,7 +255,7 @@ public class PipelineBreakerExecutorTest {
         TransferableBlockTestUtils.getEndOfStreamTransferableBlock(1));
 
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, _workerMetadata, stagePlan,
+        executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, 
stagePlan,
             ImmutableMap.of(), 0, Long.MAX_VALUE);
 
     // then
@@ -278,7 +288,7 @@ public class PipelineBreakerExecutorTest {
         TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
 
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, _workerMetadata, stagePlan,
+        executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, 
stagePlan,
             ImmutableMap.of(), 0, Long.MAX_VALUE);
 
     // then
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 8d20db0335..2b4ac80523 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -49,6 +49,7 @@ import org.apache.pinot.server.access.AccessControl;
 import org.apache.pinot.server.access.AccessControlFactory;
 import org.apache.pinot.server.access.AllowAllAccessFactory;
 import org.apache.pinot.server.conf.ServerConf;
+import org.apache.pinot.server.starter.helix.SendStatsPredicate;
 import org.apache.pinot.server.worker.WorkerQueryServer;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
@@ -84,7 +85,7 @@ public class ServerInstance {
   private boolean _queryServerStarted = false;
 
   public ServerInstance(ServerConf serverConf, HelixManager helixManager, 
AccessControlFactory accessControlFactory,
-      @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
+      @Nullable SegmentOperationsThrottler segmentOperationsThrottler, 
SendStatsPredicate sendStatsPredicate)
       throws Exception {
     LOGGER.info("Initializing server instance");
     _helixManager = helixManager;
@@ -131,7 +132,7 @@ public class ServerInstance {
       LOGGER.info("Initializing Multi-stage query engine");
       _workerQueryServer =
           new WorkerQueryServer(serverConf.getPinotConfig(), 
_instanceDataManager, helixManager, _serverMetrics,
-              serverConf.isMultiStageEngineTlsEnabled() ? tlsConfig : null);
+              serverConf.isMultiStageEngineTlsEnabled() ? tlsConfig : null, 
sendStatsPredicate);
     } else {
       _workerQueryServer = null;
     }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 5c0a7e74f0..ed5277d63d 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -454,6 +454,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
       updated |= updatePortIfNeeded(simpleFields, 
Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY,
           serverConf.getMultiStageMailboxPort());
     }
+    updated |= HelixHelper.updatePinotVersion(instanceConfig);
 
     // Update environment properties
     if (_pinotEnvironmentProvider != null) {
@@ -663,8 +664,10 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
         new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler, 
segmentStarTreePreprocessThrottler,
             segmentDownloadThrottler);
 
+    SendStatsPredicate sendStatsPredicate = 
SendStatsPredicate.create(_serverConf);
     ServerConf serverConf = new ServerConf(_serverConf);
-    _serverInstance = new ServerInstance(serverConf, _helixManager, 
_accessControlFactory, _segmentOperationsThrottler);
+    _serverInstance = new ServerInstance(serverConf, _helixManager, 
_accessControlFactory, _segmentOperationsThrottler,
+        sendStatsPredicate);
     ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
 
     InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
@@ -694,6 +697,13 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     }
     
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_segmentOperationsThrottler);
 
+    LOGGER.info("Initializing and registering the SendStatsPredicate");
+    try {
+      _helixManager.addInstanceConfigChangeListener(sendStatsPredicate);
+    } catch (Exception e) {
+      LOGGER.error("Failed to register SendStatsPredicate as the Helix 
InstanceConfigChangeListener", e);
+    }
+
     // Start restlet server for admin API endpoint
     LOGGER.info("Starting server admin application on: {}", 
ListenerConfigUtil.toString(_listenerConfigs));
     _adminApiApplication = createServerAdminApp();
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
new file mode 100644
index 0000000000..833f7d4504
--- /dev/null
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.InstanceConfigChangeListener;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.version.PinotVersion;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/// A class used to determine whether MSE should to send stats or not.
+///
+/// The stat mechanism used in MSE is very efficient, so contrary to what we 
do in SSE, we decided to always collect and
+/// send stats in MSE. However, there are some versions of Pinot that have 
known issues with the stats mechanism, so we
+/// created this class as a mechanism to disable stats sending in case of 
problematic versions.
+///
+/// Specifically, Pinot 1.3.0 and lower have known issues when they receive 
unexpected stats from upstream stages, but
+/// even these versions are prepared to receive empty stats from upstream 
stages.
+/// Therefore the cleanest and safer solution is to not send stats when we 
know a problematic version is in the cluster.
+///
+/// We support three modes:
+/// - SAFE: This is the default mode. In this mode, we will send stats unless 
we detect a problematic version in the
+///  cluster. This doesn't require human intervention and is the recommended 
mode.
+/// - ALWAYS: In this mode, we will always send stats, regardless of the 
version of the cluster. This mimics the
+///  behavior in 1.3.0 and lower versions.
+/// - NEVER: In this mode, we will never send stats, regardless of the version 
of the cluster. This is useful for
+/// testing purposes or if for whatever reason you want to disable stats.
+public abstract class SendStatsPredicate implements 
InstanceConfigChangeListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SendStatsPredicate.class);
+
+  public abstract boolean getSendStats();
+
+  public static SendStatsPredicate create(PinotConfiguration configuration) {
+    String modeStr = configuration.getProperty(
+        CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_SEND_STATS_MODE).toUpperCase(Locale.ENGLISH);
+    Mode mode;
+    try {
+      mode = Mode.valueOf(modeStr.trim().toUpperCase(Locale.ENGLISH));
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Invalid value " + modeStr + " for "
+          + CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE, e);
+    }
+    return mode.create();
+  }
+
+  public enum Mode {
+    SAFE {
+      @Override
+      public SendStatsPredicate create() {
+        return new Safe();
+      }
+    },
+    ALWAYS {
+      @Override
+      public SendStatsPredicate create() {
+        return new SendStatsPredicate() {
+          @Override
+          public boolean getSendStats() {
+            return true;
+          }
+
+          @Override
+          public void onInstanceConfigChange(List<InstanceConfig> 
instanceConfigs, NotificationContext context) {
+            // Nothing to do
+          }
+        };
+      }
+    },
+    NEVER {
+      @Override
+      public SendStatsPredicate create() {
+        return new SendStatsPredicate() {
+          @Override
+          public boolean getSendStats() {
+            return false;
+          }
+
+          @Override
+          public void onInstanceConfigChange(List<InstanceConfig> 
instanceConfigs, NotificationContext context) {
+            // Nothing to do
+          }
+        };
+      }
+    };
+
+    public abstract SendStatsPredicate create();
+  }
+
+  private static class Safe extends SendStatsPredicate {
+    private final AtomicBoolean _sendStats = new AtomicBoolean(true);
+
+    @Override
+    public boolean getSendStats() {
+      return _sendStats.get();
+    }
+
+    @Override
+    public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, 
NotificationContext context) {
+      Map<String, String> problematicVersionsById = new HashMap<>();
+      for (InstanceConfig instanceConfig : instanceConfigs) {
+        switch 
(InstanceTypeUtils.getInstanceType(instanceConfig.getInstanceName())) {
+          case BROKER:
+          case SERVER:
+            String otherVersion = instanceConfig.getRecord()
+                
.getStringField(CommonConstants.Helix.Instance.PINOT_VERSION_KEY, null);
+            if (isProblematicVersion(otherVersion)) {
+              problematicVersionsById.put(instanceConfig.getInstanceName(), 
otherVersion);
+            }
+            break;
+          default:
+            continue;
+        }
+      }
+      boolean sendStats = problematicVersionsById.isEmpty();
+      if (_sendStats.getAndSet(sendStats) != sendStats) {
+        if (sendStats) {
+          LOGGER.warn("Send MSE stats is now enabled");
+        } else {
+          LOGGER.warn("Send MSE stats is now disabled (problematic versions: 
{})", problematicVersionsById);
+        }
+      }
+    }
+
+    /// Returns true if the version is problematic
+    ///
+    /// Ideally [PinotVersion] should have a way to extract versions in 
comparable format, but given it doesn't we
+    /// need to parse the string here. In case version doesn't match 
`1\.x\..*`, we treat is as a problematic version
+    private boolean isProblematicVersion(@Nullable String versionStr) {
+      if (versionStr == null) {
+        return true;
+      }
+      if (versionStr.equals(PinotVersion.UNKNOWN)) {
+        return true;
+      }
+      if (versionStr.equals(PinotVersion.VERSION)) {
+        return false;
+      }
+      // Lets try to parse 1.x versions
+      String[] splits = versionStr.trim().split("\\.");
+      if (splits.length < 2) {
+        return true;
+      }
+      // Versions less than 1.x are problematic for sure
+      if (!splits[0].equals("1")) {
+        return true;
+      }
+      try {
+        // Versions less than 1.4 are problematic
+        if (Integer.parseInt(splits[1]) < 4) {
+          return true;
+        }
+      } catch (NumberFormatException e) {
+        return true;
+      }
+      return false;
+    }
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index 0d177d7fa0..26041b8fcb 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.service.server.QueryServer;
+import org.apache.pinot.server.starter.helix.SendStatsPredicate;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.NetUtils;
@@ -37,12 +38,14 @@ public class WorkerQueryServer {
   private final QueryServer _queryWorkerService;
 
   public WorkerQueryServer(PinotConfiguration configuration, 
InstanceDataManager instanceDataManager,
-      HelixManager helixManager, ServerMetrics serverMetrics, @Nullable 
TlsConfig tlsConfig) {
+      HelixManager helixManager, ServerMetrics serverMetrics, @Nullable 
TlsConfig tlsConfig,
+      SendStatsPredicate sendStats) {
     _configuration = toWorkerQueryConfig(configuration);
     _queryServicePort = 
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
         CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
     QueryRunner queryRunner = new QueryRunner();
-    queryRunner.init(_configuration, instanceDataManager, helixManager, 
serverMetrics, tlsConfig);
+    queryRunner.init(_configuration, instanceDataManager, helixManager, 
serverMetrics, tlsConfig,
+        sendStats::getSendStats);
     _queryWorkerService = new QueryServer(_queryServicePort, queryRunner, 
tlsConfig, serverMetrics, configuration);
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index d4c5cacb2f..9acbda4f69 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -205,6 +205,7 @@ public class CommonConstants {
       public static final String MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY = 
"queryMailboxPort";
 
       public static final String SYSTEM_RESOURCE_INFO_KEY = 
"SYSTEM_RESOURCE_INFO";
+      public static final String PINOT_VERSION_KEY = "pinotVersion";
     }
 
     public static final String SET_INSTANCE_ID_TO_HOSTNAME_KEY = 
"pinot.set.instance.id.to.hostname";
@@ -1473,6 +1474,23 @@ public class CommonConstants {
     public static final String KEY_OF_MAX_ROWS_IN_JOIN = 
"pinot.query.join.max.rows";
     public static final String KEY_OF_JOIN_OVERFLOW_MODE = 
"pinot.query.join.overflow.mode";
 
+    /// Specifies the send stats mode used in MSE.
+    ///
+    /// Valid values are (in lower or upper case):
+    /// - "SAFE": MSE will only send stats if all instances in the cluster are 
running 1.4.0 or later.
+    /// - "ALWAYS": MSE will always send stats, regardless of the version of 
the instances in the cluster.
+    /// - "NEVER": MSE will never send stats.
+    ///
+    /// The reason for this flag that versions 1.3.0 and lower have two 
undesired behaviors:
+    /// 1. Some queries using intersection generate incorrect stats
+    /// 2. When stats from other nodes are sent but are different from 
expected, the query fails.
+    ///
+    /// In 1.4.0 the first issue is solved and instead of failing when 
unexpected stats are received, the query
+    /// continues without children stats. But if a query involves servers in 
versions 1.3.0 and 1.4.0, the one
+    /// running 1.3.0 may fail, which breaks backward compatibility.
+    public static final String KEY_OF_SEND_STATS_MODE = 
"pinot.query.mse.stats.mode";
+    public static final String DEFAULT_SEND_STATS_MODE = "SAFE";
+
     public enum JoinOverFlowMode {
       THROW, BREAK
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to