Repository: spark
Updated Branches:
  refs/heads/master 37ad3b724 -> afd757a24


http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
deleted file mode 100644
index a4e1f3e..0000000
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.thriftserver.server
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.math.{random, round}
-
-import java.sql.Timestamp
-import java.util.{Map => JMap}
-
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hive.service.cli._
-import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, 
Operation, OperationManager}
-import org.apache.hive.service.cli.session.HiveSession
-
-import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
-import org.apache.spark.sql.{Logging, SchemaRDD, Row => SparkRow}
-
-/**
- * Executes queries using Spark SQL, and maintains a list of handles to active 
queries.
- */
-class SparkSQLOperationManager(hiveContext: HiveContext) extends 
OperationManager with Logging {
-  val handleToOperation = ReflectionUtils
-    .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
-
-  override def newExecuteStatementOperation(
-      parentSession: HiveSession,
-      statement: String,
-      confOverlay: JMap[String, String],
-      async: Boolean): ExecuteStatementOperation = synchronized {
-
-    val operation = new ExecuteStatementOperation(parentSession, statement, 
confOverlay) {
-      private var result: SchemaRDD = _
-      private var iter: Iterator[SparkRow] = _
-      private var dataTypes: Array[DataType] = _
-
-      def close(): Unit = {
-        // RDDs will be cleaned automatically upon garbage collection.
-        logger.debug("CLOSING")
-      }
-
-      def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
-        if (!iter.hasNext) {
-          new RowSet()
-        } else {
-          val maxRows = maxRowsL.toInt // Do you really want a row batch 
larger than Int Max? No.
-          var curRow = 0
-          var rowSet = new ArrayBuffer[Row](maxRows)
-
-          while (curRow < maxRows && iter.hasNext) {
-            val sparkRow = iter.next()
-            val row = new Row()
-            var curCol = 0
-
-            while (curCol < sparkRow.length) {
-              dataTypes(curCol) match {
-                case StringType =>
-                  row.addString(sparkRow(curCol).asInstanceOf[String])
-                case IntegerType =>
-                  
row.addColumnValue(ColumnValue.intValue(sparkRow.getInt(curCol)))
-                case BooleanType =>
-                  
row.addColumnValue(ColumnValue.booleanValue(sparkRow.getBoolean(curCol)))
-                case DoubleType =>
-                  
row.addColumnValue(ColumnValue.doubleValue(sparkRow.getDouble(curCol)))
-                case FloatType =>
-                  
row.addColumnValue(ColumnValue.floatValue(sparkRow.getFloat(curCol)))
-                case DecimalType =>
-                  val hiveDecimal = 
sparkRow.get(curCol).asInstanceOf[BigDecimal].bigDecimal
-                  row.addColumnValue(ColumnValue.stringValue(new 
HiveDecimal(hiveDecimal)))
-                case LongType =>
-                  
row.addColumnValue(ColumnValue.longValue(sparkRow.getLong(curCol)))
-                case ByteType =>
-                  
row.addColumnValue(ColumnValue.byteValue(sparkRow.getByte(curCol)))
-                case ShortType =>
-                  
row.addColumnValue(ColumnValue.intValue(sparkRow.getShort(curCol)))
-                case TimestampType =>
-                  row.addColumnValue(
-                    
ColumnValue.timestampValue(sparkRow.get(curCol).asInstanceOf[Timestamp]))
-                case BinaryType | _: ArrayType | _: StructType | _: MapType =>
-                  val hiveString = result
-                    .queryExecution
-                    .asInstanceOf[HiveContext#QueryExecution]
-                    .toHiveString((sparkRow.get(curCol), dataTypes(curCol)))
-                  row.addColumnValue(ColumnValue.stringValue(hiveString))
-              }
-              curCol += 1
-            }
-            rowSet += row
-            curRow += 1
-          }
-          new RowSet(rowSet, 0)
-        }
-      }
-
-      def getResultSetSchema: TableSchema = {
-        logger.warn(s"Result Schema: ${result.queryExecution.analyzed.output}")
-        if (result.queryExecution.analyzed.output.size == 0) {
-          new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
-        } else {
-          val schema = result.queryExecution.analyzed.output.map { attr =>
-            new FieldSchema(attr.name, 
HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
-          }
-          new TableSchema(schema)
-        }
-      }
-
-      def run(): Unit = {
-        logger.info(s"Running query '$statement'")
-        setState(OperationState.RUNNING)
-        try {
-          result = hiveContext.hql(statement)
-          logger.debug(result.queryExecution.toString())
-          val groupId = round(random * 1000000).toString
-          hiveContext.sparkContext.setJobGroup(groupId, statement)
-          iter = result.queryExecution.toRdd.toLocalIterator
-          dataTypes = 
result.queryExecution.analyzed.output.map(_.dataType).toArray
-          setHasResultSet(true)
-        } catch {
-          // Actually do need to catch Throwable as some failures don't 
inherit from Exception and
-          // HiveServer will silently swallow them.
-          case e: Throwable =>
-            logger.error("Error executing query:",e)
-            throw new HiveSQLException(e.toString)
-        }
-        setState(OperationState.FINISHED)
-      }
-    }
-
-   handleToOperation.put(operation.getHandle, operation)
-   operation
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt 
b/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt
deleted file mode 100644
index 850f801..0000000
--- a/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-238val_238
-86val_86
-311val_311
-27val_27
-165val_165

http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
deleted file mode 100644
index b90670a..0000000
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.thriftserver
-
-import java.io.{BufferedReader, InputStreamReader, PrintWriter}
-
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
-
-import org.apache.spark.sql.hive.test.TestHive
-
-class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils {
-  val WAREHOUSE_PATH = TestUtils.getWarehousePath("cli")
-  val METASTORE_PATH = TestUtils.getMetastorePath("cli")
-
-  override def beforeAll() {
-    val pb = new ProcessBuilder(
-      "../../bin/spark-sql",
-      "--master",
-      "local",
-      "--hiveconf",
-      
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
-      "--hiveconf",
-      "hive.metastore.warehouse.dir=" + WAREHOUSE_PATH)
-
-    process = pb.start()
-    outputWriter = new PrintWriter(process.getOutputStream, true)
-    inputReader = new BufferedReader(new 
InputStreamReader(process.getInputStream))
-    errorReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream))
-    waitForOutput(inputReader, "spark-sql>")
-  }
-
-  override def afterAll() {
-    process.destroy()
-    process.waitFor()
-  }
-
-  test("simple commands") {
-    val dataFilePath = getDataFile("data/files/small_kv.txt")
-    executeQuery("create table hive_test1(key int, val string);")
-    executeQuery("load data local inpath '" + dataFilePath+ "' overwrite into 
table hive_test1;")
-    executeQuery("cache table hive_test1", "Time taken")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
deleted file mode 100644
index 59f4952..0000000
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.thriftserver
-
-import scala.collection.JavaConversions._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent._
-
-import java.io.{BufferedReader, InputStreamReader}
-import java.sql.{Connection, DriverManager, Statement}
-
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
-
-import org.apache.spark.sql.Logging
-import org.apache.spark.sql.catalyst.util.getTempFilePath
-
-/**
- * Test for the HiveThriftServer2 using JDBC.
- */
-class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with 
TestUtils with Logging {
-
-  val WAREHOUSE_PATH = getTempFilePath("warehouse")
-  val METASTORE_PATH = getTempFilePath("metastore")
-
-  val DRIVER_NAME  = "org.apache.hive.jdbc.HiveDriver"
-  val TABLE = "test"
-  // use a different port, than the hive standard 10000,
-  // for tests to avoid issues with the port being taken on some machines
-  val PORT = "10000"
-
-  // If verbose is true, the test program will print all outputs coming from 
the Hive Thrift server.
-  val VERBOSE = 
Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).getOrElse("false").toBoolean
-
-  Class.forName(DRIVER_NAME)
-
-  override def beforeAll() { launchServer() }
-
-  override def afterAll() { stopServer() }
-
-  private def launchServer(args: Seq[String] = Seq.empty) {
-    // Forking a new process to start the Hive Thrift server. The reason to do 
this is it is
-    // hard to clean up Hive resources entirely, so we just start a new 
process and kill
-    // that process for cleanup.
-    val defaultArgs = Seq(
-      "../../sbin/start-thriftserver.sh",
-      "--master local",
-      "--hiveconf",
-      "hive.root.logger=INFO,console",
-      "--hiveconf",
-      
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
-      "--hiveconf",
-      s"hive.metastore.warehouse.dir=$WAREHOUSE_PATH")
-    val pb = new ProcessBuilder(defaultArgs ++ args)
-    process = pb.start()
-    inputReader = new BufferedReader(new 
InputStreamReader(process.getInputStream))
-    errorReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream))
-    waitForOutput(inputReader, "ThriftBinaryCLIService listening on")
-
-    // Spawn a thread to read the output from the forked process.
-    // Note that this is necessary since in some configurations, log4j could 
be blocked
-    // if its output to stderr are not read, and eventually blocking the 
entire test suite.
-    future {
-      while (true) {
-        val stdout = readFrom(inputReader)
-        val stderr = readFrom(errorReader)
-        if (VERBOSE && stdout.length > 0) {
-          println(stdout)
-        }
-        if (VERBOSE && stderr.length > 0) {
-          println(stderr)
-        }
-        Thread.sleep(50)
-      }
-    }
-  }
-
-  private def stopServer() {
-    process.destroy()
-    process.waitFor()
-  }
-
-  test("test query execution against a Hive Thrift server") {
-    Thread.sleep(5 * 1000)
-    val dataFilePath = getDataFile("data/files/small_kv.txt")
-    val stmt = createStatement()
-    stmt.execute("DROP TABLE IF EXISTS test")
-    stmt.execute("DROP TABLE IF EXISTS test_cached")
-    stmt.execute("CREATE TABLE test(key int, val string)")
-    stmt.execute(s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE 
test")
-    stmt.execute("CREATE TABLE test_cached as select * from test limit 4")
-    stmt.execute("CACHE TABLE test_cached")
-
-    var rs = stmt.executeQuery("select count(*) from test")
-    rs.next()
-    assert(rs.getInt(1) === 5)
-
-    rs = stmt.executeQuery("select count(*) from test_cached")
-    rs.next()
-    assert(rs.getInt(1) === 4)
-
-    stmt.close()
-  }
-
-  def getConnection: Connection = {
-    val connectURI = s"jdbc:hive2://localhost:$PORT/"
-    DriverManager.getConnection(connectURI, System.getProperty("user.name"), 
"")
-  }
-
-  def createStatement(): Statement = getConnection.createStatement()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala
deleted file mode 100644
index bb22426..0000000
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.thriftserver
-
-import java.io.{BufferedReader, PrintWriter}
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import org.apache.hadoop.hive.common.LogUtils
-import org.apache.hadoop.hive.common.LogUtils.LogInitializationException
-
-object TestUtils {
-  val timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss")
-
-  def getWarehousePath(prefix: String): String = {
-    System.getProperty("user.dir") + "/test_warehouses/" + prefix + 
"-warehouse-" +
-      timestamp.format(new Date)
-  }
-
-  def getMetastorePath(prefix: String): String = {
-    System.getProperty("user.dir") + "/test_warehouses/" + prefix + 
"-metastore-" +
-      timestamp.format(new Date)
-  }
-
-  // Dummy function for initialize the log4j properties.
-  def init() { }
-
-  // initialize log4j
-  try {
-    LogUtils.initHiveLog4j()
-  } catch {
-    case e: LogInitializationException => // Ignore the error.
-  }
-}
-
-trait TestUtils {
-  var process : Process = null
-  var outputWriter : PrintWriter = null
-  var inputReader : BufferedReader = null
-  var errorReader : BufferedReader = null
-
-  def executeQuery(
-    cmd: String, outputMessage: String = "OK", timeout: Long = 15000): String 
= {
-    println("Executing: " + cmd + ", expecting output: " + outputMessage)
-    outputWriter.write(cmd + "\n")
-    outputWriter.flush()
-    waitForQuery(timeout, outputMessage)
-  }
-
-  protected def waitForQuery(timeout: Long, message: String): String = {
-    if (waitForOutput(errorReader, message, timeout)) {
-      Thread.sleep(500)
-      readOutput()
-    } else {
-      assert(false, "Didn't find \"" + message + "\" in the output:\n" + 
readOutput())
-      null
-    }
-  }
-
-  // Wait for the specified str to appear in the output.
-  protected def waitForOutput(
-    reader: BufferedReader, str: String, timeout: Long = 10000): Boolean = {
-    val startTime = System.currentTimeMillis
-    var out = ""
-    while (!out.contains(str) && System.currentTimeMillis < (startTime + 
timeout)) {
-      out += readFrom(reader)
-    }
-    out.contains(str)
-  }
-
-  // Read stdout output and filter out garbage collection messages.
-  protected def readOutput(): String = {
-    val output = readFrom(inputReader)
-    // Remove GC Messages
-    val filteredOutput = output.lines.filterNot(x => x.contains("[GC") || 
x.contains("[Full GC"))
-      .mkString("\n")
-    filteredOutput
-  }
-
-  protected def readFrom(reader: BufferedReader): String = {
-    var out = ""
-    var c = 0
-    while (reader.ready) {
-      c = reader.read()
-      out += c.asInstanceOf[Char]
-    }
-    out
-  }
-
-  protected def getDataFile(name: String) = {
-    Thread.currentThread().getContextClassLoader.getResource(name)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/sql/hive/pom.xml
----------------------------------------------------------------------
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 93d00f7..1699ffe 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -32,7 +32,7 @@
   <name>Spark Project Hive</name>
   <url>http://spark.apache.org/</url>
   <properties>
-    <sbt.project.name>hive</sbt.project.name>
+     <sbt.project.name>hive</sbt.project.name>
   </properties>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 84d43ea..201c85f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -255,7 +255,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
       Seq(StringType, IntegerType, LongType, DoubleType, FloatType, 
BooleanType, ByteType,
         ShortType, DecimalType, TimestampType, BinaryType)
 
-    protected[sql] def toHiveString(a: (Any, DataType)): String = a match {
+    protected def toHiveString(a: (Any, DataType)): String = a match {
       case (struct: Row, StructType(fields)) =>
         struct.zip(fields).map {
           case (v, t) => s""""${t.name}":${toHiveStructString(v, 
t.dataType)}"""

http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 8489f2a..6f36a4f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -416,10 +416,10 @@ class HiveQuerySuite extends HiveComparisonTest {
     hql(s"set $testKey=$testVal")
     assert(get(testKey, testVal + "_") == testVal)
 
-    hql("set some.property=20")
-    assert(get("some.property", "0") == "20")
-    hql("set some.property = 40")
-    assert(get("some.property", "0") == "40")
+    hql("set mapred.reduce.tasks=20")
+    assert(get("mapred.reduce.tasks", "0") == "20")
+    hql("set mapred.reduce.tasks = 40")
+    assert(get("mapred.reduce.tasks", "0") == "40")
 
     hql(s"set $testKey=$testVal")
     assert(get(testKey, "0") == testVal)
@@ -433,61 +433,63 @@ class HiveQuerySuite extends HiveComparisonTest {
     val testKey = "spark.sql.key.usedfortestonly"
     val testVal = "test.val.0"
     val nonexistentKey = "nonexistent"
+    def collectResults(rdd: SchemaRDD): Set[(String, String)] =
+      rdd.collect().map { case Row(key: String, value: String) => key -> value 
}.toSet
 
     clear()
 
     // "set" itself returns all config variables currently specified in 
SQLConf.
     assert(hql("SET").collect().size == 0)
 
-    assertResult(Array(s"$testKey=$testVal")) {
-      hql(s"SET $testKey=$testVal").collect().map(_.getString(0))
+    assertResult(Set(testKey -> testVal)) {
+      collectResults(hql(s"SET $testKey=$testVal"))
     }
 
     assert(hiveconf.get(testKey, "") == testVal)
-    assertResult(Array(s"$testKey=$testVal")) {
-      hql(s"SET $testKey=$testVal").collect().map(_.getString(0))
+    assertResult(Set(testKey -> testVal)) {
+      collectResults(hql("SET"))
     }
 
     hql(s"SET ${testKey + testKey}=${testVal + testVal}")
     assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
-    assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal 
+ testVal}")) {
-      hql(s"SET").collect().map(_.getString(0))
+    assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + 
testVal))) {
+      collectResults(hql("SET"))
     }
 
     // "set key"
-    assertResult(Array(s"$testKey=$testVal")) {
-      hql(s"SET $testKey").collect().map(_.getString(0))
+    assertResult(Set(testKey -> testVal)) {
+      collectResults(hql(s"SET $testKey"))
     }
 
-    assertResult(Array(s"$nonexistentKey=<undefined>")) {
-      hql(s"SET $nonexistentKey").collect().map(_.getString(0))
+    assertResult(Set(nonexistentKey -> "<undefined>")) {
+      collectResults(hql(s"SET $nonexistentKey"))
     }
 
     // Assert that sql() should have the same effects as hql() by repeating 
the above using sql().
     clear()
     assert(sql("SET").collect().size == 0)
 
-    assertResult(Array(s"$testKey=$testVal")) {
-      sql(s"SET $testKey=$testVal").collect().map(_.getString(0))
+    assertResult(Set(testKey -> testVal)) {
+      collectResults(sql(s"SET $testKey=$testVal"))
     }
 
     assert(hiveconf.get(testKey, "") == testVal)
-    assertResult(Array(s"$testKey=$testVal")) {
-      sql("SET").collect().map(_.getString(0))
+    assertResult(Set(testKey -> testVal)) {
+      collectResults(sql("SET"))
     }
 
     sql(s"SET ${testKey + testKey}=${testVal + testVal}")
     assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
-    assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal 
+ testVal}")) {
-      sql("SET").collect().map(_.getString(0))
+    assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + 
testVal))) {
+      collectResults(sql("SET"))
     }
 
-    assertResult(Array(s"$testKey=$testVal")) {
-      sql(s"SET $testKey").collect().map(_.getString(0))
+    assertResult(Set(testKey -> testVal)) {
+      collectResults(sql(s"SET $testKey"))
     }
 
-    assertResult(Array(s"$nonexistentKey=<undefined>")) {
-      sql(s"SET $nonexistentKey").collect().map(_.getString(0))
+    assertResult(Set(nonexistentKey -> "<undefined>")) {
+      collectResults(sql(s"SET $nonexistentKey"))
     }
 
     clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
index b99f306..f60697c 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -28,7 +28,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming_2.10</artifactId>
   <properties>
-    <sbt.project.name>streaming</sbt.project.name>
+     <sbt.project.name>streaming</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project Streaming</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/tools/pom.xml
----------------------------------------------------------------------
diff --git a/tools/pom.xml b/tools/pom.xml
index 97abb6b..c0ee8fa 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -27,7 +27,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-tools_2.10</artifactId>
   <properties>
-    <sbt.project.name>tools</sbt.project.name>
+     <sbt.project.name>tools</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project Tools</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/yarn/alpha/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/alpha/pom.xml b/yarn/alpha/pom.xml
index 51744ec..5b13a1f 100644
--- a/yarn/alpha/pom.xml
+++ b/yarn/alpha/pom.xml
@@ -24,7 +24,7 @@
     <relativePath>../pom.xml</relativePath>
   </parent>
   <properties>
-    <sbt.project.name>yarn-alpha</sbt.project.name>
+     <sbt.project.name>yarn-alpha</sbt.project.name>
   </properties>
 
   <groupId>org.apache.spark</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 3faaf05..efb473a 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -29,7 +29,7 @@
   <packaging>pom</packaging>
   <name>Spark Project YARN Parent POM</name>
   <properties>
-    <sbt.project.name>yarn</sbt.project.name>
+     <sbt.project.name>yarn</sbt.project.name>
   </properties>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/spark/blob/afd757a2/yarn/stable/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml
index b6c8456..ceaf9f9 100644
--- a/yarn/stable/pom.xml
+++ b/yarn/stable/pom.xml
@@ -24,7 +24,7 @@
     <relativePath>../pom.xml</relativePath>
   </parent>
   <properties>
-    <sbt.project.name>yarn-stable</sbt.project.name>
+     <sbt.project.name>yarn-stable</sbt.project.name>
   </properties>
 
   <groupId>org.apache.spark</groupId>

Reply via email to