Repository: spark Updated Branches: refs/heads/branch-2.0 6871deb93 -> 9c20c7a33
[SPARK-15330][SQL] Implement Reset Command #### What changes were proposed in this pull request? Like `Set` Command in Hive, `Reset` is also supported by Hive. See the link: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli Below is the related Hive JIRA: https://issues.apache.org/jira/browse/HIVE-3202 This PR is to implement such a command for resetting the SQL-related configuration to the default values. One of the use case shown in HIVE-3202 is listed below: > For the purpose of optimization we set various configs per query. It's worthy > but all those configs should be reset every time for next query. #### How was this patch tested? Added a test case. Author: gatorsmile <[email protected]> Author: xiaoli <[email protected]> Author: Xiao Li <[email protected]> Closes #13121 from gatorsmile/resetCommand. (cherry picked from commit 8f0a3d5bcba313dc3b70d4aa9a8ba2aa2d276062) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c20c7a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c20c7a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c20c7a3 Branch: refs/heads/branch-2.0 Commit: 9c20c7a33b86c1db5899900b21e36f80f08a1b8a Parents: 6871deb Author: gatorsmile <[email protected]> Authored: Sat May 21 20:07:34 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Sat May 21 20:07:47 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../spark/sql/execution/SparkSqlParser.scala | 12 +++++ .../sql/execution/command/SetCommand.scala | 16 +++++++ .../spark/sql/internal/SQLConfSuite.scala | 49 +++++++++++++++++++- .../hive/thriftserver/SparkSQLCLIDriver.scala | 6 +-- 5 files changed, 82 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9c20c7a3/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 06ac37b..848c59e 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -120,6 +120,7 @@ statement | ADD identifier .*? #addResource | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration + | RESET #resetConfiguration | unsupportedHiveNativeCommands .*? #failNativeCommand ; @@ -633,7 +634,7 @@ nonReserved | GROUPING | CUBE | ROLLUP | EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN | TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF - | SET + | SET | RESET | VIEW | REPLACE | IF | NO | DATA @@ -748,6 +749,7 @@ MAP: 'MAP'; STRUCT: 'STRUCT'; COMMENT: 'COMMENT'; SET: 'SET'; +RESET: 'RESET'; DATA: 'DATA'; START: 'START'; TRANSACTION: 'TRANSACTION'; http://git-wip-us.apache.org/repos/asf/spark/blob/9c20c7a3/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 2966eef..2e3ac97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -76,6 +76,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** + * Create a [[ResetCommand]] logical plan. + * Example SQL : + * {{{ + * RESET; + * }}} + */ + override def visitResetConfiguration( + ctx: ResetConfigurationContext): LogicalPlan = withOrigin(ctx) { + ResetCommand + } + + /** * Create an [[AnalyzeTableCommand]] command. This currently only implements the NOSCAN * option (other options are passed on to Hive) e.g.: * {{{ http://git-wip-us.apache.org/repos/asf/spark/blob/9c20c7a3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 282f26c..b0e2d03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -116,3 +116,19 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm override def run(sparkSession: SparkSession): Seq[Row] = runFunc(sparkSession) } + +/** + * This command is for resetting SQLConf to the default values. Command that runs + * {{{ + * reset; + * }}} + */ +case object ResetCommand extends RunnableCommand with Logging { + + override def run(sparkSession: SparkSession): Seq[Row] = { + sparkSession.sessionState.conf.clear() + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} http://git-wip-us.apache.org/repos/asf/spark/blob/9c20c7a3/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 0296229..f8227e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -99,7 +99,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { test("deprecated property") { spark.sqlContext.conf.clear() val original = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) - try{ + try { sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10") assert(spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) === 10) } finally { @@ -107,6 +107,53 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } } + test("reset - public conf") { + spark.sqlContext.conf.clear() + val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL) + try { + assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === true) + sql(s"set ${SQLConf.GROUP_BY_ORDINAL.key}=false") + assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === false) + assert(sql(s"set").where(s"key = '${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 1) + sql(s"reset") + assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === true) + assert(sql(s"set").where(s"key = '${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 0) + } finally { + sql(s"set ${SQLConf.GROUP_BY_ORDINAL}=$original") + } + } + + test("reset - internal conf") { + spark.sqlContext.conf.clear() + val original = spark.conf.get(SQLConf.NATIVE_VIEW) + try { + assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true) + sql(s"set ${SQLConf.NATIVE_VIEW.key}=false") + assert(spark.conf.get(SQLConf.NATIVE_VIEW) === false) + assert(sql(s"set").where(s"key = '${SQLConf.NATIVE_VIEW.key}'").count() == 1) + sql(s"reset") + assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true) + assert(sql(s"set").where(s"key = '${SQLConf.NATIVE_VIEW.key}'").count() == 0) + } finally { + sql(s"set ${SQLConf.NATIVE_VIEW}=$original") + } + } + + test("reset - user-defined conf") { + spark.sqlContext.conf.clear() + val userDefinedConf = "x.y.z.reset" + try { + assert(spark.conf.getOption(userDefinedConf).isEmpty) + sql(s"set $userDefinedConf=false") + assert(spark.conf.get(userDefinedConf) === "false") + assert(sql(s"set").where(s"key = '$userDefinedConf'").count() == 1) + sql(s"reset") + assert(spark.conf.getOption(userDefinedConf).isEmpty) + } finally { + spark.conf.unset(userDefinedConf) + } + } + test("invalid conf value") { spark.sqlContext.conf.clear() val e = intercept[IllegalArgumentException] { http://git-wip-us.apache.org/repos/asf/spark/blob/9c20c7a3/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 1402e0a..33ff8ae 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -32,8 +32,8 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.exec.Utilities -import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, CommandProcessor, - CommandProcessorFactory, SetProcessor} +import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, CommandProcessor} +import org.apache.hadoop.hive.ql.processors.{CommandProcessorFactory, ResetProcessor, SetProcessor} import org.apache.hadoop.hive.ql.session.SessionState import org.apache.thrift.transport.TSocket @@ -312,7 +312,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { if (proc != null) { // scalastyle:off println if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] || - proc.isInstanceOf[AddResourceProcessor]) { + proc.isInstanceOf[AddResourceProcessor] || proc.isInstanceOf[ResetProcessor]) { val driver = new SparkSQLDriver driver.init() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
