weixiangsun commented on a change in pull request #8029:
URL: https://github.com/apache/pinot/pull/8029#discussion_r829536271



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
##########
@@ -119,4 +137,265 @@ static public Serializable 
getDefaultValue(DataSchema.ColumnDataType dataType) {
   private static String canonicalizeFunctionName(String functionName) {
     return StringUtils.remove(functionName, '_').toLowerCase();
   }
+
+  public static boolean isGapfill(ExpressionContext expressionContext) {
+    if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
+      return false;
+    }
+
+    return 
GAP_FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+  }
+
+  private static boolean isGapfill(QueryContext queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get the gapfill type for queryContext. Also do the validation for gapfill 
request.
+   * @param queryContext
+   */
+  public static void setGapfillType(QueryContext queryContext) {
+    GapfillType gapfillType = null;
+    if (queryContext.getSubQueryContext() == null) {
+      if (isGapfill(queryContext)) {
+        Preconditions.checkArgument(queryContext.getAggregationFunctions() == 
null,
+            "Aggregation and Gapfill can not be in the same sql statement.");
+        gapfillType = GapfillType.GAP_FILL;
+      }
+    } else if (isGapfill(queryContext)) {
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getAggregationFunctions()
 != null,
+          "Select and Gapfill should be in the same sql statement.");
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getSubQueryContext()
 == null,
+          "There is no three levels nesting sql when the outer query is 
gapfill.");
+      gapfillType = GapfillType.AGGREGATE_GAP_FILL;
+    } else if (isGapfill(queryContext.getSubQueryContext())) {
+      if (queryContext.getAggregationFunctions() == null) {
+        gapfillType = GapfillType.GAP_FILL_SELECT;
+      } else if (queryContext.getSubQueryContext().getSubQueryContext() == 
null) {
+        gapfillType = GapfillType.GAP_FILL_AGGREGATE;
+      } else {
+        Preconditions
+            
.checkArgument(queryContext.getSubQueryContext().getSubQueryContext().getAggregationFunctions()
 != null,
+                "Select cannot happen before gapfill.");
+        gapfillType = GapfillType.AGGREGATE_GAP_FILL_AGGREGATE;
+      }
+    }
+
+    queryContext.setGapfillType(gapfillType);
+    if (gapfillType == null) {
+      return;
+    }
+
+    ExpressionContext gapFillSelection = 
GapfillUtils.getGapfillExpressionContext(queryContext);
+
+    Preconditions.checkArgument(gapFillSelection != null && 
gapFillSelection.getFunction() != null,
+        "Gapfill Expression should be function.");
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    Preconditions.checkArgument(args.size() > 5, "PreAggregateGapFill does not 
have correct number of arguments.");
+    Preconditions.checkArgument(args.get(1).getLiteral() != null,
+        "The second argument of PostAggregateGapFill should be 
TimeFormatter.");
+    Preconditions.checkArgument(args.get(2).getLiteral() != null,
+        "The third argument of PostAggregateGapFill should be start time.");
+    Preconditions.checkArgument(args.get(3).getLiteral() != null,
+        "The fourth argument of PostAggregateGapFill should be end time.");
+    Preconditions.checkArgument(args.get(4).getLiteral() != null,
+        "The fifth argument of PostAggregateGapFill should be time bucket 
size.");
+
+    ExpressionContext timeseriesOn = 
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
+    Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn 
expressions should be specified.");
+
+    if (queryContext.getAggregationFunctions() == null) {
+      return;
+    }
+
+    List<ExpressionContext> groupbyExpressions = 
queryContext.getGroupByExpressions();
+    Preconditions.checkArgument(groupbyExpressions != null, "No GroupBy 
Clause.");
+    List<ExpressionContext> innerSelections = 
queryContext.getSubQueryContext().getSelectExpressions();
+    String timeBucketCol = null;
+    List<String> strAlias = queryContext.getSubQueryContext().getAliasList();
+    for (int i = 0; i < innerSelections.size(); i++) {
+      ExpressionContext innerSelection = innerSelections.get(i);
+      if (GapfillUtils.isGapfill(innerSelection)) {
+        if (strAlias.get(i) != null) {
+          timeBucketCol = strAlias.get(i);
+        } else {
+          timeBucketCol = 
innerSelection.getFunction().getArguments().get(0).toString();
+        }
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(timeBucketCol != null, "No Group By 
timebucket.");
+
+    boolean findTimeBucket = false;
+    for (ExpressionContext groupbyExp : groupbyExpressions) {
+      if (timeBucketCol.equals(groupbyExp.toString())) {
+        findTimeBucket = true;
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(findTimeBucket, "No Group By timebucket.");
+  }
+
+  private static ExpressionContext findGapfillExpressionContext(QueryContext 
queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return expressionContext;
+      }
+    }
+    return null;
+  }
+
+  public static ExpressionContext getGapfillExpressionContext(QueryContext 
queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.AGGREGATE_GAP_FILL || gapfillType == 
GapfillType.GAP_FILL) {
+      return findGapfillExpressionContext(queryContext);
+    } else if (gapfillType == GapfillType.GAP_FILL_AGGREGATE || gapfillType == 
GapfillType.AGGREGATE_GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT) {
+      return findGapfillExpressionContext(queryContext.getSubQueryContext());
+    } else {
+      return null;
+    }
+  }
+
+  public static int findTimeBucketColumnIndex(QueryContext queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT
+        || gapfillType == GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) {
+      queryContext = queryContext.getSubQueryContext();
+    }
+    List<ExpressionContext> expressionContexts = 
queryContext.getSelectExpressions();
+    for (int i = 0; i < expressionContexts.size(); i++) {
+      if (isGapfill(expressionContexts.get(i))) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static ExpressionContext 
getTimeSeriesOnExpressionContext(ExpressionContext gapFillSelection) {
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isTimeSeriesOn(args.get(i))) {
+        return args.get(i);
+      }
+    }
+    return null;
+  }
+
+  public static Map<String, ExpressionContext> 
getFillExpressions(ExpressionContext gapFillSelection) {
+    Map<String, ExpressionContext> fillExpressions = new HashMap<>();
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isFill(args.get(i))) {
+        ExpressionContext fillExpression = args.get(i);
+        
fillExpressions.put(fillExpression.getFunction().getArguments().get(0).getIdentifier(),
 fillExpression);
+      }
+    }
+    return fillExpressions;
+  }
+
+  public static String getTableName(PinotQuery pinotQuery) {
+    while (pinotQuery.getDataSource().getSubquery() != null) {
+      pinotQuery = pinotQuery.getDataSource().getSubquery();
+    }
+    return pinotQuery.getDataSource().getTableName();
+  }
+
+  public static BrokerRequest stripGapfill(BrokerRequest brokerRequest) {
+    if (brokerRequest.getPinotQuery().getDataSource() == null) {
+      return brokerRequest;
+    }
+    QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+    GapfillUtils.GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == null) {
+      return brokerRequest;
+    }
+    switch (gapfillType) {
+      // one sql query with gapfill only
+      case GAP_FILL:
+        return stripGapfill(brokerRequest.getPinotQuery());
+      // gapfill as subquery, the outer query may have the filter
+      case GAP_FILL_SELECT:
+        // gapfill as subquery, the outer query has the aggregation
+      case GAP_FILL_AGGREGATE:
+        // aggregation as subqery, the outer query is gapfill
+      case AGGREGATE_GAP_FILL:
+        return 
stripGapfill(brokerRequest.getPinotQuery().getDataSource().getSubquery());
+      // aggegration as second nesting subquery, gapfill as first nesting 
subquery, different aggregation as outer query
+      case AGGREGATE_GAP_FILL_AGGREGATE:
+        return 
stripGapfill(brokerRequest.getPinotQuery().getDataSource().getSubquery().getDataSource().getSubquery());
+      default:
+        return brokerRequest;
+    }
+  }
+
+  private static BrokerRequest stripGapfill(PinotQuery pinotQuery) {
+    PinotQuery copy = new PinotQuery(pinotQuery);
+    BrokerRequest brokerRequest = new BrokerRequest();
+    brokerRequest.setPinotQuery(copy);
+    // Set table name in broker request because it is used for access control, 
query routing etc.
+    DataSource dataSource = copy.getDataSource();
+    if (dataSource != null) {
+      QuerySource querySource = new QuerySource();
+      querySource.setTableName(dataSource.getTableName());
+      brokerRequest.setQuerySource(querySource);
+    }
+    List<Expression> selectList = copy.getSelectList();
+    for (int i = 0; i < selectList.size(); i++) {
+      Expression select = selectList.get(i);
+      if (select.getType() != ExpressionType.FUNCTION) {
+        continue;
+      }
+      if (GAP_FILL.equalsIgnoreCase(select.getFunctionCall().getOperator())) {

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to