Repository: spark
Updated Branches:
  refs/heads/branch-1.1 4da76fc81 -> 496f62d9a


SPARK-3025 [SQL]: Allow JDBC clients to set a fair scheduler pool

This definitely needs review as I am not familiar with this part of Spark.
I tested this locally and it did seem to work.

Author: Patrick Wendell <[email protected]>

Closes #1937 from pwendell/scheduler and squashes the following commits:

b858e33 [Patrick Wendell] SPARK-3025: Allow JDBC clients to set a fair 
scheduler pool

(cherry picked from commit 6bca8898a1aa4ca7161492229bac1748b3da2ad7)
Signed-off-by: Michael Armbrust <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/496f62d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/496f62d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/496f62d9

Branch: refs/heads/branch-1.1
Commit: 496f62d9a98067256d8a51fd1e7a485ff6492fa8
Parents: 4da76fc
Author: Patrick Wendell <[email protected]>
Authored: Mon Aug 18 10:52:20 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Mon Aug 18 10:52:36 2014 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  5 ++++
 .../scala/org/apache/spark/sql/SQLConf.scala    |  3 +++
 .../server/SparkSQLOperationManager.scala       | 27 +++++++++++++++-----
 3 files changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/496f62d9/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index cd65439..34accad 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -605,6 +605,11 @@ Configuration of Hive is done by placing your 
`hive-site.xml` file in `conf/`.
 
 You may also use the beeline script comes with Hive.
 
+To set a [Fair Scheduler](job-scheduling.html#fair-scheduler-pools) pool for a 
JDBC client session,
+users can set the `spark.sql.thriftserver.scheduler.pool` variable:
+
+    SET spark.sql.thriftserver.scheduler.pool=accounting;
+
 ### Migration Guide for Shark Users
 
 #### Reducer number

http://git-wip-us.apache.org/repos/asf/spark/blob/496f62d9/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 90de111..56face2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -33,6 +33,9 @@ private[spark] object SQLConf {
   val DIALECT = "spark.sql.dialect"
   val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
 
+  // This is only used for the thriftserver
+  val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/496f62d9/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index 9338e81..699a110 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -17,24 +17,24 @@
 
 package org.apache.spark.sql.hive.thriftserver.server
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.math.{random, round}
-
 import java.sql.Timestamp
 import java.util.{Map => JMap}
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, Map}
+import scala.math.{random, round}
+
 import org.apache.hadoop.hive.common.`type`.HiveDecimal
 import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, 
Operation, OperationManager}
 import org.apache.hive.service.cli.session.HiveSession
-
 import org.apache.spark.Logging
+import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
+import org.apache.spark.sql.catalyst.plans.logical.SetCommand
 import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
 import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
-import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
+import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
 
 /**
  * Executes queries using Spark SQL, and maintains a list of handles to active 
queries.
@@ -43,6 +43,9 @@ class SparkSQLOperationManager(hiveContext: HiveContext) 
extends OperationManage
   val handleToOperation = ReflectionUtils
     .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
 
+  // TODO: Currenlty this will grow infinitely, even as sessions expire
+  val sessionToActivePool = Map[HiveSession, String]()
+
   override def newExecuteStatementOperation(
       parentSession: HiveSession,
       statement: String,
@@ -165,8 +168,18 @@ class SparkSQLOperationManager(hiveContext: HiveContext) 
extends OperationManage
         try {
           result = hiveContext.sql(statement)
           logDebug(result.queryExecution.toString())
+          result.queryExecution.logical match {
+            case SetCommand(Some(key), Some(value)) if (key == 
SQLConf.THRIFTSERVER_POOL) =>
+              sessionToActivePool(parentSession) = value
+              logInfo(s"Setting spark.scheduler.pool=$value for future 
statements in this session.")
+            case _ =>
+          }
+
           val groupId = round(random * 1000000).toString
           hiveContext.sparkContext.setJobGroup(groupId, statement)
+          sessionToActivePool.get(parentSession).foreach { pool =>
+            hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", 
pool)
+          }
           iter = {
             val resultRdd = result.queryExecution.toRdd
             val useIncrementalCollect =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to