This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f22322  Adding column name rewrite for the identifiers in the format 
of [table_name].[column_name] (#5734)
9f22322 is described below

commit 9f22322132fccd8ca1c777a8568962952daa27a3
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Sat Jul 25 11:51:11 2020 -0700

    Adding column name rewrite for the identifiers in the format of 
[table_name].[column_name] (#5734)
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 90 +++++++++++++---------
 .../tests/OfflineClusterIntegrationTest.java       | 73 ++++++++++++++----
 2 files changed, 115 insertions(+), 48 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 9654554..e442b5d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -202,12 +202,10 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       }
     }
     updateQuerySource(brokerRequest);
-    if (_enableCaseInsensitive) {
-      try {
-        handleCaseSensitivity(brokerRequest);
-      } catch (Exception e) {
-        LOGGER.warn("Caught exception while rewriting PQL to make it 
case-insensitive {}: {}, {}", requestId, query, e);
-      }
+    try {
+      updateColumnNames(brokerRequest);
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while updating Column names in Query {}: 
{}, {}", requestId, query, e);
     }
     if (_defaultHllLog2m > 0) {
       handleHyperloglogLog2mOverride(brokerRequest, _defaultHllLog2m);
@@ -450,18 +448,22 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   private void updateQuerySource(BrokerRequest brokerRequest) {
     String tableName = brokerRequest.getQuerySource().getTableName();
     // Check if table is in the format of [database_name].[table_name]
-    String[] tableNameSplits = StringUtils.split(tableName, '.');
-    if (tableNameSplits.length != 2) {
-      return;
-    }
+    String[] tableNameSplits = StringUtils.split(tableName, ".", 2);
     // Update table name if there is no existing table in the format of 
[database_name].[table_name] but only [table_name]
     if (_enableCaseInsensitive) {
+      if (tableNameSplits.length < 2) {
+        
brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableName));
+        return;
+      }
       if (_tableCache.containsTable(tableNameSplits[1]) && 
!_tableCache.containsTable(tableName)) {
         // Use TableCache to check case insensitive table name.
-        brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
+        
brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableNameSplits[1]));
       }
       return;
     }
+    if (tableNameSplits.length < 2) {
+      return;
+    }
     // Use RoutingManager to check case sensitive table name.
     if (TableNameBuilder.isTableResource(tableName)) {
       if (_routingManager.routingExists(tableNameSplits[1]) && 
!_routingManager.routingExists(tableName)) {
@@ -667,19 +669,17 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   }
 
   /**
-   * Fixes the case-insensitive column names to the actual column names in the 
given broker request.
+   * Fixes the column names to the actual column names in the given broker 
request.
    */
-  private void handleCaseSensitivity(BrokerRequest brokerRequest) {
-    String inputTableName = brokerRequest.getQuerySource().getTableName();
-    String actualTableName = _tableCache.getActualTableName(inputTableName);
-    brokerRequest.getQuerySource().setTableName(actualTableName);
+  private void updateColumnNames(BrokerRequest brokerRequest) {
+    String tableName = brokerRequest.getQuerySource().getTableName();
     //fix columns
     if (brokerRequest.getFilterSubQueryMap() != null) {
       Collection<FilterQuery> values = 
brokerRequest.getFilterSubQueryMap().getFilterQueryMap().values();
       for (FilterQuery filterQuery : values) {
         if (filterQuery.getNestedFilterQueryIdsSize() == 0) {
           String expression = filterQuery.getColumn();
-          filterQuery.setColumn(fixColumnNameCase(actualTableName, 
expression));
+          filterQuery.setColumn(fixColumnName(tableName, expression));
         }
       }
     }
@@ -688,14 +688,14 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
         if 
(!info.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName()))
 {
           // Always read from backward compatible api in 
AggregationFunctionUtils.
           List<String> arguments = AggregationFunctionUtils.getArguments(info);
-          arguments.replaceAll(e -> fixColumnNameCase(actualTableName, e));
+          arguments.replaceAll(e -> fixColumnName(tableName, e));
           info.setExpressions(arguments);
         }
       }
       if (brokerRequest.isSetGroupBy()) {
         List<String> expressions = brokerRequest.getGroupBy().getExpressions();
         for (int i = 0; i < expressions.size(); i++) {
-          expressions.set(i, fixColumnNameCase(actualTableName, 
expressions.get(i)));
+          expressions.set(i, fixColumnName(tableName, expressions.get(i)));
         }
       }
     } else {
@@ -704,7 +704,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       for (int i = 0; i < selectionColumns.size(); i++) {
         String expression = selectionColumns.get(i);
         if (!expression.equals("*")) {
-          selectionColumns.set(i, fixColumnNameCase(actualTableName, 
expression));
+          selectionColumns.set(i, fixColumnName(tableName, expression));
         }
       }
     }
@@ -712,66 +712,86 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       List<SelectionSort> orderBy = brokerRequest.getOrderBy();
       for (SelectionSort selectionSort : orderBy) {
         String expression = selectionSort.getColumn();
-        selectionSort.setColumn(fixColumnNameCase(actualTableName, 
expression));
+        selectionSort.setColumn(fixColumnName(tableName, expression));
       }
     }
 
     PinotQuery pinotQuery = brokerRequest.getPinotQuery();
     if (pinotQuery != null) {
-      pinotQuery.getDataSource().setTableName(actualTableName);
+      pinotQuery.getDataSource().setTableName(tableName);
       for (Expression expression : pinotQuery.getSelectList()) {
-        fixColumnNameCase(actualTableName, expression);
+        fixColumnName(tableName, expression);
       }
       Expression filterExpression = pinotQuery.getFilterExpression();
       if (filterExpression != null) {
-        fixColumnNameCase(actualTableName, filterExpression);
+        fixColumnName(tableName, filterExpression);
       }
       List<Expression> groupByList = pinotQuery.getGroupByList();
       if (groupByList != null) {
         for (Expression expression : groupByList) {
-          fixColumnNameCase(actualTableName, expression);
+          fixColumnName(tableName, expression);
         }
       }
       List<Expression> orderByList = pinotQuery.getOrderByList();
       if (orderByList != null) {
         for (Expression expression : orderByList) {
-          fixColumnNameCase(actualTableName, expression);
+          fixColumnName(tableName, expression);
         }
       }
       Expression havingExpression = pinotQuery.getHavingExpression();
       if (havingExpression != null) {
-        fixColumnNameCase(actualTableName, havingExpression);
+        fixColumnName(tableName, havingExpression);
       }
     }
   }
 
-  private String fixColumnNameCase(String tableNameWithType, String 
expression) {
+  private String fixColumnName(String tableNameWithType, String expression) {
     TransformExpressionTree expressionTree = 
TransformExpressionTree.compileToExpressionTree(expression);
-    fixColumnNameCase(tableNameWithType, expressionTree);
+    fixColumnName(tableNameWithType, expressionTree);
     return expressionTree.toString();
   }
 
-  private void fixColumnNameCase(String tableNameWithType, 
TransformExpressionTree expression) {
+  private void fixColumnName(String tableNameWithType, TransformExpressionTree 
expression) {
     TransformExpressionTree.ExpressionType expressionType = 
expression.getExpressionType();
     if (expressionType == TransformExpressionTree.ExpressionType.IDENTIFIER) {
-      expression.setValue(_tableCache.getActualColumnName(tableNameWithType, 
expression.getValue()));
+      String identifier = expression.getValue();
+      expression.setValue(getActualColumnName(tableNameWithType, identifier));
     } else if (expressionType == 
TransformExpressionTree.ExpressionType.FUNCTION) {
       for (TransformExpressionTree child : expression.getChildren()) {
-        fixColumnNameCase(tableNameWithType, child);
+        fixColumnName(tableNameWithType, child);
       }
     }
   }
 
-  private void fixColumnNameCase(String tableNameWithType, Expression 
expression) {
+  private void fixColumnName(String tableNameWithType, Expression expression) {
     ExpressionType expressionType = expression.getType();
     if (expressionType == ExpressionType.IDENTIFIER) {
       Identifier identifier = expression.getIdentifier();
-      identifier.setName(_tableCache.getActualColumnName(tableNameWithType, 
identifier.getName()));
+      identifier.setName(getActualColumnName(tableNameWithType, 
identifier.getName()));
     } else if (expressionType == ExpressionType.FUNCTION) {
       for (Expression operand : expression.getFunctionCall().getOperands()) {
-        fixColumnNameCase(tableNameWithType, operand);
+        fixColumnName(tableNameWithType, operand);
+      }
+    }
+  }
+
+  private String getActualColumnName(String tableNameWithType, String 
columnName) {
+    String[] splits = StringUtils.split(columnName, ".", 2);
+    if (_enableCaseInsensitive) {
+      if (splits.length == 2) {
+        if 
(TableNameBuilder.extractRawTableName(tableNameWithType).equalsIgnoreCase(splits[0]))
 {
+          return _tableCache.getActualColumnName(tableNameWithType, splits[1]);
+        }
+      }
+      return _tableCache.getActualColumnName(tableNameWithType, columnName);
+    } else {
+      if (splits.length == 2) {
+        if 
(TableNameBuilder.extractRawTableName(tableNameWithType).equals(splits[0])) {
+          return splits[1];
+        }
       }
     }
+    return columnName;
   }
 
   private static Map<String, String> getOptionsFromJson(JsonNode request, 
String optionsKey) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 9a7e49d..9827001 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1159,24 +1159,71 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM 
mytable",
         "SELECT COUNT(*) FROM mytable GROUP BY 
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
     List<String> queries = new ArrayList<>();
-    baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", 
"MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
-    baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", 
"MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
+    baseQueries.forEach(q -> queries.add(q.replace("mytable", 
"MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
+    baseQueries.forEach(q -> queries.add(q.replace("mytable", 
"MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
 
-    // Wait for at most 10 seconds for broker to get the ZK callback of the 
schema change
-    TestUtils.waitForCondition(aVoid -> {
+    for (String query : queries) {
       try {
-        for (String query : queries) {
-          JsonNode response = postQuery(query);
-          // NOTE: When table does not exist, we will get 
'BrokerResourceMissingError'.
-          //       When column does not exist, all segments will be pruned and 
'numSegmentsProcessed' will be 0.
-          return response.get("exceptions").size() == 0 && 
response.get("numSegmentsProcessed").asInt() > 0;
-        }
+        postQuery(query);
       } catch (Exception e) {
         // Fail the test when exception caught
-        throw new RuntimeException(e);
+        throw new RuntimeException("Got Exceptions from query - " + query);
+      }
+    }
+  }
+
+  @Test
+  public void testColumnNameContainsTableName() {
+    int daysSinceEpoch = 16138;
+    long secondsSinceEpoch = 16138 * 24 * 60 * 60;
+    List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable order by DaysSinceEpoch limit 10000",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 
10000",
+        "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + 
daysSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE 
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE 
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
+        "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM 
mytable",
+        "SELECT COUNT(*) FROM mytable GROUP BY 
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
+    List<String> queries = new ArrayList<>();
+    baseQueries.forEach(q -> queries.add(q.replace("DaysSinceEpoch", 
"mytable.DAYSSinceEpOch")));
+    baseQueries.forEach(q -> queries.add(q.replace("DaysSinceEpoch", 
"mytable.DAYSSinceEpOch")));
+
+    for (String query : queries) {
+      try {
+        postQuery(query);
+      } catch (Exception e) {
+        // Fail the test when exception caught
+        throw new RuntimeException("Got Exceptions from query - " + query);
       }
-      return true;
-    }, 10_000L, "Failed to get results for case-insensitive queries");
+    }
+  }
+
+  @Test
+  public void testCaseInsensitivityWithColumnNameContainsTableName() {
+    int daysSinceEpoch = 16138;
+    long secondsSinceEpoch = 16138 * 24 * 60 * 60;
+    List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable order by DaysSinceEpoch limit 10000",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 
10000",
+        "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + 
daysSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE 
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE 
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
+        "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM 
mytable",
+        "SELECT COUNT(*) FROM mytable GROUP BY 
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
+    List<String> queries = new ArrayList<>();
+    baseQueries.forEach(q -> queries.add(q.replace("mytable", 
"MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
+    baseQueries.forEach(q -> queries.add(q.replace("mytable", 
"MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
+
+    for (String query : queries) {
+      try {
+        postQuery(query);
+      } catch (Exception e) {
+        // Fail the test when exception caught
+        throw new RuntimeException("Got Exceptions from query - " + query);
+      }
+    }
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to