This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9f22322 Adding column name rewrite for the identifiers in the format of [table_name].[column_name] (#5734) 9f22322 is described below commit 9f22322132fccd8ca1c777a8568962952daa27a3 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Sat Jul 25 11:51:11 2020 -0700 Adding column name rewrite for the identifiers in the format of [table_name].[column_name] (#5734) --- .../requesthandler/BaseBrokerRequestHandler.java | 90 +++++++++++++--------- .../tests/OfflineClusterIntegrationTest.java | 73 ++++++++++++++---- 2 files changed, 115 insertions(+), 48 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 9654554..e442b5d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -202,12 +202,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } } updateQuerySource(brokerRequest); - if (_enableCaseInsensitive) { - try { - handleCaseSensitivity(brokerRequest); - } catch (Exception e) { - LOGGER.warn("Caught exception while rewriting PQL to make it case-insensitive {}: {}, {}", requestId, query, e); - } + try { + updateColumnNames(brokerRequest); + } catch (Exception e) { + LOGGER.warn("Caught exception while updating Column names in Query {}: {}, {}", requestId, query, e); } if (_defaultHllLog2m > 0) { handleHyperloglogLog2mOverride(brokerRequest, _defaultHllLog2m); @@ -450,18 +448,22 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { private void updateQuerySource(BrokerRequest brokerRequest) { String tableName = brokerRequest.getQuerySource().getTableName(); // Check if table is in the format of [database_name].[table_name] - String[] tableNameSplits = StringUtils.split(tableName, '.'); - if (tableNameSplits.length != 2) { - return; - } + String[] tableNameSplits = StringUtils.split(tableName, ".", 2); // Update table name if there is no existing table in the format of [database_name].[table_name] but only [table_name] if (_enableCaseInsensitive) { + if (tableNameSplits.length < 2) { + brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableName)); + return; + } if (_tableCache.containsTable(tableNameSplits[1]) && !_tableCache.containsTable(tableName)) { // Use TableCache to check case insensitive table name. - brokerRequest.getQuerySource().setTableName(tableNameSplits[1]); + brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableNameSplits[1])); } return; } + if (tableNameSplits.length < 2) { + return; + } // Use RoutingManager to check case sensitive table name. if (TableNameBuilder.isTableResource(tableName)) { if (_routingManager.routingExists(tableNameSplits[1]) && !_routingManager.routingExists(tableName)) { @@ -667,19 +669,17 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } /** - * Fixes the case-insensitive column names to the actual column names in the given broker request. + * Fixes the column names to the actual column names in the given broker request. */ - private void handleCaseSensitivity(BrokerRequest brokerRequest) { - String inputTableName = brokerRequest.getQuerySource().getTableName(); - String actualTableName = _tableCache.getActualTableName(inputTableName); - brokerRequest.getQuerySource().setTableName(actualTableName); + private void updateColumnNames(BrokerRequest brokerRequest) { + String tableName = brokerRequest.getQuerySource().getTableName(); //fix columns if (brokerRequest.getFilterSubQueryMap() != null) { Collection<FilterQuery> values = brokerRequest.getFilterSubQueryMap().getFilterQueryMap().values(); for (FilterQuery filterQuery : values) { if (filterQuery.getNestedFilterQueryIdsSize() == 0) { String expression = filterQuery.getColumn(); - filterQuery.setColumn(fixColumnNameCase(actualTableName, expression)); + filterQuery.setColumn(fixColumnName(tableName, expression)); } } } @@ -688,14 +688,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (!info.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) { // Always read from backward compatible api in AggregationFunctionUtils. List<String> arguments = AggregationFunctionUtils.getArguments(info); - arguments.replaceAll(e -> fixColumnNameCase(actualTableName, e)); + arguments.replaceAll(e -> fixColumnName(tableName, e)); info.setExpressions(arguments); } } if (brokerRequest.isSetGroupBy()) { List<String> expressions = brokerRequest.getGroupBy().getExpressions(); for (int i = 0; i < expressions.size(); i++) { - expressions.set(i, fixColumnNameCase(actualTableName, expressions.get(i))); + expressions.set(i, fixColumnName(tableName, expressions.get(i))); } } } else { @@ -704,7 +704,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { for (int i = 0; i < selectionColumns.size(); i++) { String expression = selectionColumns.get(i); if (!expression.equals("*")) { - selectionColumns.set(i, fixColumnNameCase(actualTableName, expression)); + selectionColumns.set(i, fixColumnName(tableName, expression)); } } } @@ -712,66 +712,86 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { List<SelectionSort> orderBy = brokerRequest.getOrderBy(); for (SelectionSort selectionSort : orderBy) { String expression = selectionSort.getColumn(); - selectionSort.setColumn(fixColumnNameCase(actualTableName, expression)); + selectionSort.setColumn(fixColumnName(tableName, expression)); } } PinotQuery pinotQuery = brokerRequest.getPinotQuery(); if (pinotQuery != null) { - pinotQuery.getDataSource().setTableName(actualTableName); + pinotQuery.getDataSource().setTableName(tableName); for (Expression expression : pinotQuery.getSelectList()) { - fixColumnNameCase(actualTableName, expression); + fixColumnName(tableName, expression); } Expression filterExpression = pinotQuery.getFilterExpression(); if (filterExpression != null) { - fixColumnNameCase(actualTableName, filterExpression); + fixColumnName(tableName, filterExpression); } List<Expression> groupByList = pinotQuery.getGroupByList(); if (groupByList != null) { for (Expression expression : groupByList) { - fixColumnNameCase(actualTableName, expression); + fixColumnName(tableName, expression); } } List<Expression> orderByList = pinotQuery.getOrderByList(); if (orderByList != null) { for (Expression expression : orderByList) { - fixColumnNameCase(actualTableName, expression); + fixColumnName(tableName, expression); } } Expression havingExpression = pinotQuery.getHavingExpression(); if (havingExpression != null) { - fixColumnNameCase(actualTableName, havingExpression); + fixColumnName(tableName, havingExpression); } } } - private String fixColumnNameCase(String tableNameWithType, String expression) { + private String fixColumnName(String tableNameWithType, String expression) { TransformExpressionTree expressionTree = TransformExpressionTree.compileToExpressionTree(expression); - fixColumnNameCase(tableNameWithType, expressionTree); + fixColumnName(tableNameWithType, expressionTree); return expressionTree.toString(); } - private void fixColumnNameCase(String tableNameWithType, TransformExpressionTree expression) { + private void fixColumnName(String tableNameWithType, TransformExpressionTree expression) { TransformExpressionTree.ExpressionType expressionType = expression.getExpressionType(); if (expressionType == TransformExpressionTree.ExpressionType.IDENTIFIER) { - expression.setValue(_tableCache.getActualColumnName(tableNameWithType, expression.getValue())); + String identifier = expression.getValue(); + expression.setValue(getActualColumnName(tableNameWithType, identifier)); } else if (expressionType == TransformExpressionTree.ExpressionType.FUNCTION) { for (TransformExpressionTree child : expression.getChildren()) { - fixColumnNameCase(tableNameWithType, child); + fixColumnName(tableNameWithType, child); } } } - private void fixColumnNameCase(String tableNameWithType, Expression expression) { + private void fixColumnName(String tableNameWithType, Expression expression) { ExpressionType expressionType = expression.getType(); if (expressionType == ExpressionType.IDENTIFIER) { Identifier identifier = expression.getIdentifier(); - identifier.setName(_tableCache.getActualColumnName(tableNameWithType, identifier.getName())); + identifier.setName(getActualColumnName(tableNameWithType, identifier.getName())); } else if (expressionType == ExpressionType.FUNCTION) { for (Expression operand : expression.getFunctionCall().getOperands()) { - fixColumnNameCase(tableNameWithType, operand); + fixColumnName(tableNameWithType, operand); + } + } + } + + private String getActualColumnName(String tableNameWithType, String columnName) { + String[] splits = StringUtils.split(columnName, ".", 2); + if (_enableCaseInsensitive) { + if (splits.length == 2) { + if (TableNameBuilder.extractRawTableName(tableNameWithType).equalsIgnoreCase(splits[0])) { + return _tableCache.getActualColumnName(tableNameWithType, splits[1]); + } + } + return _tableCache.getActualColumnName(tableNameWithType, columnName); + } else { + if (splits.length == 2) { + if (TableNameBuilder.extractRawTableName(tableNameWithType).equals(splits[0])) { + return splits[1]; + } } } + return columnName; } private static Map<String, String> getOptionsFromJson(JsonNode request, String optionsKey) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 9a7e49d..9827001 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -1159,24 +1159,71 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable", "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')"); List<String> queries = new ArrayList<>(); - baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"))); - baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"))); + baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"))); + baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"))); - // Wait for at most 10 seconds for broker to get the ZK callback of the schema change - TestUtils.waitForCondition(aVoid -> { + for (String query : queries) { try { - for (String query : queries) { - JsonNode response = postQuery(query); - // NOTE: When table does not exist, we will get 'BrokerResourceMissingError'. - // When column does not exist, all segments will be pruned and 'numSegmentsProcessed' will be 0. - return response.get("exceptions").size() == 0 && response.get("numSegmentsProcessed").asInt() > 0; - } + postQuery(query); } catch (Exception e) { // Fail the test when exception caught - throw new RuntimeException(e); + throw new RuntimeException("Got Exceptions from query - " + query); + } + } + } + + @Test + public void testColumnNameContainsTableName() { + int daysSinceEpoch = 16138; + long secondsSinceEpoch = 16138 * 24 * 60 * 60; + List<String> baseQueries = Arrays.asList("SELECT * FROM mytable", + "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable", + "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000", + "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000", + "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch, + "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch, + "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch, + "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable", + "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')"); + List<String> queries = new ArrayList<>(); + baseQueries.forEach(q -> queries.add(q.replace("DaysSinceEpoch", "mytable.DAYSSinceEpOch"))); + baseQueries.forEach(q -> queries.add(q.replace("DaysSinceEpoch", "mytable.DAYSSinceEpOch"))); + + for (String query : queries) { + try { + postQuery(query); + } catch (Exception e) { + // Fail the test when exception caught + throw new RuntimeException("Got Exceptions from query - " + query); } - return true; - }, 10_000L, "Failed to get results for case-insensitive queries"); + } + } + + @Test + public void testCaseInsensitivityWithColumnNameContainsTableName() { + int daysSinceEpoch = 16138; + long secondsSinceEpoch = 16138 * 24 * 60 * 60; + List<String> baseQueries = Arrays.asList("SELECT * FROM mytable", + "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable", + "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000", + "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000", + "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch, + "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch, + "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch, + "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable", + "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')"); + List<String> queries = new ArrayList<>(); + baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch"))); + baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch"))); + + for (String query : queries) { + try { + postQuery(query); + } catch (Exception e) { + // Fail the test when exception caught + throw new RuntimeException("Got Exceptions from query - " + query); + } + } } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org