This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new be496bc763 Multi stage int tests (#11404)
be496bc763 is described below

commit be496bc763ac94660098c3a9e1f4a75167731789
Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com>
AuthorDate: Thu Sep 7 02:44:12 2023 +0200

    Multi stage int tests (#11404)
    
    * Run OfflineClusterIntegrationTest test with both query engines.
    
    * Improved some tests in V2
    
    * Improve some tests in OfflineClusterIntegrationTest
    
    * Run more integration test with multi stage engine
    
    * Run OfflineClusterIntegrationTest test with both query engines.
    
    * Improved some tests in V2
    
    * Fix MultiStageEngineIntegrationTest
    
    * Apply some changes in BaseClusterIntegrationTestSet
    
    * Extra improvements in OfflineClusterIntegrationTest
    
    * Adapted error code expected on a specific query
    
    * Fix or skip all tests in OfflineClusterIntegrationTest
    
    * Skip all tests that fail in BaseRealtimeClusterIntegrationTest
    
    * Skip tests that doesn't work in V2 using a specific function
    
    * Fix MultiStageEngineIntegrationTest to run with the correct transport
    
    * Add a TODO
    
    * Fix several error codes
    
    * Enable one now supported V2 tests
    
    ---------
    
    Co-authored-by: Xiang Fu <xiangfu.1...@gmail.com>
---
 .../org/apache/pinot/client/ConnectionFactory.java |   1 +
 .../tests/BaseClusterIntegrationTest.java          |  31 +-
 .../pinot/integration/tests/ClusterTest.java       |   7 +
 .../AggregateMetricsClusterIntegrationTest.java    |   5 +-
 .../tests/BaseClusterIntegrationTestSet.java       |   6 +
 .../tests/BaseRealtimeClusterIntegrationTest.java  |  34 +-
 .../tests/GrpcBrokerClusterIntegrationTest.java    |   5 +-
 .../tests/HybridClusterIntegrationTest.java        |  56 +-
 .../IngestionConfigHybridIntegrationTest.java      |   5 +-
 .../tests/LLCRealtimeClusterIntegrationTest.java   |   6 +-
 .../MultiNodesOfflineClusterIntegrationTest.java   |   9 +-
 .../tests/MultiStageEngineIntegrationTest.java     |  49 +-
 .../tests/NullHandlingIntegrationTest.java         |  76 +--
 .../tests/OfflineClusterIntegrationTest.java       | 576 ++++++++++++++++-----
 ...flineClusterMemBasedServerQueryKillingTest.java |  18 +-
 ...fflineClusterServerCPUTimeQueryKillingTest.java |   6 +-
 ...OfflineGRPCServerMultiStageIntegrationTest.java |  29 ++
 .../tests/StarTreeClusterIntegrationTest.java      |  10 +-
 .../integration/tests/custom/TextIndicesTest.java  |  10 +-
 19 files changed, 691 insertions(+), 248 deletions(-)

diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
index 1ed65dd2eb..45a92be48b 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
@@ -226,6 +226,7 @@ public class ConnectionFactory {
   }
 
   private static PinotClientTransport getDefault(Properties 
connectionProperties) {
+    // TODO: This code incorrectly assumes that connection properties are 
always the same
     if (_defaultTransport == null) {
       synchronized (ConnectionFactory.class) {
         if (_defaultTransport == null) {
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index fc50b2d931..6b6cc9202c 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.integration.tests;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -32,9 +33,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.function.Function;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.ConnectionFactory;
+import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
 import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
@@ -509,17 +512,24 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
    * @return Pinot connection
    */
   protected org.apache.pinot.client.Connection getPinotConnection() {
+    // TODO: This code is assuming getPinotConnectionProperties() will always 
return the same values
     if (useMultiStageQueryEngine()) {
       if (_pinotConnectionV2 == null) {
         Properties properties = getPinotConnectionProperties();
         properties.put("useMultiStageEngine", "true");
-        _pinotConnectionV2 = ConnectionFactory.fromZookeeper(properties, 
getZkUrl() + "/" + getHelixClusterName());
+        _pinotConnectionV2 = ConnectionFactory.fromZookeeper(getZkUrl() + "/" 
+ getHelixClusterName(),
+            new JsonAsyncHttpPinotClientTransportFactory()
+                .withConnectionProperties(properties)
+                .buildTransport());
       }
       return _pinotConnectionV2;
     }
     if (_pinotConnection == null) {
       _pinotConnection =
-          ConnectionFactory.fromZookeeper(getPinotConnectionProperties(), 
getZkUrl() + "/" + getHelixClusterName());
+          ConnectionFactory.fromZookeeper(getZkUrl() + "/" + 
getHelixClusterName(),
+              new JsonAsyncHttpPinotClientTransportFactory()
+                  .withConnectionProperties(getPinotConnectionProperties())
+                  .buildTransport());
     }
     return _pinotConnection;
   }
@@ -789,4 +799,21 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
     ClusterIntegrationTestUtils.testQueryWithMatchingRowCount(pinotQuery, 
getBrokerBaseApiUrl(), getPinotConnection(),
         h2Query, getH2Connection(), null, getExtraQueryProperties(), 
useMultiStageQueryEngine());
   }
+
+  protected String getType(JsonNode jsonNode, int colIndex) {
+    return jsonNode.get("resultTable")
+        .get("dataSchema")
+        .get("columnDataTypes")
+        .get(colIndex)
+        .asText();
+  }
+
+  protected <T> T getCellValue(JsonNode jsonNode, int colIndex, int rowIndex, 
Function<JsonNode, T> extract) {
+    JsonNode cellResult = 
jsonNode.get("resultTable").get("rows").get(rowIndex).get(colIndex);
+    return extract.apply(cellResult);
+  }
+
+  protected long getLongCellValue(JsonNode jsonNode, int colIndex, int 
rowIndex) {
+    return getCellValue(jsonNode, colIndex, rowIndex, 
JsonNode::asLong).longValue();
+  }
 }
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 9845318f30..9b41232f9b 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -78,6 +78,7 @@ import org.apache.pinot.spi.utils.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
+import org.testng.SkipException;
 import org.testng.annotations.DataProvider;
 
 import static 
org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getBrokerQueryApiUrl;
@@ -572,4 +573,10 @@ public abstract class ClusterTest extends ControllerTest {
         {true}
     };
   }
+
+  protected void notSupportedInV2() {
+    if (useMultiStageQueryEngine()) {
+      throw new SkipException("Some queries fail when using multi-stage 
engine");
+    }
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
index 5fd41d9d95..7ad116a8c7 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
@@ -99,9 +99,10 @@ public class AggregateMetricsClusterIntegrationTest extends 
BaseClusterIntegrati
     }, 100L, timeoutMs, "Failed to load all documents");
   }
 
-  @Test
-  public void testQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueries(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query = "SELECT SUM(AirTime), SUM(ArrDelay) FROM mytable";
     testQuery(query);
     query = "SELECT SUM(AirTime), DaysSinceEpoch FROM mytable GROUP BY 
DaysSinceEpoch ORDER BY SUM(AirTime) DESC";
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 5b07d1cc37..f757ac54de 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -67,6 +68,11 @@ public abstract class BaseClusterIntegrationTestSet extends 
BaseClusterIntegrati
       "On_Time_On_Time_Performance_2014_100k_subset.test_queries_200.sql";
   private static final int DEFAULT_NUM_QUERIES_TO_GENERATE = 100;
 
+  @BeforeMethod
+  public void resetMultiStage() {
+    setUseMultiStageQueryEngine(false);
+  }
+
   /**
    * Can be overridden to change default setting
    */
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
index 83adb4dfe9..af39f3280e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
@@ -106,9 +106,10 @@ public abstract class BaseRealtimeClusterIntegrationTest 
extends BaseClusterInte
    * to ensure the right result is computed, wherein dictionary is not read if 
it is mutable
    * @throws Exception
    */
-  @Test
-  public void testDictionaryBasedQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDictionaryBasedQueries(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
 
     // Dictionary columns
     // int
@@ -142,30 +143,35 @@ public abstract class BaseRealtimeClusterIntegrationTest 
extends BaseClusterInte
         String.format("SELECT MAX(%s)-MIN(%s) FROM %s", column, column, 
getTableName()));
   }
 
-  @Test
-  public void testHardcodedQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testHardcodedQueries(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     super.testHardcodedQueries();
   }
 
-  @Test
-  @Override
-  public void testQueriesFromQueryFile()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueriesFromQueryFile(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     super.testQueriesFromQueryFile();
   }
 
-  @Test
-  @Override
-  public void testGeneratedQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testGeneratedQueries(boolean useMultiStageQueryEngine)
       throws Exception {
-    testGeneratedQueries(true, false);
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
+    testGeneratedQueries(true, useMultiStageQueryEngine);
   }
 
-  @Test
-  @Override
-  public void testQueryExceptions()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueryExceptions(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     super.testQueryExceptions();
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
index a0dc29c61c..55cccc064b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
@@ -119,9 +119,10 @@ public class GrpcBrokerClusterIntegrationTest extends 
BaseClusterIntegrationTest
     createServerTenant(TENANT_NAME, 1, 1);
   }
 
-  @Test
-  public void testGrpcBrokerRequestHandlerOnSelectionOnlyQuery()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testGrpcBrokerRequestHandlerOnSelectionOnlyQuery(boolean 
useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query = "SELECT * FROM mytable LIMIT 1000000";
     testQuery(query);
     query = "SELECT * FROM mytable WHERE DaysSinceEpoch > 16312 LIMIT 
10000000";
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index f366a994eb..2b5c36bc59 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -185,9 +185,10 @@ public class HybridClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     Assert.assertNotNull(getDebugInfo("debug/routingTable/" + 
TableNameBuilder.REALTIME.tableNameWithType(tableName)));
   }
 
-  @Test
-  public void testBrokerDebugRoutingTableSQL()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testBrokerDebugRoutingTableSQL(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String tableName = getTableName();
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
     String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
@@ -198,9 +199,11 @@ public class HybridClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     Assert.assertNotNull(getDebugInfo("debug/routingTable/sql?query=" + 
encodedSQL));
   }
 
-  @Test
-  public void testQueryTracing()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueryTracing(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     JsonNode jsonNode = postQuery("SET trace = true; SELECT COUNT(*) FROM " + 
getTableName());
     
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(),
 getCountStarResult());
     Assert.assertTrue(jsonNode.get("exceptions").isEmpty());
@@ -210,9 +213,11 @@ public class HybridClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     Assert.assertTrue(traceInfo.has("localhost_R"));
   }
 
-  @Test
-  public void testQueryTracingWithLiteral()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueryTracingWithLiteral(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     JsonNode jsonNode =
         postQuery("SET trace = true; SELECT 1, \'test\', ArrDelay FROM " + 
getTableName() + " LIMIT 10");
     long countStarResult = 10;
@@ -228,9 +233,10 @@ public class HybridClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     Assert.assertTrue(traceInfo.has("localhost_R"));
   }
 
-  @Test
-  public void testDropResults()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDropResults(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     final String query = String.format("SELECT * FROM %s limit 10", 
getTableName());
     final String resultTag = "resultTable";
 
@@ -244,31 +250,35 @@ public class HybridClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     Assert.assertTrue(postQueryWithOptions(query, 
"dropResults=truee").has(resultTag));
   }
 
-  @Test
-  @Override
-  public void testHardcodedQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testHardcodedQueries(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     super.testHardcodedQueries();
   }
 
-  @Test
-  @Override
-  public void testQueriesFromQueryFile()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueriesFromQueryFile(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     super.testQueriesFromQueryFile();
   }
 
-  @Test
-  @Override
-  public void testGeneratedQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testGeneratedQueries(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     super.testGeneratedQueries();
   }
 
-  @Test
-  @Override
-  public void testQueryExceptions()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueryExceptions(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     super.testQueryExceptions();
   }
 
@@ -286,9 +296,9 @@ public class HybridClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     super.testBrokerResponseMetadata();
   }
 
-  @Test
-  @Override
-  public void testVirtualColumnQueries() {
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testVirtualColumnQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
     super.testVirtualColumnQueries();
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
index 918b15cae7..7495be1e0f 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
@@ -151,9 +151,10 @@ public class IngestionConfigHybridIntegrationTest extends 
BaseClusterIntegration
     waitForAllDocsLoaded(600_000L);
   }
 
-  @Test
-  public void testQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueries(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     // Select column created with transform function
     String sqlQuery = "Select millisSinceEpoch from " + DEFAULT_TABLE_NAME;
     JsonNode response = postQuery(sqlQuery);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 7068656865..88fe46e4e1 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -262,9 +262,11 @@ public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegr
     testReload(false);
   }
 
-  @Test
-  public void testAddRemoveDictionaryAndInvertedIndex()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testAddRemoveDictionaryAndInvertedIndex(boolean 
useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     String query = "SELECT COUNT(*) FROM myTable WHERE ActualElapsedTime = 
-9999";
     long numTotalDocs = getCountStarResult();
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
index 7c76eaaacb..f2b345a690 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
@@ -194,21 +194,19 @@ public class MultiNodesOfflineClusterIntegrationTest 
extends OfflineClusterInteg
 
   // Disabled because with multiple replicas, there is no guarantee that all 
replicas are reloaded
   @Test(enabled = false)
-  @Override
-  public void testStarTreeTriggering() {
+  public void testStarTreeTriggering(boolean useMultiStageQueryEngine) {
     // Ignored
   }
 
   // Disabled because with multiple replicas, there is no guarantee that all 
replicas are reloaded
   @Test(enabled = false)
   @Override
-  public void testDefaultColumns() {
+  public void testDefaultColumns(boolean useMultiStageQueryEngineg) {
     // Ignored
   }
 
   // Disabled because with multiple replicas, there is no guarantee that all 
replicas are reloaded
   @Test(enabled = false)
-  @Override
   public void testBloomFilterTriggering() {
     // Ignored
   }
@@ -216,7 +214,8 @@ public class MultiNodesOfflineClusterIntegrationTest 
extends OfflineClusterInteg
   // Disabled because with multiple replicas, there is no guarantee that all 
replicas are reloaded
   @Test(enabled = false)
   @Override
-  public void testRangeIndexTriggering() {
+  public void testRangeIndexTriggering(boolean useMultiStageQueryEngine)
+      throws Exception {
     // Ignored
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 4099b744c2..c18dd026ff 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.function.scalar.StringFunctions.*;
@@ -91,9 +92,15 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
       throws IOException {
   }
 
+//  @Override
+//  protected boolean useMultiStageQueryEngine() {
+//    return true;
+//  }
+
+  @BeforeMethod
   @Override
-  protected boolean useMultiStageQueryEngine() {
-    return true;
+  public void resetMultiStage() {
+    setUseMultiStageQueryEngine(true);
   }
 
   @Test
@@ -157,12 +164,21 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     double[] expectedNumericResults = new double[]{
         364, 364, 355, 364, 364, 364, 5915969, 16252.662087912087
     };
+    double[] expectedNumericResultsV1 = new double[]{
+        364, 364, 357, 364, 364, 364, 5915969, 16252.662087912087
+    };
     Assert.assertEquals(numericResultFunctions.length, 
expectedNumericResults.length);
 
     for (int i = 0; i < numericResultFunctions.length; i++) {
       String pinotQuery = String.format("SELECT %s(DaysSinceEpoch) FROM 
mytable", numericResultFunctions[i]);
       JsonNode jsonNode = postQuery(pinotQuery);
-      
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble(),
 expectedNumericResults[i]);
+      if (useMultiStageQueryEngine) {
+        
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble(),
+            expectedNumericResults[i]);
+      } else {
+        
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble(),
+            expectedNumericResultsV1[i]);
+      }
     }
 
     String[] binaryResultFunctions = new String[]{
@@ -172,14 +188,21 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
         360,
         3904
     };
+    int[] expectedBinarySizeResultsV1 = new int[]{
+        5480,
+        3904
+    };
     for (int i = 0; i < binaryResultFunctions.length; i++) {
       String pinotQuery = String.format("SELECT %s(DaysSinceEpoch) FROM 
mytable", binaryResultFunctions[i]);
       JsonNode jsonNode = postQuery(pinotQuery);
-      
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asText().length(),
-          expectedBinarySizeResults[i]);
+      if (useMultiStageQueryEngine) {
+        
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asText().length(),
+            expectedBinarySizeResults[i]);
+      } else {
+        
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asText().length(),
+            expectedBinarySizeResultsV1[i]);
+      }
     }
-
-    setUseMultiStageQueryEngine(true);
   }
 
   @Test(dataProvider = "useBothQueryEngines")
@@ -195,13 +218,21 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
         -5.421344202E9, 577725, -9999.0, 16271.0, -9383.95292223809, 26270.0, 
312, 312, 328, 3954484.0,
         12674.628205128205
     };
+    double[] expectedResultsV1 = new double[]{
+        -5.421344202E9, 577725, -9999.0, 16271.0, -9383.95292223809, 26270.0, 
312, 312, 312, 3954484.0,
+        12674.628205128205
+    };
 
     Assert.assertEquals(multiValueFunctions.length, expectedResults.length);
 
     for (int i = 0; i < multiValueFunctions.length; i++) {
       String pinotQuery = String.format("SELECT %s(DivAirportIDs) FROM 
mytable", multiValueFunctions[i]);
       JsonNode jsonNode = postQuery(pinotQuery);
-      
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble(),
 expectedResults[i]);
+      if (useMultiStageQueryEngine) {
+        
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble(),
 expectedResults[i]);
+      } else {
+        
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble(),
 expectedResultsV1[i]);
+      }
     }
 
     String pinotQuery = "SELECT percentileMV(DivAirportIDs, 99) FROM mytable";
@@ -227,8 +258,6 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     jsonNode = postQuery(pinotQuery);
     
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
 > 10000);
     
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
 < 17000);
-
-    setUseMultiStageQueryEngine(true);
   }
 
   @Test
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
index cb2163b632..3fc9e3e063 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
@@ -139,62 +139,72 @@ public class NullHandlingIntegrationTest extends 
BaseClusterIntegrationTestSet {
     return 100;
   }
 
-  @Test
-  public void testTotalCount()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testTotalCount(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query = "SELECT COUNT(*) FROM " + getTableName();
     testQuery(query);
   }
 
-  @Test
-  public void testCountWithNullDescription()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testCountWithNullDescription(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     String query = "SELECT COUNT(*) FROM " + getTableName() + " WHERE 
description IS NOT NULL";
     testQuery(query);
   }
 
-  @Test
-  public void testCountWithNullDescriptionAndSalary()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testCountWithNullDescriptionAndSalary(boolean 
useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     String query = "SELECT COUNT(*) FROM " + getTableName() + " WHERE 
description IS NOT NULL AND salary IS NOT NULL";
     testQuery(query);
   }
 
-  @Test
-  public void testCaseWithNullSalary()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testCaseWithNullSalary(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query = "SELECT CASE WHEN salary IS NULL THEN 1 ELSE 0 END FROM " + 
getTableName();
     testQuery(query);
   }
 
-  @Test
-  public void testCaseWithNotNullDescription()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testCaseWithNotNullDescription(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query = "SELECT CASE WHEN description IS NOT NULL THEN 1 ELSE 0 END 
FROM " + getTableName();
     testQuery(query);
   }
 
-  @Test
-  public void testCaseWithIsDistinctFrom()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testCaseWithIsDistinctFrom(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query = "SELECT salary IS DISTINCT FROM salary FROM " + 
getTableName();
     testQuery(query);
     query = "SELECT salary FROM " + getTableName() + " where salary IS 
DISTINCT FROM salary";
     testQuery(query);
   }
 
-  @Test
-  public void testCaseWithIsNotDistinctFrom()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testCaseWithIsNotDistinctFrom(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query = "SELECT description IS NOT DISTINCT FROM description FROM " 
+ getTableName();
     testQuery(query);
     query = "SELECT description FROM " + getTableName() + " where description 
IS NOT DISTINCT FROM description";
     testQuery(query);
   }
 
-  @Test
-  public void testTotalCountWithNullHandlingQueryOptionEnabled()
-          throws Exception {
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testTotalCountWithNullHandlingQueryOptionEnabled(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String pinotQuery = "SELECT COUNT(*) FROM " + getTableName() + " 
option(enableNullHandling=true)";
     String h2Query = "SELECT COUNT(*) FROM " + getTableName();
     testQuery(pinotQuery, h2Query);
@@ -205,9 +215,11 @@ public class NullHandlingIntegrationTest extends 
BaseClusterIntegrationTestSet {
     
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
   }
 
-  @Test
-  public void testNullLiteralSelectionOnlyBroker()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testNullLiteralSelectionOnlyBroker(boolean 
useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     // Null literal only
     String sqlQuery = "SELECT null FROM mytable 
OPTION(enableNullHandling=true)";
     JsonNode response = postQuery(sqlQuery);
@@ -300,35 +312,40 @@ public class NullHandlingIntegrationTest extends 
BaseClusterIntegrationTestSet {
     assertEquals(rows.get(0).get(0).asText(), "null");
   }
 
-  @Test
-  public void testOrderByNullsFirst()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testOrderByNullsFirst(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String h2Query = "SELECT salary FROM " + getTableName() + " ORDER BY 
salary NULLS FIRST";
     String pinotQuery = h2Query + " option(enableNullHandling=true)";
 
     testQuery(pinotQuery, h2Query);
   }
 
-  @Test
-  public void testOrderByNullsLast()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testOrderByNullsLast(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String h2Query = "SELECT salary FROM " + getTableName() + " ORDER BY 
salary DESC NULLS LAST";
     String pinotQuery = h2Query + " option(enableNullHandling=true)";
 
     testQuery(pinotQuery, h2Query);
   }
 
-  @Test
-  public void testDistinctOrderByNullsLast()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctOrderByNullsLast(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String h2Query = "SELECT distinct salary FROM " + getTableName() + " ORDER 
BY salary DESC NULLS LAST";
     String pinotQuery = h2Query + " option(enableNullHandling=true)";
 
     testQuery(pinotQuery, h2Query);
   }
 
-  @Test
-  public void testSelectNullLiteral() throws Exception {
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testSelectNullLiteral(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     // Need to also select an identifier column to skip the all literal query 
optimization which returns without
     // querying the segment.
     String sqlQuery = "SELECT NULL, salary FROM mytable 
OPTION(enableNullHandling=true)";
@@ -339,9 +356,10 @@ public class NullHandlingIntegrationTest extends 
BaseClusterIntegrationTestSet {
     assertEquals(rows.get(0).get(0).asText(), "null");
   }
 
-  @Test
-  public void testCaseWhenAllLiteral()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testCaseWhenAllLiteral(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String sqlQuery =
         "SELECT CASE WHEN true THEN 1 WHEN NOT true THEN 0 ELSE NULL END FROM 
mytable OPTION(enableNullHandling=true)";
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 8c0d6602b7..0be7e109c6 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -39,6 +39,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.IdealState;
@@ -273,6 +274,15 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     super.testQuery(pinotQuery, h2Query);
   }
 
+  private void testQueryError(String query, int errorCode)
+      throws Exception {
+    JsonNode response = postQuery(query);
+    JsonNode exceptions = response.get("exceptions");
+    assertFalse(exceptions.isEmpty(), "At least one exception was expected");
+    JsonNode firstException = exceptions.get(0);
+    assertEquals(firstException.get("errorCode").asInt(), errorCode);
+  }
+
   @Test
   public void testInstancesStarted() {
     assertEquals(_serviceStatusCallbacks.size(), getNumBrokers() + 
getNumServers());
@@ -573,9 +583,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
 0L);
   }
 
-  @Test
-  public void testTimeFunc()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testTimeFunc(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String sqlQuery = "SELECT toDateTime(now(), 'yyyy-MM-dd z'), 
toDateTime(ago('PT1H'), 'yyyy-MM-dd z') FROM mytable";
     JsonNode response = postQuery(sqlQuery);
     String todayStr = 
response.get("resultTable").get("rows").get(0).get(0).asText();
@@ -589,9 +600,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(oneHourAgoTodayStr, expectedOneHourAgoTodayStr);
   }
 
-  @Test
-  public void testRegexpReplace()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testRegexpReplace(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     // Correctness tests of regexpReplace.
 
     // Test replace all.
@@ -678,7 +690,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(result, "healthy, wealthy, stealthy and wise");
 
     // Test in select clause with column values
-    sqlQuery = "SELECT regexpReplace(DestCityName, ' ', '', 0, -1, 'i') from 
myTable where OriginState = 'CA'";
+    sqlQuery = "SELECT regexpReplace(DestCityName, ' ', '', 0, -1, 'i') from 
mytable where OriginState = 'CA'";
     response = postQuery(sqlQuery);
     JsonNode rows = response.get("resultTable").get("rows");
     for (int i = 0; i < rows.size(); i++) {
@@ -687,20 +699,20 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
 
     // Test in where clause
-    sqlQuery = "SELECT count(*) from myTable where regexpReplace(originState, 
'[VC]A', 'TEST') = 'TEST'";
+    sqlQuery = "SELECT count(*) from mytable where regexpReplace(OriginState, 
'[VC]A', 'TEST') = 'TEST'";
     response = postQuery(sqlQuery);
     int count1 = response.get("resultTable").get("rows").get(0).get(0).asInt();
-    sqlQuery = "SELECT count(*) from myTable where originState='CA' or 
originState='VA'";
+    sqlQuery = "SELECT count(*) from mytable where OriginState='CA' or 
OriginState='VA'";
     response = postQuery(sqlQuery);
     int count2 = response.get("resultTable").get("rows").get(0).get(0).asInt();
     assertEquals(count1, count2);
 
     // Test nested transform
     sqlQuery =
-        "SELECT count(*) from myTable where 
contains(regexpReplace(originState, '(C)(A)', '$1TEST$2'), 'CTESTA')";
+        "SELECT count(*) from mytable where 
contains(regexpReplace(OriginState, '(C)(A)', '$1TEST$2'), 'CTESTA')";
     response = postQuery(sqlQuery);
     count1 = response.get("resultTable").get("rows").get(0).get(0).asInt();
-    sqlQuery = "SELECT count(*) from myTable where originState='CA'";
+    sqlQuery = "SELECT count(*) from mytable where OriginState='CA'";
     response = postQuery(sqlQuery);
     count2 = response.get("resultTable").get("rows").get(0).get(0).asInt();
     assertEquals(count1, count2);
@@ -709,9 +721,8 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testCastMV()
       throws Exception {
-
     // simple cast
-    String sqlQuery = "SELECT DivLongestGTimes, CAST(DivLongestGTimes as 
DOUBLE) from myTable LIMIT 100";
+    String sqlQuery = "SELECT DivLongestGTimes, CAST(DivLongestGTimes as 
DOUBLE) from mytable LIMIT 100";
     JsonNode response = postQuery(sqlQuery);
     JsonNode resultTable = response.get("resultTable");
     JsonNode dataSchema = resultTable.get("dataSchema");
@@ -735,7 +746,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     // nested cast
     sqlQuery = "SELECT DivAirportIDs, CAST(CAST(CAST(DivAirportIDs AS FLOAT) 
as INT) as STRING),"
-        + " DivTotalGTimes, CAST(CAST(DivTotalGTimes AS STRING) AS LONG) from 
myTable ORDER BY CARRIER LIMIT 100";
+        + " DivTotalGTimes, CAST(CAST(DivTotalGTimes AS STRING) AS LONG) from 
mytable ORDER BY CARRIER LIMIT 100";
     response = postQuery(sqlQuery);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
@@ -770,11 +781,12 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
   }
 
-  @Test
-  public void testUrlFunc()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testUrlFunc(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String sqlQuery = "SELECT encodeUrl('key1=value 
1&key2=value@!$2&key3=value%3'), "
-        + 
"decodeUrl('key1%3Dvalue+1%26key2%3Dvalue%40%21%242%26key3%3Dvalue%253') FROM 
myTable";
+        + 
"decodeUrl('key1%3Dvalue+1%26key2%3Dvalue%40%21%242%26key3%3Dvalue%253') FROM 
mytable";
     JsonNode response = postQuery(sqlQuery);
     String encodedString = 
response.get("resultTable").get("rows").get(0).get(0).asText();
     String expectedUrlStr = encodeUrl("key1=value 
1&key2=value@!$2&key3=value%3");
@@ -785,12 +797,13 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(decodedString, expectedUrlStr);
   }
 
-  @Test
-  public void testBase64Func()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testBase64Func(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
 
     // string literal
-    String sqlQuery = "SELECT toBase64(toUtf8('hello!')), " + 
"fromUtf8(fromBase64('aGVsbG8h')) FROM myTable";
+    String sqlQuery = "SELECT toBase64(toUtf8('hello!')), " + 
"fromUtf8(fromBase64('aGVsbG8h')) FROM mytable";
     JsonNode response = postQuery(sqlQuery);
     JsonNode resultTable = response.get("resultTable");
     JsonNode dataSchema = resultTable.get("dataSchema");
@@ -807,7 +820,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // long string literal encode
     sqlQuery =
         "SELECT toBase64(toUtf8('this is a long string that will encode to 
more than 76 characters using base64')) "
-            + "FROM myTable";
+            + "FROM mytable";
     response = postQuery(sqlQuery);
     resultTable = response.get("resultTable");
     rows = resultTable.get("rows");
@@ -818,7 +831,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // long string literal decode
     sqlQuery = "SELECT fromUtf8(fromBase64"
         + 
"('dGhpcyBpcyBhIGxvbmcgc3RyaW5nIHRoYXQgd2lsbCBlbmNvZGUgdG8gbW9yZSB0aGFuIDc2IGNoYXJhY3RlcnMgdXNpbmcgYmFzZTY0"
-        + "')) FROM myTable";
+        + "')) FROM mytable";
     response = postQuery(sqlQuery);
     resultTable = response.get("resultTable");
     rows = resultTable.get("rows");
@@ -827,7 +840,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         
"dGhpcyBpcyBhIGxvbmcgc3RyaW5nIHRoYXQgd2lsbCBlbmNvZGUgdG8gbW9yZSB0aGFuIDc2IGNoYXJhY3RlcnMgdXNpbmcgYmFzZTY0")));
 
     // non-string literal
-    sqlQuery = "SELECT toBase64(toUtf8(123)), 
fromUtf8(fromBase64(toBase64(toUtf8(123)))), 123 FROM myTable";
+    sqlQuery = "SELECT toBase64(toUtf8(123)), 
fromUtf8(fromBase64(toBase64(toUtf8(123)))), 123 FROM mytable";
     response = postQuery(sqlQuery);
     resultTable = response.get("resultTable");
     rows = resultTable.get("rows");
@@ -839,7 +852,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     // identifier
     sqlQuery = "SELECT Carrier, toBase64(toUtf8(Carrier)), 
fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))), "
-        + "fromBase64(toBase64(toUtf8(Carrier))) FROM myTable LIMIT 100";
+        + "fromBase64(toBase64(toUtf8(Carrier))) FROM mytable LIMIT 100";
     response = postQuery(sqlQuery);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
@@ -856,27 +869,43 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
 
     // invalid argument
-    sqlQuery = "SELECT toBase64() FROM myTable";
-    response = postQuery(sqlQuery);
-    
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
+    sqlQuery = "SELECT toBase64() FROM mytable";
+    if (useMultiStageQueryEngine) {
+      testQueryError(sqlQuery, QueryException.QUERY_PLANNING_ERROR_CODE);
+    } else {
+      response = postQuery(sqlQuery);
+      
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
+    }
 
     // invalid argument
-    sqlQuery = "SELECT fromBase64() FROM myTable";
-    response = postQuery(sqlQuery);
-    
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
+    sqlQuery = "SELECT fromBase64() FROM mytable";
+    if (useMultiStageQueryEngine) {
+      testQueryError(sqlQuery, QueryException.QUERY_PLANNING_ERROR_CODE);
+    } else {
+      response = postQuery(sqlQuery);
+      
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
+    }
 
     // invalid argument
-    sqlQuery = "SELECT toBase64('hello!') FROM myTable";
-    response = postQuery(sqlQuery);
-    
assertTrue(response.get("exceptions").get(0).get("message").toString().contains("SqlCompilationException"));
+    sqlQuery = "SELECT toBase64('hello!') FROM mytable";
+    if (useMultiStageQueryEngine) {
+      testQueryError(sqlQuery, QueryException.QUERY_PLANNING_ERROR_CODE);
+    } else {
+      response = postQuery(sqlQuery);
+      
assertTrue(response.get("exceptions").get(0).get("message").toString().contains("SqlCompilationException"));
+    }
 
     // invalid argument
-    sqlQuery = "SELECT fromBase64('hello!') FROM myTable";
-    response = postQuery(sqlQuery);
-    
assertTrue(response.get("exceptions").get(0).get("message").toString().contains("IllegalArgumentException"));
+    sqlQuery = "SELECT fromBase64('hello!') FROM mytable";
+    if (useMultiStageQueryEngine) {
+      testQueryError(sqlQuery, QueryException.QUERY_PLANNING_ERROR_CODE);
+    } else {
+      response = postQuery(sqlQuery);
+      
assertTrue(response.get("exceptions").get(0).get("message").toString().contains("IllegalArgumentException"));
+    }
 
     // string literal used in a filter
-    sqlQuery = "SELECT * FROM myTable WHERE fromUtf8(fromBase64('aGVsbG8h')) 
!= Carrier AND "
+    sqlQuery = "SELECT * FROM mytable WHERE fromUtf8(fromBase64('aGVsbG8h')) 
!= Carrier AND "
         + "toBase64(toUtf8('hello!')) != Carrier LIMIT 10";
     response = postQuery(sqlQuery);
     resultTable = response.get("resultTable");
@@ -884,21 +913,21 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(rows.size(), 10);
 
     // non-string literal used in a filter
-    sqlQuery = "SELECT * FROM myTable WHERE 
fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) != Carrier LIMIT 10";
+    sqlQuery = "SELECT * FROM mytable WHERE 
fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) != Carrier LIMIT 10";
     response = postQuery(sqlQuery);
     resultTable = response.get("resultTable");
     rows = resultTable.get("rows");
     assertEquals(rows.size(), 10);
 
     // string identifier used in a filter
-    sqlQuery = "SELECT * FROM myTable WHERE 
fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))) = Carrier LIMIT 10";
+    sqlQuery = "SELECT * FROM mytable WHERE 
fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))) = Carrier LIMIT 10";
     response = postQuery(sqlQuery);
     resultTable = response.get("resultTable");
     rows = resultTable.get("rows");
     assertEquals(rows.size(), 10);
 
     // non-string identifier used in a filter
-    sqlQuery = "SELECT fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))), 
AirlineID FROM myTable WHERE "
+    sqlQuery = "SELECT fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))), 
AirlineID FROM mytable WHERE "
         + "fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) = AirlineID LIMIT 
10";
     response = postQuery(sqlQuery);
     resultTable = response.get("resultTable");
@@ -909,7 +938,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     // string identifier used in group by order by
     sqlQuery = "SELECT Carrier as originalCol, toBase64(toUtf8(Carrier)) as 
encoded, "
-        + "fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))) as decoded FROM 
myTable "
+        + "fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))) as decoded FROM 
mytable "
         + "GROUP BY Carrier, toBase64(toUtf8(Carrier)), 
fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))) "
         + "ORDER BY toBase64(toUtf8(Carrier)) LIMIT 10";
     response = postQuery(sqlQuery);
@@ -929,7 +958,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     // non-string identifier used in group by order by
     sqlQuery = "SELECT AirlineID as originalCol, toBase64(toUtf8(AirlineID)) 
as encoded, "
-        + "fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) as decoded FROM 
myTable "
+        + "fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) as decoded FROM 
mytable "
         + "GROUP BY AirlineID, toBase64(toUtf8(AirlineID)), 
fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) "
         + "ORDER BY fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) LIMIT 
10";
     response = postQuery(sqlQuery);
@@ -949,7 +978,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testLiteralOnlyFunc()
+  public void testLiteralOnlyFuncV1()
       throws Exception {
     long queryStartTimeMs = System.currentTimeMillis();
     String sqlQuery =
@@ -1011,9 +1040,75 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(results.get(10).asText(), "hello!");
   }
 
-  @Test(dependsOnMethods = "testBloomFilterTriggering")
-  public void testRangeIndexTriggering()
+  @Test
+  public void testLiteralOnlyFuncV2()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+    long queryStartTimeMs = System.currentTimeMillis();
+    String sqlQuery =
+        "SELECT 1, now() as currentTs, ago('PT1H') as oneHourAgoTs, 'abc', 
toDateTime(now(), 'yyyy-MM-dd z') as "
+            + "today, now(), ago('PT1H'), encodeUrl('key1=value 
1&key2=value@!$2&key3=value%3') as encodedUrl, "
+            + 
"decodeUrl('key1%3Dvalue+1%26key2%3Dvalue%40%21%242%26key3%3Dvalue%253') as 
decodedUrl, toBase64"
+            + "(toUtf8('hello!')) as toBase64, 
fromUtf8(fromBase64('aGVsbG8h')) as fromBase64";
+    JsonNode response = postQuery(sqlQuery);
+    long queryEndTimeMs = System.currentTimeMillis();
+
+    JsonNode resultTable = response.get("resultTable");
+    JsonNode dataSchema = resultTable.get("dataSchema");
+    JsonNode columnNames = dataSchema.get("columnNames");
+    assertEquals(columnNames.get(0).asText(), "EXPR$0");
+    assertEquals(columnNames.get(1).asText(), "currentTs");
+    assertEquals(columnNames.get(2).asText(), "oneHourAgoTs");
+    assertEquals(columnNames.get(3).asText(), "EXPR$3");
+    assertEquals(columnNames.get(4).asText(), "today");
+    String nowColumnName = columnNames.get(5).asText();
+    String oneHourAgoColumnName = columnNames.get(6).asText();
+    assertEquals(columnNames.get(7).asText(), "encodedUrl");
+    assertEquals(columnNames.get(8).asText(), "decodedUrl");
+    assertEquals(columnNames.get(9).asText(), "toBase64");
+    assertEquals(columnNames.get(10).asText(), "fromBase64");
+
+    JsonNode columnDataTypes = dataSchema.get("columnDataTypes");
+    assertEquals(columnDataTypes.get(0).asText(), "INT");
+    assertEquals(columnDataTypes.get(1).asText(), "LONG");
+    assertEquals(columnDataTypes.get(2).asText(), "LONG");
+    assertEquals(columnDataTypes.get(3).asText(), "STRING");
+    assertEquals(columnDataTypes.get(4).asText(), "STRING");
+    assertEquals(columnDataTypes.get(5).asText(), "LONG");
+    assertEquals(columnDataTypes.get(6).asText(), "LONG");
+    assertEquals(columnDataTypes.get(7).asText(), "STRING");
+    assertEquals(columnDataTypes.get(8).asText(), "STRING");
+    assertEquals(columnDataTypes.get(9).asText(), "STRING");
+    assertEquals(columnDataTypes.get(10).asText(), "STRING");
+
+    JsonNode results = resultTable.get("rows").get(0);
+    assertEquals(results.get(0).asInt(), 1);
+    long nowResult = results.get(1).asLong();
+    assertTrue(nowResult >= queryStartTimeMs);
+    assertTrue(nowResult <= queryEndTimeMs);
+    long oneHourAgoResult = results.get(2).asLong();
+    assertTrue(oneHourAgoResult >= queryStartTimeMs - 
TimeUnit.HOURS.toMillis(1));
+    assertTrue(oneHourAgoResult <= queryEndTimeMs - 
TimeUnit.HOURS.toMillis(1));
+    assertEquals(results.get(3).asText(), "abc");
+    String queryStartTimeDay = 
Instant.ofEpochMilli(queryStartTimeMs).atZone(ZoneId.of("UTC"))
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd z"));
+    String queryEndTimeDay = 
Instant.ofEpochMilli(queryEndTimeMs).atZone(ZoneId.of("UTC"))
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd z"));
+    String dateTimeResult = results.get(4).asText();
+    assertTrue(dateTimeResult.equals(queryStartTimeDay) || 
dateTimeResult.equals(queryEndTimeDay));
+    // In V2 column names and values are not related
+//    assertEquals(results.get(5).asText(), nowColumnName);
+//    assertEquals(results.get(6).asText(), oneHourAgoColumnName);
+    assertEquals(results.get(7).asText(), 
"key1%3Dvalue+1%26key2%3Dvalue%40%21%242%26key3%3Dvalue%253");
+    assertEquals(results.get(8).asText(), "key1=value 
1&key2=value@!$2&key3=value%3");
+    assertEquals(results.get(9).asText(), "aGVsbG8h");
+    assertEquals(results.get(10).asText(), "hello!");
+  }
+
+  @Test(dependsOnMethods = "testBloomFilterTriggering", dataProvider = 
"useBothQueryEngines")
+  public void testRangeIndexTriggering(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     long numTotalDocs = getCountStarResult();
     
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
 numTotalDocs);
 
@@ -1095,9 +1190,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   /**
    * Check if server returns error response quickly without timing out Broker.
    */
-  @Test
-  public void testServerErrorWithBrokerTimeout()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testServerErrorWithBrokerTimeout(boolean 
useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     long startTimeMs = System.currentTimeMillis();
     // The query below will fail execution due to JSON_MATCH on column without 
json index
     JsonNode queryResponse = postQuery("SELECT count(*) FROM mytable WHERE 
JSON_MATCH(Dest, '$=123')");
@@ -1275,9 +1371,11 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
    *   <li>"NewAddedDerivedMVStringDimension", DATE_TIME, STRING, 
multi-value</li>
    * </ul>
    */
-  @Test(dependsOnMethods = "testAggregateMetadataAPI")
-  public void testDefaultColumns()
+  @Test(dependsOnMethods = "testAggregateMetadataAPI", dataProvider = 
"useBothQueryEngines")
+  public void testDefaultColumns(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     long numTotalDocs = getCountStarResult();
 
     reloadWithExtraColumns();
@@ -1305,7 +1403,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   public void testDisableGroovyQueryTableConfigOverride()
       throws Exception {
     String groovyQuery = "SELECT 
GROOVY('{\"returnType\":\"STRING\",\"isSingleValue\":true}', "
-        + "'arg0 + arg1', FlightNum, Origin) FROM myTable";
+        + "'arg0 + arg1', FlightNum, Origin) FROM mytable";
     TableConfig tableConfig = getOfflineTableConfig();
     tableConfig.setQueryConfig(new QueryConfig(null, false, null, null));
     updateTableConfig(tableConfig);
@@ -1618,24 +1716,24 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }, 60_000L, "Failed to remove expression override");
   }
 
-  @Test
-  @Override
-  public void testBrokerResponseMetadata()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testBrokerResponseMetadata(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     super.testBrokerResponseMetadata();
   }
 
-  @Test
-  public void testInBuiltVirtualColumns()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testInBuiltVirtualColumns(boolean useMultiStageQueryEngine)
       throws Exception {
-    String query = "SELECT $docId, $HOSTNAME, $segmentname FROM mytable";
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = "SELECT $docId, $hostName, $segmentName FROM mytable";
     JsonNode response = postQuery(query);
     JsonNode resultTable = response.get("resultTable");
     JsonNode dataSchema = resultTable.get("dataSchema");
     assertEquals(dataSchema.get("columnNames").toString(), 
"[\"$docId\",\"$hostName\",\"$segmentName\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"INT\",\"STRING\",\"STRING\"]");
     JsonNode rows = resultTable.get("rows");
-    assertEquals(rows.size(), 10);
     String expectedHostName = NetUtils.getHostnameOrAddress();
     String expectedSegmentNamePrefix = "mytable_";
     for (int i = 0; i < 10; i++) {
@@ -1646,19 +1744,19 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
   }
 
-  @Test
-  public void testGroupByUDF()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testGroupByUDF(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     String query = "SELECT timeConvert(DaysSinceEpoch,'DAYS','SECONDS'), 
COUNT(*) FROM mytable "
         + "GROUP BY timeConvert(DaysSinceEpoch,'DAYS','SECONDS') ORDER BY 
COUNT(*) DESC";
     JsonNode response = postQuery(query);
     JsonNode resultTable = response.get("resultTable");
     JsonNode dataSchema = resultTable.get("dataSchema");
-    assertEquals(dataSchema.get("columnNames").toString(),
-        "[\"timeconvert(DaysSinceEpoch,'DAYS','SECONDS')\",\"count(*)\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"LONG\",\"LONG\"]");
     JsonNode rows = resultTable.get("rows");
-    assertEquals(rows.size(), 10);
+    assertFalse(rows.isEmpty());
     JsonNode row = rows.get(0);
     assertEquals(row.size(), 2);
     assertEquals(row.get(0).asLong(), 16138 * 24 * 3600);
@@ -1669,26 +1767,27 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     response = postQuery(query);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
-    assertEquals(dataSchema.get("columnNames").toString(),
-        
"[\"datetimeconvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')\",\"count(*)\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"LONG\",\"LONG\"]");
     rows = resultTable.get("rows");
-    assertEquals(rows.size(), 10);
+    assertFalse(rows.isEmpty());
     row = rows.get(0);
     assertEquals(row.size(), 2);
     assertEquals(row.get(0).asLong(), 16138 * 24);
     assertEquals(row.get(1).asLong(), 605);
 
-    query = "SELECT add(DaysSinceEpoch,DaysSinceEpoch,15), COUNT(*) FROM 
mytable "
-        + "GROUP BY add(DaysSinceEpoch,DaysSinceEpoch,15) ORDER BY COUNT(*) 
DESC";
+    if (useMultiStageQueryEngine) {
+      query = "SELECT add(DaysSinceEpoch,add(DaysSinceEpoch,15)), COUNT(*) 
FROM mytable "
+          + "GROUP BY add(DaysSinceEpoch,add(DaysSinceEpoch,15)) ORDER BY 
COUNT(*) DESC";
+    } else {
+      query = "SELECT add(DaysSinceEpoch,DaysSinceEpoch,15), COUNT(*) FROM 
mytable "
+          + "GROUP BY add(DaysSinceEpoch,DaysSinceEpoch,15) ORDER BY COUNT(*) 
DESC";
+    }
     response = postQuery(query);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
-    assertEquals(dataSchema.get("columnNames").toString(),
-        "[\"add(DaysSinceEpoch,DaysSinceEpoch,'15')\",\"count(*)\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"DOUBLE\",\"LONG\"]");
     rows = resultTable.get("rows");
-    assertEquals(rows.size(), 10);
+    assertFalse(rows.isEmpty());
     row = rows.get(0);
     assertEquals(row.size(), 2);
     assertEquals(row.get(0).asDouble(), 16138.0 + 16138 + 15);
@@ -1699,24 +1798,27 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     response = postQuery(query);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
-    assertEquals(dataSchema.get("columnNames").toString(), 
"[\"sub(DaysSinceEpoch,'25')\",\"count(*)\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"DOUBLE\",\"LONG\"]");
     rows = resultTable.get("rows");
-    assertEquals(rows.size(), 10);
+    assertFalse(rows.isEmpty());
     row = rows.get(0);
     assertEquals(row.size(), 2);
     assertEquals(row.get(0).asDouble(), 16138.0 - 25);
     assertEquals(row.get(1).asLong(), 605);
 
-    query = "SELECT mult(DaysSinceEpoch,24,3600), COUNT(*) FROM mytable "
-        + "GROUP BY mult(DaysSinceEpoch,24,3600) ORDER BY COUNT(*) DESC";
+    if (useMultiStageQueryEngine) {
+      query = "SELECT mult(DaysSinceEpoch,mult(24,3600)), COUNT(*) FROM 
mytable "
+          + "GROUP BY mult(DaysSinceEpoch,mult(24,3600)) ORDER BY COUNT(*) 
DESC";
+    } else {
+      query = "SELECT mult(DaysSinceEpoch,24,3600), COUNT(*) FROM mytable "
+          + "GROUP BY mult(DaysSinceEpoch,24,3600) ORDER BY COUNT(*) DESC";
+    }
     response = postQuery(query);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
-    assertEquals(dataSchema.get("columnNames").toString(), 
"[\"mult(DaysSinceEpoch,'24','3600')\",\"count(*)\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"DOUBLE\",\"LONG\"]");
     rows = resultTable.get("rows");
-    assertEquals(rows.size(), 10);
+    assertFalse(rows.isEmpty());
     row = rows.get(0);
     assertEquals(row.size(), 2);
     assertEquals(row.get(0).asDouble(), 16138.0 * 24 * 3600);
@@ -1727,10 +1829,9 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     response = postQuery(query);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
-    assertEquals(dataSchema.get("columnNames").toString(), 
"[\"div(DaysSinceEpoch,'2')\",\"count(*)\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"DOUBLE\",\"LONG\"]");
     rows = resultTable.get("rows");
-    assertEquals(rows.size(), 10);
+    assertFalse(rows.isEmpty());
     row = rows.get(0);
     assertEquals(row.size(), 2);
     assertEquals(row.get(0).asDouble(), 16138.0 / 2);
@@ -1741,10 +1842,9 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     response = postQuery(query);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
-    assertEquals(dataSchema.get("columnNames").toString(), 
"[\"arraylength(DivAirports)\",\"count(*)\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"INT\",\"LONG\"]");
     rows = resultTable.get("rows");
-    assertEquals(rows.size(), 1);
+    assertFalse(rows.isEmpty());
     row = rows.get(0);
     assertEquals(row.size(), 2);
     assertEquals(row.get(0).asInt(), 5);
@@ -1755,8 +1855,6 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     response = postQuery(query);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
-    assertEquals(dataSchema.get("columnNames").toString(),
-        "[\"arraylength(valuein(DivAirports,'DFW','ORD'))\",\"count(*)\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"INT\",\"LONG\"]");
     rows = resultTable.get("rows");
     assertEquals(rows.size(), 3);
@@ -1778,7 +1876,6 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     response = postQuery(query);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
-    assertEquals(dataSchema.get("columnNames").toString(), 
"[\"valuein(DivAirports,'DFW','ORD')\",\"count(*)\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"STRING\",\"LONG\"]");
     rows = resultTable.get("rows");
     assertEquals(rows.size(), 2);
@@ -1793,7 +1890,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testAggregationUDF()
+  public void testAggregationUDFV1()
       throws Exception {
     String query = "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) 
FROM mytable";
     JsonNode response = postQuery(query);
@@ -1821,14 +1918,40 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testSelectionUDF()
+  public void testAggregationUDFV2()
       throws Exception {
-    String query = "SELECT DaysSinceEpoch, 
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable";
+    setUseMultiStageQueryEngine(true);
+    String query = "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) 
FROM mytable";
+    JsonNode response = postQuery(query);
+    JsonNode resultTable = response.get("resultTable");
+    JsonNode dataSchema = resultTable.get("dataSchema");
+    assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"LONG\"]");
+    JsonNode rows = resultTable.get("rows");
+    assertEquals(rows.size(), 1);
+    JsonNode row = rows.get(0);
+    assertEquals(row.size(), 1);
+    assertEquals(row.get(0).asDouble(), 16435.0 * 24 * 3600);
+
+    query = "SELECT MIN(div(DaysSinceEpoch,2)) FROM mytable";
+    response = postQuery(query);
+    resultTable = response.get("resultTable");
+    dataSchema = resultTable.get("dataSchema");
+    assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"DOUBLE\"]");
+    rows = resultTable.get("rows");
+    assertEquals(rows.size(), 1);
+    row = rows.get(0);
+    assertEquals(row.size(), 1);
+    assertEquals(row.get(0).asDouble(), 16071.0 / 2);
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testSelectionUDF(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = "SELECT DaysSinceEpoch, 
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable limit 10";
     JsonNode response = postQuery(query);
     JsonNode resultTable = response.get("resultTable");
     JsonNode dataSchema = resultTable.get("dataSchema");
-    assertEquals(dataSchema.get("columnNames").toString(),
-        
"[\"DaysSinceEpoch\",\"timeconvert(DaysSinceEpoch,'DAYS','SECONDS')\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"INT\",\"LONG\"]");
     JsonNode rows = response.get("resultTable").get("rows");
     assertEquals(rows.size(), 10);
@@ -1843,8 +1966,6 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     response = postQuery(query);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
-    assertEquals(dataSchema.get("columnNames").toString(),
-        
"[\"DaysSinceEpoch\",\"timeconvert(DaysSinceEpoch,'DAYS','SECONDS')\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"INT\",\"LONG\"]");
     rows = response.get("resultTable").get("rows");
     assertEquals(rows.size(), 10000);
@@ -1862,8 +1983,6 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     response = postQuery(query);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
-    assertEquals(dataSchema.get("columnNames").toString(),
-        
"[\"DaysSinceEpoch\",\"timeconvert(DaysSinceEpoch,'DAYS','SECONDS')\"]");
     assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"INT\",\"LONG\"]");
     rows = response.get("resultTable").get("rows");
     assertEquals(rows.size(), 10000);
@@ -1877,9 +1996,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
   }
 
-  @Test
-  public void testFilterUDF()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testFilterUDF(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     int daysSinceEpoch = 16138;
     long secondsSinceEpoch = 16138 * 24 * 60 * 60;
 
@@ -1912,22 +2032,23 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedResult);
   }
 
-  @Test
-  public void testCaseStatementInSelection()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testCaseStatementInSelection(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     List<String> origins =
         Arrays.asList("ATL", "ORD", "DFW", "DEN", "LAX", "IAH", "SFO", "PHX", 
"LAS", "EWR", "MCO", "BOS", "SLC", "SEA",
             "MSP", "CLT", "LGA", "DTW", "JFK", "BWI");
     StringBuilder caseStatementBuilder = new StringBuilder("CASE ");
     for (int i = 0; i < origins.size(); i++) {
-      // WHEN origin = 'ATL' THEN 1
-      // WHEN origin = 'ORD' THEN 2
-      // WHEN origin = 'DFW' THEN 3
+      // WHEN Origin = 'ATL' THEN 1
+      // WHEN Origin = 'ORD' THEN 2
+      // WHEN Origin = 'DFW' THEN 3
       // ....
-      caseStatementBuilder.append(String.format("WHEN origin = '%s' THEN %d ", 
origins.get(i), i + 1));
+      caseStatementBuilder.append(String.format("WHEN Origin = '%s' THEN %d ", 
origins.get(i), i + 1));
     }
     caseStatementBuilder.append("ELSE 0 END");
-    String sqlQuery = "SELECT origin, " + caseStatementBuilder + " AS 
origin_code FROM mytable LIMIT 1000";
+    String sqlQuery = "SELECT Origin, " + caseStatementBuilder + " AS 
origin_code FROM mytable LIMIT 1000";
     JsonNode response = postQuery(sqlQuery);
     JsonNode rows = response.get("resultTable").get("rows");
     assertTrue(response.get("exceptions").isEmpty());
@@ -1942,9 +2063,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
   }
 
-  @Test
-  public void testCaseStatementInSelectionWithTransformFunctionInThen()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testCaseStatementInSelectionWithTransformFunctionInThen(boolean 
useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String sqlQuery =
         "SELECT ArrDelay, CASE WHEN ArrDelay > 0 THEN ArrDelay WHEN ArrDelay < 
0 THEN ArrDelay * -1 ELSE 0 END AS "
             + "ArrTimeDiff FROM mytable LIMIT 1000";
@@ -1988,11 +2110,12 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
   }
 
-  @Test
-  public void testCaseStatementWithInAggregation()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testCaseStatementWithInAggregation(boolean 
useMultiStageQueryEngine)
       throws Exception {
-    testCountVsCaseQuery("origin = 'ATL'");
-    testCountVsCaseQuery("origin <> 'ATL'");
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    testCountVsCaseQuery("Origin = 'ATL'");
+    testCountVsCaseQuery("Origin <> 'ATL'");
 
     testCountVsCaseQuery("DaysSinceEpoch > 16312");
     testCountVsCaseQuery("DaysSinceEpoch >= 16312");
@@ -2013,9 +2136,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(caseSum, countValue);
   }
 
-  @Test
-  public void testFilterWithInvertedIndexUDF()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testFilterWithInvertedIndexUDF(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     int daysSinceEpoch = 16138;
     long secondsSinceEpoch = 16138 * 24 * 60 * 60;
 
@@ -2039,7 +2163,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testQueryWithRepeatedColumns()
+  public void testQueryWithRepeatedColumnsV1()
       throws Exception {
     //test repeated columns in selection query
     String query = "SELECT ArrTime, ArrTime FROM mytable WHERE DaysSinceEpoch 
<= 16312 AND Carrier = 'DL'";
@@ -2059,9 +2183,34 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     testQuery(query);
   }
 
+  // these tests actually checks a calcite limitation.
+  // Once it is fixed in calcite, we should merge this tests with 
testQueryRepetedColumnsV1
   @Test
-  public void testQueryWithOrderby()
+  public void testQueryWithRepeatedColumnsV2()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+    //test repeated columns in selection query
+    String query = "SELECT ArrTime, ArrTime FROM mytable WHERE DaysSinceEpoch 
<= 16312 AND Carrier = 'DL'";
+    testQuery(query);
+
+    //test repeated columns in selection query with order by
+    query = "SELECT ArrTime, ArrTime FROM mytable WHERE DaysSinceEpoch <= 
16312 AND Carrier = 'DL' order by ArrTime";
+    testQueryError(query, QueryException.QUERY_PLANNING_ERROR_CODE);
+
+    //test repeated columns in agg query
+    query = "SELECT COUNT(*), COUNT(*) FROM mytable WHERE DaysSinceEpoch <= 
16312 AND Carrier = 'DL'";
+    testQuery(query);
+
+    //test repeated columns in agg group by query
+    query = "SELECT ArrTime, ArrTime, COUNT(*), COUNT(*) FROM mytable WHERE 
DaysSinceEpoch <= 16312 AND Carrier = 'DL' "
+        + "GROUP BY ArrTime, ArrTime";
+    testQueryError(query, QueryException.QUERY_PLANNING_ERROR_CODE);
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueryWithOrderby(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     //test repeated columns in selection query
     String query = "SELECT ArrTime, Carrier, DaysSinceEpoch FROM mytable ORDER 
BY DaysSinceEpoch DESC";
     testQuery(query);
@@ -2075,9 +2224,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     testQuery(query);
   }
 
-  @Test
-  public void testQueryWithAlias()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueryWithAlias(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     {
       //test same alias name with column name
       String query = "SELECT ArrTime AS ArrTime, Carrier AS Carrier, 
DaysSinceEpoch AS DaysSinceEpoch FROM mytable "
@@ -2223,9 +2373,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertFalse(_propertyStore.exists(configPath, 0));
   }
 
-  @Test
-  public void testDistinctQuery()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctQuery(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     // by default 10 rows will be returned, so use high limit
     String pinotQuery = "SELECT DISTINCT Carrier FROM mytable LIMIT 1000000";
     String h2Query = "SELECT DISTINCT Carrier FROM mytable";
@@ -2244,9 +2395,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     testQuery(pinotQuery, h2Query);
   }
 
-  @Test
-  public void testNonAggregationGroupByQuery()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testNonAggregationGroupByQuery(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     // by default 10 rows will be returned, so use high limit
     String pinotQuery = "SELECT Carrier FROM mytable GROUP BY Carrier LIMIT 
1000000";
     String h2Query = "SELECT Carrier FROM mytable GROUP BY Carrier";
@@ -2276,9 +2428,9 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     testQuery(pinotQuery, h2Query);
   }
 
-  @Test
-  public void testCaseInsensitivity()
+  public void testCaseInsensitivityV1(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     int daysSinceEpoch = 16138;
     int hoursSinceEpoch = 16138 * 24;
     int secondsSinceEpoch = 16138 * 24 * 60 * 60;
@@ -2305,6 +2457,35 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
   }
 
+  @Test
+  public void testCaseSensitivityV2()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+    int daysSinceEpoch = 16138;
+    int hoursSinceEpoch = 16138 * 24;
+    int secondsSinceEpoch = 16138 * 24 * 60 * 60;
+    List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable order by DaysSinceEpoch "
+            + "limit 10000",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable order by timeConvert"
+            + "(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
+        "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + 
daysSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE 
timeConvert(DaysSinceEpoch,'DAYS','HOURS') = " + hoursSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE 
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
+        "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM 
mytable",
+        "SELECT COUNT(*) FROM mytable GROUP BY 
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH',"
+            + "'1:HOURS')");
+    List<String> queries = new ArrayList<>();
+    baseQueries.forEach(q -> queries.add(q.replace("mytable", 
"MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
+    baseQueries.forEach(
+        q -> queries.add(q.replace("mytable", 
"MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
+
+    for (String query : queries) {
+      testQueryError(query, QueryException.QUERY_PLANNING_ERROR_CODE);
+    }
+  }
+
   @Test
   public void testColumnNameContainsTableName()
       throws Exception {
@@ -2334,7 +2515,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testCaseInsensitivityWithColumnNameContainsTableName()
+  public void testCaseInsensitivityWithColumnNameContainsTableNameV1()
       throws Exception {
     int daysSinceEpoch = 16138;
     int hoursSinceEpoch = 16138 * 24;
@@ -2365,7 +2546,38 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testQuerySourceWithDatabaseName()
+  public void testCaseSensitivityWithColumnNameContainsTableNameV2()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+    int daysSinceEpoch = 16138;
+    int hoursSinceEpoch = 16138 * 24;
+    int secondsSinceEpoch = 16138 * 24 * 60 * 60;
+    List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable order by DaysSinceEpoch "
+            + "limit 10000",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable order by timeConvert"
+            + "(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
+        "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + 
daysSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE 
timeConvert(DaysSinceEpoch,'DAYS','HOURS') = " + hoursSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE 
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
+        "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM 
mytable",
+        "SELECT COUNT(*) FROM mytable GROUP BY 
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH',"
+            + "'1:HOURS')");
+    List<String> queries = new ArrayList<>();
+    baseQueries.forEach(
+        q -> queries.add(q.replace("mytable", 
"MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
+    // something like "SELECT MYDB.MYTABLE.DAYSSinceEpOch from MYDB.MYTABLE 
where MYDB.MYTABLE.DAYSSinceEpOch = 16138"
+    baseQueries.forEach(
+        q -> queries.add(q.replace("mytable", 
"MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
+
+    for (String query : queries) {
+      testQueryError(query, QueryException.QUERY_PLANNING_ERROR_CODE);
+    }
+  }
+
+  @Test
+  public void testQuerySourceWithDatabaseNameV1()
       throws Exception {
     // by default 10 rows will be returned, so use high limit
     String pinotQuery = "SELECT DISTINCT(Carrier) FROM mytable LIMIT 1000000";
@@ -2375,9 +2587,28 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     testQuery(pinotQuery, h2Query);
   }
 
+
   @Test
-  public void testDistinctCountHll()
+  public void testQuerySourceWithDatabaseNameV2()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+    // by default 10 rows will be returned, so use high limit
+    String pinotQuery = "SELECT DISTINCT(Carrier) FROM mytable LIMIT 1000000";
+    String h2Query = "SELECT DISTINCT Carrier FROM mytable";
+    testQuery(pinotQuery, h2Query);
+
+    pinotQuery = "SELECT DISTINCT Carrier FROM db.mytable LIMIT 1000000";
+    JsonNode response = postQuery(pinotQuery);
+    JsonNode exceptions = response.get("exceptions");
+    assertFalse(exceptions.isEmpty(), "At least one exception was expected");
+    JsonNode firstException = exceptions.get(0);
+    assertEquals(firstException.get("errorCode").asInt(), 
QueryException.QUERY_PLANNING_ERROR_CODE);
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctCountHll(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query;
 
     // The Accurate value is 6538.
@@ -2396,14 +2627,20 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedResults[i - 2]);
     }
 
-    // Default HLL is set as log2m=12
+    // Default log2m for HLL is set to 12 in V1 and 8 in V2
+    long expectedDefault;
     query = "SELECT distinctCountHLL(FlightNum) FROM mytable ";
-    
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedResults[10]);
-    
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedResults[10]);
+    if (useMultiStageQueryEngine) {
+      expectedDefault = expectedResults[6];
+    } else {
+      expectedDefault = expectedResults[10];
+    }
+    
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedDefault);
+    
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedDefault);
   }
 
   @Test
-  public void testAggregationFunctionsWithUnderscore()
+  public void testAggregationFunctionsWithUnderscoreV1()
       throws Exception {
     String query;
 
@@ -2417,7 +2654,22 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testExplainPlanQuery()
+  public void testAggregationFunctionsWithUnderscoreV2()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+    String query;
+
+    // The Accurate value is 6538.
+    query = "SELECT distinct_count(FlightNum) FROM mytable";
+    
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asInt(),
 6538);
+
+    // This is not supported in V2.
+    query = "SELECT c_o_u_n_t(FlightNum) FROM mytable";
+    testQueryError(query, QueryException.QUERY_PLANNING_ERROR_CODE);
+  }
+
+  @Test
+  public void testExplainPlanQueryV1()
       throws Exception {
     String query1 = "EXPLAIN PLAN FOR SELECT count(*) AS count, Carrier AS 
name FROM mytable GROUP BY name ORDER BY 1";
     String response1 = postQuery(query1).get("resultTable").toString();
@@ -2444,35 +2696,77 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         + 
"[\"PLAN_START(numSegmentsForThisPlan:12)\",-1,-1],[\"ALL_SEGMENTS_PRUNED_ON_SERVER\",2,1]]}");
   }
 
-  /** Test to make sure we are properly handling string comparisons in 
predicates. */
   @Test
-  public void testStringComparisonInFilter()
+  public void testExplainPlanQueryV2()
       throws Exception {
-    // compare two string columns.
-    String query1 = "SELECT count(*) FROM mytable WHERE OriginState = 
DestState";
+    setUseMultiStageQueryEngine(true);
+    String query1 = "EXPLAIN PLAN FOR SELECT count(*) AS count, Carrier AS 
name FROM mytable GROUP BY name ORDER BY 1";
     String response1 = postQuery(query1).get("resultTable").toString();
-    assertEquals(response1,
-        
"{\"dataSchema\":{\"columnNames\":[\"count(*)\"],\"columnDataTypes\":[\"LONG\"]},"
 + "\"rows\":[[14011]]}");
 
-    // compare string function with string column.
-    String query2 = "SELECT count(*) FROM mytable WHERE trim(OriginState) = 
DestState";
+    // Replace string "docs:[0-9]+" with "docs:*" so that test doesn't fail 
when number of documents change. This is
+    // needed because both OfflineClusterIntegrationTest and 
MultiNodesOfflineClusterIntegrationTest run this test
+    // case with different number of documents in the segment.
+    response1 = response1.replaceAll("docs:[0-9]+", "docs:*");
+
+    assertEquals(response1, 
"{\"dataSchema\":{\"columnNames\":[\"SQL\",\"PLAN\"],\"columnDataTypes\":[\"STRING\","
+        + "\"STRING\"]},\"rows\":[[\"EXPLAIN PLAN FOR SELECT count(*) AS 
count, Carrier AS name FROM mytable "
+        + "GROUP BY name ORDER BY 1\",\"Execution Plan\\n"
+        + "LogicalSort(sort0=[$0], dir0=[ASC], offset=[0])\\n"
+        + "  PinotLogicalSortExchange("
+        + "distribution=[hash], collation=[[0]], isSortOnSender=[false], 
isSortOnReceiver=[true])\\n"
+        + "    LogicalSort(sort0=[$0], dir0=[ASC])\\n"
+        + "      LogicalProject(count=[$1], name=[$0])\\n"
+        + "        LogicalAggregate(group=[{0}], agg#0=[COUNT($1)])\\n"
+        + "          PinotLogicalExchange(distribution=[hash[0]])\\n"
+        + "            LogicalAggregate(group=[{17}], agg#0=[COUNT()])\\n"
+        + "              LogicalTableScan(table=[[mytable]])\\n"
+        + "\"]]}");
+
+    // In the query below, FlightNum column has an inverted index and there is 
no data satisfying the predicate
+    // "FlightNum < 0". Hence, all segments are pruned out before query 
execution on the server side.
+    String query2 = "EXPLAIN PLAN FOR SELECT * FROM mytable WHERE FlightNum < 
0";
     String response2 = postQuery(query2).get("resultTable").toString();
-    assertEquals(response2,
-        
"{\"dataSchema\":{\"columnNames\":[\"count(*)\"],\"columnDataTypes\":[\"LONG\"]},"
 + "\"rows\":[[14011]]}");
+
+    Pattern pattern = 
Pattern.compile("\\{\"dataSchema\":\\{\"columnNames\":\\[\"SQL\",\"PLAN\"],"
+        + "\"columnDataTypes\":\\[\"STRING\",\"STRING\"]},"
+        + "\"rows\":\\[\\[\"EXPLAIN PLAN FOR SELECT \\* FROM mytable WHERE 
FlightNum < 0\","
+        + "\"Execution Plan.."
+        + "LogicalProject\\(.*\\).."
+        + "  LogicalFilter\\(condition=\\[<\\(.*, 0\\)]\\).."
+        + "    LogicalTableScan\\(table=\\[\\[mytable]]\\)..\""
+        + "]]}");
+    boolean found = pattern.matcher(response2).find();
+    assertTrue(found, "Pattern " + pattern + " not found in " + response2);
+  }
+
+  /** Test to make sure we are properly handling string comparisons in 
predicates. */
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testStringComparisonInFilter(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    // compare two string columns.
+    JsonNode jsonNode = postQuery("SELECT count(*) FROM mytable WHERE 
OriginState = DestState");
+    assertEquals(getType(jsonNode, 0), "LONG");
+    assertEquals(getLongCellValue(jsonNode, 0, 0), 14011);
+
+    // compare string function with string column.
+    jsonNode = postQuery("SELECT count(*) FROM mytable WHERE trim(OriginState) 
= DestState");
+    assertEquals(getType(jsonNode, 0), "LONG");
+    assertEquals(getLongCellValue(jsonNode, 0, 0), 14011);
 
     // compare string function with string function.
-    String query3 = "SELECT count(*) FROM mytable WHERE substr(OriginState, 0, 
1) = substr(DestState, 0, 1)";
-    String response3 = postQuery(query3).get("resultTable").toString();
-    assertEquals(response3,
-        
"{\"dataSchema\":{\"columnNames\":[\"count(*)\"],\"columnDataTypes\":[\"LONG\"]},"
 + "\"rows\":[[19755]]}");
+    jsonNode = postQuery("SELECT count(*) FROM mytable WHERE 
substr(OriginState, 0, 1) = substr(DestState, 0, 1)");
+    assertEquals(getType(jsonNode, 0), "LONG");
+    assertEquals(getLongCellValue(jsonNode, 0, 0), 19755);
   }
 
   /**
    * Test queries that can be solved with {@link 
NonScanBasedAggregationOperator}.
    */
-  @Test
-  public void testNonScanAggregationQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testNonScanAggregationQueries(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String tableName = getTableName();
 
     // Test queries with COUNT, MIN, MAX, MIN_MAX_RANGE
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
index 7cc9a60fe6..7c762fa4a4 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
@@ -211,9 +211,11 @@ public class OfflineClusterMemBasedServerQueryKillingTest 
extends BaseClusterInt
         .build();
   }
 
-  @Test
-  public void testDigestOOM()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDigestOOM(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     JsonNode queryResponse = postQuery(OOM_QUERY);
     
Assert.assertTrue(queryResponse.get("exceptions").toString().contains("\"errorCode\":"
         + QueryException.QUERY_CANCELLATION_ERROR_CODE));
@@ -221,17 +223,21 @@ public class OfflineClusterMemBasedServerQueryKillingTest 
extends BaseClusterInt
     Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got 
killed because"));
   }
 
-  @Test
-  public void testDigestOOM2()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDigestOOM2(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     JsonNode queryResponse = postQuery(OOM_QUERY_2);
     
Assert.assertTrue(queryResponse.get("exceptions").toString().contains("QueryCancelledException"));
     Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got 
killed because"));
   }
 
-  @Test
-  public void testDigestOOMMultipleQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDigestOOMMultipleQueries(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     AtomicReference<JsonNode> queryResponse1 = new AtomicReference<>();
     AtomicReference<JsonNode> queryResponse2 = new AtomicReference<>();
     AtomicReference<JsonNode> queryResponse3 = new AtomicReference<>();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
index ce8a12a478..35d8ea3b5e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
@@ -214,9 +214,11 @@ public class OfflineClusterServerCPUTimeQueryKillingTest 
extends BaseClusterInte
         .build();
   }
 
-  @Test
-  public void testDigestTimeoutMultipleQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDigestTimeoutMultipleQueries(boolean 
useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
     AtomicReference<JsonNode> queryResponse1 = new AtomicReference<>();
     AtomicReference<JsonNode> queryResponse2 = new AtomicReference<>();
     AtomicReference<JsonNode> queryResponse3 = new AtomicReference<>();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerMultiStageIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerMultiStageIntegrationTest.java
new file mode 100644
index 0000000000..9786388156
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerMultiStageIntegrationTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import org.testng.annotations.BeforeTest;
+
+
+public class OfflineGRPCServerMultiStageIntegrationTest extends 
OfflineGRPCServerIntegrationTest {
+  @BeforeTest
+  void enableMultiStage() {
+    setUseMultiStageQueryEngine(true);
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
index e36f3d6aac..6f91ad4af0 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
@@ -155,18 +155,20 @@ public class StarTreeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
     return new StarTreeIndexConfig(dimensions, null, functionColumnPairs, 
maxLeafRecords);
   }
 
-  @Test
-  public void testGeneratedQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testGeneratedQueries(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     for (int i = 0; i < NUM_QUERIES_TO_GENERATE; i += 2) {
       testStarQuery(_starTree1QueryGenerator.nextQuery());
       testStarQuery(_starTree2QueryGenerator.nextQuery());
     }
   }
 
-  @Test
-  public void testHardCodedQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testHardCodedQueries(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     // This query can test the case of one predicate matches all the child 
nodes but star-node cannot be used because
     // the predicate is included as remaining predicate from another branch
     String starQuery = "SELECT DepTimeBlk, COUNT(*) FROM mytable "
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
index a8535ea798..9d963eab75 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
@@ -178,9 +178,10 @@ public class TextIndicesTest extends 
CustomDataQueryClusterIntegrationTest {
         
.setQueryConfig(getQueryConfig()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
   }
 
-  @Test
-  public void testTextSearchCountQuery()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testTextSearchCountQuery(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     // Keep posting queries until all records are consumed
     long previousResult = 0;
     while (getCurrentCountStarResult() < NUM_RECORDS) {
@@ -201,9 +202,10 @@ public class TextIndicesTest extends 
CustomDataQueryClusterIntegrationTest {
     }, 10_000L, "Failed to reach expected number of matching records");
   }
 
-  @Test
-  public void testTextSearchCountQueryNative()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testTextSearchCountQueryNative(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     // Keep posting queries until all records are consumed
     long previousResult = 0;
     while (getCurrentCountStarResult() < NUM_RECORDS) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to