This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 78b76641f06 Add extra JSON fields (schemas and tableConfigs) in the
validateMultiStageQuery API - MSE (#16626)
78b76641f06 is described below
commit 78b76641f06b79def40d22217cf006ff766a8ded
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Thu Aug 28 16:20:12 2025 -0700
Add extra JSON fields (schemas and tableConfigs) in the
validateMultiStageQuery API - MSE (#16626)
---
.../common/config/provider/StaticTableCache.java | 223 +++++++++++++++++++
.../pinot/common/config/provider/TableCache.java | 91 +++++++-
.../pinot/common/config/provider/ZkTableCache.java | 97 +--------
.../api/resources/PinotQueryResource.java | 93 ++++++--
.../PinotQueryResourceStaticValidationTest.java | 72 ++++++
.../tests/MultiStageEngineIntegrationTest.java | 242 ++++++++++++++++++++-
6 files changed, 716 insertions(+), 102 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/StaticTableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/StaticTableCache.java
new file mode 100644
index 00000000000..1332b93496b
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/StaticTableCache.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.config.provider;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
+import org.apache.pinot.spi.config.provider.SchemaChangeListener;
+import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A static implementation that works with pre-loaded table configs and
schemas.
+ * This is useful for validation scenarios where you want to test query
compilation against a specific
+ * set of table configs and schemas without needing a live cluster.
+ */
+public class StaticTableCache implements TableCache {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StaticTableCache.class);
+
+ private final boolean _ignoreCase;
+ private final Map<String, TableConfig> _tableConfigMap = new HashMap<>();
+ private final Map<String, Schema> _schemaMap = new HashMap<>();
+ private final Map<String, LogicalTableConfig> _logicalTableConfigMap = new
HashMap<>();
+ private final Map<String, String> _tableNameMap = new HashMap<>();
+ private final Map<String, TableConfigInfo> _tableConfigInfoMap = new
ConcurrentHashMap<>();
+ private final Map<String, SchemaInfo> _schemaInfoMap = new
ConcurrentHashMap<>();
+ private final Map<String, LogicalTableConfigInfo> _logicalTableConfigInfoMap
= new ConcurrentHashMap<>();
+ private final Map<String, String> _logicalTableNameMap = new HashMap<>();
+
+ public StaticTableCache(List<TableConfig> tableConfigs, List<Schema> schemas,
+ List<LogicalTableConfig> logicalTableConfigs, boolean ignoreCase) {
+ _ignoreCase = ignoreCase;
+
+ for (TableConfig tableConfig : tableConfigs) {
+ String tableNameWithType = tableConfig.getTableName();
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+
+ _tableConfigMap.put(tableNameWithType, tableConfig);
+ _tableConfigInfoMap.put(tableNameWithType, new
TableConfigInfo(tableConfig));
+ if (_ignoreCase) {
+ _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
+ _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
+ } else {
+ _tableNameMap.put(tableNameWithType, tableNameWithType);
+ _tableNameMap.put(rawTableName, rawTableName);
+ }
+ }
+
+ for (Schema schema : schemas) {
+ addBuiltInVirtualColumns(schema);
+ String schemaName = schema.getSchemaName();
+ _schemaMap.put(schemaName, schema);
+ Map<String, String> columnNameMap = new HashMap<>();
+ if (_ignoreCase) {
+ for (String columnName : schema.getColumnNames()) {
+ columnNameMap.put(columnName.toLowerCase(), columnName);
+ }
+ } else {
+ for (String columnName : schema.getColumnNames()) {
+ columnNameMap.put(columnName, columnName);
+ }
+ }
+ _schemaInfoMap.put(schemaName, new SchemaInfo(schema, columnNameMap));
+ }
+
+ if (logicalTableConfigs != null) {
+ for (LogicalTableConfig logicalTableConfig : logicalTableConfigs) {
+ String logicalTableName = logicalTableConfig.getTableName();
+ _logicalTableConfigMap.put(logicalTableName, logicalTableConfig);
+ _logicalTableConfigInfoMap.put(logicalTableName, new
LogicalTableConfigInfo(logicalTableConfig));
+ if (_ignoreCase) {
+ _logicalTableNameMap.put(logicalTableName.toLowerCase(),
logicalTableName);
+ } else {
+ _logicalTableNameMap.put(logicalTableName, logicalTableName);
+ }
+ }
+ }
+
+ LOGGER.info(
+ "Initialized StaticTableCache with {} table configs, {} schemas, {}
logical table configs (ignoreCase: {})",
+ _tableConfigMap.size(), _schemaMap.size(),
_logicalTableNameMap.size(), ignoreCase);
+ }
+
+ @Override
+ public boolean isIgnoreCase() {
+ return _ignoreCase;
+ }
+
+ @Nullable
+ @Override
+ public String getActualTableName(String tableName) {
+ if (_ignoreCase) {
+ return _tableNameMap.get(tableName.toLowerCase());
+ } else {
+ return _tableNameMap.get(tableName);
+ }
+ }
+
+ @Nullable
+ @Override
+ public String getActualLogicalTableName(String logicalTableName) {
+ return _ignoreCase ?
_logicalTableNameMap.get(logicalTableName.toLowerCase())
+ : _logicalTableNameMap.get(logicalTableName);
+ }
+
+ @Override
+ public Map<String, String> getTableNameMap() {
+ return Collections.unmodifiableMap(_tableNameMap);
+ }
+
+ @Override
+ public Map<String, String> getLogicalTableNameMap() {
+ return Collections.unmodifiableMap(_logicalTableNameMap);
+ }
+
+ @Override
+ public List<String> getAllDimensionTables() {
+ List<String> dimensionTables = new ArrayList<>();
+ for (TableConfig tableConfig : _tableConfigMap.values()) {
+ if (tableConfig.isDimTable()) {
+ dimensionTables.add(tableConfig.getTableName());
+ }
+ }
+ return dimensionTables;
+ }
+
+ @Override
+ public Map<String, String> getColumnNameMap(String rawTableName) {
+ SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+ return schemaInfo != null ? schemaInfo._columnNameMap :
Collections.emptyMap();
+ }
+
+ @Nullable
+ @Override
+ public Map<Expression, Expression> getExpressionOverrideMap(String
physicalOrLogicalTableName) {
+ TableConfigInfo tableConfigInfo =
_tableConfigInfoMap.get(physicalOrLogicalTableName);
+ if (tableConfigInfo != null) {
+ return tableConfigInfo._expressionOverrideMap;
+ }
+ LogicalTableConfigInfo logicalTableConfigInfo =
_logicalTableConfigInfoMap.get(physicalOrLogicalTableName);
+ return logicalTableConfigInfo != null ?
logicalTableConfigInfo._expressionOverrideMap : null;
+ }
+
+ @Nullable
+ @Override
+ public Set<String> getTimestampIndexColumns(String tableNameWithType) {
+ TableConfigInfo tableConfigInfo =
_tableConfigInfoMap.get(tableNameWithType);
+ return tableConfigInfo != null ? tableConfigInfo._timestampIndexColumns :
null;
+ }
+
+ @Nullable
+ @Override
+ public TableConfig getTableConfig(String tableNameWithType) {
+ return _tableConfigMap.get(tableNameWithType);
+ }
+
+ @Nullable
+ @Override
+ public LogicalTableConfig getLogicalTableConfig(String logicalTableName) {
+ return _logicalTableConfigMap.get(logicalTableName);
+ }
+
+ @Override
+ public boolean registerTableConfigChangeListener(TableConfigChangeListener
tableConfigChangeListener) {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ public Schema getSchema(String rawTableName) {
+ SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+ return schemaInfo != null ? schemaInfo._schema : null;
+ }
+
+ @Override
+ public boolean registerSchemaChangeListener(SchemaChangeListener
schemaChangeListener) {
+ return false;
+ }
+
+ @Override
+ public boolean registerLogicalTableConfigChangeListener(
+ LogicalTableConfigChangeListener logicalTableConfigChangeListener) {
+ return false;
+ }
+
+ @Override
+ public List<LogicalTableConfig> getLogicalTableConfigs() {
+ return new ArrayList<>(_logicalTableConfigMap.values());
+ }
+
+ @Override
+ public boolean isLogicalTable(String logicalTableName) {
+ return _logicalTableConfigMap.containsKey(logicalTableName);
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 6d789e685c6..57fa9502e5a 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -18,18 +18,29 @@
*/
package org.apache.pinot.common.config.provider;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import javax.annotation.Nullable;
+import org.apache.commons.collections4.MapUtils;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
import org.apache.pinot.spi.config.provider.PinotConfigProvider;
import org.apache.pinot.spi.config.provider.SchemaChangeListener;
import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
+import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
+import org.apache.pinot.spi.utils.TimestampIndexUtils;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -38,6 +49,8 @@ import org.apache.pinot.spi.data.Schema;
* them in sync. It also maintains the table name map and the column name map
for case-insensitive queries.
*/
public interface TableCache extends PinotConfigProvider {
+ Logger LOGGER = LoggerFactory.getLogger(TableCache.class);
+
/**
* Returns {@code true} if the TableCache is case-insensitive, {@code false}
otherwise.
*/
@@ -126,6 +139,80 @@ public interface TableCache extends PinotConfigProvider {
boolean isLogicalTable(String logicalTableName);
@Override
- boolean registerLogicalTableConfigChangeListener(
- LogicalTableConfigChangeListener logicalTableConfigChangeListener);
+ boolean
registerLogicalTableConfigChangeListener(LogicalTableConfigChangeListener
logicalTableConfigChangeListener);
+
+ /**
+ * Adds the built-in virtual columns to the schema.
+ * NOTE: The virtual column provider class is not added.
+ */
+ default void addBuiltInVirtualColumns(Schema schema) {
+ if (!schema.hasColumn(BuiltInVirtualColumn.DOCID)) {
+ schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.DOCID,
FieldSpec.DataType.INT, true));
+ }
+ if (!schema.hasColumn(BuiltInVirtualColumn.HOSTNAME)) {
+ schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.HOSTNAME,
FieldSpec.DataType.STRING, true));
+ }
+ if (!schema.hasColumn(BuiltInVirtualColumn.SEGMENTNAME)) {
+ schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.SEGMENTNAME,
FieldSpec.DataType.STRING, true));
+ }
+ }
+
+ static Map<Expression, Expression> createExpressionOverrideMap(String
physicalOrLogicalTableName,
+ QueryConfig queryConfig) {
+ Map<Expression, Expression> expressionOverrideMap = new TreeMap<>();
+ if (queryConfig != null &&
MapUtils.isNotEmpty(queryConfig.getExpressionOverrideMap())) {
+ for (Map.Entry<String, String> entry :
queryConfig.getExpressionOverrideMap().entrySet()) {
+ try {
+ Expression srcExp =
CalciteSqlParser.compileToExpression(entry.getKey());
+ Expression destExp =
CalciteSqlParser.compileToExpression(entry.getValue());
+ expressionOverrideMap.put(srcExp, destExp);
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while compiling expression override:
{} -> {} for table: {}, skipping it",
+ entry.getKey(), entry.getValue(), physicalOrLogicalTableName);
+ }
+ }
+ int mapSize = expressionOverrideMap.size();
+ if (mapSize == 1) {
+ Map.Entry<Expression, Expression> entry =
expressionOverrideMap.entrySet().iterator().next();
+ return Collections.singletonMap(entry.getKey(), entry.getValue());
+ } else if (mapSize > 1) {
+ return expressionOverrideMap;
+ }
+ }
+ return null;
+ }
+
+ class TableConfigInfo {
+ final TableConfig _tableConfig;
+ final Map<Expression, Expression> _expressionOverrideMap;
+ // All the timestamp with granularity column names
+ final Set<String> _timestampIndexColumns;
+
+ public TableConfigInfo(TableConfig tableConfig) {
+ _tableConfig = tableConfig;
+ _expressionOverrideMap =
createExpressionOverrideMap(tableConfig.getTableName(),
tableConfig.getQueryConfig());
+ _timestampIndexColumns =
TimestampIndexUtils.extractColumnsWithGranularity(tableConfig);
+ }
+ }
+
+ class LogicalTableConfigInfo {
+ final LogicalTableConfig _logicalTableConfig;
+ final Map<Expression, Expression> _expressionOverrideMap;
+
+ LogicalTableConfigInfo(LogicalTableConfig logicalTableConfig) {
+ _logicalTableConfig = logicalTableConfig;
+ _expressionOverrideMap =
+ createExpressionOverrideMap(logicalTableConfig.getTableName(),
logicalTableConfig.getQueryConfig());
+ }
+ }
+
+ class SchemaInfo {
+ final Schema _schema;
+ final Map<String, String> _columnNameMap;
+
+ SchemaInfo(Schema schema, Map<String, String> columnNameMap) {
+ _schema = schema;
+ _columnNameMap = columnNameMap;
+ }
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/ZkTableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/ZkTableCache.java
index 8e688468b19..12b8a69382f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/ZkTableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/ZkTableCache.java
@@ -20,18 +20,15 @@ package org.apache.pinot.common.config.provider;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -45,17 +42,11 @@ import
org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
import org.apache.pinot.spi.config.provider.PinotConfigProvider;
import org.apache.pinot.spi.config.provider.SchemaChangeListener;
import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
-import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
-import org.apache.pinot.spi.utils.TimestampIndexUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,6 +145,7 @@ public class ZkTableCache implements TableCache {
/**
* Returns {@code true} if the TableCache is case-insensitive, {@code false}
otherwise.
*/
+ @Override
public boolean isIgnoreCase() {
return _ignoreCase;
}
@@ -163,6 +155,7 @@ public class ZkTableCache implements TableCache {
* does not exist.
*/
@Nullable
+ @Override
public String getActualTableName(String tableName) {
if (_ignoreCase) {
return _tableNameMap.get(tableName.toLowerCase());
@@ -177,6 +170,7 @@ public class ZkTableCache implements TableCache {
* @return Actual logical table name
*/
@Nullable
+ @Override
public String getActualLogicalTableName(String logicalTableName) {
return _ignoreCase
? _logicalTableNameMap.get(logicalTableName.toLowerCase())
@@ -187,6 +181,7 @@ public class ZkTableCache implements TableCache {
* Returns a map from table name to actual table name. For case-insensitive
case, the keys of the map are in lower
* case.
*/
+ @Override
public Map<String, String> getTableNameMap() {
return _tableNameMap;
}
@@ -196,6 +191,7 @@ public class ZkTableCache implements TableCache {
* are in lower case.
* @return Map from logical table name to actual logical table name
*/
+ @Override
public Map<String, String> getLogicalTableNameMap() {
return _logicalTableNameMap;
}
@@ -204,6 +200,7 @@ public class ZkTableCache implements TableCache {
* Get all dimension table names.
* @return List of dimension table names
*/
+ @Override
public List<String> getAllDimensionTables() {
List<String> dimensionTables = new ArrayList<>();
for (TableConfigInfo tableConfigInfo : _tableConfigInfoMap.values()) {
@@ -219,6 +216,7 @@ public class ZkTableCache implements TableCache {
* not exist. For case-insensitive case, the keys of the map are in lower
case.
*/
@Nullable
+ @Override
public Map<String, String> getColumnNameMap(String rawTableName) {
SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
return schemaInfo != null ? schemaInfo._columnNameMap : null;
@@ -229,6 +227,7 @@ public class ZkTableCache implements TableCache {
* configured.
*/
@Nullable
+ @Override
public Map<Expression, Expression> getExpressionOverrideMap(String
physicalOrLogicalTableName) {
TableConfigInfo tableConfigInfo =
_tableConfigInfoMap.get(physicalOrLogicalTableName);
if (tableConfigInfo != null) {
@@ -242,6 +241,7 @@ public class ZkTableCache implements TableCache {
* Returns the timestamp index columns for the given table, or {@code null}
if table does not exist.
*/
@Nullable
+ @Override
public Set<String> getTimestampIndexColumns(String tableNameWithType) {
TableConfigInfo tableConfigInfo =
_tableConfigInfoMap.get(tableNameWithType);
return tableConfigInfo != null ? tableConfigInfo._timestampIndexColumns :
null;
@@ -414,7 +414,7 @@ public class ZkTableCache implements TableCache {
}
private void putSchema(ZNRecord znRecord)
- throws IOException {
+ throws IOException {
Schema schema = SchemaSerDeUtils.fromZNRecord(znRecord);
addBuiltInVirtualColumns(schema);
String schemaName = schema.getSchemaName();
@@ -431,22 +431,6 @@ public class ZkTableCache implements TableCache {
_schemaInfoMap.put(schemaName, new SchemaInfo(schema, columnNameMap));
}
- /**
- * Adds the built-in virtual columns to the schema.
- * NOTE: The virtual column provider class is not added.
- */
- private static void addBuiltInVirtualColumns(Schema schema) {
- if (!schema.hasColumn(BuiltInVirtualColumn.DOCID)) {
- schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.DOCID,
FieldSpec.DataType.INT, true));
- }
- if (!schema.hasColumn(BuiltInVirtualColumn.HOSTNAME)) {
- schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.HOSTNAME,
FieldSpec.DataType.STRING, true));
- }
- if (!schema.hasColumn(BuiltInVirtualColumn.SEGMENTNAME)) {
- schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.SEGMENTNAME,
FieldSpec.DataType.STRING, true));
- }
- }
-
private void removeSchema(String path) {
_propertyStore.unsubscribeDataChanges(path, _zkSchemaChangeListener);
String schemaName = path.substring(SCHEMA_PATH_PREFIX.length());
@@ -479,6 +463,7 @@ public class ZkTableCache implements TableCache {
return tableConfigs;
}
+ @Override
public List<LogicalTableConfig> getLogicalTableConfigs() {
return _logicalTableConfigInfoMap.values().stream().map(o ->
o._logicalTableConfig).collect(Collectors.toList());
}
@@ -500,6 +485,7 @@ public class ZkTableCache implements TableCache {
return schemas;
}
+ @Override
public boolean isLogicalTable(String logicalTableName) {
logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() :
logicalTableName;
return _logicalTableConfigInfoMap.containsKey(logicalTableName);
@@ -645,63 +631,4 @@ public class ZkTableCache implements TableCache {
notifyLogicalTableConfigChangeListeners();
}
}
-
- private static Map<Expression, Expression>
createExpressionOverrideMap(String physicalOrLogicalTableName,
- QueryConfig queryConfig) {
- Map<Expression, Expression> expressionOverrideMap = new TreeMap<>();
- if (queryConfig != null &&
MapUtils.isNotEmpty(queryConfig.getExpressionOverrideMap())) {
- for (Map.Entry<String, String> entry :
queryConfig.getExpressionOverrideMap().entrySet()) {
- try {
- Expression srcExp =
CalciteSqlParser.compileToExpression(entry.getKey());
- Expression destExp =
CalciteSqlParser.compileToExpression(entry.getValue());
- expressionOverrideMap.put(srcExp, destExp);
- } catch (Exception e) {
- LOGGER.warn("Caught exception while compiling expression override:
{} -> {} for table: {}, skipping it",
- entry.getKey(), entry.getValue(), physicalOrLogicalTableName);
- }
- }
- int mapSize = expressionOverrideMap.size();
- if (mapSize == 1) {
- Map.Entry<Expression, Expression> entry =
expressionOverrideMap.entrySet().iterator().next();
- return Collections.singletonMap(entry.getKey(), entry.getValue());
- } else if (mapSize > 1) {
- return expressionOverrideMap;
- }
- }
- return null;
- }
-
- private static class TableConfigInfo {
- final TableConfig _tableConfig;
- final Map<Expression, Expression> _expressionOverrideMap;
- // All the timestamp with granularity column names
- final Set<String> _timestampIndexColumns;
-
- private TableConfigInfo(TableConfig tableConfig) {
- _tableConfig = tableConfig;
- _expressionOverrideMap =
createExpressionOverrideMap(tableConfig.getTableName(),
tableConfig.getQueryConfig());
- _timestampIndexColumns =
TimestampIndexUtils.extractColumnsWithGranularity(tableConfig);
- }
- }
-
- private static class SchemaInfo {
- final Schema _schema;
- final Map<String, String> _columnNameMap;
-
- private SchemaInfo(Schema schema, Map<String, String> columnNameMap) {
- _schema = schema;
- _columnNameMap = columnNameMap;
- }
- }
-
- private static class LogicalTableConfigInfo {
- final LogicalTableConfig _logicalTableConfig;
- final Map<Expression, Expression> _expressionOverrideMap;
-
- private LogicalTableConfigInfo(LogicalTableConfig logicalTableConfig) {
- _logicalTableConfig = logicalTableConfig;
- _expressionOverrideMap =
createExpressionOverrideMap(logicalTableConfig.getTableName(),
- logicalTableConfig.getQueryConfig());
- }
- }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index bed6e19b32d..a2278d12412 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.controller.api.resources;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.swagger.annotations.ApiOperation;
@@ -51,10 +53,13 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.calcite.sql.SqlNode;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.config.provider.StaticTableCache;
+import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.DatabaseUtils;
@@ -70,7 +75,9 @@ import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.parser.utils.ParserUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.exception.DatabaseConflictException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryException;
@@ -153,27 +160,41 @@ public class PinotQueryResource {
@POST
@Path("validateMultiStageQuery")
- public MultiStageQueryValidationResponse validateMultiStageQuery(String
requestJsonStr,
+ public MultiStageQueryValidationResponse
validateMultiStageQuery(MultiStageQueryValidationRequest request,
@Context HttpHeaders httpHeaders) {
- JsonNode requestJson;
- try {
- requestJson = JsonUtils.stringToJsonNode(requestJsonStr);
- } catch (Exception e) {
- LOGGER.warn("Caught exception while parsing request {}", e.getMessage());
- return new MultiStageQueryValidationResponse(false, "Failed to parse
request JSON: " + e.getMessage(), null);
- }
- if (!requestJson.has("sql")) {
- return new MultiStageQueryValidationResponse(false, "JSON Payload is
missing the query string field 'sql'", null);
+
+ String sqlQuery = request.getSql().trim();
+ if (request.getSql() == null || sqlQuery.isEmpty()) {
+ return new MultiStageQueryValidationResponse(false, "Request is missing
the query string field 'sql'", null);
}
- String sqlQuery = requestJson.get("sql").asText();
+
Map<String, String> queryOptionsMap =
RequestUtils.parseQuery(sqlQuery).getOptions();
String database =
DatabaseUtils.extractDatabaseFromQueryRequest(queryOptionsMap, httpHeaders);
- try (QueryEnvironment.CompiledQuery compiledQuery = new
QueryEnvironment(database,
- _pinotHelixResourceManager.getTableCache(), null).compile(sqlQuery)) {
- return new MultiStageQueryValidationResponse(true, null, null);
+
+ try {
+ TableCache tableCache;
+ if (CollectionUtils.isNotEmpty(request.getTableConfigs()) &&
CollectionUtils.isNotEmpty(request.getSchemas())) {
+ tableCache =
+ new StaticTableCache(request.getTableConfigs(),
request.getSchemas(), request.getLogicalTableConfigs(),
+ request.isIgnoreCase());
+ LOGGER.info("Validating multi-stage query compilation using static
table cache for query: {}",
+ request.getSql());
+ } else {
+ // Use TableCache from environment if static fields are not specified
+ tableCache = _pinotHelixResourceManager.getTableCache();
+ LOGGER.info("Validating multi-stage query compilation using Zk table
cache for query: {}", request.getSql());
+ }
+ try (QueryEnvironment.CompiledQuery compiledQuery = new
QueryEnvironment(database, tableCache, null).compile(
+ sqlQuery)) {
+ return new MultiStageQueryValidationResponse(true, null, null);
+ }
} catch (QueryException e) {
LOGGER.info("Caught exception while compiling multi-stage query: {}",
e.getMessage());
return new MultiStageQueryValidationResponse(false, e.getMessage(),
e.getErrorCode());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while validating multi-stage query: {}",
e.getMessage());
+ return new MultiStageQueryValidationResponse(false, "Unexpected error: "
+ e.getMessage(),
+ QueryErrorCode.UNKNOWN);
}
}
@@ -204,6 +225,50 @@ public class PinotQueryResource {
}
}
+ public static class MultiStageQueryValidationRequest {
+ private final String _sql;
+ private final List<TableConfig> _tableConfigs;
+ private final List<Schema> _schemas;
+ private final List<LogicalTableConfig> _logicalTableConfigs;
+ private final boolean _ignoreCase;
+
+ @JsonCreator
+ public MultiStageQueryValidationRequest(@JsonProperty("sql") String sql,
+ @JsonProperty("tableConfigs") @Nullable List<TableConfig> tableConfigs,
+ @JsonProperty("schemas") @Nullable List<Schema> schemas,
+ @JsonProperty("logicalTableConfigs") @Nullable
List<LogicalTableConfig> logicalTableConfigs,
+ @JsonProperty("ignoreCase") boolean ignoreCase) {
+ _sql = sql;
+ _tableConfigs = tableConfigs;
+ _schemas = schemas;
+ _logicalTableConfigs = logicalTableConfigs;
+ _ignoreCase = ignoreCase;
+ }
+
+ public String getSql() {
+ return _sql;
+ }
+
+ @Nullable
+ public List<TableConfig> getTableConfigs() {
+ return _tableConfigs;
+ }
+
+ @Nullable
+ public List<Schema> getSchemas() {
+ return _schemas;
+ }
+
+ @Nullable
+ public List<LogicalTableConfig> getLogicalTableConfigs() {
+ return _logicalTableConfigs;
+ }
+
+ public boolean isIgnoreCase() {
+ return _ignoreCase;
+ }
+ }
+
private StreamingOutput executeSqlQueryCatching(HttpHeaders httpHeaders,
String sqlQuery, String traceEnabled,
String queryOptions) {
try {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotQueryResourceStaticValidationTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotQueryResourceStaticValidationTest.java
new file mode 100644
index 00000000000..ab715485d54
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotQueryResourceStaticValidationTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.config.provider.StaticTableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for the static table cache functionality in PinotQueryResource.
+ */
+public class PinotQueryResourceStaticValidationTest {
+
+ private ObjectMapper _objectMapper;
+
+ @BeforeClass
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ _objectMapper = new ObjectMapper();
+ }
+
+ @Test
+ public void testStaticTableCacheProvider() {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable")
+ .addSingleValueDimension("dimensionCol", FieldSpec.DataType.STRING)
+ .addMetric("metricCol", FieldSpec.DataType.LONG).build();
+
+ List<TableConfig> tableConfigs = Arrays.asList(tableConfig);
+ List<Schema> schemas = Arrays.asList(schema);
+
+ StaticTableCache provider = new StaticTableCache(tableConfigs, schemas,
null, false);
+
+ Assert.assertFalse(provider.isIgnoreCase());
+ Assert.assertEquals(provider.getActualTableName("testTable_OFFLINE"),
"testTable_OFFLINE");
+ Assert.assertEquals(provider.getActualTableName("testTable"), "testTable");
+ Assert.assertNotNull(provider.getTableConfig("testTable_OFFLINE"));
+ Assert.assertNotNull(provider.getSchema("testTable"));
+ Assert.assertNotNull(provider.getColumnNameMap("testTable"));
+ Assert.assertEquals(provider.getColumnNameMap("testTable").size(), 5); //
2 columns + 3 built-in virtual columns
+
+
Assert.assertTrue(provider.getTableNameMap().containsKey("testTable_OFFLINE"));
+ Assert.assertTrue(provider.getTableNameMap().containsKey("testTable"));
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 6b0ac607400..b45a47b153d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -47,8 +47,13 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import
org.apache.pinot.controller.api.resources.PinotQueryResource.MultiStageQueryValidationRequest;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.FieldSpec;
@@ -57,7 +62,9 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.Enablement;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTime;
@@ -75,7 +82,6 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
-
public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestSet {
private static final String SCHEMA_FILE_NAME =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
private static final String DEFAULT_DATABASE_NAME =
CommonConstants.DEFAULT_DATABASE;
@@ -1736,6 +1742,240 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
assertFalse(result.get("errorMessage").isNull());
}
+ @Test
+ public void testValidateQueryApiSuccessfulQueries() throws Exception {
+ JsonNode tableConfigsNode = JsonUtils.stringToJsonNode(
+ sendGetRequest(getControllerBaseApiUrl() + "/tables/mytable"));
+ JsonNode schemaNode = JsonUtils.stringToJsonNode(
+ sendGetRequest(getControllerBaseApiUrl() + "/schemas/mytable"));
+
+ String[] successfulQueries = {
+ "SELECT COUNT(*) FROM mytable",
+ "SELECT DivAirportSeqIDs, COUNT(*) FROM mytable GROUP BY
DivAirportSeqIDs",
+ "SELECT DivAirportSeqIDs FROM mytable WHERE
arrayToMV(DivAirportSeqIDs) > 0 LIMIT 10",
+ "SELECT DivAirportSeqIDs, AirlineID FROM mytable ORDER BY
DivAirportSeqIDs LIMIT 5",
+ "SELECT SUM(arrayToMV(DivAirportSeqIDs)) AS total FROM mytable",
+ "SELECT AVG(arrayToMV(DivAirportSeqIDs)) FROM mytable WHERE AirlineID
IS NOT NULL"
+ };
+
+ List<TableConfig> tableConfigs = new ArrayList<>();
+ JsonNode offlineConfig = tableConfigsNode.get("OFFLINE");
+ if (offlineConfig != null && !offlineConfig.isMissingNode() &&
!offlineConfig.isEmpty()) {
+ tableConfigs.add(JsonUtils.jsonNodeToObject(offlineConfig,
TableConfig.class));
+ }
+ JsonNode realtimeConfig = tableConfigsNode.get("REALTIME");
+ if (realtimeConfig != null && !realtimeConfig.isMissingNode() &&
!realtimeConfig.isEmpty()) {
+ tableConfigs.add(JsonUtils.jsonNodeToObject(realtimeConfig,
TableConfig.class));
+ }
+
+ Schema schema = JsonUtils.jsonNodeToObject(schemaNode, Schema.class);
+ List<Schema> schemas = Collections.singletonList(schema);
+
+ for (String query : successfulQueries) {
+ MultiStageQueryValidationRequest request = new
MultiStageQueryValidationRequest(
+ query, tableConfigs, schemas, null, false);
+
+ String requestJson = JsonUtils.objectToString(request);
+ JsonNode result = JsonUtils.stringToJsonNode(
+ sendPostRequest(getControllerBaseApiUrl() +
"/validateMultiStageQuery", requestJson, null));
+
+ assertTrue(result.get("compiledSuccessfully").asBoolean(),
+ "Query should compile successfully: " + query);
+ assertTrue(result.get("errorCode").isNull());
+ assertTrue(result.get("errorMessage").isNull());
+ }
+ }
+
+ @Test
+ public void testValidateQueryApiUnsuccessfulQueries() throws Exception {
+ JsonNode tableConfigsNode =
+ JsonUtils.stringToJsonNode(sendGetRequest(getControllerBaseApiUrl() +
"/tables/mytable"));
+ JsonNode schemaNode =
+ JsonUtils.stringToJsonNode(sendGetRequest(getControllerBaseApiUrl() +
"/schemas/mytable"));
+
+ List<TableConfig> tableConfigs = new ArrayList<>();
+ JsonNode offlineConfig = tableConfigsNode.get("OFFLINE");
+ if (offlineConfig != null && !offlineConfig.isMissingNode() &&
!offlineConfig.isEmpty()) {
+ tableConfigs.add(JsonUtils.jsonNodeToObject(offlineConfig,
TableConfig.class));
+ }
+ JsonNode realtimeConfig = tableConfigsNode.get("REALTIME");
+ if (realtimeConfig != null && !realtimeConfig.isMissingNode() &&
!realtimeConfig.isEmpty()) {
+ tableConfigs.add(JsonUtils.jsonNodeToObject(realtimeConfig,
TableConfig.class));
+ }
+
+ Schema schema = JsonUtils.jsonNodeToObject(schemaNode, Schema.class);
+ List<Schema> schemas = Collections.singletonList(schema);
+
+ // Invalid column in the query
+ MultiStageQueryValidationRequest request = new
MultiStageQueryValidationRequest(
+ "SELECT nonExistentColumn FROM mytable", tableConfigs, schemas, null,
true);
+
+ String requestJson = JsonUtils.objectToString(request);
+ JsonNode result = JsonUtils.stringToJsonNode(
+ sendPostRequest(getControllerBaseApiUrl() +
"/validateMultiStageQuery", requestJson, null));
+ assertFalse(result.get("compiledSuccessfully").asBoolean());
+ assertEquals(result.get("errorCode").asText(),
QueryErrorCode.QUERY_VALIDATION.name());
+
+ // Cannot apply '>' to arguments of type '<INTEGER> to <ARRAY>
+ String query = "SELECT DivAirportSeqIDs FROM mytable WHERE
DivAirportSeqIDs > 0 LIMIT 10";
+ request = new MultiStageQueryValidationRequest(query, tableConfigs,
schemas, null, false);
+
+ requestJson = JsonUtils.objectToString(request);
+ result = JsonUtils.stringToJsonNode(
+ sendPostRequest(getControllerBaseApiUrl() +
"/validateMultiStageQuery", requestJson, null));
+
+ assertFalse(result.get("compiledSuccessfully").asBoolean(), "Query should
not compile successfully: " + query);
+ assertEquals(result.get("errorCode").asText(),
QueryErrorCode.QUERY_VALIDATION.name());
+ assertFalse(result.get("errorMessage").isNull(), "Error message should not
be null for: " + query);
+
+ // Non-existent table
+ query = "SELECT count(*) FROM nonExistentTable";
+ request = new MultiStageQueryValidationRequest(query, tableConfigs,
schemas, null, false);
+
+ requestJson = JsonUtils.objectToString(request);
+ result = JsonUtils.stringToJsonNode(
+ sendPostRequest(getControllerBaseApiUrl() +
"/validateMultiStageQuery", requestJson, null));
+
+ assertFalse(result.get("compiledSuccessfully").asBoolean(), "Query should
not compile successfully: " + query);
+ assertEquals(result.get("errorCode").asText(),
QueryErrorCode.TABLE_DOES_NOT_EXIST.name());
+ assertFalse(result.get("errorMessage").isNull(), "Error message should not
be null for: " + query);
+ }
+
+ @Test
+ public void testValidateQueryApiWithStaticTable()
+ throws Exception {
+
+ Schema schema = new
Schema.SchemaBuilder().setSchemaName("staticTableTest").setEnableColumnBasedNullHandling(false)
+ .addSingleValueDimension("event_id", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("dummy_realtime", FieldSpec.DataType.STRING)
+ .addDateTime("mtime", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH",
"1:MILLISECONDS")
+ .setPrimaryKeyColumns(Collections.singletonList("event_id")).build();
+
+ Map<String, String> streamConfigs = new HashMap<>();
+ streamConfigs.put("streamType", "fake");
+ streamConfigs.put("stream.fake.num.partitions", "2");
+ streamConfigs.put("stream.fake.topic.name", "fake_topic");
+ streamConfigs.put("stream.fake.consumer.factory.class.name",
+ "ai.startree.pinot.plugin.fakestream.FakeStreamConsumerFactory");
+ streamConfigs.put("stream.fake.decoder.class.name",
"ai.startree.pinot.plugin.fakestream.FakeStreamMessageDecoder");
+ streamConfigs.put("stream.fake.decoder.prop.colval.event_id",
"$partitionLongRange,1,100000000");
+ streamConfigs.put("stream.fake.decoder.prop.colval.mtime", "$timestamp");
+ streamConfigs.put("stream.fake.decoder.prop.partition.specific.colvals",
"event_id");
+ streamConfigs.put("stream.fake.decoder.prop.pad.colvals", "event_id");
+ streamConfigs.put("stream.fake.decoder.prop.pad.content",
"dummy_realtime");
+ streamConfigs.put("stream.fake.decoder.prop.pad.times", "1");
+ streamConfigs.put("stream.fake.consumer.fetch.interval.ms", "1");
+ streamConfigs.put("realtime.segment.flush.threshold.rows", "500000");
+
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setSnapshot(Enablement.ENABLE);
+ upsertConfig.setPreload(Enablement.ENABLE);
+ upsertConfig.setHashFunction(HashFunction.NONE);
+ upsertConfig.setComparisonColumns(Collections.singletonList("mtime"));
+ upsertConfig.setDeleteRecordColumn("event_id");
+ upsertConfig.setMetadataTTL(0);
+ upsertConfig.setDeletedKeysTTL(0);
+ upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE);
+ upsertConfig.setEnableSnapshot(true);
+ upsertConfig.setEnablePreload(true);
+ upsertConfig.setDropOutOfOrderRecord(false);
+ upsertConfig.setNewSegmentTrackingTimeMs(10000L);
+
upsertConfig.setMetadataManagerClass("ai.startree.pinot.upsert.rocksdb.RocksDBTableUpsertMetadataManager");
+
+ Map<String, String> metadataManagerConfigs = new HashMap<>();
+ metadataManagerConfigs.put("rocksdb.preload.num_partition_overwrite", "2");
+ upsertConfig.setMetadataManagerConfigs(metadataManagerConfigs);
+
+
upsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
+ upsertConfig.setUpsertViewRefreshIntervalMs(3000L);
+
+ RoutingConfig routingConfig =
+ new RoutingConfig(null,
Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE),
+ RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, true);
+
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName("staticTableTest").setTimeColumnName("mtime")
+
.setTimeType("MILLISECONDS").setRetentionTimeUnit("DAYS").setRetentionTimeValue("5000")
+
.setDeletedSegmentsRetentionPeriod("7d").setSegmentAssignmentStrategy("BalanceNumSegmentAssignmentStrategy")
+
.setNumReplicas(1).setSegmentPushType("APPEND").setBrokerTenant("DefaultTenant")
+
.setServerTenant("DefaultTenant").setLoadMode("MMAP").setAggregateMetrics(false)
+
.setOptimizeDictionary(false).setOptimizeDictionaryForMetrics(false).setNoDictionarySizeRatioThreshold(0.85)
+
.setNullHandlingEnabled(false).setSkipSegmentPreprocess(false).setOptimizeDictionaryType(false)
+
.setCreateInvertedIndexDuringSegmentGeneration(false).setColumnMajorSegmentBuilderEnabled(false)
+
.setStreamConfigs(streamConfigs).setRoutingConfig(routingConfig).setUpsertConfig(upsertConfig)
+ .setIsDimTable(false).build();
+
+ List<TableConfig> tableConfigs = Collections.singletonList(tableConfig);
+ List<Schema> schemas = Collections.singletonList(schema);
+
+ String query = "SELECT nonExistentColumn FROM staticTableTest";
+ // Invalid column in the static table query
+ MultiStageQueryValidationRequest request =
+ new MultiStageQueryValidationRequest(query, tableConfigs, schemas,
null, true);
+
+ String requestJson = JsonUtils.objectToString(request);
+ JsonNode result = JsonUtils.stringToJsonNode(
+ sendPostRequest(getControllerBaseApiUrl() +
"/validateMultiStageQuery", requestJson, null));
+ assertFalse(result.get("compiledSuccessfully").asBoolean());
+ assertEquals(result.get("errorCode").asText(),
QueryErrorCode.QUERY_VALIDATION.name());
+
+ // Successful query with existing column
+ query = "SELECT event_id FROM staticTableTest";
+ request = new MultiStageQueryValidationRequest(query, tableConfigs,
schemas, null, false);
+
+ requestJson = JsonUtils.objectToString(request);
+ result = JsonUtils.stringToJsonNode(
+ sendPostRequest(getControllerBaseApiUrl() +
"/validateMultiStageQuery", requestJson, null));
+
+ assertTrue(result.get("compiledSuccessfully").asBoolean(), "Query should
compile successfully: " + query);
+ assertTrue(result.get("errorCode").isNull());
+ assertTrue(result.get("errorMessage").isNull());
+ }
+
+ @Test
+ public void testValidateQueryApiWithIgnoreCaseOption()
+ throws Exception {
+ JsonNode tableConfigsNode =
+ JsonUtils.stringToJsonNode(sendGetRequest(getControllerBaseApiUrl() +
"/tables/mytable"));
+ JsonNode schemaNode =
JsonUtils.stringToJsonNode(sendGetRequest(getControllerBaseApiUrl() +
"/schemas/mytable"));
+
+ List<TableConfig> tableConfigs = new ArrayList<>();
+ JsonNode offlineConfig = tableConfigsNode.get("OFFLINE");
+ if (offlineConfig != null && !offlineConfig.isMissingNode() &&
!offlineConfig.isEmpty()) {
+ tableConfigs.add(JsonUtils.jsonNodeToObject(offlineConfig,
TableConfig.class));
+ }
+ JsonNode realtimeConfig = tableConfigsNode.get("REALTIME");
+ if (realtimeConfig != null && !realtimeConfig.isMissingNode() &&
!realtimeConfig.isEmpty()) {
+ tableConfigs.add(JsonUtils.jsonNodeToObject(realtimeConfig,
TableConfig.class));
+ }
+ Schema schema = JsonUtils.jsonNodeToObject(schemaNode, Schema.class);
+ List<Schema> schemas = Collections.singletonList(schema);
+
+ // Test case-sensitive mode
+ MultiStageQueryValidationRequest request = new
MultiStageQueryValidationRequest(
+ "SELECT divairportseqids FROM mytable", tableConfigs, schemas, null,
false);
+
+ String requestJson = JsonUtils.objectToString(request);
+ JsonNode result = JsonUtils.stringToJsonNode(
+ sendPostRequest(getControllerBaseApiUrl() +
"/validateMultiStageQuery", requestJson, null));
+ assertTrue(result.get("compiledSuccessfully").asBoolean(),
+ "Query should compile successfully in case-sensitive mode");
+ assertTrue(result.get("errorCode").isNull(), "Error code should be null in
case-sensitive mode");
+ assertTrue(result.get("errorMessage").isNull(), "Error message should be
null in case-sensitive mode");
+
+ // Test case-insensitive mode
+ request = new MultiStageQueryValidationRequest(
+ "SELECT divairportseqids FROM mytable", tableConfigs, schemas, null,
true);
+
+ requestJson = JsonUtils.objectToString(request);
+ result = JsonUtils.stringToJsonNode(
+ sendPostRequest(getControllerBaseApiUrl() +
"/validateMultiStageQuery", requestJson, null));
+ assertTrue(result.get("compiledSuccessfully").asBoolean(),
+ "Query should compile successfully in case-insensitive mode");
+ assertTrue(result.get("errorCode").isNull(), "Error code should be null in
case-insensitive mode");
+ assertTrue(result.get("errorMessage").isNull(), "Error message should be
null in case-insensitive mode");
+ }
+
private void checkQueryResultForDBTest(String column, String tableName)
throws Exception {
checkQueryResultForDBTest(column, tableName, null, null);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]