Repository: spark
Updated Branches:
  refs/heads/branch-2.0 6871deb93 -> 9c20c7a33


[SPARK-15330][SQL] Implement Reset Command

#### What changes were proposed in this pull request?
Like `Set` Command in Hive, `Reset` is also supported by Hive. See the link: 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli

Below is the related Hive JIRA: https://issues.apache.org/jira/browse/HIVE-3202

This PR is to implement such a command for resetting the SQL-related 
configuration to the default values. One of the use case shown in HIVE-3202 is 
listed below:

> For the purpose of optimization we set various configs per query. It's worthy 
> but all those configs should be reset every time for next query.

#### How was this patch tested?
Added a test case.

Author: gatorsmile <[email protected]>
Author: xiaoli <[email protected]>
Author: Xiao Li <[email protected]>

Closes #13121 from gatorsmile/resetCommand.

(cherry picked from commit 8f0a3d5bcba313dc3b70d4aa9a8ba2aa2d276062)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c20c7a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c20c7a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c20c7a3

Branch: refs/heads/branch-2.0
Commit: 9c20c7a33b86c1db5899900b21e36f80f08a1b8a
Parents: 6871deb
Author: gatorsmile <[email protected]>
Authored: Sat May 21 20:07:34 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sat May 21 20:07:47 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  4 +-
 .../spark/sql/execution/SparkSqlParser.scala    | 12 +++++
 .../sql/execution/command/SetCommand.scala      | 16 +++++++
 .../spark/sql/internal/SQLConfSuite.scala       | 49 +++++++++++++++++++-
 .../hive/thriftserver/SparkSQLCLIDriver.scala   |  6 +--
 5 files changed, 82 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9c20c7a3/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 06ac37b..848c59e 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -120,6 +120,7 @@ statement
     | ADD identifier .*?                                               
#addResource
     | SET ROLE .*?                                                     
#failNativeCommand
     | SET .*?                                                          
#setConfiguration
+    | RESET                                                            
#resetConfiguration
     | unsupportedHiveNativeCommands .*?                                
#failNativeCommand
     ;
 
@@ -633,7 +634,7 @@ nonReserved
     | GROUPING | CUBE | ROLLUP
     | EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN
     | TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF
-    | SET
+    | SET | RESET
     | VIEW | REPLACE
     | IF
     | NO | DATA
@@ -748,6 +749,7 @@ MAP: 'MAP';
 STRUCT: 'STRUCT';
 COMMENT: 'COMMENT';
 SET: 'SET';
+RESET: 'RESET';
 DATA: 'DATA';
 START: 'START';
 TRANSACTION: 'TRANSACTION';

http://git-wip-us.apache.org/repos/asf/spark/blob/9c20c7a3/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 2966eef..2e3ac97 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -76,6 +76,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
+   * Create a [[ResetCommand]] logical plan.
+   * Example SQL :
+   * {{{
+   *   RESET;
+   * }}}
+   */
+  override def visitResetConfiguration(
+      ctx: ResetConfigurationContext): LogicalPlan = withOrigin(ctx) {
+    ResetCommand
+  }
+
+  /**
    * Create an [[AnalyzeTableCommand]] command. This currently only implements 
the NOSCAN
    * option (other options are passed on to Hive) e.g.:
    * {{{

http://git-wip-us.apache.org/repos/asf/spark/blob/9c20c7a3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index 282f26c..b0e2d03 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -116,3 +116,19 @@ case class SetCommand(kv: Option[(String, 
Option[String])]) extends RunnableComm
   override def run(sparkSession: SparkSession): Seq[Row] = 
runFunc(sparkSession)
 
 }
+
+/**
+ * This command is for resetting SQLConf to the default values. Command that 
runs
+ * {{{
+ *   reset;
+ * }}}
+ */
+case object ResetCommand extends RunnableCommand with Logging {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.sessionState.conf.clear()
+    Seq.empty[Row]
+  }
+
+  override val output: Seq[Attribute] = Seq.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9c20c7a3/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 0296229..f8227e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -99,7 +99,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
   test("deprecated property") {
     spark.sqlContext.conf.clear()
     val original = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
-    try{
+    try {
       sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
       assert(spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) === 10)
     } finally {
@@ -107,6 +107,53 @@ class SQLConfSuite extends QueryTest with SharedSQLContext 
{
     }
   }
 
+  test("reset - public conf") {
+    spark.sqlContext.conf.clear()
+    val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL)
+    try {
+      assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === true)
+      sql(s"set ${SQLConf.GROUP_BY_ORDINAL.key}=false")
+      assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === false)
+      assert(sql(s"set").where(s"key = 
'${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 1)
+      sql(s"reset")
+      assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === true)
+      assert(sql(s"set").where(s"key = 
'${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 0)
+    } finally {
+      sql(s"set ${SQLConf.GROUP_BY_ORDINAL}=$original")
+    }
+  }
+
+  test("reset - internal conf") {
+    spark.sqlContext.conf.clear()
+    val original = spark.conf.get(SQLConf.NATIVE_VIEW)
+    try {
+      assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true)
+      sql(s"set ${SQLConf.NATIVE_VIEW.key}=false")
+      assert(spark.conf.get(SQLConf.NATIVE_VIEW) === false)
+      assert(sql(s"set").where(s"key = '${SQLConf.NATIVE_VIEW.key}'").count() 
== 1)
+      sql(s"reset")
+      assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true)
+      assert(sql(s"set").where(s"key = '${SQLConf.NATIVE_VIEW.key}'").count() 
== 0)
+    } finally {
+      sql(s"set ${SQLConf.NATIVE_VIEW}=$original")
+    }
+  }
+
+  test("reset - user-defined conf") {
+    spark.sqlContext.conf.clear()
+    val userDefinedConf = "x.y.z.reset"
+    try {
+      assert(spark.conf.getOption(userDefinedConf).isEmpty)
+      sql(s"set $userDefinedConf=false")
+      assert(spark.conf.get(userDefinedConf) === "false")
+      assert(sql(s"set").where(s"key = '$userDefinedConf'").count() == 1)
+      sql(s"reset")
+      assert(spark.conf.getOption(userDefinedConf).isEmpty)
+    } finally {
+      spark.conf.unset(userDefinedConf)
+    }
+  }
+
   test("invalid conf value") {
     spark.sqlContext.conf.clear()
     val e = intercept[IllegalArgumentException] {

http://git-wip-us.apache.org/repos/asf/spark/blob/9c20c7a3/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 1402e0a..33ff8ae 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -32,8 +32,8 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, 
HiveInterruptUtils}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.exec.Utilities
-import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, 
CommandProcessor,
-  CommandProcessorFactory, SetProcessor}
+import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, 
CommandProcessor}
+import org.apache.hadoop.hive.ql.processors.{CommandProcessorFactory, 
ResetProcessor, SetProcessor}
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.thrift.transport.TSocket
 
@@ -312,7 +312,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver 
with Logging {
       if (proc != null) {
         // scalastyle:off println
         if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] ||
-          proc.isInstanceOf[AddResourceProcessor]) {
+          proc.isInstanceOf[AddResourceProcessor] || 
proc.isInstanceOf[ResetProcessor]) {
           val driver = new SparkSQLDriver
 
           driver.init()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to