Repository: spark
Updated Branches:
refs/heads/branch-1.6 85e6a2205 -> f7898f9e2
[SPARK-11624][SPARK-11972][SQL] fix commands that need hive to exec
In SparkSQLCLI, we have created a `CliSessionState`, but then we call
`SparkSQLEnv.init()`, which will start another `SessionState`. This would lead
to exception because `processCmd` need to get the `CliSessionState` instance by
calling `SessionState.get()`, but the return value would be a instance of
`SessionState`. See the exception below.
spark-sql> !echo "test";
Exception in thread "main" java.lang.ClassCastException:
org.apache.hadoop.hive.ql.session.SessionState cannot be cast to
org.apache.hadoop.hive.cli.CliSessionState
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:112)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:301)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:242)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:691)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Author: Daoyuan Wang <[email protected]>
Closes #9589 from adrian-wang/clicommand.
(cherry picked from commit 5d80fac58f837933b5359a8057676f45539e53af)
Signed-off-by: Michael Armbrust <[email protected]>
Conflicts:
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7898f9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7898f9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7898f9e
Branch: refs/heads/branch-1.6
Commit: f7898f9e2df131fa78200f6034508e74a78c2a44
Parents: 85e6a22
Author: Daoyuan Wang <[email protected]>
Authored: Mon Feb 22 18:13:32 2016 -0800
Committer: Michael Armbrust <[email protected]>
Committed: Mon Feb 22 18:20:06 2016 -0800
----------------------------------------------------------------------
.../hive/thriftserver/SparkSQLCLIDriver.scala | 7 ++-
.../spark/sql/hive/thriftserver/CliSuite.scala | 5 ++
sql/hive/pom.xml | 6 +++
.../spark/sql/hive/client/ClientWrapper.scala | 53 ++++++++++++--------
4 files changed, 48 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f7898f9e/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 6419002..4be9bd9 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -276,8 +276,11 @@ private[hive] class SparkSQLCLIDriver extends CliDriver
with Logging {
val tokens: Array[String] = cmd_trimmed.split("\\s+")
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
if (cmd_lower.equals("quit") ||
- cmd_lower.equals("exit") ||
- tokens(0).toLowerCase(Locale.ENGLISH).equals("source") ||
+ cmd_lower.equals("exit")) {
+ sessionState.close()
+ System.exit(0)
+ }
+ if (tokens(0).toLowerCase(Locale.ENGLISH).equals("source") ||
cmd_trimmed.startsWith("!") ||
tokens(0).toLowerCase.equals("list") ||
isRemoteMode) {
http://git-wip-us.apache.org/repos/asf/spark/blob/f7898f9e/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index fcf0399..58ebc00 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -234,4 +234,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll
with Logging {
-> "Error in query: Table not found: nonexistent_table;"
)
}
+
+ test("SPARK-11624 Spark SQL CLI should set sessionState only once") {
+ runCliWithin(2.minute, Seq("-e", "!echo \"This is a test for
Spark-11624\";"))(
+ "" -> "This is a test for Spark-11624")
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f7898f9e/sql/hive/pom.xml
----------------------------------------------------------------------
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index d96f3e2..57c867f 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -72,6 +72,12 @@
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
+-->
+ <dependency>
+ <groupId>${hive.group}</groupId>
+ <artifactId>hive-cli</artifactId>
+ </dependency>
+<!--
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-common</artifactId>
http://git-wip-us.apache.org/repos/asf/spark/blob/f7898f9e/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 598ccde..e9b2e20 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import scala.language.reflectiveCalls
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.cli.CliSessionState
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
import org.apache.hadoop.hive.metastore.{TableType => HTableType}
@@ -172,29 +173,39 @@ private[hive] class ClientWrapper(
}
val ret = try {
- val initialConf = new HiveConf(classOf[SessionState])
- // HiveConf is a Hadoop Configuration, which has a field of classLoader
and
- // the initial value will be the current thread's context class loader
- // (i.e. initClassLoader at here).
- // We call initialConf.setClassLoader(initClassLoader) at here to make
- // this action explicit.
- initialConf.setClassLoader(initClassLoader)
- config.foreach { case (k, v) =>
- if (k.toLowerCase.contains("password")) {
- logDebug(s"Hive Config: $k=xxx")
- } else {
- logDebug(s"Hive Config: $k=$v")
+ // originState will be created if not exists, will never be null
+ val originalState = SessionState.get()
+ if (originalState.isInstanceOf[CliSessionState]) {
+ // In `SparkSQLCLIDriver`, we have already started a `CliSessionState`,
+ // which contains information like configurations from command line.
Later
+ // we call `SparkSQLEnv.init()` there, which would run into this part
again.
+ // so we should keep `conf` and reuse the existing instance of
`CliSessionState`.
+ originalState
+ } else {
+ val initialConf = new HiveConf(classOf[SessionState])
+ // HiveConf is a Hadoop Configuration, which has a field of
classLoader and
+ // the initial value will be the current thread's context class loader
+ // (i.e. initClassLoader at here).
+ // We call initialConf.setClassLoader(initClassLoader) at here to make
+ // this action explicit.
+ initialConf.setClassLoader(initClassLoader)
+ config.foreach { case (k, v) =>
+ if (k.toLowerCase.contains("password")) {
+ logDebug(s"Hive Config: $k=xxx")
+ } else {
+ logDebug(s"Hive Config: $k=$v")
+ }
+ initialConf.set(k, v)
}
- initialConf.set(k, v)
- }
- val state = new SessionState(initialConf)
- if (clientLoader.cachedHive != null) {
- Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
+ val state = new SessionState(initialConf)
+ if (clientLoader.cachedHive != null) {
+ Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
+ }
+ SessionState.start(state)
+ state.out = new PrintStream(outputBuffer, true, "UTF-8")
+ state.err = new PrintStream(outputBuffer, true, "UTF-8")
+ state
}
- SessionState.start(state)
- state.out = new PrintStream(outputBuffer, true, "UTF-8")
- state.err = new PrintStream(outputBuffer, true, "UTF-8")
- state
} finally {
Thread.currentThread().setContextClassLoader(original)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]