Repository: spark Updated Branches: refs/heads/master c5ed36953 -> b7f54119f
[SPARK-9420][SQL] Move expressions in sql/core package to catalyst. Since catalyst package already depends on Spark core, we can move those expressions into catalyst, and simplify function registry. This is a followup of #7478. Author: Reynold Xin <[email protected]> Closes #7735 from rxin/SPARK-8003 and squashes the following commits: 2ffbdc3 [Reynold Xin] [SPARK-8003][SQL] Move expressions in sql/core package to catalyst. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7f54119 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7f54119 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7f54119 Branch: refs/heads/master Commit: b7f54119f86f916481aeccc67f07e77dc2a924c7 Parents: c5ed369 Author: Reynold Xin <[email protected]> Authored: Tue Jul 28 17:03:59 2015 -0700 Committer: Reynold Xin <[email protected]> Committed: Tue Jul 28 17:03:59 2015 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +- .../catalyst/analysis/FunctionRegistry.scala | 17 ++--- .../expressions/MonotonicallyIncreasingID.scala | 73 +++++++++++++++++++ .../catalyst/expressions/SparkPartitionID.scala | 50 +++++++++++++ .../expressions/NondeterministicSuite.scala | 30 ++++++++ .../scala/org/apache/spark/sql/SQLContext.scala | 11 +-- .../expressions/MonotonicallyIncreasingID.scala | 74 -------------------- .../expressions/SparkPartitionID.scala | 51 -------------- .../sql/execution/expressions/package.scala | 23 ------ .../scala/org/apache/spark/sql/functions.scala | 4 +- .../scala/org/apache/spark/sql/UDFSuite.scala | 4 +- .../expression/NondeterministicSuite.scala | 32 --------- .../org/apache/spark/sql/hive/HiveContext.scala | 13 +--- .../org/apache/spark/sql/hive/UDFSuite.scala | 4 +- 14 files changed, 173 insertions(+), 216 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a723e92..a309ee3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, AggregateExpression2, AggregateFunction2} import org.apache.spark.sql.catalyst.expressions._ @@ -25,7 +27,6 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.{SimpleCatalystConf, CatalystConf} import org.apache.spark.sql.types._ -import scala.collection.mutable.ArrayBuffer /** * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 9b60943..372f80d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -161,13 +161,6 @@ object FunctionRegistry { expression[ToDegrees]("degrees"), expression[ToRadians]("radians"), - // misc functions - expression[Md5]("md5"), - expression[Sha2]("sha2"), - expression[Sha1]("sha1"), - expression[Sha1]("sha"), - expression[Crc32]("crc32"), - // aggregate functions expression[Average]("avg"), expression[Count]("count"), @@ -229,7 +222,15 @@ object FunctionRegistry { expression[Year]("year"), // collection functions - expression[Size]("size") + expression[Size]("size"), + + // misc functions + expression[Crc32]("crc32"), + expression[Md5]("md5"), + expression[Sha1]("sha"), + expression[Sha1]("sha1"), + expression[Sha2]("sha2"), + expression[SparkPartitionID]("spark_partition_id") ) val builtin: FunctionRegistry = { http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala new file mode 100644 index 0000000..291b7a5 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} +import org.apache.spark.sql.types.{LongType, DataType} + +/** + * Returns monotonically increasing 64-bit integers. + * + * The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. + * The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits + * represent the record number within each partition. The assumption is that the data frame has + * less than 1 billion partitions, and each partition has less than 8 billion records. + * + * Since this expression is stateful, it cannot be a case object. + */ +private[sql] case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic { + + /** + * Record ID within each partition. By being transient, count's value is reset to 0 every time + * we serialize and deserialize and initialize it. + */ + @transient private[this] var count: Long = _ + + @transient private[this] var partitionMask: Long = _ + + override protected def initInternal(): Unit = { + count = 0L + partitionMask = TaskContext.getPartitionId().toLong << 33 + } + + override def nullable: Boolean = false + + override def dataType: DataType = LongType + + override protected def evalInternal(input: InternalRow): Long = { + val currentCount = count + count += 1 + partitionMask + currentCount + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val countTerm = ctx.freshName("count") + val partitionMaskTerm = ctx.freshName("partitionMask") + ctx.addMutableState(ctx.JAVA_LONG, countTerm, s"$countTerm = 0L;") + ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm, + s"$partitionMaskTerm = ((long) org.apache.spark.TaskContext.getPartitionId()) << 33;") + + ev.isNull = "false" + s""" + final ${ctx.javaType(dataType)} ${ev.primitive} = $partitionMaskTerm + $countTerm; + $countTerm++; + """ + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala new file mode 100644 index 0000000..3f6480b --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} +import org.apache.spark.sql.types.{IntegerType, DataType} + + +/** + * Expression that returns the current partition id of the Spark task. + */ +private[sql] case class SparkPartitionID() extends LeafExpression with Nondeterministic { + + override def nullable: Boolean = false + + override def dataType: DataType = IntegerType + + @transient private[this] var partitionId: Int = _ + + override protected def initInternal(): Unit = { + partitionId = TaskContext.getPartitionId() + } + + override protected def evalInternal(input: InternalRow): Int = partitionId + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val idTerm = ctx.freshName("partitionId") + ctx.addMutableState(ctx.JAVA_INT, idTerm, + s"$idTerm = org.apache.spark.TaskContext.getPartitionId();") + ev.isNull = "false" + s"final ${ctx.javaType(dataType)} ${ev.primitive} = $idTerm;" + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala new file mode 100644 index 0000000..8289482 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkFunSuite + +class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { + test("MonotonicallyIncreasingID") { + checkEvaluation(MonotonicallyIncreasingID(), 0L) + } + + test("SparkPartitionID") { + checkEvaluation(SparkPartitionID(), 0) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 56cd8f2..dbb2a09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -31,8 +31,6 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{expression => FunctionExpression, FunctionBuilder} -import org.apache.spark.sql.execution.expressions.SparkPartitionID import org.apache.spark.sql.SQLConf.SQLConfEntry import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.errors.DialectException @@ -142,14 +140,7 @@ class SQLContext(@transient val sparkContext: SparkContext) // TODO how to handle the temp function per user session? @transient - protected[sql] lazy val functionRegistry: FunctionRegistry = { - val reg = FunctionRegistry.builtin - val extendedFunctions = List[(String, (ExpressionInfo, FunctionBuilder))]( - FunctionExpression[SparkPartitionID]("spark__partition__id") - ) - extendedFunctions.foreach { case(name, (info, fun)) => reg.registerFunction(name, info, fun) } - reg - } + protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin @transient protected[sql] lazy val analyzer: Analyzer = http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala deleted file mode 100644 index eca36b3..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.expressions - -import org.apache.spark.TaskContext -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression} -import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} -import org.apache.spark.sql.types.{LongType, DataType} - -/** - * Returns monotonically increasing 64-bit integers. - * - * The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. - * The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits - * represent the record number within each partition. The assumption is that the data frame has - * less than 1 billion partitions, and each partition has less than 8 billion records. - * - * Since this expression is stateful, it cannot be a case object. - */ -private[sql] case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic { - - /** - * Record ID within each partition. By being transient, count's value is reset to 0 every time - * we serialize and deserialize and initialize it. - */ - @transient private[this] var count: Long = _ - - @transient private[this] var partitionMask: Long = _ - - override protected def initInternal(): Unit = { - count = 0L - partitionMask = TaskContext.getPartitionId().toLong << 33 - } - - override def nullable: Boolean = false - - override def dataType: DataType = LongType - - override protected def evalInternal(input: InternalRow): Long = { - val currentCount = count - count += 1 - partitionMask + currentCount - } - - override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { - val countTerm = ctx.freshName("count") - val partitionMaskTerm = ctx.freshName("partitionMask") - ctx.addMutableState(ctx.JAVA_LONG, countTerm, s"$countTerm = 0L;") - ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm, - s"$partitionMaskTerm = ((long) org.apache.spark.TaskContext.getPartitionId()) << 33;") - - ev.isNull = "false" - s""" - final ${ctx.javaType(dataType)} ${ev.primitive} = $partitionMaskTerm + $countTerm; - $countTerm++; - """ - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala deleted file mode 100644 index 98c8eab..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.expressions - -import org.apache.spark.TaskContext -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression} -import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} -import org.apache.spark.sql.types.{IntegerType, DataType} - - -/** - * Expression that returns the current partition id of the Spark task. - */ -private[sql] case class SparkPartitionID() extends LeafExpression with Nondeterministic { - - override def nullable: Boolean = false - - override def dataType: DataType = IntegerType - - @transient private[this] var partitionId: Int = _ - - override protected def initInternal(): Unit = { - partitionId = TaskContext.getPartitionId() - } - - override protected def evalInternal(input: InternalRow): Int = partitionId - - override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { - val idTerm = ctx.freshName("partitionId") - ctx.addMutableState(ctx.JAVA_INT, idTerm, - s"$idTerm = org.apache.spark.TaskContext.getPartitionId();") - ev.isNull = "false" - s"final ${ctx.javaType(dataType)} ${ev.primitive} = $idTerm;" - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala deleted file mode 100644 index 568b7ac..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -/** - * Package containing expressions that are specific to Spark runtime. - */ -package object expressions http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 0148991..4261a5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -634,7 +634,7 @@ object functions { * @group normal_funcs * @since 1.4.0 */ - def monotonicallyIncreasingId(): Column = execution.expressions.MonotonicallyIncreasingID() + def monotonicallyIncreasingId(): Column = MonotonicallyIncreasingID() /** * Return an alternative value `r` if `l` is NaN. @@ -741,7 +741,7 @@ object functions { * @group normal_funcs * @since 1.4.0 */ - def sparkPartitionId(): Column = execution.expressions.SparkPartitionID() + def sparkPartitionId(): Column = SparkPartitionID() /** * Computes the square root of the specified float value. http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 9b326c1..d9c8b38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -51,10 +51,10 @@ class UDFSuite extends QueryTest { df.selectExpr("count(distinct a)") } - test("SPARK-8003 spark__partition__id") { + test("SPARK-8003 spark_partition_id") { val df = Seq((1, "Tearing down the walls that divide us")).toDF("id", "saying") df.registerTempTable("tmp_table") - checkAnswer(ctx.sql("select spark__partition__id() from tmp_table").toDF(), Row(0)) + checkAnswer(ctx.sql("select spark_partition_id() from tmp_table").toDF(), Row(0)) ctx.dropTempTable("tmp_table") } http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala deleted file mode 100644 index b6e79ff..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.expression - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions. ExpressionEvalHelper -import org.apache.spark.sql.execution.expressions.{SparkPartitionID, MonotonicallyIncreasingID} - -class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { - test("MonotonicallyIncreasingID") { - checkEvaluation(MonotonicallyIncreasingID(), 0L) - } - - test("SparkPartitionID") { - checkEvaluation(SparkPartitionID(), 0) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 8b35c12..110f51a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -38,9 +38,6 @@ import org.apache.spark.Logging import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{expression => FunctionExpression, FunctionBuilder} -import org.apache.spark.sql.catalyst.expressions.ExpressionInfo -import org.apache.spark.sql.execution.expressions.SparkPartitionID import org.apache.spark.sql.SQLConf.SQLConfEntry import org.apache.spark.sql.SQLConf.SQLConfEntry._ import org.apache.spark.sql.catalyst.{TableIdentifier, ParserDialect} @@ -375,14 +372,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { // Note that HiveUDFs will be overridden by functions registered in this context. @transient - override protected[sql] lazy val functionRegistry: FunctionRegistry = { - val reg = new HiveFunctionRegistry(FunctionRegistry.builtin) - val extendedFunctions = List[(String, (ExpressionInfo, FunctionBuilder))]( - FunctionExpression[SparkPartitionID]("spark__partition__id") - ) - extendedFunctions.foreach { case(name, (info, fun)) => reg.registerFunction(name, info, fun) } - reg - } + override protected[sql] lazy val functionRegistry: FunctionRegistry = + new HiveFunctionRegistry(FunctionRegistry.builtin) /* An analyzer that uses the Hive metastore. */ @transient http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala index 9cea5d4..37afc21 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala @@ -35,9 +35,9 @@ class UDFSuite extends QueryTest { assert(ctx.sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5) } - test("SPARK-8003 spark__partition__id") { + test("SPARK-8003 spark_partition_id") { val df = Seq((1, "Two Fiiiiive")).toDF("id", "saying") ctx.registerDataFrameAsTable(df, "test_table") - checkAnswer(ctx.sql("select spark__partition__id() from test_table LIMIT 1").toDF(), Row(0)) + checkAnswer(ctx.sql("select spark_partition_id() from test_table LIMIT 1").toDF(), Row(0)) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
