This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 68685dc431 Add backward compatibility regression test suite for 
multi-stage query engine (#13193)
68685dc431 is described below

commit 68685dc431678d306b03f1f39e21eafe4b6306bc
Author: Yash Mayya <yash.ma...@gmail.com>
AuthorDate: Mon Jun 3 16:12:27 2024 +0530

    Add backward compatibility regression test suite for multi-stage query 
engine (#13193)
---
 ...ulti_stage_query_engine_compatibility_tests.yml | 84 +++++++++++++++++++
 .github/workflows/pinot_tests.yml                  | 56 +++++++++++++
 .gitignore                                         |  1 +
 .../config/BrokerConfig.properties                 | 23 ++++++
 .../config/ControllerConfig.properties             | 25 ++++++
 .../config/FeatureTest1-schema.json                | 94 ++++++++++++++++++++++
 .../config/FeatureTest2-schema.json                | 94 ++++++++++++++++++++++
 .../config/ServerConfig.properties                 | 25 ++++++
 .../config/data/FeatureTest1-data-00.csv           | 12 +++
 .../config/data/FeatureTest2-data-realtime-00.csv  | 11 +++
 .../config/data/recordReaderConfig.json            |  5 ++
 .../config/feature-test-1.json                     | 46 +++++++++++
 .../feature-test-2-realtime-stream-config.json     |  8 ++
 .../config/feature-test-2-realtime.json            | 61 ++++++++++++++
 .../queries/feature-test-multi-stage.queries       | 31 +++++++
 .../query-results/feature-test-multi-stage.results | 31 +++++++
 .../post-broker-rollback.yaml                      | 43 ++++++++++
 .../post-controller-rollback.yaml                  | 53 ++++++++++++
 .../post-server-rollback.yaml                      | 43 ++++++++++
 .../post-server-upgrade.yaml                       | 43 ++++++++++
 .../pre-broker-upgrade.yaml                        | 43 ++++++++++
 .../pre-controller-upgrade.yaml                    | 58 +++++++++++++
 .../pre-server-upgrade.yaml                        | 43 ++++++++++
 .../pinot/common/utils/SqlResultComparator.java    | 44 ++++++++++
 .../main/java/org/apache/pinot/compat/QueryOp.java | 32 ++++++--
 .../main/java/org/apache/pinot/compat/Utils.java   |  9 +++
 .../apache/pinot/tools/utils/ExplainPlanUtils.java |  5 ++
 27 files changed, 1017 insertions(+), 6 deletions(-)

diff --git 
a/.github/workflows/pinot_multi_stage_query_engine_compatibility_tests.yml 
b/.github/workflows/pinot_multi_stage_query_engine_compatibility_tests.yml
new file mode 100644
index 0000000000..52b7773033
--- /dev/null
+++ b/.github/workflows/pinot_multi_stage_query_engine_compatibility_tests.yml
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: Pinot Multi-Stage Query Engine Compatibility Regression Test
+
+on:
+  workflow_dispatch:
+    inputs:
+      oldCommit:
+        description: "Git hash (or tag) for old commit. (required)"
+        required: true
+      newCommit:
+        description: "Git hash (or tag) for new commit. (required)"
+        required: true
+
+jobs:
+  compatibility-verifier:
+    runs-on: ubuntu-latest
+    strategy:
+      matrix:
+        test_suite: [ 
"compatibility-verifier/multi-stage-query-engine-test-suite" ]
+    name: Pinot Multi-Stage Query Engine Compatibility Regression Testing 
against ${{ github.event.inputs.oldCommit }} and ${{ 
github.event.inputs.newCommit }} on ${{ matrix.test_suite }}
+    steps:
+      - uses: actions/checkout@v4
+      - name: Set up JDK 11
+        uses: actions/setup-java@v4
+        with:
+          java-version: 11
+          distribution: 'temurin'
+          cache: 'maven'
+      - name: Setup node
+        uses: actions/setup-node@v4
+        with:
+          node-version: v16.15.0
+          cache: 'npm'
+          cache-dependency-path: 
pinot-controller/src/main/resources/package-lock.json
+      - name: Install npm
+        run: |
+          npm install -g npm@8.5.5
+          npm --version
+      - name: Pinot Multi-Stage Query Engine Compatibility Regression Testing
+        if : ${{github.event_name == 'workflow_dispatch'}}
+        env:
+          OLD_COMMIT: ${{ github.event.inputs.oldCommit }}
+          NEW_COMMIT: ${{ github.event.inputs.newCommit }}
+          WORKING_DIR: /tmp/multi-stage-compatibility-verifier
+          TEST_SUITE: ${{ matrix.test_suite }}
+          MAVEN_OPTS: >
+            -Xmx2G -DskipShade -DfailIfNoTests=false 
-Dmaven.wagon.httpconnectionManager.ttlSeconds=25
+            -Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false 
-Dmaven.wagon.http.pool=false
+            -XX:+IgnoreUnrecognizedVMOptions
+            --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
+            --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
+            --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
+            --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
+            --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
+        run: .github/workflows/scripts/.pinot_compatibility_verifier.sh
+      - name: Archive artifacts into zip
+        if: always()
+        run: |
+          zip -1 -r artifacts.zip /tmp/multi-stage-compatibility-verifier/*
+      - uses: actions/upload-artifact@v4
+        name: Store multi-stage compatibility verifier work directory
+        if: always()
+        with:
+          ## TODO: currently matrix.test_suite cannot be used as part of name 
due to invalid path character.
+          name: multi_stage_compatibility_verifier_work_dir
+          retention-days: 3
+          path: artifacts.zip
diff --git a/.github/workflows/pinot_tests.yml 
b/.github/workflows/pinot_tests.yml
index 01e77cdcb0..ef2828d549 100644
--- a/.github/workflows/pinot_tests.yml
+++ b/.github/workflows/pinot_tests.yml
@@ -314,6 +314,62 @@ jobs:
             --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
         run: .github/workflows/scripts/.pinot_compatibility_verifier.sh
 
+  multi-stage-compatibility-verifier:
+    if: github.repository == 'apache/pinot'
+    runs-on: ubuntu-latest
+    strategy:
+      # Changed to false in order to improve coverage using unsafe buffers
+      fail-fast: false
+      matrix:
+        test_suite: [ 
"compatibility-verifier/multi-stage-query-engine-test-suite" ]
+        old_commit: [
+          "master"
+        ]
+    name: Pinot Multi-Stage Query Engine Compatibility Regression Testing 
against ${{ matrix.old_commit }} on ${{ matrix.test_suite }}
+    steps:
+      - uses: actions/checkout@v4
+      - name: Set up JDK 11
+        uses: actions/setup-java@v4
+        with:
+          java-version: 11
+          distribution: 'temurin'
+          cache: 'maven'
+      - name: Setup node
+        uses: actions/setup-node@v4
+        with:
+          node-version: v16.15.0
+          cache: 'npm'
+          cache-dependency-path: 
pinot-controller/src/main/resources/package-lock.json
+      - name: Install npm
+        run: |
+          npm install -g npm@8.5.5
+          npm --version
+      # Step that does that actual cache save and restore
+      - uses: actions/cache@v4
+        env:
+          SEGMENT_DOWNLOAD_TIMEOUT_MINS: 10
+        with:
+          path: ~/.m2/repository
+          key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
+          restore-keys: |
+            ${{ runner.os }}-maven-
+      - name: Pinot Multi-Stage Query Engine Compatibility Regression Testing
+        env:
+          OLD_COMMIT: ${{ matrix.old_commit }}
+          WORKING_DIR: /tmp/multi-stage-compatibility-verifier
+          TEST_SUITE: ${{ matrix.test_suite }}
+          MAVEN_OPTS: >
+            -Xmx2G -DskipShade -DfailIfNoTests=false 
-Dmaven.wagon.httpconnectionManager.ttlSeconds=25
+            -Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false 
-Dmaven.wagon.http.pool=false
+            -B -ntp
+            -XX:+IgnoreUnrecognizedVMOptions
+            --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
+            --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
+            --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
+            --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
+            --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
+        run: .github/workflows/scripts/.pinot_compatibility_verifier.sh
+
   quickstarts:
     if: github.repository == 'apache/pinot'
     runs-on: ubuntu-latest
diff --git a/.gitignore b/.gitignore
index 25c9ab8541..c3b52d6e04 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,7 @@ cscope.*
 .classpath
 .project
 .svn
+.java-version
 .externalToolBuilders/
 maven-eclipse.xml
 target/
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/BrokerConfig.properties
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/BrokerConfig.properties
new file mode 100644
index 0000000000..e56f23c287
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/BrokerConfig.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+pinot.broker.client.queryPort = 8099
+pinot.zk.server = localhost:2181
+pinot.cluster.name = PinotCluster
+pinot.broker.disable.query.groovy=false
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/ControllerConfig.properties
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/ControllerConfig.properties
new file mode 100644
index 0000000000..9949b2519e
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/ControllerConfig.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+controller.host = localhost
+controller.port = 9000
+controller.zk.str = localhost:2181
+controller.data.dir = /tmp/PinotController
+controller.helix.cluster.name = PinotCluster
+controller.disable.ingestion.groovy = false
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/FeatureTest1-schema.json
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/FeatureTest1-schema.json
new file mode 100644
index 0000000000..85378dd3a0
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/FeatureTest1-schema.json
@@ -0,0 +1,94 @@
+{
+  "dimensionFieldSpecs": [
+    {
+      "dataType": "INT",
+      "name": "generationNumber"
+    },
+    {
+      "dataType": "STRING",
+      "name": "stringDimSV1"
+    },
+    {
+      "dataType": "STRING",
+      "name": "stringDimSV2"
+    },
+    {
+      "dataType": "LONG",
+      "name": "longDimSV1"
+    },
+    {
+      "dataType": "LONG",
+      "name": "longDimSV2"
+    },
+    {
+      "dataType": "STRING",
+      "name": "stringDimMV1",
+      "singleValueField": false
+    },
+    {
+      "dataType": "STRING",
+      "name": "stringDimMV2",
+      "singleValueField": false
+    },
+    {
+      "dataType": "INT",
+      "name": "intDimMV1",
+      "singleValueField": false
+    },
+    {
+      "dataType": "INT",
+      "name": "intDimMV2",
+      "singleValueField": false
+    },
+    {
+      "dataType": "STRING",
+      "maxLength": 1000,
+      "name": "textDim1"
+    },
+    {
+      "dataType": "BYTES",
+      "name": "bytesDimSV1"
+    },
+    {
+      "dataType": "STRING",
+      "name": "mapDim1__KEYS",
+      "singleValueField": false
+    },
+    {
+      "dataType": "INT",
+      "name": "mapDim1__VALUES",
+      "singleValueField": false
+    },
+    {
+      "dataType": "STRING",
+      "name": "mapDim2json"
+    }
+  ],
+  "metricFieldSpecs": [
+    {
+      "dataType": "INT",
+      "name": "intMetric1"
+    },
+    {
+      "dataType": "LONG",
+      "name": "longMetric1"
+    },
+    {
+      "dataType": "FLOAT",
+      "name": "floatMetric1"
+    },
+    {
+      "dataType": "DOUBLE",
+      "name": "doubleMetric1"
+    }
+  ],
+  "dateTimeFieldSpecs" : [
+    {
+      "name" : "HoursSinceEpoch",
+      "dataType" : "INT",
+      "format" : "1:HOURS:EPOCH",
+      "granularity": "1:HOURS"
+    }
+  ],
+  "schemaName": "FeatureTest1"
+}
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/FeatureTest2-schema.json
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/FeatureTest2-schema.json
new file mode 100644
index 0000000000..f53c5a2c86
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/FeatureTest2-schema.json
@@ -0,0 +1,94 @@
+{
+  "dimensionFieldSpecs": [
+    {
+      "dataType": "INT",
+      "name": "generationNumber"
+    },
+    {
+      "dataType": "STRING",
+      "name": "stringDimSV1"
+    },
+    {
+      "dataType": "STRING",
+      "name": "stringDimSV2"
+    },
+    {
+      "dataType": "LONG",
+      "name": "longDimSV1"
+    },
+    {
+      "dataType": "LONG",
+      "name": "longDimSV2"
+    },
+    {
+      "dataType": "STRING",
+      "name": "stringDimMV1",
+      "singleValueField": false
+    },
+    {
+      "dataType": "STRING",
+      "name": "stringDimMV2",
+      "singleValueField": false
+    },
+    {
+      "dataType": "INT",
+      "name": "intDimMV1",
+      "singleValueField": false
+    },
+    {
+      "dataType": "INT",
+      "name": "intDimMV2",
+      "singleValueField": false
+    },
+    {
+      "dataType": "STRING",
+      "maxLength": 1000,
+      "name": "textDim1"
+    },
+    {
+      "dataType": "BYTES",
+      "name": "bytesDimSV1"
+    },
+    {
+      "dataType": "STRING",
+      "name": "mapDim1__KEYS",
+      "singleValueField": false
+    },
+    {
+      "dataType": "INT",
+      "name": "mapDim1__VALUES",
+      "singleValueField": false
+    },
+    {
+      "dataType": "STRING",
+      "name": "mapDim2json"
+    }
+  ],
+  "metricFieldSpecs": [
+    {
+      "dataType": "INT",
+      "name": "intMetric1"
+    },
+    {
+      "dataType": "LONG",
+      "name": "longMetric1"
+    },
+    {
+      "dataType": "FLOAT",
+      "name": "floatMetric1"
+    },
+    {
+      "dataType": "DOUBLE",
+      "name": "doubleMetric1"
+    }
+  ],
+  "dateTimeFieldSpecs" : [
+    {
+      "name" : "HoursSinceEpoch",
+      "dataType" : "INT",
+      "format" : "1:HOURS:EPOCH",
+      "granularity": "1:HOURS"
+    }
+  ],
+  "schemaName": "FeatureTest2"
+}
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/ServerConfig.properties
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/ServerConfig.properties
new file mode 100644
index 0000000000..0bf3a9b47b
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/ServerConfig.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+pinot.server.adminapi.port = 8097
+pinot.server.netty.port = 8098
+pinot.zk.server = localhost:2181
+pinot.cluster.name = PinotCluster
+pinot.server.instance.dataDir = /tmp/PinotServer/data
+pinot.server.instance.segmentTarDir = /tmp/PinotServer/segments
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/data/FeatureTest1-data-00.csv
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/data/FeatureTest1-data-00.csv
new file mode 100644
index 0000000000..bb537d9ba3
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/data/FeatureTest1-data-00.csv
@@ -0,0 +1,12 @@
+# HoursSinceEpoch generationNumber stringDimSV1 stringDimSV2 longDimSV1 
longDimSV2 stringDimMV1 stringDimMV2 intDimMV1 intDimMV2 textDim1 mapDim1__KEYS 
mapDim1__VALUES mapDim2json intMetric1 longMetric1 floatMetric1 doubleMetric1
+# Add some common rows from first segment, and some new rows as well
+123456,__GENERATION_NUMBER__,"s1-0",s2-0,1,2,m1-0-0;m1-0-1,m2-0-0;m2-0-1,3;4,6;7,Java
 C++ 
Python,01a0bc,k1;k2;k3;k4;k5,1;1;2;2;2,"{""k1"":1,""k2"":1,""k3"":2,""k4"":2,""k5"":2}",10,11,12.1,13.1
+123456,__GENERATION_NUMBER__,"s1-0",s2-0,1,2,m1-0-0;m1-0-1,m2-0-0;m2-0-1,3;4,6;7,Java
 C++ 
Python,4877625602,k1;k2;k3;k4;k5,3;3;3;3;3,"{""k1"":3,""k2"":3,""k3"":3,""k4"":3,""k5"":3}",10,11,12.1,13.1,
 # Dupliate of row 0 1
+123456,__GENERATION_NUMBER__,s1-2,s2-2,11,21,m1-2-0;m1-2-1,m2-2-0;m2-2-1,32;42,62;72,Java
 C++ 
golang,13225573e3f5,k1;k2;k3;k4;k5,4;5;6;7;7,"{""k1"":4,""k2"":5,""k3"":6,""k4"":7,""k5"":7}",10,21,22.1,23.10
+123456,__GENERATION_NUMBER__,s1-2,s2-2,11,21,m1-3-0;m1-3-1,m2-3-0;m2-3-1,32;42,62;72,Java
 C++ 
golang,deadbeef,k1;k2;k3;k4;k5,7;7;7;7;7,"{""k1"":7,""k2"":7,""k3"":7,""k4"":7,""k5"":7}",10,21,22.1,23.10,
 # All sv cols same as prev
+123456,__GENERATION_NUMBER__,s1-4,s2-4,41,22,m1-2-0;m1-2-1,m2-2-0;m2-2-1,42;52,72;82,Java
 C++ 
golang,deed0507,k1;k2;k3;k4;k5,7;7;8;8;8,"{""k1"":7,""k2"":7,""k3"":8,""k4"":8,""k5"":8}",14,24,24.1,24.10,
 # All mv cols same as row 2
+123456,__GENERATION_NUMBER__,s1-5,,,32,m1-5-0,m2-2-0,,92;22,golang shell 
bash,,k1;k2;k3;k4;k5,7;7;7;7;7,"{""k1"":7,""k2"":7,""k3"":7,""k4"":7,""k5"":7}",,24,,24.10,
 # Default values for some columns
+123456,__GENERATION_NUMBER__,s1-6,s2-6,7611,7621,m1-5-0;m1-5-1;m1-5-2,m2-2-0,392;462,6662;782,C++
 golang 
python,deed0507,k1;k2;k3;k4;k5,7;8;9;10;20,"{""k1"":7,""k2"":8,""k3"":9,""k4"":10,""k5"":20}",101,251,262.1,263.10,
 # 3 values in MV
+123456,__GENERATION_NUMBER__,s1-6,s2-6,7611,7621,m1-5-0;m1-5-1;m1-5-2,m2-2-0,392;462,6662;782,C++
 golang 
python,deed0507,k1;k2;k3;k4;k5,7;8;9;10;20,"{""k1"":7,""k2"":8,""k3"":9,""k4"":10,""k5"":20}",2147483647,251,262.1,263.10,
 # MAX_INT in int metric
+123456,__GENERATION_NUMBER__,s1-6,s2-6,7611,7621,m1-5-0;m1-5-1;m1-5-2,m2-2-0,392;462,6662;782,C++
 golang 
python,deed0507,k1;k2;k3;k4;k5,7;8;9;10;20,"{""k1"":7,""k2"":8,""k3"":9,""k4"":10,""k5"":20}",2147483647,251,262.1,263.10,
 # MAX_INT in int metric
+123456,__GENERATION_NUMBER__,s1-7,s2-7,6766,6777,m1-6-0;m1-6-1;m1-6-2;m1-6-3,m2-6-0;m2-6-1,392;462,6662;782,golang
 
Java,d54d0507,k1;k2;k3;k4;k5,31;31;32;32;32,"{""k1"":31,""k2"":31,""k3"":32,""k4"":32,""k5"":32}",87,251,262.10,263.10
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/data/FeatureTest2-data-realtime-00.csv
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/data/FeatureTest2-data-realtime-00.csv
new file mode 100644
index 0000000000..e4e36e4705
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/data/FeatureTest2-data-realtime-00.csv
@@ -0,0 +1,11 @@
+# HoursSinceEpoch generationNumber stringDimSV1 stringDimSV2 longDimSV1 
longDimSV2 stringDimMV1 stringDimMV2 intDimMV1 intDimMV2 textDim1 mapDim1__KEYS 
mapDim1__VALUES mapDim2json intMetric1 longMetric1 floatMetric1 doubleMetric1
+123456,__GENERATION_NUMBER__,"s1-0",s2-0,1,2,m1-0-0;m1-0-1,m2-0-0;m2-0-1,3;4,6;7,Java
 C++ 
Python,01a0bc,k1;k2;k3;k4;k5,1;1;2;2;2,"{""k1"":1,""k2"":1,""k3"":2,""k4"":2,""k5"":2}",10,11,12.1,13.1
+123456,__GENERATION_NUMBER__,"s1-0",s2-0,1,2,m1-0-0;m1-0-1,m2-0-0;m2-0-1,3;4,6;7,Java
 C++ 
Python,4877625602,k1;k2;k3;k4;k5,3;3;3;3;3,"{""k1"":3,""k2"":3,""k3"":3,""k4"":3,""k5"":3}",10,11,12.1,13.1,
 # Dupliate of row 0 1
+123456,__GENERATION_NUMBER__,s1-2,s2-2,11,21,m1-2-0;m1-2-1,m2-2-0;m2-2-1,32;42,62;72,Java
 C++ 
golang,13225573e3f5,k1;k2;k3;k4;k5,4;5;6;7;7,"{""k1"":4,""k2"":5,""k3"":6,""k4"":7,""k5"":7}",10,21,22.1,23.10
+123456,__GENERATION_NUMBER__,s1-2,s2-2,11,21,m1-3-0;m1-3-1,m2-3-0;m2-3-1,32;42,62;72,Java
 C++ 
golang,deadbeef,k1;k2;k3;k4;k5,7;7;7;7;7,"{""k1"":7,""k2"":7,""k3"":7,""k4"":7,""k5"":7}",10,21,22.1,23.10,
 # All sv cols same as prev
+123456,__GENERATION_NUMBER__,s1-4,s2-4,41,22,m1-2-0;m1-2-1,m2-2-0;m2-2-1,42;52,72;82,Java
 C++ 
golang,deed0507,k1;k2;k3;k4;k5,7;7;8;8;8,"{""k1"":7,""k2"":7,""k3"":8,""k4"":8,""k5"":8}",14,24,24.1,24.10,
 # All mv cols same as row 2
+123456,__GENERATION_NUMBER__,s1-5,,,32,m1-5-0,m2-2-0,,92;22,golang shell 
bash,,k1;k2;k3;k4;k5,7;7;7;7;7,"{""k1"":7,""k2"":7,""k3"":7,""k4"":7,""k5"":7}",,24,,24.10,
 # Default values for some columns
+123456,__GENERATION_NUMBER__,s1-6,s2-6,7611,7621,m1-5-0;m1-5-1;m1-5-2,m2-2-0,392;462,6662;782,C++
 golang 
python,deed0507,k1;k2;k3;k4;k5,7;8;9;10;20,"{""k1"":7,""k2"":8,""k3"":9,""k4"":10,""k5"":20}",101,251,262.1,263.10,
 # 3 values in MV
+123456,__GENERATION_NUMBER__,s1-6,s2-6,7611,7621,m1-5-0;m1-5-1;m1-5-2,m2-2-0,392;462,6662;782,C++
 golang 
python,deed0507,k1;k2;k3;k4;k5,7;8;9;10;20,"{""k1"":7,""k2"":8,""k3"":9,""k4"":10,""k5"":20}",2147483647,251,262.1,263.10,
 # MAX_INT in int metric
+123456,__GENERATION_NUMBER__,s1-6,s2-6,7611,7621,m1-5-0;m1-5-1;m1-5-2,m2-2-0,392;462,6662;782,C++
 golang 
python,deed0507,k1;k2;k3;k4;k5,7;8;9;10;20,"{""k1"":7,""k2"":8,""k3"":9,""k4"":10,""k5"":20}",2147483647,251,262.1,263.10,
 # MAX_INT in int metric
+123456,__GENERATION_NUMBER__,s1-7,s2-7,6766,6777,m1-6-0;m1-6-1;m1-6-2;m1-6-3,m2-6-0;m2-6-1,392;462,6662;782,golang
 
Java,d54d0507,k1;k2;k3;k4;k5,31;31;32;32;32,"{""k1"":31,""k2"":31,""k3"":32,""k4"":32,""k5"":32}",87,251,262.10,263.10
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/data/recordReaderConfig.json
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/data/recordReaderConfig.json
new file mode 100644
index 0000000000..ebaa04e104
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/data/recordReaderConfig.json
@@ -0,0 +1,5 @@
+{
+    "commentMarker" : "#",
+    "header" :
+            
"HoursSinceEpoch,generationNumber,stringDimSV1,stringDimSV2,longDimSV1,longDimSV2,stringDimMV1,stringDimMV2,intDimMV1,intDimMV2,textDim1,bytesDimSV1,mapDim1__KEYS,mapDim1__VALUES,mapDim2json,intMetric1,longMetric1,floatMetric1,doubleMetric1"
+}
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/feature-test-1.json
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/feature-test-1.json
new file mode 100644
index 0000000000..381e6885d1
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/feature-test-1.json
@@ -0,0 +1,46 @@
+{
+  "fieldConfigList": [
+    {
+      "encodingType": "RAW",
+      "indexType": "TEXT",
+      "name": "textDim1",
+      "properties": {
+        "deriveNumDocsPerChunkForRawIndex": "true",
+        "rawIndexWriterVersion": "3"
+      }
+    }
+  ],
+  "metadata": {
+    "customConfigs": {
+      "d2Name": ""
+    }
+  },
+  "segmentsConfig": {
+    "replication": "1",
+    "retentionTimeUnit": "",
+    "retentionTimeValue": "",
+    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+    "segmentPushFrequency": "daily",
+    "segmentPushType": "REFRESH",
+    "timeColumnName": "HoursSinceEpoch",
+    "timeType": "HOURS"
+  },
+  "tableIndexConfig": {
+    "aggregateMetrics": false,
+    "autoGeneratedInvertedIndex": false,
+    "createInvertedIndexDuringSegmentGeneration": false,
+    "enableDefaultStarTree": false,
+    "enableDynamicStarTreeCreation": false,
+    "loadMode": "MMAP",
+    "noDictionaryColumns": ["textDim1"],
+    "nullHandlingEnabled": false,
+    "sortedColumn": [],
+    "streamConfigs": {}
+  },
+  "tableName": "FeatureTest1_OFFLINE",
+  "tableType": "OFFLINE",
+  "tenants": {
+    "broker": "DefaultTenant",
+    "server": "DefaultTenant"
+  }
+}
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/feature-test-2-realtime-stream-config.json
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/feature-test-2-realtime-stream-config.json
new file mode 100644
index 0000000000..441ce201a9
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/feature-test-2-realtime-stream-config.json
@@ -0,0 +1,8 @@
+{
+  "streamType": "kafka",
+  "stream.kafka.consumer.type": "simple",
+  "topicName": "PinotRealtimeFeatureTest2Event",
+  "partitionColumn": "longDimSV1",
+  "numPartitions": "1",
+  "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
+}
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/feature-test-2-realtime.json
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/feature-test-2-realtime.json
new file mode 100644
index 0000000000..6df335e488
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/feature-test-2-realtime.json
@@ -0,0 +1,61 @@
+{
+  "fieldConfigList": [
+    {
+      "encodingType": "RAW",
+      "indexType": "TEXT",
+      "name": "textDim1",
+      "properties": {
+        "deriveNumDocsPerChunkForRawIndex": "true",
+        "rawIndexWriterVersion": "3"
+      }
+    }
+  ],
+  "metadata": {
+    "customConfigs": {
+      "d2Name": ""
+    }
+  },
+  "segmentsConfig": {
+    "replicasPerPartition": "1",
+    "replication": "1",
+    "retentionTimeUnit": "",
+    "retentionTimeValue": "",
+    "schemaName": "FeatureTest2",
+    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+    "segmentPushFrequency": "daily",
+    "segmentPushType": "APPEND",
+    "timeColumnName": "HoursSinceEpoch",
+    "timeType": "HOURS"
+  },
+  "tableIndexConfig": {
+    "aggregateMetrics": false,
+    "autoGeneratedInvertedIndex": false,
+    "bloomFilterColumns": [],
+    "createInvertedIndexDuringSegmentGeneration": false,
+    "enableDefaultStarTree": false,
+    "enableDynamicStarTreeCreation": false,
+    "loadMode": "MMAP",
+    "noDictionaryColumns": [],
+    "nullHandlingEnabled": false,
+    "segmentFormatVersion": "v3",
+    "sortedColumn": [],
+    "streamConfigs": {
+      "realtime.segment.flush.threshold.size": "63",
+      "realtime.segment.flush.threshold.time": "1h",
+      "streamType": "kafka",
+      "stream.kafka.topic.name": "PinotRealtimeFeatureTest2Event",
+      "stream.kafka.consumer.type": "simple",
+      "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+      "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
+      "stream.kafka.broker.list": "localhost:19092",
+      "stream.kafka.zk.broker.url": "localhost:2181/kafka",
+      "stream.kafka.consumer.prop.auto.offset.reset": "largest"
+    }
+  },
+  "tableName": "FeatureTest2",
+  "tableType": "REALTIME",
+  "tenants": {
+    "broker": "DefaultTenant",
+    "server": "DefaultTenant"
+  }
+}
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/queries/feature-test-multi-stage.queries
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/queries/feature-test-multi-stage.queries
new file mode 100644
index 0000000000..00d0c6cb8a
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/queries/feature-test-multi-stage.queries
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Joins
+SELECT COUNT(*) FROM FeatureTest1 ft1 INNER JOIN FeatureTest2 ft2 ON 
ft1.stringDimSV1 = ft2.stringDimSV1 WHERE ft1.generationNumber = 
__GENERATION_NUMBER__ AND ft2.generationNumber = __GENERATION_NUMBER__
+SELECT ft1.stringDimSV1, COUNT(ft1.stringDimSV1) FROM FeatureTest1 ft1 INNER 
JOIN FeatureTest2 ft2 ON ft1.stringDimSV1 = ft2.stringDimSV1 WHERE 
ft1.generationNumber = __GENERATION_NUMBER__ AND ft2.generationNumber = 
__GENERATION_NUMBER__ GROUP BY ft1.stringDimSV1
+SELECT ft1.stringDimSV2, SUM(ft1.floatMetric1) FROM FeatureTest1 ft1 INNER 
JOIN FeatureTest2 ft2 ON ft1.stringDimSV1 = ft2.stringDimSV1 WHERE 
ft1.generationNumber = __GENERATION_NUMBER__ AND ft2.generationNumber = 
__GENERATION_NUMBER__ GROUP BY ft1.stringDimSV2
+SELECT ft1.stringDimSV1 FROM FeatureTest1 ft1 WHERE ft1.generationNumber = 
__GENERATION_NUMBER__ AND EXISTS (SELECT 1 FROM FeatureTest2 ft2 WHERE 
ft2.generationNumber = __GENERATION_NUMBER__ AND ft2.stringDimSV2 = 
ft1.stringDimSV1)
+
+# Set operations
+SELECT * FROM (SELECT stringDimSV1 FROM FeatureTest1 WHERE generationNumber = 
__GENERATION_NUMBER__) INTERSECT (SELECT stringDimSV1 FROM FeatureTest2 WHERE 
generationNumber = __GENERATION_NUMBER__)
+SELECT * FROM (SELECT stringDimSV1 FROM FeatureTest1 WHERE generationNumber = 
__GENERATION_NUMBER__) UNION (SELECT stringDimSV1 FROM FeatureTest2 WHERE 
generationNumber = __GENERATION_NUMBER__)
+
+# Windows
+SELECT stringDimSV1, longMetric1, SUM(longMetric1) OVER (PARTITION BY 
stringDimSV1) FROM FeatureTest1 WHERE generationNumber = __GENERATION_NUMBER__
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/config/query-results/feature-test-multi-stage.results
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/query-results/feature-test-multi-stage.results
new file mode 100644
index 0000000000..b073dd9e1c
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/config/query-results/feature-test-multi-stage.results
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Joins
+{"resultTable":{"dataSchema":{"columnNames":["EXPR$0"],"columnDataTypes":["LONG"]},"rows":[[130]]},"requestId":"11345778000000000","stageStats":{"type":"MAILBOX_RECEIVE","executionTimeMs":36,"emittedRows":1,"fanIn":1,"rawMessages":2,"deserializedBytes":714,"upstreamWaitMs":40,"children":[{"type":"MAILBOX_SEND","executionTimeMs":36,"emittedRows":1,"stage":1,"parallelism":1,"fanOut":1,"rawMessages":2,"serializedBytes":99,"serializationTimeMs":2,"children":[{"type":"AGGREGATE","executionTim
 [...]
+{"resultTable":{"dataSchema":{"columnNames":["stringDimSV1","EXPR$1"],"columnDataTypes":["STRING","LONG"]},"rows":[["s1-4",7],["s1-6",54],["s1-2",28],["s1-0",28],["s1-5",7],["s1-7",6]]},"requestId":"11345778000000001","stageStats":{"type":"MAILBOX_RECEIVE","executionTimeMs":3,"emittedRows":6,"fanIn":1,"rawMessages":2,"deserializedBytes":824,"upstreamWaitMs":3,"children":[{"type":"MAILBOX_SEND","executionTimeMs":2,"emittedRows":6,"stage":1,"parallelism":1,"fanOut":1,"rawMessages":2,"seria
 [...]
+{"resultTable":{"dataSchema":{"columnNames":["stringDimSV2","EXPR$1"],"columnDataTypes":["STRING","DOUBLE"]},"rows":[["s2-6",14153.4],["s2-2",618.8],["s2-0",338.8],["s2-4",168.7],["s2-7",1572.6],["null",0.0]]},"requestId":"11345778000000002","stageStats":{"type":"MAILBOX_RECEIVE","executionTimeMs":2,"emittedRows":6,"fanIn":1,"rawMessages":2,"deserializedBytes":826,"upstreamWaitMs":3,"children":[{"type":"MAILBOX_SEND","executionTimeMs":4,"emittedRows":6,"stage":1,"parallelism":1,"fanOut":
 [...]
+{"resultTable":{"dataSchema":{"columnNames":["stringDimSV1"],"columnDataTypes":["STRING"]},"rows":[]},"requestId":"11345778000000003","stageStats":{"type":"MAILBOX_RECEIVE","fanIn":1,"rawMessages":1,"deserializedBytes":132,"children":[{"type":"MAILBOX_SEND","stage":1,"parallelism":1,"fanOut":1,"rawMessages":1,"children":[{"type":"LEAF","table":"FeatureTest1","numSegmentsQueried":1,"totalDocs":10,"numSegmentsPrunedByServer":1}]}]},"brokerId":"Broker_192.168.29.25_8099","exceptions":[],"nu
 [...]
+
+# Set operations
+{"resultTable":{"dataSchema":{"columnNames":["stringDimSV1"],"columnDataTypes":["STRING"]},"rows":[["s1-0"],["s1-2"],["s1-4"],["s1-5"],["s1-6"],["s1-7"]]},"requestId":"11345778000000004","stageStats":{"type":"MAILBOX_RECEIVE","executionTimeMs":1,"emittedRows":6,"fanIn":1,"rawMessages":2,"deserializedBytes":628,"upstreamWaitMs":2,"children":[{"type":"MAILBOX_SEND","executionTimeMs":1,"emittedRows":6,"stage":1,"parallelism":1,"fanOut":1,"rawMessages":2,"serializedBytes":174,"children":[{"c
 [...]
+{"resultTable":{"dataSchema":{"columnNames":["stringDimSV1"],"columnDataTypes":["STRING"]},"rows":[["s1-4"],["s1-6"],["s1-2"],["s1-0"],["s1-5"],["s1-7"]]},"requestId":"11345778000000006","stageStats":{"type":"MAILBOX_RECEIVE","emittedRows":6,"fanIn":1,"rawMessages":2,"deserializedBytes":744,"upstreamWaitMs":1,"children":[{"type":"MAILBOX_SEND","executionTimeMs":2,"emittedRows":6,"stage":1,"parallelism":1,"fanOut":1,"rawMessages":2,"serializedBytes":174,"children":[{"type":"AGGREGATE","ex
 [...]
+
+# Windows
+{"resultTable":{"dataSchema":{"columnNames":["stringDimSV1","longMetric1","EXPR$2"],"columnDataTypes":["STRING","LONG","LONG"]},"rows":[["s1-5",24,24],["s1-4",24,24],["s1-7",251,251],["s1-6",251,753],["s1-6",251,753],["s1-6",251,753],["s1-0",11,22],["s1-0",11,22],["s1-2",21,42],["s1-2",21,42]]},"partialResult":false,"exceptions":[],"numGroupsLimitReached":false,"maxRowsInJoinReached":false,"timeUsedMs":34,"stageStats":{"type":"MAILBOX_RECEIVE","executionTimeMs":3,"emittedRows":10,"fanIn"
 [...]
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/post-broker-rollback.yaml
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/post-broker-rollback.yaml
new file mode 100644
index 0000000000..3e927386b1
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/post-broker-rollback.yaml
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Operations to be done.
+description: Operations to be run after broker rollback
+operations:
+  - type: segmentOp
+    description: Add segment FeatureTest1_Segment6 to table FeatureTest1
+    op: UPLOAD
+    inputDataFileName: data/FeatureTest1-data-00.csv
+    schemaFileName: FeatureTest1-schema.json
+    tableConfigFileName: feature-test-1.json
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    segmentName: FeatureTest1_Segment6
+  - type: streamOp
+    description: publish rows to PinotRealtimeFeatureTest2Event
+    op: PRODUCE
+    streamConfigFileName: feature-test-2-realtime-stream-config.json
+    numRows: 66
+    inputDataFileName: data/FeatureTest2-data-realtime-00.csv
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    tableConfigFileName: feature-test-2-realtime.json
+  - type: queryOp
+    description: Run multi-stage queries on FeatureTest1 and FeatureTest2 
using SQL
+    useMultiStageQueryEngine: true
+    queryFileName: queries/feature-test-multi-stage.queries
+    expectedResultsFileName: query-results/feature-test-multi-stage.results
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/post-controller-rollback.yaml
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/post-controller-rollback.yaml
new file mode 100644
index 0000000000..c3bcbf631c
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/post-controller-rollback.yaml
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Operations to be done.
+description: Operations to be run after controller rollback
+operations:
+  - type: segmentOp
+    description: Add segment FeatureTest1_Segment7 to table FeatureTest1
+    op: UPLOAD
+    inputDataFileName: data/FeatureTest1-data-00.csv
+    schemaFileName: FeatureTest1-schema.json
+    tableConfigFileName: feature-test-1.json
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    segmentName: FeatureTest1_Segment7
+  - type: streamOp
+    description: publish rows to PinotRealtimeFeatureTest2Event
+    op: PRODUCE
+    streamConfigFileName: feature-test-2-realtime-stream-config.json
+    numRows: 66
+    inputDataFileName: data/FeatureTest2-data-realtime-00.csv
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    tableConfigFileName: feature-test-2-realtime.json
+  - type: queryOp
+    description: Run multi-stage queries on FeatureTest1 and FeatureTest2 
using SQL
+    useMultiStageQueryEngine: true
+    queryFileName: queries/feature-test-multi-stage.queries
+    expectedResultsFileName: query-results/feature-test-multi-stage.results
+  - type: segmentOp
+    description: Delete segment FeatureTest1_Segment
+    op: DELETE
+    tableConfigFileName: feature-test-1.json
+    segmentName: FeatureTest1_Segment
+  - type: tableOp
+    description: Delete table feature-test-1.json
+    op: DELETE
+    schemaFileName: FeatureTest1-schema.json
+    tableConfigFileName: feature-test-1.json
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/post-server-rollback.yaml
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/post-server-rollback.yaml
new file mode 100644
index 0000000000..c31a41294b
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/post-server-rollback.yaml
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Operations to be done.
+description: Operations to be run after server rollback
+operations:
+  - type: segmentOp
+    description: Add segment FeatureTest1_Segment5 to table FeatureTest1
+    op: UPLOAD
+    inputDataFileName: data/FeatureTest1-data-00.csv
+    schemaFileName: FeatureTest1-schema.json
+    tableConfigFileName: feature-test-1.json
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    segmentName: FeatureTest1_Segment5
+  - type: streamOp
+    description: publish rows to PinotRealtimeFeatureTest2Event
+    op: PRODUCE
+    streamConfigFileName: feature-test-2-realtime-stream-config.json
+    numRows: 66
+    inputDataFileName: data/FeatureTest2-data-realtime-00.csv
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    tableConfigFileName: feature-test-2-realtime.json
+  - type: queryOp
+    description: Run multi-stage queries on FeatureTest1 and FeatureTest2 
using SQL
+    useMultiStageQueryEngine: true
+    queryFileName: queries/feature-test-multi-stage.queries
+    expectedResultsFileName: query-results/feature-test-multi-stage.results
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/post-server-upgrade.yaml
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/post-server-upgrade.yaml
new file mode 100644
index 0000000000..786cb3fb39
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/post-server-upgrade.yaml
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Operations to be done.
+description: Operations to be run after server upgrade
+operations:
+  - type: segmentOp
+    description: Add segment FeatureTest1_Segment4 to table FeatureTest1
+    op: UPLOAD
+    inputDataFileName: data/FeatureTest1-data-00.csv
+    schemaFileName: FeatureTest1-schema.json
+    tableConfigFileName: feature-test-1.json
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    segmentName: FeatureTest1_Segment4
+  - type: streamOp
+    description: publish rows to PinotRealtimeFeatureTest2Event
+    op: PRODUCE
+    streamConfigFileName: feature-test-2-realtime-stream-config.json
+    numRows: 66
+    inputDataFileName: data/FeatureTest2-data-realtime-00.csv
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    tableConfigFileName: feature-test-2-realtime.json
+  - type: queryOp
+    description: Run multi-stage queries on FeatureTest1 and FeatureTest2 
using SQL
+    useMultiStageQueryEngine: true
+    queryFileName: queries/feature-test-multi-stage.queries
+    expectedResultsFileName: query-results/feature-test-multi-stage.results
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/pre-broker-upgrade.yaml
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/pre-broker-upgrade.yaml
new file mode 100644
index 0000000000..4e255b8cf8
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/pre-broker-upgrade.yaml
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Operations to be done.
+description: Operations to be run before Broker upgrade
+operations:
+  - type: segmentOp
+    description: Add segment FeatureTest1_Segment2 to table FeatureTest1
+    op: UPLOAD
+    inputDataFileName: data/FeatureTest1-data-00.csv
+    schemaFileName: FeatureTest1-schema.json
+    tableConfigFileName: feature-test-1.json
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    segmentName: FeatureTest1_Segment2
+  - type: streamOp
+    description: publish rows to PinotRealtimeFeatureTest2Event
+    op: PRODUCE
+    streamConfigFileName: feature-test-2-realtime-stream-config.json
+    numRows: 66
+    inputDataFileName: data/FeatureTest2-data-realtime-00.csv
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    tableConfigFileName: feature-test-2-realtime.json
+  - type: queryOp
+    description: Run multi-stage queries on FeatureTest1 and FeatureTest2 
using SQL
+    useMultiStageQueryEngine: true
+    queryFileName: queries/feature-test-multi-stage.queries
+    expectedResultsFileName: query-results/feature-test-multi-stage.results
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/pre-controller-upgrade.yaml
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/pre-controller-upgrade.yaml
new file mode 100644
index 0000000000..39f8bcacdf
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/pre-controller-upgrade.yaml
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Operations to be done.
+description: Operations to be run before Controller upgrade
+operations:
+  - type: streamOp
+    description: create Kafka topic PinotRealtimeFeatureTest2Event
+    op: CREATE
+    streamConfigFileName: feature-test-2-realtime-stream-config.json
+  - type: tableOp
+    description: Create realtime table FeatureTest2
+    op: CREATE
+    schemaFileName: FeatureTest2-schema.json
+    tableConfigFileName: feature-test-2-realtime.json
+    recordReaderConfigFileName: data/recordReaderConfig.json
+  - type: tableOp
+    description: Create offline table FeatureTest1
+    op: CREATE
+    schemaFileName: FeatureTest1-schema.json
+    tableConfigFileName: feature-test-1.json
+  - type: segmentOp
+    description: Add segment FeatureTest1_Segment to table FeatureTest1
+    op: UPLOAD
+    inputDataFileName: data/FeatureTest1-data-00.csv
+    schemaFileName: FeatureTest1-schema.json
+    tableConfigFileName: feature-test-1.json
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    segmentName: FeatureTest1_Segment
+  - type: streamOp
+    description: publish rows to PinotRealtimeFeatureTest2Event
+    op: PRODUCE
+    streamConfigFileName: feature-test-2-realtime-stream-config.json
+    numRows: 66
+    inputDataFileName: data/FeatureTest2-data-realtime-00.csv
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    tableConfigFileName: feature-test-2-realtime.json
+  - type: queryOp
+    description: Run multi-stage queries on FeatureTest1 and FeatureTest2 
using SQL
+    useMultiStageQueryEngine: true
+    queryFileName: queries/feature-test-multi-stage.queries
+    expectedResultsFileName: query-results/feature-test-multi-stage.results
diff --git 
a/compatibility-verifier/multi-stage-query-engine-test-suite/pre-server-upgrade.yaml
 
b/compatibility-verifier/multi-stage-query-engine-test-suite/pre-server-upgrade.yaml
new file mode 100644
index 0000000000..167f71fadf
--- /dev/null
+++ 
b/compatibility-verifier/multi-stage-query-engine-test-suite/pre-server-upgrade.yaml
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Operations to be done.
+description: Operations to be run before server upgrade
+operations:
+  - type: segmentOp
+    description: Add segment FeatureTest1_Segment3 to table FeatureTest1
+    op: UPLOAD
+    inputDataFileName: data/FeatureTest1-data-00.csv
+    schemaFileName: FeatureTest1-schema.json
+    tableConfigFileName: feature-test-1.json
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    segmentName: FeatureTest1_Segment3
+  - type: streamOp
+    description: publish rows to PinotRealtimeFeatureTest2Event
+    op: PRODUCE
+    streamConfigFileName: feature-test-2-realtime-stream-config.json
+    numRows: 66
+    inputDataFileName: data/FeatureTest2-data-realtime-00.csv
+    recordReaderConfigFileName: data/recordReaderConfig.json
+    tableConfigFileName: feature-test-2-realtime.json
+  - type: queryOp
+    description: Run multi-stage queries on FeatureTest1 and FeatureTest2 
using SQL
+    useMultiStageQueryEngine: true
+    queryFileName: queries/feature-test-multi-stage.queries
+    expectedResultsFileName: query-results/feature-test-multi-stage.results
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SqlResultComparator.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SqlResultComparator.java
index 3120eaea30..fac24a931a 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SqlResultComparator.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SqlResultComparator.java
@@ -72,6 +72,9 @@ public class SqlResultComparator {
   private static final String FIELD_NUM_ENTRIES_SCANNED_IN_FILTER = 
"numEntriesScannedInFilter";
   private static final String FIELD_NUM_ENTRIES_SCANNED_POST_FILTER = 
"numEntriesScannedPostFilter";
   private static final String FIELD_NUM_GROUPS_LIMIT_REACHED = 
"numGroupsLimitReached";
+  private static final String FIELD_MULTI_STAGE_STATS = "stageStats";
+  private static final String FIELD_MULTI_STAGE_STATS_TYPE = "type";
+  private static final String FIELD_MULTI_STAGE_STATS_CHILDREN = "children";
 
   private static final String FIELD_TYPE_INT = "INT";
   private static final String FIELD_TYPE_LONG = "LONG";
@@ -163,6 +166,47 @@ public class SqlResultComparator {
     return areResultsEqual;
   }
 
+  public static boolean areMultiStageQueriesEqual(JsonNode actual, JsonNode 
expected, String query)
+      throws IOException {
+    if (hasExceptions(actual)) {
+      return false;
+    }
+
+    if (areEmpty(actual, expected)) {
+      return true;
+    }
+
+    if (!areDataSchemaEqual(actual, expected)) {
+      return false;
+    }
+
+    ArrayNode actualRows = (ArrayNode) 
actual.get(FIELD_RESULT_TABLE).get(FIELD_ROWS);
+    ArrayNode expectedRows = (ArrayNode) 
expected.get(FIELD_RESULT_TABLE).get(FIELD_ROWS);
+    ArrayNode columnDataTypes = (ArrayNode) 
expected.get(FIELD_RESULT_TABLE).get(FIELD_DATA_SCHEMA).
+        get(FIELD_COLUMN_DATA_TYPES);
+
+    convertNumbersToString(expectedRows, columnDataTypes);
+    convertNumbersToString(actualRows, columnDataTypes);
+
+    List<String> actualElementsSerialized = new ArrayList<>();
+    List<String> expectedElementsSerialized = new ArrayList<>();
+    for (int i = 0; i < actualRows.size(); i++) {
+      actualElementsSerialized.add(actualRows.get(i).toString());
+    }
+    for (int i = 0; i < expectedRows.size(); i++) {
+      expectedElementsSerialized.add(expectedRows.get(i).toString());
+    }
+
+    if (!areLengthsEqual(actual, expected)) {
+      return false;
+    }
+
+    // For now, just directly compare elements in result set
+    return areNonOrderByQueryElementsEqual(actualElementsSerialized, 
expectedElementsSerialized);
+
+    // Not verifying stage stats for now since they're still subject to change 
across versions
+  }
+
   private static boolean areOrderByQueryElementsEqual(ArrayNode 
actualElements, ArrayNode expectedElements,
       List<String> actualElementsSerialized, List<String> 
expectedElementsSerialized, String query) {
     // Happy path, the results match exactly.
diff --git 
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/QueryOp.java
 
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/QueryOp.java
index 931ad734d5..4b5836e448 100644
--- 
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/QueryOp.java
+++ 
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/QueryOp.java
@@ -47,6 +47,7 @@ public class QueryOp extends BaseOp {
   private static final String COMMENT_DELIMITER = "#";
   private String _queryFileName;
   private String _expectedResultsFileName;
+  private boolean _useMultiStageQueryEngine = false;
 
   public QueryOp() {
     super(OpType.QUERY_OP);
@@ -73,6 +74,14 @@ public class QueryOp extends BaseOp {
     _expectedResultsFileName = expectedResultsFileName;
   }
 
+  public boolean getUseMultiStageQueryEngine() {
+    return _useMultiStageQueryEngine;
+  }
+
+  public void setUseMultiStageQueryEngine(boolean useMultiStageQueryEngine) {
+    _useMultiStageQueryEngine = useMultiStageQueryEngine;
+  }
+
   @Override
   boolean runOp(int generationNumber) {
     System.out.println("Verifying queries in " + _queryFileName + " against 
results in " + _expectedResultsFileName);
@@ -125,7 +134,9 @@ public class QueryOp extends BaseOp {
         JsonNode actualJson = null;
         if (expectedJson != null) {
           try {
-            actualJson = Utils.postSqlQuery(query, 
ClusterDescriptor.getInstance().getBrokerUrl());
+            actualJson = _useMultiStageQueryEngine
+                ? Utils.postMultiStageSqlQuery(query, 
ClusterDescriptor.getInstance().getBrokerUrl())
+                : Utils.postSqlQuery(query, 
ClusterDescriptor.getInstance().getBrokerUrl());
           } catch (Exception e) {
             LOGGER.error("Comparison FAILED: Line: {} Exception caught while 
running query: '{}', explain plan: {}",
                 queryLineNum, query, getExplainPlan(query), e);
@@ -134,7 +145,9 @@ public class QueryOp extends BaseOp {
 
         if (expectedJson != null && actualJson != null) {
           try {
-            boolean passed = SqlResultComparator.areEqual(actualJson, 
expectedJson, query);
+            boolean passed = _useMultiStageQueryEngine
+                ? SqlResultComparator.areMultiStageQueriesEqual(actualJson, 
expectedJson, query)
+                : SqlResultComparator.areEqual(actualJson, expectedJson, 
query);
             if (passed) {
               succeededQueryCount++;
               LOGGER.debug("Comparison PASSED: Line: {}, query: '{}', actual 
response: {}, expected response: {}",
@@ -163,11 +176,18 @@ public class QueryOp extends BaseOp {
     return testPassed;
   }
 
-  private static String getExplainPlan(String query) {
+  private String getExplainPlan(String query) {
     try {
-      JsonNode explainPlanResponse =
-          Utils.postSqlQuery("explain plan for " + query, 
ClusterDescriptor.getInstance().getBrokerUrl());
-      return ExplainPlanUtils.formatExplainPlan(explainPlanResponse);
+      if (!_useMultiStageQueryEngine) {
+        JsonNode explainPlanResponse =
+            Utils.postSqlQuery("explain plan for " + query, 
ClusterDescriptor.getInstance().getBrokerUrl());
+        return ExplainPlanUtils.formatExplainPlan(explainPlanResponse);
+      } else {
+        JsonNode explainPlanResponse =
+            Utils.postMultiStageSqlQuery("explain plan for " + query,
+                ClusterDescriptor.getInstance().getBrokerUrl());
+        return 
ExplainPlanUtils.formatMultiStageExplainPlan(explainPlanResponse);
+      }
     } catch (Throwable error) {
       return error.getMessage();
     }
diff --git 
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/Utils.java 
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/Utils.java
index 86d85078ff..f54b8399cb 100644
--- 
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/Utils.java
+++ 
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/Utils.java
@@ -59,4 +59,13 @@ public class Utils {
     return JsonUtils.stringToJsonNode(
         ControllerTest.sendPostRequest(brokerBaseApiUrl + "/query/sql", 
payload.toString()));
   }
+
+  public static JsonNode postMultiStageSqlQuery(String query, String 
brokerBaseApiUrl) throws Exception {
+    ObjectNode payload = JsonUtils.newObjectNode();
+    payload.put("sql", query);
+    payload.put("queryOptions", "useMultistageEngine=true");
+
+    return JsonUtils.stringToJsonNode(
+        ControllerTest.sendPostRequest(brokerBaseApiUrl + "/query/sql", 
payload.toString()));
+  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/ExplainPlanUtils.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/ExplainPlanUtils.java
index 74da8b45a5..03c9f3b400 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/ExplainPlanUtils.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/ExplainPlanUtils.java
@@ -64,4 +64,9 @@ public class ExplainPlanUtils {
     }
     return explainPlan.toString();
   }
+
+  public static String formatMultiStageExplainPlan(JsonNode explainPlanJson) {
+    JsonNode rows = explainPlanJson.get("resultTable").get("rows");
+    return rows.get(0).get(1).asText();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to