KYLIN-2292 workaround for CALCITE-1540
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4ae4333c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4ae4333c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4ae4333c Branch: refs/heads/KYLIN-2293 Commit: 4ae4333c82243c21d253008c9f5146f1e18f6e84 Parents: 6f9bd4a Author: Hongbin Ma <mahong...@apache.org> Authored: Fri Dec 16 17:21:37 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Dec 19 14:16:08 2016 +0800 ---------------------------------------------------------------------- .../adapter/enumerable/EnumerableWindow.java | 978 +++++++++++++++++++ .../calcite/adapter/enumerable/PhysType.java | 209 ++++ .../adapter/enumerable/PhysTypeImpl.java | 654 +++++++++++++ .../test/resources/query/sql_window/query11.sql | 23 + .../test/resources/query/sql_window/query12.sql | 26 + 5 files changed, 1890 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4ae4333c/atopcalcite/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableWindow.java ---------------------------------------------------------------------- diff --git a/atopcalcite/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableWindow.java b/atopcalcite/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableWindow.java new file mode 100644 index 0000000..203ce02 --- /dev/null +++ b/atopcalcite/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableWindow.java @@ -0,0 +1,978 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.enumerable; + +import org.apache.calcite.adapter.enumerable.impl.WinAggAddContextImpl; +import org.apache.calcite.adapter.enumerable.impl.WinAggResetContextImpl; +import org.apache.calcite.adapter.enumerable.impl.WinAggResultContextImpl; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.tree.BinaryExpression; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.BlockStatement; +import org.apache.calcite.linq4j.tree.DeclarationStatement; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.ParameterExpression; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.linq4j.tree.Statement; +import org.apache.calcite.linq4j.tree.Types; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.CalcitePrepareImpl; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexWindowBound; +import org.apache.calcite.runtime.SortedMultiMap; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; + +import java.lang.reflect.Modifier; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/* + * OVERRIDE POINT: patching CALCITE-1540 on calcite 1.8.0 + */ + +/** Implementation of {@link org.apache.calcite.rel.core.Window} in + * {@link org.apache.calcite.adapter.enumerable.EnumerableConvention enumerable calling convention}. */ +public class EnumerableWindow extends Window implements EnumerableRel { + /** Creates an EnumerableWindowRel. */ + EnumerableWindow(RelOptCluster cluster, RelTraitSet traits, RelNode child, + List<RexLiteral> constants, RelDataType rowType, List<Group> groups) { + super(cluster, traits, child, constants, rowType, groups); + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new EnumerableWindow(getCluster(), traitSet, sole(inputs), + constants, rowType, groups); + } + + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + return super.computeSelfCost(planner, mq) + .multiplyBy(EnumerableConvention.COST_MULTIPLIER); + } + + /** Implementation of {@link RexToLixTranslator.InputGetter} + * suitable for generating implementations of windowed aggregate + * functions. */ + private static class WindowRelInputGetter + implements RexToLixTranslator.InputGetter { + private final Expression row; + private final PhysType rowPhysType; + private final int actualInputFieldCount; + private final List<Expression> constants; + + private WindowRelInputGetter(Expression row, + PhysType rowPhysType, int actualInputFieldCount, + List<Expression> constants) { + this.row = row; + this.rowPhysType = rowPhysType; + this.actualInputFieldCount = actualInputFieldCount; + this.constants = constants; + } + + public Expression field(BlockBuilder list, int index, Type storageType) { + if (index < actualInputFieldCount) { + Expression current = list.append("current", row); + return rowPhysType.fieldReference(current, index, storageType); + } + return constants.get(index - actualInputFieldCount); + } + } + + private void sampleOfTheGeneratedWindowedAggregate() { + // Here's overview of the generated code + // For each list of rows that have the same partitioning key, evaluate + // all of the windowed aggregate functions. + + // builder + Iterator<Integer[]> iterator = null; + + // builder3 + Integer[] rows = iterator.next(); + + int prevStart = -1; + int prevEnd = -1; + + for (int i = 0; i < rows.length; i++) { + // builder4 + Integer row = rows[i]; + + int start = 0; + int end = 100; + if (start != prevStart || end != prevEnd) { + // builder5 + int actualStart = 0; + if (start != prevStart || end < prevEnd) { + // builder6 + // recompute + actualStart = start; + // implementReset + } else { // must be start == prevStart && end > prevEnd + actualStart = prevEnd + 1; + } + prevStart = start; + prevEnd = end; + + if (start != -1) { + for (int j = actualStart; j <= end; j++) { + // builder7 + // implementAdd + } + } + // implementResult + // list.add(new Xxx(row.deptno, row.empid, sum, count)); + } + } + // multiMap.clear(); // allows gc + // source = Linq4j.asEnumerable(list); + } + + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + final JavaTypeFactory typeFactory = implementor.getTypeFactory(); + final EnumerableRel child = (EnumerableRel) getInput(); + final BlockBuilder builder = new BlockBuilder(); + final Result result = implementor.visitChild(this, 0, child, pref); + Expression source_ = builder.append("source", result.block); + + final List<Expression> translatedConstants = + new ArrayList<Expression>(constants.size()); + for (RexLiteral constant : constants) { + translatedConstants.add( + RexToLixTranslator.translateLiteral(constant, constant.getType(), + typeFactory, RexImpTable.NullAs.NULL)); + } + + PhysType inputPhysType = result.physType; + + ParameterExpression prevStart = + Expressions.parameter(int.class, builder.newName("prevStart")); + ParameterExpression prevEnd = + Expressions.parameter(int.class, builder.newName("prevEnd")); + + builder.add(Expressions.declare(0, prevStart, null)); + builder.add(Expressions.declare(0, prevEnd, null)); + + for (int windowIdx = 0; windowIdx < groups.size(); windowIdx++) { + Group group = groups.get(windowIdx); + // Comparator: + // final Comparator<JdbcTest.Employee> comparator = + // new Comparator<JdbcTest.Employee>() { + // public int compare(JdbcTest.Employee o1, + // JdbcTest.Employee o2) { + // return Integer.compare(o1.empid, o2.empid); + // } + // }; + final Expression comparator_ = + builder.append( + "comparator", + inputPhysType.generateComparator( + group.collation())); + + Pair<Expression, Expression> partitionIterator = + getPartitionIterator(builder, source_, inputPhysType, group, + comparator_); + final Expression collectionExpr = partitionIterator.left; + final Expression iterator_ = partitionIterator.right; + + List<AggImpState> aggs = new ArrayList<AggImpState>(); + List<AggregateCall> aggregateCalls = group.getAggregateCalls(this); + for (int aggIdx = 0; aggIdx < aggregateCalls.size(); aggIdx++) { + AggregateCall call = aggregateCalls.get(aggIdx); + aggs.add(new AggImpState(aggIdx, call, true)); + } + + // The output from this stage is the input plus the aggregate functions. + final RelDataTypeFactory.FieldInfoBuilder typeBuilder = + typeFactory.builder(); + typeBuilder.addAll(inputPhysType.getRowType().getFieldList()); + for (AggImpState agg : aggs) { + typeBuilder.add(agg.call.name, agg.call.type); + } + RelDataType outputRowType = typeBuilder.build(); + final PhysType outputPhysType = + PhysTypeImpl.of( + typeFactory, outputRowType, pref.prefer(result.format)); + + final Expression list_ = + builder.append( + "list", + Expressions.new_( + ArrayList.class, + Expressions.call( + collectionExpr, BuiltInMethod.COLLECTION_SIZE.method)), + false); + + Pair<Expression, Expression> collationKey = + getRowCollationKey(builder, inputPhysType, group, windowIdx); + Expression keySelector = collationKey.left; + Expression keyComparator = collationKey.right; + final BlockBuilder builder3 = new BlockBuilder(); + final Expression rows_ = + builder3.append( + "rows", + Expressions.convert_( + Expressions.call( + iterator_, BuiltInMethod.ITERATOR_NEXT.method), + Object[].class), + false); + + builder3.add( + Expressions.statement( + Expressions.assign(prevStart, Expressions.constant(-1)))); + builder3.add( + Expressions.statement( + Expressions.assign(prevEnd, + Expressions.constant(Integer.MAX_VALUE)))); + + final BlockBuilder builder4 = new BlockBuilder(); + + final ParameterExpression i_ = + Expressions.parameter(int.class, builder4.newName("i")); + + final Expression row_ = + builder4.append( + "row", + RexToLixTranslator.convert( + Expressions.arrayIndex(rows_, i_), + inputPhysType.getJavaRowType())); + + final RexToLixTranslator.InputGetter inputGetter = + new WindowRelInputGetter(row_, inputPhysType, + result.physType.getRowType().getFieldCount(), + translatedConstants); + + final RexToLixTranslator translator = + RexToLixTranslator.forAggregation(typeFactory, builder4, + inputGetter); + + final List<Expression> outputRow = new ArrayList<Expression>(); + int fieldCountWithAggResults = + inputPhysType.getRowType().getFieldCount(); + for (int i = 0; i < fieldCountWithAggResults; i++) { + outputRow.add( + inputPhysType.fieldReference( + row_, i, + outputPhysType.getJavaFieldType(i))); + } + + declareAndResetState(typeFactory, builder, result, windowIdx, aggs, + outputPhysType, outputRow); + + // There are assumptions that minX==0. If ever change this, look for + // frameRowCount, bounds checking, etc + final Expression minX = Expressions.constant(0); + final Expression partitionRowCount = + builder3.append("partRows", Expressions.field(rows_, "length")); + final Expression maxX = builder3.append("maxX", + Expressions.subtract( + partitionRowCount, Expressions.constant(1))); + + final Expression startUnchecked = builder4.append("start", + translateBound(translator, i_, row_, minX, maxX, rows_, + group, true, + inputPhysType, comparator_, keySelector, keyComparator)); + final Expression endUnchecked = builder4.append("end", + translateBound(translator, i_, row_, minX, maxX, rows_, + group, false, + inputPhysType, comparator_, keySelector, keyComparator)); + + final Expression startX; + final Expression endX; + final Expression hasRows; + if (group.isAlwaysNonEmpty()) { + startX = startUnchecked; + endX = endUnchecked; + hasRows = Expressions.constant(true); + } else { + Expression startTmp = + group.lowerBound.isUnbounded() || startUnchecked == i_ + ? startUnchecked + : builder4.append("startTmp", + Expressions.call(null, BuiltInMethod.MATH_MAX.method, + startUnchecked, minX)); + Expression endTmp = + group.upperBound.isUnbounded() || endUnchecked == i_ + ? endUnchecked + : builder4.append("endTmp", + Expressions.call(null, BuiltInMethod.MATH_MIN.method, + endUnchecked, maxX)); + + ParameterExpression startPe = Expressions.parameter(0, int.class, + builder4.newName("startChecked")); + ParameterExpression endPe = Expressions.parameter(0, int.class, + builder4.newName("endChecked")); + builder4.add(Expressions.declare(Modifier.FINAL, startPe, null)); + builder4.add(Expressions.declare(Modifier.FINAL, endPe, null)); + + hasRows = builder4.append("hasRows", + Expressions.lessThanOrEqual(startTmp, endTmp)); + builder4.add( + Expressions.ifThenElse(hasRows, + Expressions.block( + Expressions.statement( + Expressions.assign(startPe, startTmp)), + Expressions.statement( + Expressions.assign(endPe, endTmp))), + Expressions.block( + Expressions.statement( + Expressions.assign(startPe, Expressions.constant(-1))), + Expressions.statement( + Expressions.assign(endPe, Expressions.constant(-1)))))); + startX = startPe; + endX = endPe; + } + + final BlockBuilder builder5 = new BlockBuilder(true, builder4); + + BinaryExpression rowCountWhenNonEmpty = Expressions.add( + startX == minX ? endX : Expressions.subtract(endX, startX), + Expressions.constant(1)); + + final Expression frameRowCount; + + if (hasRows.equals(Expressions.constant(true))) { + frameRowCount = + builder4.append("totalRows", rowCountWhenNonEmpty); + } else { + frameRowCount = + builder4.append("totalRows", + Expressions.condition(hasRows, rowCountWhenNonEmpty, + Expressions.constant(0))); + } + + ParameterExpression actualStart = Expressions.parameter( + 0, int.class, builder5.newName("actualStart")); + + final BlockBuilder builder6 = new BlockBuilder(true, builder5); + builder6.add( + Expressions.statement(Expressions.assign(actualStart, startX))); + + for (final AggImpState agg : aggs) { + agg.implementor.implementReset(agg.context, + new WinAggResetContextImpl(builder6, agg.state, i_, startX, endX, + hasRows, partitionRowCount, frameRowCount)); + } + + Expression lowerBoundCanChange = + group.lowerBound.isUnbounded() && group.lowerBound.isPreceding() + ? Expressions.constant(false) + : Expressions.notEqual(startX, prevStart); + Expression needRecomputeWindow = Expressions.orElse( + lowerBoundCanChange, + Expressions.lessThan(endX, prevEnd)); + + BlockStatement resetWindowState = builder6.toBlock(); + if (resetWindowState.statements.size() == 1) { + builder5.add( + Expressions.declare(0, actualStart, + Expressions.condition(needRecomputeWindow, startX, + Expressions.add(prevEnd, Expressions.constant(1))))); + } else { + builder5.add( + Expressions.declare(0, actualStart, null)); + builder5.add( + Expressions.ifThenElse(needRecomputeWindow, + resetWindowState, + Expressions.statement( + Expressions.assign(actualStart, + Expressions.add(prevEnd, Expressions.constant(1)))))); + } + + if (lowerBoundCanChange instanceof BinaryExpression) { + builder5.add( + Expressions.statement(Expressions.assign(prevStart, startX))); + } + builder5.add( + Expressions.statement(Expressions.assign(prevEnd, endX))); + + final BlockBuilder builder7 = new BlockBuilder(true, builder5); + final DeclarationStatement jDecl = + Expressions.declare(0, "j", actualStart); + + final PhysType inputPhysTypeFinal = inputPhysType; + final Function<BlockBuilder, WinAggFrameResultContext> + resultContextBuilder = + getBlockBuilderWinAggFrameResultContextFunction(typeFactory, result, + translatedConstants, comparator_, rows_, i_, startX, endX, + minX, maxX, + hasRows, frameRowCount, partitionRowCount, + jDecl, inputPhysTypeFinal); + + final Function<AggImpState, List<RexNode>> rexArguments = + new Function<AggImpState, List<RexNode>>() { + public List<RexNode> apply(AggImpState agg) { + List<Integer> argList = agg.call.getArgList(); + List<RelDataType> inputTypes = + EnumUtils.fieldRowTypes( + result.physType.getRowType(), + constants, + argList); + List<RexNode> args = new ArrayList<RexNode>( + inputTypes.size()); + for (int i = 0; i < argList.size(); i++) { + Integer idx = argList.get(i); + args.add(new RexInputRef(idx, inputTypes.get(i))); + } + return args; + } + }; + + implementAdd(aggs, builder7, resultContextBuilder, rexArguments, jDecl); + + BlockStatement forBlock = builder7.toBlock(); + if (!forBlock.statements.isEmpty()) { + // For instance, row_number does not use for loop to compute the value + Statement forAggLoop = Expressions.for_( + Arrays.asList(jDecl), + Expressions.lessThanOrEqual(jDecl.parameter, endX), + Expressions.preIncrementAssign(jDecl.parameter), + forBlock); + if (!hasRows.equals(Expressions.constant(true))) { + forAggLoop = Expressions.ifThen(hasRows, forAggLoop); + } + builder5.add(forAggLoop); + } + + if (implementResult(aggs, builder5, resultContextBuilder, rexArguments, + true)) { + builder4.add( + Expressions.ifThen( + Expressions.orElse(lowerBoundCanChange, + Expressions.notEqual(endX, prevEnd)), + builder5.toBlock())); + } + + implementResult(aggs, builder4, resultContextBuilder, rexArguments, + false); + + builder4.add( + Expressions.statement( + Expressions.call( + list_, + BuiltInMethod.COLLECTION_ADD.method, + outputPhysType.record(outputRow)))); + + builder3.add( + Expressions.for_( + Expressions.declare(0, i_, Expressions.constant(0)), + Expressions.lessThan( + i_, + Expressions.field(rows_, "length")), + Expressions.preIncrementAssign(i_), + builder4.toBlock())); + + builder.add( + Expressions.while_( + Expressions.call( + iterator_, + BuiltInMethod.ITERATOR_HAS_NEXT.method), + builder3.toBlock())); + builder.add( + Expressions.statement( + Expressions.call( + collectionExpr, + BuiltInMethod.MAP_CLEAR.method))); + + // We're not assigning to "source". For each group, create a new + // final variable called "source" or "sourceN". + source_ = + builder.append( + "source", + Expressions.call( + BuiltInMethod.AS_ENUMERABLE.method, list_)); + + inputPhysType = outputPhysType; + } + + // return Linq4j.asEnumerable(list); + builder.add( + Expressions.return_(null, source_)); + return implementor.result(inputPhysType, builder.toBlock()); + } + + private Function<BlockBuilder, WinAggFrameResultContext> + getBlockBuilderWinAggFrameResultContextFunction( + final JavaTypeFactory typeFactory, final Result result, + final List<Expression> translatedConstants, + final Expression comparator_, + final Expression rows_, final ParameterExpression i_, + final Expression startX, final Expression endX, + final Expression minX, final Expression maxX, + final Expression hasRows, final Expression frameRowCount, + final Expression partitionRowCount, + final DeclarationStatement jDecl, + final PhysType inputPhysType) { + return new Function<BlockBuilder, + WinAggFrameResultContext>() { + public WinAggFrameResultContext apply( + final BlockBuilder block) { + return new WinAggFrameResultContext() { + public RexToLixTranslator rowTranslator(Expression rowIndex) { + Expression row = + getRow(rowIndex); + final RexToLixTranslator.InputGetter inputGetter = + new WindowRelInputGetter(row, inputPhysType, + result.physType.getRowType().getFieldCount(), + translatedConstants); + + return RexToLixTranslator.forAggregation(typeFactory, + block, inputGetter); + } + + public Expression computeIndex(Expression offset, + WinAggImplementor.SeekType seekType) { + Expression index; + if (seekType == WinAggImplementor.SeekType.AGG_INDEX) { + index = jDecl.parameter; + } else if (seekType == WinAggImplementor.SeekType.SET) { + index = i_; + } else if (seekType == WinAggImplementor.SeekType.START) { + index = startX; + } else if (seekType == WinAggImplementor.SeekType.END) { + index = endX; + } else { + throw new IllegalArgumentException("SeekSet " + seekType + + " is not supported"); + } + if (!Expressions.constant(0).equals(offset)) { + index = block.append("idx", Expressions.add(index, offset)); + } + return index; + } + + private Expression checkBounds(Expression rowIndex, + Expression minIndex, Expression maxIndex) { + if (rowIndex == i_ || rowIndex == startX || rowIndex == endX) { + // No additional bounds check required + return hasRows; + } + + //noinspection UnnecessaryLocalVariable + Expression res = block.append("rowInFrame", + Expressions.foldAnd( + ImmutableList.of(hasRows, + Expressions.greaterThanOrEqual(rowIndex, minIndex), + Expressions.lessThanOrEqual(rowIndex, maxIndex)))); + + return res; + } + + public Expression rowInFrame(Expression rowIndex) { + return checkBounds(rowIndex, startX, endX); + } + + public Expression rowInPartition(Expression rowIndex) { + return checkBounds(rowIndex, minX, maxX); + } + + public Expression compareRows(Expression a, Expression b) { + return Expressions.call(comparator_, + BuiltInMethod.COMPARATOR_COMPARE.method, + getRow(a), getRow(b)); + } + + public Expression getRow(Expression rowIndex) { + return block.append( + "jRow", + RexToLixTranslator.convert( + Expressions.arrayIndex(rows_, rowIndex), + inputPhysType.getJavaRowType())); + } + + public Expression index() { + return i_; + } + + public Expression startIndex() { + return startX; + } + + public Expression endIndex() { + return endX; + } + + public Expression hasRows() { + return hasRows; + } + + public Expression getFrameRowCount() { + return frameRowCount; + } + + public Expression getPartitionRowCount() { + return partitionRowCount; + } + }; + } + }; + } + + private Pair<Expression, Expression> getPartitionIterator( + BlockBuilder builder, + Expression source_, + PhysType inputPhysType, + Group group, + Expression comparator_) { + // Populate map of lists, one per partition + // final Map<Integer, List<Employee>> multiMap = + // new SortedMultiMap<Integer, List<Employee>>(); + // source.foreach( + // new Function1<Employee, Void>() { + // public Void apply(Employee v) { + // final Integer k = v.deptno; + // multiMap.putMulti(k, v); + // return null; + // } + // }); + // final List<Xxx> list = new ArrayList<Xxx>(multiMap.size()); + // Iterator<Employee[]> iterator = multiMap.arrays(comparator); + // + if (group.keys.isEmpty()) { + // If partition key is empty, no need to partition. + // + // final List<Employee> tempList = + // source.into(new ArrayList<Employee>()); + // Iterator<Employee[]> iterator = + // SortedMultiMap.singletonArrayIterator(comparator, tempList); + // final List<Xxx> list = new ArrayList<Xxx>(tempList.size()); + + final Expression tempList_ = builder.append( + "tempList", + Expressions.convert_( + Expressions.call( + source_, + BuiltInMethod.INTO.method, + Expressions.new_(ArrayList.class)), + List.class)); + return Pair.of(tempList_, + builder.append( + "iterator", + Expressions.call( + null, + BuiltInMethod.SORTED_MULTI_MAP_SINGLETON.method, + comparator_, + tempList_))); + } + Expression multiMap_ = + builder.append( + "multiMap", Expressions.new_(SortedMultiMap.class)); + final BlockBuilder builder2 = new BlockBuilder(); + final ParameterExpression v_ = + Expressions.parameter(inputPhysType.getJavaRowType(), + builder2.newName("v")); + + Pair<Type, List<Expression>> selector = inputPhysType.selector( + v_, + group.keys.asList(), + JavaRowFormat.CUSTOM); + final ParameterExpression key_; + if(selector.left instanceof Types.RecordType) { + Types.RecordType keyJavaType = (Types.RecordType) selector.left; + List<Expression> initExpressions = selector.right; + + key_ = Expressions.parameter(keyJavaType, "key"); + builder2.add(Expressions.declare(0, key_, null)); + builder2.add(Expressions.statement(Expressions.assign(key_, Expressions.new_(keyJavaType)))); + List<Types.RecordField> fieldList = keyJavaType.getRecordFields(); + for (int i = 0; i < initExpressions.size(); i++) { + Expression right = initExpressions.get(i); + builder2.add( + Expressions.statement( + Expressions.assign( + Expressions.field(key_, fieldList.get(i)), right))); + } + } + else + { + DeclarationStatement declare = Expressions.declare(0, "key", selector.right.get(0)); + builder2.add(declare); + key_ = declare.parameter; + } + builder2.add( + Expressions.statement( + Expressions.call( + multiMap_, + BuiltInMethod.SORTED_MULTI_MAP_PUT_MULTI.method, + key_, + v_))); + builder2.add( + Expressions.return_( + null, Expressions.constant(null))); + + builder.add( + Expressions.statement( + Expressions.call( + source_, + BuiltInMethod.ENUMERABLE_FOREACH.method, + Expressions.lambda( + builder2.toBlock(), v_)))); + + return Pair.of(multiMap_, + builder.append( + "iterator", + Expressions.call( + multiMap_, + BuiltInMethod.SORTED_MULTI_MAP_ARRAYS.method, + comparator_))); + } + + private Pair<Expression, Expression> getRowCollationKey( + BlockBuilder builder, PhysType inputPhysType, + Group group, int windowIdx) { + if (!(group.isRows || (group.upperBound.isUnbounded() + && group.lowerBound.isUnbounded()))) { + Pair<Expression, Expression> pair = + inputPhysType.generateCollationKey( + group.collation().getFieldCollations()); + // optimize=false to prevent inlining of object create into for-loops + return Pair.of( + builder.append("keySelector" + windowIdx, pair.left, false), + builder.append("keyComparator" + windowIdx, pair.right, false)); + } else { + return Pair.of(null, null); + } + } + + private void declareAndResetState(final JavaTypeFactory typeFactory, + BlockBuilder builder, final Result result, int windowIdx, + List<AggImpState> aggs, PhysType outputPhysType, + List<Expression> outputRow) { + for (final AggImpState agg : aggs) { + agg.context = + new WinAggContext() { + public SqlAggFunction aggregation() { + return agg.call.getAggregation(); + } + + public RelDataType returnRelType() { + return agg.call.type; + } + + public Type returnType() { + return EnumUtils.javaClass(typeFactory, returnRelType()); + } + + public List<? extends Type> parameterTypes() { + return EnumUtils.fieldTypes(typeFactory, + parameterRelTypes()); + } + + public List<? extends RelDataType> parameterRelTypes() { + return EnumUtils.fieldRowTypes(result.physType.getRowType(), + constants, agg.call.getArgList()); + } + }; + String aggName = "a" + agg.aggIdx; + if (CalcitePrepareImpl.DEBUG) { + aggName = Util.toJavaId(agg.call.getAggregation().getName(), 0) + .substring("ID$0$".length()) + aggName; + } + List<Type> state = agg.implementor.getStateType(agg.context); + final List<Expression> decls = + new ArrayList<Expression>(state.size()); + for (int i = 0; i < state.size(); i++) { + Type type = state.get(i); + ParameterExpression pe = + Expressions.parameter(type, + builder.newName(aggName + + "s" + i + "w" + windowIdx)); + builder.add(Expressions.declare(0, pe, null)); + decls.add(pe); + } + agg.state = decls; + Type aggHolderType = agg.context.returnType(); + Type aggStorageType = + outputPhysType.getJavaFieldType(outputRow.size()); + if (Primitive.is(aggHolderType) && !Primitive.is(aggStorageType)) { + aggHolderType = Primitive.box(aggHolderType); + } + ParameterExpression aggRes = Expressions.parameter(0, + aggHolderType, + builder.newName(aggName + "w" + windowIdx)); + + builder.add( + Expressions.declare(0, aggRes, + Expressions.constant(Primitive.is(aggRes.getType()) + ? Primitive.of(aggRes.getType()).defaultValue + : null, + aggRes.getType()))); + agg.result = aggRes; + outputRow.add(aggRes); + agg.implementor.implementReset(agg.context, + new WinAggResetContextImpl(builder, agg.state, + null, null, null, null, null, null)); + } + } + + private void implementAdd(List<AggImpState> aggs, + final BlockBuilder builder7, + final Function<BlockBuilder, WinAggFrameResultContext> frame, + final Function<AggImpState, List<RexNode>> rexArguments, + final DeclarationStatement jDecl) { + for (final AggImpState agg : aggs) { + final WinAggAddContext addContext = + new WinAggAddContextImpl(builder7, agg.state, frame) { + public Expression currentPosition() { + return jDecl.parameter; + } + + public List<RexNode> rexArguments() { + return rexArguments.apply(agg); + } + + public RexNode rexFilterArgument() { + return null; // REVIEW + } + }; + agg.implementor.implementAdd(agg.context, addContext); + } + } + + private boolean implementResult(List<AggImpState> aggs, + final BlockBuilder builder, + final Function<BlockBuilder, WinAggFrameResultContext> frame, + final Function<AggImpState, List<RexNode>> rexArguments, + boolean cachedBlock) { + boolean nonEmpty = false; + for (final AggImpState agg : aggs) { + boolean needCache = true; + if (agg.implementor instanceof WinAggImplementor) { + WinAggImplementor imp = (WinAggImplementor) agg.implementor; + needCache = imp.needCacheWhenFrameIntact(); + } + if (needCache ^ cachedBlock) { + // Regular aggregates do not change when the windowing frame keeps + // the same. Ths + continue; + } + nonEmpty = true; + Expression res = agg.implementor.implementResult(agg.context, + new WinAggResultContextImpl(builder, agg.state, frame) { + public List<RexNode> rexArguments() { + return rexArguments.apply(agg); + } + }); + // Several count(a) and count(b) might share the result + Expression aggRes = builder.append("a" + agg.aggIdx + "res", + RexToLixTranslator.convert(res, agg.result.getType())); + builder.add( + Expressions.statement(Expressions.assign(agg.result, aggRes))); + } + return nonEmpty; + } + + private Expression translateBound(RexToLixTranslator translator, + ParameterExpression i_, Expression row_, Expression min_, + Expression max_, Expression rows_, Group group, + boolean lower, + PhysType physType, Expression rowComparator, + Expression keySelector, Expression keyComparator) { + RexWindowBound bound = lower ? group.lowerBound : group.upperBound; + if (bound.isUnbounded()) { + return bound.isPreceding() ? min_ : max_; + } + if (group.isRows) { + if (bound.isCurrentRow()) { + return i_; + } + RexNode node = bound.getOffset(); + Expression offs = translator.translate(node); + // Floating offset does not make sense since we refer to array index. + // Nulls do not make sense as well. + offs = RexToLixTranslator.convert(offs, int.class); + + Expression b = i_; + if (bound.isFollowing()) { + b = Expressions.add(b, offs); + } else { + b = Expressions.subtract(b, offs); + } + return b; + } + Expression searchLower = min_; + Expression searchUpper = max_; + if (bound.isCurrentRow()) { + if (lower) { + searchUpper = i_; + } else { + searchLower = i_; + } + } + + List<RelFieldCollation> fieldCollations = + group.collation().getFieldCollations(); + if (bound.isCurrentRow() && fieldCollations.size() != 1) { + return Expressions.call( + (lower + ? BuiltInMethod.BINARY_SEARCH5_LOWER + : BuiltInMethod.BINARY_SEARCH5_UPPER).method, + rows_, row_, searchLower, searchUpper, keySelector, keyComparator); + } + assert fieldCollations.size() == 1 + : "When using range window specification, ORDER BY should have" + + " exactly one expression." + + " Actual collation is " + group.collation(); + // isRange + int orderKey = + fieldCollations.get(0).getFieldIndex(); + RelDataType keyType = + physType.getRowType().getFieldList().get(orderKey).getType(); + Type desiredKeyType = translator.typeFactory.getJavaClass(keyType); + if (bound.getOffset() == null) { + desiredKeyType = Primitive.box(desiredKeyType); + } + Expression val = translator.translate( + new RexInputRef(orderKey, keyType), desiredKeyType); + if (!bound.isCurrentRow()) { + RexNode node = bound.getOffset(); + Expression offs = translator.translate(node); + // TODO: support date + interval somehow + if (bound.isFollowing()) { + val = Expressions.add(val, offs); + } else { + val = Expressions.subtract(val, offs); + } + } + return Expressions.call( + (lower + ? BuiltInMethod.BINARY_SEARCH6_LOWER + : BuiltInMethod.BINARY_SEARCH6_UPPER).method, + rows_, val, searchLower, searchUpper, keySelector, keyComparator); + } +} + +// End EnumerableWindow.java http://git-wip-us.apache.org/repos/asf/kylin/blob/4ae4333c/atopcalcite/src/main/java/org/apache/calcite/adapter/enumerable/PhysType.java ---------------------------------------------------------------------- diff --git a/atopcalcite/src/main/java/org/apache/calcite/adapter/enumerable/PhysType.java b/atopcalcite/src/main/java/org/apache/calcite/adapter/enumerable/PhysType.java new file mode 100644 index 0000000..e37b196 --- /dev/null +++ b/atopcalcite/src/main/java/org/apache/calcite/adapter/enumerable/PhysType.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.enumerable; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.ParameterExpression; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.util.Pair; + +import java.lang.reflect.Type; +import java.util.List; + +/* + * OVERRIDE POINT: patching CALCITE-1540 on calcite 1.8.0 + */ + +/** + * Physical type of a row. + * + * <p>Consists of the SQL row type (returned by {@link #getRowType()}), the Java + * type of the row (returned by {@link #getJavaRowType()}), and methods to + * generate expressions to access fields, generate records, and so forth. + * Together, the records encapsulate how the logical type maps onto the physical + * type.</p> + */ +public interface PhysType { + /** Returns the Java type (often a Class) that represents a row. For + * example, in one row format, always returns {@code Object[].class}. */ + Type getJavaRowType(); + + /** + * Returns the Java class that is used to store the field with the given + * ordinal. + * + * <p>For instance, when the java row type is {@code Object[]}, the java + * field type is {@code Object} even if the field is not nullable.</p> */ + Type getJavaFieldType(int field); + + /** Returns the physical type of a field. */ + PhysType field(int ordinal); + + /** Returns the physical type of a given field's component type. */ + PhysType component(int field); + + /** Returns the SQL row type. */ + RelDataType getRowType(); + + /** Returns the Java class of the field with the given ordinal. */ + Class fieldClass(int field); + + /** Returns whether a given field allows null values. */ + boolean fieldNullable(int index); + + /** Generates a reference to a given field in an expression. + * + * <p>For example given {@code expression=employee} and {@code field=2}, + * generates</p> + * + * <pre>{@code employee.deptno}</pre> + * + * @param expression Expression + * @param field Ordinal of field + * @return Expression to access the field of the expression + */ + Expression fieldReference(Expression expression, int field); + + /** Generates a reference to a given field in an expression. + * + * <p>This method optimizes for the target storage type (i.e. avoids + * casts).</p> + * + * <p>For example given {@code expression=employee} and {@code field=2}, + * generates</p> + * + * <pre>{@code employee.deptno}</pre> + * + * @param expression Expression + * @param field Ordinal of field + * @param storageType optional hint for storage class + * @return Expression to access the field of the expression + */ + Expression fieldReference(Expression expression, int field, + Type storageType); + + /** Generates an accessor function for a given list of fields. The resulting + * object is a {@link List} (implementing {@link Object#hashCode()} and + * {@link Object#equals(Object)} per that interface) and also implements + * {@link Comparable}. + * + * <p>For example:</p> + * + * <pre>{@code + * new Function1<Employee, Object[]> { + * public Object[] apply(Employee v1) { + * return FlatLists.of(v1.<fieldN>, v1.<fieldM>); + * } + * } + * }</pre> + */ + Expression generateAccessor(List<Integer> fields); + + /** Generates a selector for the given fields from an expression, with the + * default row format. */ + Expression generateSelector( + ParameterExpression parameter, + List<Integer> fields); + + /** Generates a lambda expression that is a selector for the given fields from + * an expression. */ + Expression generateSelector( + ParameterExpression parameter, + List<Integer> fields, + JavaRowFormat targetFormat); + + /** Generates a lambda expression that is a selector for the given fields from + * an expression. + * + * <p>{@code usedFields} must be a subset of {@code fields}. + * For each field, there is a corresponding indicator field. + * If a field is used, its value is assigned and its indicator is left + * {@code false}. + * If a field is not used, its value is not assigned and its indicator is + * set to {@code true}; + * This will become a value of 1 when {@code GROUPING(field)} is called. */ + Expression generateSelector( + ParameterExpression parameter, + List<Integer> fields, + List<Integer> usedFields, + JavaRowFormat targetFormat); + + /** Generates a selector for the given fields from an expression. */ + /** Only used by EnumerableWindow */ + Pair<Type, List<Expression>> selector( + ParameterExpression parameter, + List<Integer> fields, + JavaRowFormat targetFormat); + + /** Projects a given collection of fields from this input record, into + * a particular preferred output format. The output format is optimized + * if there are 0 or 1 fields. */ + PhysType project( + List<Integer> integers, + JavaRowFormat format); + + /** Projects a given collection of fields from this input record, optionally + * with indicator fields, into a particular preferred output format. + * + * <p>The output format is optimized if there are 0 or 1 fields + * and indicators are disabled. */ + PhysType project( + List<Integer> integers, + boolean indicator, + JavaRowFormat format); + + /** Returns a lambda to create a collation key and a comparator. The + * comparator is sometimes null. */ + Pair<Expression, Expression> generateCollationKey( + List<RelFieldCollation> collations); + + /** Returns a comparator. Unlike the comparator returned by + * {@link #generateCollationKey(java.util.List)}, this comparator acts on the + * whole element. */ + Expression generateComparator( + RelCollation collation); + + /** Returns a expression that yields a comparer, or null if this type + * is comparable. */ + Expression comparer(); + + /** Generates an expression that creates a record for a row, initializing + * its fields with the given expressions. There must be one expression per + * field. + * + * @param expressions Expression to initialize each field + * @return Expression to create a row + */ + Expression record(List<Expression> expressions); + + /** Returns the format. */ + JavaRowFormat getFormat(); + + List<Expression> accessors(Expression parameter, List<Integer> argList); + + /** Returns a copy of this type that allows nulls if {@code nullable} is + * true. */ + PhysType makeNullable(boolean nullable); + + /** Converts an enumerable of this physical type to an enumerable that uses a + * given physical type for its rows. */ + Expression convertTo(Expression expression, PhysType targetPhysType); +} + +// End PhysType.java http://git-wip-us.apache.org/repos/asf/kylin/blob/4ae4333c/atopcalcite/src/main/java/org/apache/calcite/adapter/enumerable/PhysTypeImpl.java ---------------------------------------------------------------------- diff --git a/atopcalcite/src/main/java/org/apache/calcite/adapter/enumerable/PhysTypeImpl.java b/atopcalcite/src/main/java/org/apache/calcite/adapter/enumerable/PhysTypeImpl.java new file mode 100644 index 0000000..678b469 --- /dev/null +++ b/atopcalcite/src/main/java/org/apache/calcite/adapter/enumerable/PhysTypeImpl.java @@ -0,0 +1,654 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.enumerable; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.linq4j.function.Function1; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.MemberDeclaration; +import org.apache.calcite.linq4j.tree.ParameterExpression; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.linq4j.tree.Types; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.runtime.Utilities; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.lang.reflect.Type; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import static org.apache.calcite.adapter.enumerable.EnumUtils.javaRowClass; +import static org.apache.calcite.adapter.enumerable.EnumUtils.overridingMethodDecl; + +/* + * OVERRIDE POINT: patching CALCITE-1540 on calcite 1.8.0 + */ + +/** Implementation of {@link PhysType}. */ +public class PhysTypeImpl implements PhysType { + private final JavaTypeFactory typeFactory; + private final RelDataType rowType; + private final Type javaRowClass; + private final List<Class> fieldClasses = new ArrayList<>(); + final JavaRowFormat format; + + /** Creates a PhysTypeImpl. */ + PhysTypeImpl( + JavaTypeFactory typeFactory, + RelDataType rowType, + Type javaRowClass, + JavaRowFormat format) { + this.typeFactory = typeFactory; + this.rowType = rowType; + this.javaRowClass = javaRowClass; + this.format = format; + for (RelDataTypeField field : rowType.getFieldList()) { + fieldClasses.add(javaRowClass(typeFactory, field.getType())); + } + } + + public static PhysType of( + JavaTypeFactory typeFactory, + RelDataType rowType, + JavaRowFormat format) { + return of(typeFactory, rowType, format, true); + } + + public static PhysType of( + JavaTypeFactory typeFactory, + RelDataType rowType, + JavaRowFormat format, + boolean optimize) { + if (optimize) { + format = format.optimize(rowType); + } + final Type javaRowClass = format.javaRowClass(typeFactory, rowType); + return new PhysTypeImpl(typeFactory, rowType, javaRowClass, format); + } + + static PhysType of( + final JavaTypeFactory typeFactory, + Type javaRowClass) { + final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); + if (javaRowClass instanceof Types.RecordType) { + final Types.RecordType recordType = (Types.RecordType) javaRowClass; + for (Types.RecordField field : recordType.getRecordFields()) { + builder.add(field.getName(), typeFactory.createType(field.getType())); + } + } + RelDataType rowType = builder.build(); + // Do not optimize if there are 0 or 1 fields. + return new PhysTypeImpl(typeFactory, rowType, javaRowClass, + JavaRowFormat.CUSTOM); + } + + public JavaRowFormat getFormat() { + return format; + } + + public PhysType project(List<Integer> integers, JavaRowFormat format) { + return project(integers, false, format); + } + + public PhysType project(List<Integer> integers, boolean indicator, + JavaRowFormat format) { + final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); + for (int index : integers) { + builder.add(rowType.getFieldList().get(index)); + } + if (indicator) { + final RelDataType booleanType = + typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.BOOLEAN), false); + for (int index : integers) { + builder.add("i$" + rowType.getFieldList().get(index).getName(), + booleanType); + } + } + RelDataType projectedRowType = builder.build(); + return of(typeFactory, projectedRowType, format.optimize(projectedRowType)); + } + + public Expression generateSelector( + ParameterExpression parameter, + List<Integer> fields) { + return generateSelector(parameter, fields, format); + } + + public Expression generateSelector( + ParameterExpression parameter, + List<Integer> fields, + JavaRowFormat targetFormat) { + // Optimize target format + switch (fields.size()) { + case 0: + targetFormat = JavaRowFormat.LIST; + break; + case 1: + targetFormat = JavaRowFormat.SCALAR; + break; + } + final PhysType targetPhysType = + project(fields, targetFormat); + switch (format) { + case SCALAR: + return Expressions.call(BuiltInMethod.IDENTITY_SELECTOR.method); + default: + return Expressions.lambda(Function1.class, + targetPhysType.record(fieldReferences(parameter, fields)), parameter); + } + } + + public Expression generateSelector(final ParameterExpression parameter, + final List<Integer> fields, List<Integer> usedFields, + JavaRowFormat targetFormat) { + final PhysType targetPhysType = + project(fields, true, targetFormat); + final List<Expression> expressions = Lists.newArrayList(); + for (Ord<Integer> ord : Ord.zip(fields)) { + final Integer field = ord.e; + if (usedFields.contains(field)) { + expressions.add(fieldReference(parameter, field)); + } else { + final Primitive primitive = + Primitive.of(targetPhysType.fieldClass(ord.i)); + expressions.add( + Expressions.constant( + primitive != null ? primitive.defaultValue : null)); + } + } + for (Integer field : fields) { + expressions.add(Expressions.constant(!usedFields.contains(field))); + } + return Expressions.lambda(Function1.class, + targetPhysType.record(expressions), parameter); + } + + public Pair<Type, List<Expression>> selector( + ParameterExpression parameter, + List<Integer> fields, + JavaRowFormat targetFormat) { + // Optimize target format + switch (fields.size()) { + case 0: + targetFormat = JavaRowFormat.LIST; + break; + case 1: + targetFormat = JavaRowFormat.SCALAR; + break; + } + final PhysType targetPhysType = + project(fields, targetFormat); + switch (format) { + case SCALAR: + return Pair.of(parameter.getType(), Collections.<Expression>singletonList(parameter)); + default: + return Pair.of(targetPhysType.getJavaRowType(), fieldReferences(parameter, fields)); + } + } + + public List<Expression> accessors(Expression v1, List<Integer> argList) { + final List<Expression> expressions = new ArrayList<>(); + for (int field : argList) { + expressions.add( + Types.castIfNecessary( + fieldClass(field), + fieldReference(v1, field))); + } + return expressions; + } + + public PhysType makeNullable(boolean nullable) { + if (!nullable) { + return this; + } + return new PhysTypeImpl(typeFactory, + typeFactory.createTypeWithNullability(rowType, true), + Primitive.box(javaRowClass), format); + } + + public Expression convertTo(Expression exp, PhysType targetPhysType) { + final JavaRowFormat targetFormat = targetPhysType.getFormat(); + if (format == targetFormat) { + return exp; + } + final ParameterExpression o_ = + Expressions.parameter(javaRowClass, "o"); + final int fieldCount = rowType.getFieldCount(); + return Expressions.call(exp, BuiltInMethod.SELECT.method, + generateSelector(o_, Util.range(fieldCount), targetFormat)); + } + + public Pair<Expression, Expression> generateCollationKey( + final List<RelFieldCollation> collations) { + final Expression selector; + if (collations.size() == 1) { + RelFieldCollation collation = collations.get(0); + ParameterExpression parameter = + Expressions.parameter(javaRowClass, "v"); + selector = + Expressions.lambda( + Function1.class, + fieldReference(parameter, collation.getFieldIndex()), + parameter); + return Pair.<Expression, Expression>of( + selector, + Expressions.call( + BuiltInMethod.NULLS_COMPARATOR.method, + Expressions.constant( + collation.nullDirection + == RelFieldCollation.NullDirection.FIRST), + Expressions.constant( + collation.getDirection() + == RelFieldCollation.Direction.DESCENDING))); + } + selector = + Expressions.call(BuiltInMethod.IDENTITY_SELECTOR.method); + + // int c; + // c = Utilities.compare(v0, v1); + // if (c != 0) return c; // or -c if descending + // ... + // return 0; + BlockBuilder body = new BlockBuilder(); + final ParameterExpression parameterV0 = + Expressions.parameter(javaRowClass, "v0"); + final ParameterExpression parameterV1 = + Expressions.parameter(javaRowClass, "v1"); + final ParameterExpression parameterC = + Expressions.parameter(int.class, "c"); + final int mod = collations.size() == 1 ? Modifier.FINAL : 0; + body.add(Expressions.declare(mod, parameterC, null)); + for (RelFieldCollation collation : collations) { + final int index = collation.getFieldIndex(); + Expression arg0 = fieldReference(parameterV0, index); + Expression arg1 = fieldReference(parameterV1, index); + switch (Primitive.flavor(fieldClass(index))) { + case OBJECT: + arg0 = Types.castIfNecessary(Comparable.class, arg0); + arg1 = Types.castIfNecessary(Comparable.class, arg1); + } + final boolean nullsFirst = + collation.nullDirection + == RelFieldCollation.NullDirection.FIRST; + final boolean descending = + collation.getDirection() + == RelFieldCollation.Direction.DESCENDING; + final Method method = (fieldNullable(index) + ? (nullsFirst ^ descending + ? BuiltInMethod.COMPARE_NULLS_FIRST + : BuiltInMethod.COMPARE_NULLS_LAST) + : BuiltInMethod.COMPARE).method; + body.add( + Expressions.statement( + Expressions.assign( + parameterC, + Expressions.call(method.getDeclaringClass(), + method.getName(), + arg0, + arg1)))); + body.add( + Expressions.ifThen( + Expressions.notEqual( + parameterC, Expressions.constant(0)), + Expressions.return_( + null, + descending + ? Expressions.negate(parameterC) + : parameterC))); + } + body.add( + Expressions.return_(null, Expressions.constant(0))); + + final List<MemberDeclaration> memberDeclarations = + Expressions.<MemberDeclaration>list( + Expressions.methodDecl( + Modifier.PUBLIC, + int.class, + "compare", + ImmutableList.of( + parameterV0, parameterV1), + body.toBlock())); + + if (EnumerableRules.BRIDGE_METHODS) { + final ParameterExpression parameterO0 = + Expressions.parameter(Object.class, "o0"); + final ParameterExpression parameterO1 = + Expressions.parameter(Object.class, "o1"); + BlockBuilder bridgeBody = new BlockBuilder(); + bridgeBody.add( + Expressions.return_( + null, + Expressions.call( + Expressions.parameter( + Comparable.class, "this"), + BuiltInMethod.COMPARATOR_COMPARE.method, + Expressions.convert_( + parameterO0, + javaRowClass), + Expressions.convert_( + parameterO1, + javaRowClass)))); + memberDeclarations.add( + overridingMethodDecl( + BuiltInMethod.COMPARATOR_COMPARE.method, + ImmutableList.of(parameterO0, parameterO1), + bridgeBody.toBlock())); + } + return Pair.<Expression, Expression>of( + selector, + Expressions.new_( + Comparator.class, + Collections.<Expression>emptyList(), + memberDeclarations)); + } + + public Expression generateComparator(RelCollation collation) { + // int c; + // c = Utilities.compare(v0, v1); + // if (c != 0) return c; // or -c if descending + // ... + // return 0; + BlockBuilder body = new BlockBuilder(); + final Type javaRowClass = Primitive.box(this.javaRowClass); + final ParameterExpression parameterV0 = + Expressions.parameter(javaRowClass, "v0"); + final ParameterExpression parameterV1 = + Expressions.parameter(javaRowClass, "v1"); + final ParameterExpression parameterC = + Expressions.parameter(int.class, "c"); + final int mod = + collation.getFieldCollations().size() == 1 ? Modifier.FINAL : 0; + body.add(Expressions.declare(mod, parameterC, null)); + for (RelFieldCollation fieldCollation : collation.getFieldCollations()) { + final int index = fieldCollation.getFieldIndex(); + Expression arg0 = fieldReference(parameterV0, index); + Expression arg1 = fieldReference(parameterV1, index); + switch (Primitive.flavor(fieldClass(index))) { + case OBJECT: + arg0 = Types.castIfNecessary(Comparable.class, arg0); + arg1 = Types.castIfNecessary(Comparable.class, arg1); + } + final boolean nullsFirst = + fieldCollation.nullDirection + == RelFieldCollation.NullDirection.FIRST; + final boolean descending = + fieldCollation.getDirection() + == RelFieldCollation.Direction.DESCENDING; + body.add( + Expressions.statement( + Expressions.assign( + parameterC, + Expressions.call( + Utilities.class, + fieldNullable(index) + ? (nullsFirst != descending + ? "compareNullsFirst" + : "compareNullsLast") + : "compare", + arg0, + arg1)))); + body.add( + Expressions.ifThen( + Expressions.notEqual( + parameterC, Expressions.constant(0)), + Expressions.return_( + null, + descending + ? Expressions.negate(parameterC) + : parameterC))); + } + body.add( + Expressions.return_(null, Expressions.constant(0))); + + final List<MemberDeclaration> memberDeclarations = + Expressions.<MemberDeclaration>list( + Expressions.methodDecl( + Modifier.PUBLIC, + int.class, + "compare", + ImmutableList.of(parameterV0, parameterV1), + body.toBlock())); + + if (EnumerableRules.BRIDGE_METHODS) { + final ParameterExpression parameterO0 = + Expressions.parameter(Object.class, "o0"); + final ParameterExpression parameterO1 = + Expressions.parameter(Object.class, "o1"); + BlockBuilder bridgeBody = new BlockBuilder(); + bridgeBody.add( + Expressions.return_( + null, + Expressions.call( + Expressions.parameter( + Comparable.class, "this"), + BuiltInMethod.COMPARATOR_COMPARE.method, + Expressions.convert_( + parameterO0, + javaRowClass), + Expressions.convert_( + parameterO1, + javaRowClass)))); + memberDeclarations.add( + overridingMethodDecl( + BuiltInMethod.COMPARATOR_COMPARE.method, + ImmutableList.of(parameterO0, parameterO1), + bridgeBody.toBlock())); + } + return Expressions.new_( + Comparator.class, + Collections.<Expression>emptyList(), + memberDeclarations); + } + + public RelDataType getRowType() { + return rowType; + } + + public Expression record(List<Expression> expressions) { + return format.record(javaRowClass, expressions); + } + + public Type getJavaRowType() { + return javaRowClass; + } + + public Type getJavaFieldType(int index) { + return format.javaFieldClass(typeFactory, rowType, index); + } + + public PhysType component(int fieldOrdinal) { + final RelDataTypeField field = rowType.getFieldList().get(fieldOrdinal); + return PhysTypeImpl.of(typeFactory, + toStruct(field.getType().getComponentType()), format, false); + } + + public PhysType field(int ordinal) { + final RelDataTypeField field = rowType.getFieldList().get(ordinal); + final RelDataType type = field.getType(); + return PhysTypeImpl.of(typeFactory, toStruct(type), format, false); + } + + private RelDataType toStruct(RelDataType type) { + if (type.isStruct()) { + return type; + } + return typeFactory.builder() + .add(SqlUtil.deriveAliasFromOrdinal(0), type) + .build(); + } + + public Expression comparer() { + return format.comparer(); + } + + private List<Expression> fieldReferences( + final Expression parameter, final List<Integer> fields) { + return new AbstractList<Expression>() { + public Expression get(int index) { + return fieldReference(parameter, fields.get(index)); + } + + public int size() { + return fields.size(); + } + }; + } + + public Class fieldClass(int field) { + return fieldClasses.get(field); + } + + public boolean fieldNullable(int field) { + return rowType.getFieldList().get(field).getType().isNullable(); + } + + public Expression generateAccessor( + List<Integer> fields) { + ParameterExpression v1 = + Expressions.parameter(javaRowClass, "v1"); + switch (fields.size()) { + case 0: + return Expressions.lambda( + Function1.class, + Expressions.field( + null, + BuiltInMethod.COMPARABLE_EMPTY_LIST.field), + v1); + case 1: + int field0 = fields.get(0); + + // new Function1<Employee, Res> { + // public Res apply(Employee v1) { + // return v1.<fieldN>; + // } + // } + Class returnType = fieldClasses.get(field0); + Expression fieldReference = + Types.castIfNecessary( + returnType, + fieldReference(v1, field0)); + return Expressions.lambda( + Function1.class, + fieldReference, + v1); + default: + // new Function1<Employee, List> { + // public List apply(Employee v1) { + // return Arrays.asList( + // new Object[] {v1.<fieldN>, v1.<fieldM>}); + // } + // } + Expressions.FluentList<Expression> list = Expressions.list(); + for (int field : fields) { + list.add(fieldReference(v1, field)); + } + switch (list.size()) { + case 2: + return Expressions.lambda( + Function1.class, + Expressions.call( + List.class, + null, + BuiltInMethod.LIST2.method, + list), + v1); + case 3: + return Expressions.lambda( + Function1.class, + Expressions.call( + List.class, + null, + BuiltInMethod.LIST3.method, + list), + v1); + case 4: + return Expressions.lambda( + Function1.class, + Expressions.call( + List.class, + null, + BuiltInMethod.LIST4.method, + list), + v1); + case 5: + return Expressions.lambda( + Function1.class, + Expressions.call( + List.class, + null, + BuiltInMethod.LIST5.method, + list), + v1); + case 6: + return Expressions.lambda( + Function1.class, + Expressions.call( + List.class, + null, + BuiltInMethod.LIST6.method, + list), + v1); + default: + return Expressions.lambda( + Function1.class, + Expressions.call( + List.class, + null, + BuiltInMethod.LIST_N.method, + Expressions.newArrayInit( + Comparable.class, + list)), + v1); + } + } + } + + public Expression fieldReference( + Expression expression, int field) { + return fieldReference(expression, field, null); + } + + public Expression fieldReference( + Expression expression, int field, Type storageType) { + if (storageType == null) { + storageType = fieldClass(field); + } + return format.field(expression, field, storageType); + } +} + +// End PhysTypeImpl.java http://git-wip-us.apache.org/repos/asf/kylin/blob/4ae4333c/kylin-it/src/test/resources/query/sql_window/query11.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql_window/query11.sql b/kylin-it/src/test/resources/query/sql_window/query11.sql new file mode 100644 index 0000000..3002f4c --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_window/query11.sql @@ -0,0 +1,23 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +select lstg_format_name,cal_dt, +sum(sum(price)) over(partition by lstg_format_name,cal_dt), +max(sum(price)) over(partition by lstg_format_name,cal_dt), +min(sum(price)) over(partition by lstg_format_name) +from test_kylin_fact +group by cal_dt, lstg_format_name \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/4ae4333c/kylin-it/src/test/resources/query/sql_window/query12.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql_window/query12.sql b/kylin-it/src/test/resources/query/sql_window/query12.sql new file mode 100644 index 0000000..8073312 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_window/query12.sql @@ -0,0 +1,26 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +select * from( + select cal_dt, lstg_format_name, sum(price) as GMV, + 100*sum(price)/first_value(sum(price)) over (partition by lstg_format_name,SLR_SEGMENT_CD order by cast(cal_dt as timestamp) range interval '1' day PRECEDING) as "last_day", + first_value(sum(price)) over (partition by lstg_format_name order by cast(cal_dt as timestamp) range cast(366 as INTERVAL day) preceding) + from test_kylin_fact as "last_year" + where cal_dt between '2013-01-08' and '2013-01-15' or cal_dt between '2013-01-07' and '2013-01-15' or cal_dt between '2012-01-01' and '2012-01-15' + group by cal_dt, lstg_format_name,SLR_SEGMENT_CD +)t +where cal_dt between '2013-01-06' and '2013-01-15'