spark git commit: [SPARK-12106][STREAMING][FLAKY-TEST] BatchedWAL test transiently flaky when Jenkins load is high
Repository: spark Updated Branches: refs/heads/master 80a824d36 -> 6fd9e70e3 [SPARK-12106][STREAMING][FLAKY-TEST] BatchedWAL test transiently flaky when Jenkins load is high We need to make sure that the last entry is indeed the last entry in the queue. Author: Burak Yavuz Closes #10110 from brkyvz/batch-wal-test-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6fd9e70e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6fd9e70e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6fd9e70e Branch: refs/heads/master Commit: 6fd9e70e3ed43836a0685507fff9949f921234f4 Parents: 80a824d Author: Burak Yavuz Authored: Mon Dec 7 00:21:55 2015 -0800 Committer: Tathagata Das Committed: Mon Dec 7 00:21:55 2015 -0800 -- .../spark/streaming/util/BatchedWriteAheadLog.scala | 6 -- .../spark/streaming/util/WriteAheadLogSuite.scala | 14 ++ 2 files changed, 14 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6fd9e70e/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala index 7158abc..b2cd524 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala @@ -166,10 +166,12 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp var segment: WriteAheadLogRecordHandle = null if (buffer.length > 0) { logDebug(s"Batched ${buffer.length} records for Write Ahead Log write") +// threads may not be able to add items in order by time +val sortedByTime = buffer.sortBy(_.time) // We take the latest record for the timestamp. Please refer to the class Javadoc for // detailed explanation -val time = buffer.last.time -segment = wrappedLog.write(aggregate(buffer), time) +val time = sortedByTime.last.time +segment = wrappedLog.write(aggregate(sortedByTime), time) } buffer.foreach(_.promise.success(segment)) } catch { http://git-wip-us.apache.org/repos/asf/spark/blob/6fd9e70e/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index eaa88ea..ef1e89d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -480,7 +480,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( p } - test("BatchedWriteAheadLog - name log with aggregated entries with the timestamp of last entry") { + test("BatchedWriteAheadLog - name log with the highest timestamp of aggregated entries") { val blockingWal = new BlockingWriteAheadLog(wal, walHandle) val batchedWal = new BatchedWriteAheadLog(blockingWal, sparkConf) @@ -500,8 +500,14 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( // rest of the records will be batched while it takes time for 3 to get written writeAsync(batchedWal, event2, 5L) writeAsync(batchedWal, event3, 8L) -writeAsync(batchedWal, event4, 12L) -writeAsync(batchedWal, event5, 10L) +// we would like event 5 to be written before event 4 in order to test that they get +// sorted before being aggregated +writeAsync(batchedWal, event5, 12L) +eventually(timeout(1 second)) { + assert(blockingWal.isBlocked) + assert(batchedWal.invokePrivate(queueLength()) === 3) +} +writeAsync(batchedWal, event4, 10L) eventually(timeout(1 second)) { assert(walBatchingThreadPool.getActiveCount === 5) assert(batchedWal.invokePrivate(queueLength()) === 4) @@ -517,7 +523,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( // the file name should be the timestamp of the last record, as events should be naturally // in order of timestamp, and we need the last element. val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer]) - verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L)) + verify(wal, times(1)).write(bufferCaptor.capture(), meq(12L)) val records = BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToStri
spark git commit: [SPARK-12106][STREAMING][FLAKY-TEST] BatchedWAL test transiently flaky when Jenkins load is high
Repository: spark Updated Branches: refs/heads/branch-1.6 82a71aba0 -> c54b698ec [SPARK-12106][STREAMING][FLAKY-TEST] BatchedWAL test transiently flaky when Jenkins load is high We need to make sure that the last entry is indeed the last entry in the queue. Author: Burak Yavuz Closes #10110 from brkyvz/batch-wal-test-fix. (cherry picked from commit 6fd9e70e3ed43836a0685507fff9949f921234f4) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c54b698e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c54b698e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c54b698e Branch: refs/heads/branch-1.6 Commit: c54b698ecc284bce9b80c40ba46008bd6321c812 Parents: 82a71ab Author: Burak Yavuz Authored: Mon Dec 7 00:21:55 2015 -0800 Committer: Tathagata Das Committed: Mon Dec 7 00:22:06 2015 -0800 -- .../spark/streaming/util/BatchedWriteAheadLog.scala | 6 -- .../spark/streaming/util/WriteAheadLogSuite.scala | 14 ++ 2 files changed, 14 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c54b698e/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala index 6e6ed8d..862272b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala @@ -165,10 +165,12 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp var segment: WriteAheadLogRecordHandle = null if (buffer.length > 0) { logDebug(s"Batched ${buffer.length} records for Write Ahead Log write") +// threads may not be able to add items in order by time +val sortedByTime = buffer.sortBy(_.time) // We take the latest record for the timestamp. Please refer to the class Javadoc for // detailed explanation -val time = buffer.last.time -segment = wrappedLog.write(aggregate(buffer), time) +val time = sortedByTime.last.time +segment = wrappedLog.write(aggregate(sortedByTime), time) } buffer.foreach(_.promise.success(segment)) } catch { http://git-wip-us.apache.org/repos/asf/spark/blob/c54b698e/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index eaa88ea..ef1e89d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -480,7 +480,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( p } - test("BatchedWriteAheadLog - name log with aggregated entries with the timestamp of last entry") { + test("BatchedWriteAheadLog - name log with the highest timestamp of aggregated entries") { val blockingWal = new BlockingWriteAheadLog(wal, walHandle) val batchedWal = new BatchedWriteAheadLog(blockingWal, sparkConf) @@ -500,8 +500,14 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( // rest of the records will be batched while it takes time for 3 to get written writeAsync(batchedWal, event2, 5L) writeAsync(batchedWal, event3, 8L) -writeAsync(batchedWal, event4, 12L) -writeAsync(batchedWal, event5, 10L) +// we would like event 5 to be written before event 4 in order to test that they get +// sorted before being aggregated +writeAsync(batchedWal, event5, 12L) +eventually(timeout(1 second)) { + assert(blockingWal.isBlocked) + assert(batchedWal.invokePrivate(queueLength()) === 3) +} +writeAsync(batchedWal, event4, 10L) eventually(timeout(1 second)) { assert(walBatchingThreadPool.getActiveCount === 5) assert(batchedWal.invokePrivate(queueLength()) === 4) @@ -517,7 +523,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( // the file name should be the timestamp of the last record, as events should be naturally // in order of timestamp, and we need the last element. val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer]) - verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L)) + verify(wal, times(1)).write(bufferCaptor.capture(),
spark git commit: [SPARK-12032] [SQL] Re-order inner joins to do join with conditions first
Repository: spark Updated Branches: refs/heads/master 6fd9e70e3 -> 9cde7d5fa [SPARK-12032] [SQL] Re-order inner joins to do join with conditions first Currently, the order of joins is exactly the same as SQL query, some conditions may not pushed down to the correct join, then those join will become cross product and is extremely slow. This patch try to re-order the inner joins (which are common in SQL query), pick the joins that have self-contain conditions first, delay those that does not have conditions. After this patch, the TPCDS query Q64/65 can run hundreds times faster. cc marmbrus nongli Author: Davies Liu Closes #10073 from davies/reorder_joins. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9cde7d5f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9cde7d5f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9cde7d5f Branch: refs/heads/master Commit: 9cde7d5fa87e7ddfff0b9c1212920a1d9000539b Parents: 6fd9e70 Author: Davies Liu Authored: Mon Dec 7 10:34:18 2015 -0800 Committer: Davies Liu Committed: Mon Dec 7 10:34:18 2015 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 56 ++-- .../spark/sql/catalyst/planning/patterns.scala | 40 - .../sql/catalyst/optimizer/JoinOrderSuite.scala | 95 3 files changed, 185 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9cde7d5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 06d14fc..f608869 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -18,14 +18,12 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.immutable.HashSet + import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.catalyst.plans.FullOuter -import org.apache.spark.sql.catalyst.plans.LeftOuter -import org.apache.spark.sql.catalyst.plans.RightOuter -import org.apache.spark.sql.catalyst.plans.LeftSemi +import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins +import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types._ @@ -44,6 +42,7 @@ object DefaultOptimizer extends Optimizer { // Operator push down SetOperationPushDown, SamplePushDown, + ReorderJoin, PushPredicateThroughJoin, PushPredicateThroughProject, PushPredicateThroughGenerate, @@ -712,6 +711,53 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel } /** + * Reorder the joins and push all the conditions into join, so that the bottom ones have at least + * one condition. + * + * The order of joins will not be changed if all of them already have at least one condition. + */ +object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { + + /** +* Join a list of plans together and push down the conditions into them. +* +* The joined plan are picked from left to right, prefer those has at least one join condition. +* +* @param input a list of LogicalPlans to join. +* @param conditions a list of condition for join. +*/ + def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = { +assert(input.size >= 2) +if (input.size == 2) { + Join(input(0), input(1), Inner, conditions.reduceLeftOption(And)) +} else { + val left :: rest = input.toList + // find out the first join that have at least one join condition + val conditionalJoin = rest.find { plan => +val refs = left.outputSet ++ plan.outputSet +conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan)) + .exists(_.references.subsetOf(refs)) + } + // pick the next one if no condition left + val right = conditionalJoin.getOrElse(rest.head) + + val joinedRefs = left.outputSet ++ right.outputSet + val (joinConditions, others) = conditions.partition(_.references.subsetOf(joinedRefs)) + val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And)) +
[3/4] spark git commit: [SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases.
http://git-wip-us.apache.org/repos/asf/spark/blob/3f230f7b/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R deleted file mode 100644 index 6ef03ae..000 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ /dev/null @@ -1,1722 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(testthat) - -context("SparkSQL functions") - -# Utility function for easily checking the values of a StructField -checkStructField <- function(actual, expectedName, expectedType, expectedNullable) { - expect_equal(class(actual), "structField") - expect_equal(actual$name(), expectedName) - expect_equal(actual$dataType.toString(), expectedType) - expect_equal(actual$nullable(), expectedNullable) -} - -markUtf8 <- function(s) { - Encoding(s) <- "UTF-8" - s -} - -# Tests for SparkSQL functions in SparkR - -sc <- sparkR.init() - -sqlContext <- sparkRSQL.init(sc) - -mockLines <- c("{\"name\":\"Michael\"}", - "{\"name\":\"Andy\", \"age\":30}", - "{\"name\":\"Justin\", \"age\":19}") -jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") -parquetPath <- tempfile(pattern="sparkr-test", fileext=".parquet") -writeLines(mockLines, jsonPath) - -# For test nafunctions, like dropna(), fillna(),... -mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", - "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", - "{\"name\":\"David\",\"age\":60,\"height\":null}", - "{\"name\":\"Amy\",\"age\":null,\"height\":null}", - "{\"name\":null,\"age\":null,\"height\":null}") -jsonPathNa <- tempfile(pattern="sparkr-test", fileext=".tmp") -writeLines(mockLinesNa, jsonPathNa) - -# For test complex types in DataFrame -mockLinesComplexType <- - c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}", -"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}", -"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}") -complexTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") -writeLines(mockLinesComplexType, complexTypeJsonPath) - -test_that("infer types and check types", { - expect_equal(infer_type(1L), "integer") - expect_equal(infer_type(1.0), "double") - expect_equal(infer_type("abc"), "string") - expect_equal(infer_type(TRUE), "boolean") - expect_equal(infer_type(as.Date("2015-03-11")), "date") - expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp") - expect_equal(infer_type(c(1L, 2L)), "array") - expect_equal(infer_type(list(1L, 2L)), "array") - expect_equal(infer_type(listToStruct(list(a = 1L, b = "2"))), "struct") - e <- new.env() - assign("a", 1L, envir = e) - expect_equal(infer_type(e), "map") - - expect_error(checkType("map"), "Key type in a map must be string or character") - - expect_equal(infer_type(as.raw(c(1, 2, 3))), "binary") -}) - -test_that("structType and structField", { - testField <- structField("a", "string") - expect_is(testField, "structField") - expect_equal(testField$name(), "a") - expect_true(testField$nullable()) - - testSchema <- structType(testField, structField("b", "integer")) - expect_is(testSchema, "structType") - expect_is(testSchema$fields()[[2]], "structField") - expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType") -}) - -test_that("create DataFrame from RDD", { - rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) - df <- createDataFrame(sqlContext, rdd, list("a", "b")) - dfAsDF <- as.DataFrame(sqlContext, rdd, list("a", "b")) - expect_is(df, "DataFrame") - expect_is(dfAsDF, "DataFrame") - expect_equal(count(df), 10) - expect_equal(count(dfAsDF), 10) - expect_equal(nrow(df), 10) - expect_equal(nrow(dfAsDF), 10) - expect_equal(ncol(df), 2) - expect_equal(ncol(dfAsDF), 2) - expect_equal(dim(df), c(10, 2)) - expect_equal(dim(dfAsDF), c(10, 2)) - expect_equal(columns(df), c("a", "b")) - expect_equal(columns(dfAsDF), c("a", "b")) - expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) - expect_equal(dtypes(dfAsDF), list(c("a", "int"), c("b", "s
[4/4] spark git commit: [SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases.
[SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases. This PR: 1. Suppress all known warnings. 2. Cleanup test cases and fix some errors in test cases. 3. Fix errors in HiveContext related test cases. These test cases are actually not run previously due to a bug of creating TestHiveContext. 4. Support 'testthat' package version 0.11.0 which prefers that test cases be under 'tests/testthat' 5. Make sure the default Hadoop file system is local when running test cases. 6. Turn on warnings into errors. Author: Sun Rui Closes #10030 from sun-rui/SPARK-12034. (cherry picked from commit 39d677c8f1ee7ebd7e142bec0415cf8f90ac84b6) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f230f7b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f230f7b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f230f7b Branch: refs/heads/branch-1.6 Commit: 3f230f7b331cf6d67426cece570af3f1340f526e Parents: c54b698 Author: Sun Rui Authored: Mon Dec 7 10:38:17 2015 -0800 Committer: Shivaram Venkataraman Committed: Mon Dec 7 10:38:31 2015 -0800 -- R/pkg/inst/tests/jarTest.R | 32 - R/pkg/inst/tests/packageInAJarTest.R| 30 - R/pkg/inst/tests/test_Serde.R | 77 - R/pkg/inst/tests/test_binaryFile.R | 89 - R/pkg/inst/tests/test_binary_function.R | 101 - R/pkg/inst/tests/test_broadcast.R | 48 - R/pkg/inst/tests/test_client.R | 45 - R/pkg/inst/tests/test_context.R | 114 -- R/pkg/inst/tests/test_includeJAR.R | 37 - R/pkg/inst/tests/test_includePackage.R | 57 - R/pkg/inst/tests/test_mllib.R | 115 -- R/pkg/inst/tests/test_parallelize_collect.R | 109 -- R/pkg/inst/tests/test_rdd.R | 793 R/pkg/inst/tests/test_shuffle.R | 221 --- R/pkg/inst/tests/test_sparkSQL.R| 1722 - R/pkg/inst/tests/test_take.R| 66 - R/pkg/inst/tests/test_textFile.R| 161 -- R/pkg/inst/tests/test_utils.R | 140 -- R/pkg/inst/tests/testthat/jarTest.R | 32 + R/pkg/inst/tests/testthat/packageInAJarTest.R | 30 + R/pkg/inst/tests/testthat/test_Serde.R | 77 + R/pkg/inst/tests/testthat/test_binaryFile.R | 89 + .../inst/tests/testthat/test_binary_function.R | 101 + R/pkg/inst/tests/testthat/test_broadcast.R | 48 + R/pkg/inst/tests/testthat/test_client.R | 45 + R/pkg/inst/tests/testthat/test_context.R| 114 ++ R/pkg/inst/tests/testthat/test_includeJAR.R | 37 + R/pkg/inst/tests/testthat/test_includePackage.R | 57 + R/pkg/inst/tests/testthat/test_mllib.R | 115 ++ .../tests/testthat/test_parallelize_collect.R | 109 ++ R/pkg/inst/tests/testthat/test_rdd.R| 793 R/pkg/inst/tests/testthat/test_shuffle.R| 221 +++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 1730 ++ R/pkg/inst/tests/testthat/test_take.R | 66 + R/pkg/inst/tests/testthat/test_textFile.R | 161 ++ R/pkg/inst/tests/testthat/test_utils.R | 140 ++ R/pkg/tests/run-all.R |3 + R/run-tests.sh |2 +- 38 files changed, 3969 insertions(+), 3958 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3f230f7b/R/pkg/inst/tests/jarTest.R -- diff --git a/R/pkg/inst/tests/jarTest.R b/R/pkg/inst/tests/jarTest.R deleted file mode 100644 index d68bb20..000 --- a/R/pkg/inst/tests/jarTest.R +++ /dev/null @@ -1,32 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -library(SparkR) - -sc <- sparkR.init() - -helloTest <- SparkR:::callJStatic("sparkR.test.hello", - "helloWorld", - "Dave") - -basicFunction <- SparkR:::ca
[2/4] spark git commit: [SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases.
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_binaryFile.R -- diff --git a/R/pkg/inst/tests/testthat/test_binaryFile.R b/R/pkg/inst/tests/testthat/test_binaryFile.R new file mode 100644 index 000..f2452ed --- /dev/null +++ b/R/pkg/inst/tests/testthat/test_binaryFile.R @@ -0,0 +1,89 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("functions on binary files") + +# JavaSparkContext handle +sc <- sparkR.init() + +mockFile <- c("Spark is pretty.", "Spark is awesome.") + +test_that("saveAsObjectFile()/objectFile() following textFile() works", { + fileName1 <- tempfile(pattern="spark-test", fileext=".tmp") + fileName2 <- tempfile(pattern="spark-test", fileext=".tmp") + writeLines(mockFile, fileName1) + + rdd <- textFile(sc, fileName1, 1) + saveAsObjectFile(rdd, fileName2) + rdd <- objectFile(sc, fileName2) + expect_equal(collect(rdd), as.list(mockFile)) + + unlink(fileName1) + unlink(fileName2, recursive = TRUE) +}) + +test_that("saveAsObjectFile()/objectFile() works on a parallelized list", { + fileName <- tempfile(pattern="spark-test", fileext=".tmp") + + l <- list(1, 2, 3) + rdd <- parallelize(sc, l, 1) + saveAsObjectFile(rdd, fileName) + rdd <- objectFile(sc, fileName) + expect_equal(collect(rdd), l) + + unlink(fileName, recursive = TRUE) +}) + +test_that("saveAsObjectFile()/objectFile() following RDD transformations works", { + fileName1 <- tempfile(pattern="spark-test", fileext=".tmp") + fileName2 <- tempfile(pattern="spark-test", fileext=".tmp") + writeLines(mockFile, fileName1) + + rdd <- textFile(sc, fileName1) + + words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] }) + wordCount <- lapply(words, function(word) { list(word, 1L) }) + + counts <- reduceByKey(wordCount, "+", 2L) + + saveAsObjectFile(counts, fileName2) + counts <- objectFile(sc, fileName2) + + output <- collect(counts) + expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1), +list("is", 2)) + expect_equal(sortKeyValueList(output), sortKeyValueList(expected)) + + unlink(fileName1) + unlink(fileName2, recursive = TRUE) +}) + +test_that("saveAsObjectFile()/objectFile() works with multiple paths", { + fileName1 <- tempfile(pattern="spark-test", fileext=".tmp") + fileName2 <- tempfile(pattern="spark-test", fileext=".tmp") + + rdd1 <- parallelize(sc, "Spark is pretty.") + saveAsObjectFile(rdd1, fileName1) + rdd2 <- parallelize(sc, "Spark is awesome.") + saveAsObjectFile(rdd2, fileName2) + + rdd <- objectFile(sc, c(fileName1, fileName2)) + expect_equal(count(rdd), 2) + + unlink(fileName1, recursive = TRUE) + unlink(fileName2, recursive = TRUE) +}) http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_binary_function.R -- diff --git a/R/pkg/inst/tests/testthat/test_binary_function.R b/R/pkg/inst/tests/testthat/test_binary_function.R new file mode 100644 index 000..f054ac9 --- /dev/null +++ b/R/pkg/inst/tests/testthat/test_binary_function.R @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("binary functions") + +# JavaSparkContext handle +sc <- sparkR.init() + +# Data +nums <- 1:10 +rdd <- parallelize(sc, nums, 2L) + +# File content +mockFile <- c("Spark is pretty.", "Spark is awesome.") + +test_that("union on two RDDs"
[4/4] spark git commit: [SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases.
[SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases. This PR: 1. Suppress all known warnings. 2. Cleanup test cases and fix some errors in test cases. 3. Fix errors in HiveContext related test cases. These test cases are actually not run previously due to a bug of creating TestHiveContext. 4. Support 'testthat' package version 0.11.0 which prefers that test cases be under 'tests/testthat' 5. Make sure the default Hadoop file system is local when running test cases. 6. Turn on warnings into errors. Author: Sun Rui Closes #10030 from sun-rui/SPARK-12034. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39d677c8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39d677c8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39d677c8 Branch: refs/heads/master Commit: 39d677c8f1ee7ebd7e142bec0415cf8f90ac84b6 Parents: 9cde7d5 Author: Sun Rui Authored: Mon Dec 7 10:38:17 2015 -0800 Committer: Shivaram Venkataraman Committed: Mon Dec 7 10:38:17 2015 -0800 -- R/pkg/inst/tests/jarTest.R | 32 - R/pkg/inst/tests/packageInAJarTest.R| 30 - R/pkg/inst/tests/test_Serde.R | 77 - R/pkg/inst/tests/test_binaryFile.R | 89 - R/pkg/inst/tests/test_binary_function.R | 101 - R/pkg/inst/tests/test_broadcast.R | 48 - R/pkg/inst/tests/test_client.R | 45 - R/pkg/inst/tests/test_context.R | 114 -- R/pkg/inst/tests/test_includeJAR.R | 37 - R/pkg/inst/tests/test_includePackage.R | 57 - R/pkg/inst/tests/test_mllib.R | 115 -- R/pkg/inst/tests/test_parallelize_collect.R | 109 -- R/pkg/inst/tests/test_rdd.R | 793 R/pkg/inst/tests/test_shuffle.R | 221 --- R/pkg/inst/tests/test_sparkSQL.R| 1722 - R/pkg/inst/tests/test_take.R| 66 - R/pkg/inst/tests/test_textFile.R| 161 -- R/pkg/inst/tests/test_utils.R | 140 -- R/pkg/inst/tests/testthat/jarTest.R | 32 + R/pkg/inst/tests/testthat/packageInAJarTest.R | 30 + R/pkg/inst/tests/testthat/test_Serde.R | 77 + R/pkg/inst/tests/testthat/test_binaryFile.R | 89 + .../inst/tests/testthat/test_binary_function.R | 101 + R/pkg/inst/tests/testthat/test_broadcast.R | 48 + R/pkg/inst/tests/testthat/test_client.R | 45 + R/pkg/inst/tests/testthat/test_context.R| 114 ++ R/pkg/inst/tests/testthat/test_includeJAR.R | 37 + R/pkg/inst/tests/testthat/test_includePackage.R | 57 + R/pkg/inst/tests/testthat/test_mllib.R | 115 ++ .../tests/testthat/test_parallelize_collect.R | 109 ++ R/pkg/inst/tests/testthat/test_rdd.R| 793 R/pkg/inst/tests/testthat/test_shuffle.R| 221 +++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 1730 ++ R/pkg/inst/tests/testthat/test_take.R | 66 + R/pkg/inst/tests/testthat/test_textFile.R | 161 ++ R/pkg/inst/tests/testthat/test_utils.R | 140 ++ R/pkg/tests/run-all.R |3 + R/run-tests.sh |2 +- 38 files changed, 3969 insertions(+), 3958 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/jarTest.R -- diff --git a/R/pkg/inst/tests/jarTest.R b/R/pkg/inst/tests/jarTest.R deleted file mode 100644 index d68bb20..000 --- a/R/pkg/inst/tests/jarTest.R +++ /dev/null @@ -1,32 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -library(SparkR) - -sc <- sparkR.init() - -helloTest <- SparkR:::callJStatic("sparkR.test.hello", - "helloWorld", - "Dave") - -basicFunction <- SparkR:::callJStatic("sparkR.test.basicFunction", - "addStuff", -
[3/4] spark git commit: [SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases.
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R deleted file mode 100644 index 6ef03ae..000 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ /dev/null @@ -1,1722 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(testthat) - -context("SparkSQL functions") - -# Utility function for easily checking the values of a StructField -checkStructField <- function(actual, expectedName, expectedType, expectedNullable) { - expect_equal(class(actual), "structField") - expect_equal(actual$name(), expectedName) - expect_equal(actual$dataType.toString(), expectedType) - expect_equal(actual$nullable(), expectedNullable) -} - -markUtf8 <- function(s) { - Encoding(s) <- "UTF-8" - s -} - -# Tests for SparkSQL functions in SparkR - -sc <- sparkR.init() - -sqlContext <- sparkRSQL.init(sc) - -mockLines <- c("{\"name\":\"Michael\"}", - "{\"name\":\"Andy\", \"age\":30}", - "{\"name\":\"Justin\", \"age\":19}") -jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") -parquetPath <- tempfile(pattern="sparkr-test", fileext=".parquet") -writeLines(mockLines, jsonPath) - -# For test nafunctions, like dropna(), fillna(),... -mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", - "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", - "{\"name\":\"David\",\"age\":60,\"height\":null}", - "{\"name\":\"Amy\",\"age\":null,\"height\":null}", - "{\"name\":null,\"age\":null,\"height\":null}") -jsonPathNa <- tempfile(pattern="sparkr-test", fileext=".tmp") -writeLines(mockLinesNa, jsonPathNa) - -# For test complex types in DataFrame -mockLinesComplexType <- - c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}", -"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}", -"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}") -complexTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") -writeLines(mockLinesComplexType, complexTypeJsonPath) - -test_that("infer types and check types", { - expect_equal(infer_type(1L), "integer") - expect_equal(infer_type(1.0), "double") - expect_equal(infer_type("abc"), "string") - expect_equal(infer_type(TRUE), "boolean") - expect_equal(infer_type(as.Date("2015-03-11")), "date") - expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp") - expect_equal(infer_type(c(1L, 2L)), "array") - expect_equal(infer_type(list(1L, 2L)), "array") - expect_equal(infer_type(listToStruct(list(a = 1L, b = "2"))), "struct") - e <- new.env() - assign("a", 1L, envir = e) - expect_equal(infer_type(e), "map") - - expect_error(checkType("map"), "Key type in a map must be string or character") - - expect_equal(infer_type(as.raw(c(1, 2, 3))), "binary") -}) - -test_that("structType and structField", { - testField <- structField("a", "string") - expect_is(testField, "structField") - expect_equal(testField$name(), "a") - expect_true(testField$nullable()) - - testSchema <- structType(testField, structField("b", "integer")) - expect_is(testSchema, "structType") - expect_is(testSchema$fields()[[2]], "structField") - expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType") -}) - -test_that("create DataFrame from RDD", { - rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) - df <- createDataFrame(sqlContext, rdd, list("a", "b")) - dfAsDF <- as.DataFrame(sqlContext, rdd, list("a", "b")) - expect_is(df, "DataFrame") - expect_is(dfAsDF, "DataFrame") - expect_equal(count(df), 10) - expect_equal(count(dfAsDF), 10) - expect_equal(nrow(df), 10) - expect_equal(nrow(dfAsDF), 10) - expect_equal(ncol(df), 2) - expect_equal(ncol(dfAsDF), 2) - expect_equal(dim(df), c(10, 2)) - expect_equal(dim(dfAsDF), c(10, 2)) - expect_equal(columns(df), c("a", "b")) - expect_equal(columns(dfAsDF), c("a", "b")) - expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) - expect_equal(dtypes(dfAsDF), list(c("a", "int"), c("b", "s
[1/4] spark git commit: [SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases.
Repository: spark Updated Branches: refs/heads/branch-1.6 c54b698ec -> 3f230f7b3 http://git-wip-us.apache.org/repos/asf/spark/blob/3f230f7b/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R new file mode 100644 index 000..39fc94a --- /dev/null +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -0,0 +1,1730 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("SparkSQL functions") + +# Utility function for easily checking the values of a StructField +checkStructField <- function(actual, expectedName, expectedType, expectedNullable) { + expect_equal(class(actual), "structField") + expect_equal(actual$name(), expectedName) + expect_equal(actual$dataType.toString(), expectedType) + expect_equal(actual$nullable(), expectedNullable) +} + +markUtf8 <- function(s) { + Encoding(s) <- "UTF-8" + s +} + +# Tests for SparkSQL functions in SparkR + +sc <- sparkR.init() + +sqlContext <- sparkRSQL.init(sc) + +mockLines <- c("{\"name\":\"Michael\"}", + "{\"name\":\"Andy\", \"age\":30}", + "{\"name\":\"Justin\", \"age\":19}") +jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") +parquetPath <- tempfile(pattern="sparkr-test", fileext=".parquet") +writeLines(mockLines, jsonPath) + +# For test nafunctions, like dropna(), fillna(),... +mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", + "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", + "{\"name\":\"David\",\"age\":60,\"height\":null}", + "{\"name\":\"Amy\",\"age\":null,\"height\":null}", + "{\"name\":null,\"age\":null,\"height\":null}") +jsonPathNa <- tempfile(pattern="sparkr-test", fileext=".tmp") +writeLines(mockLinesNa, jsonPathNa) + +# For test complex types in DataFrame +mockLinesComplexType <- + c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}", +"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}", +"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}") +complexTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") +writeLines(mockLinesComplexType, complexTypeJsonPath) + +test_that("infer types and check types", { + expect_equal(infer_type(1L), "integer") + expect_equal(infer_type(1.0), "double") + expect_equal(infer_type("abc"), "string") + expect_equal(infer_type(TRUE), "boolean") + expect_equal(infer_type(as.Date("2015-03-11")), "date") + expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp") + expect_equal(infer_type(c(1L, 2L)), "array") + expect_equal(infer_type(list(1L, 2L)), "array") + expect_equal(infer_type(listToStruct(list(a = 1L, b = "2"))), "struct") + e <- new.env() + assign("a", 1L, envir = e) + expect_equal(infer_type(e), "map") + + expect_error(checkType("map"), "Key type in a map must be string or character") + + expect_equal(infer_type(as.raw(c(1, 2, 3))), "binary") +}) + +test_that("structType and structField", { + testField <- structField("a", "string") + expect_is(testField, "structField") + expect_equal(testField$name(), "a") + expect_true(testField$nullable()) + + testSchema <- structType(testField, structField("b", "integer")) + expect_is(testSchema, "structType") + expect_is(testSchema$fields()[[2]], "structField") + expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType") +}) + +test_that("create DataFrame from RDD", { + rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) + df <- createDataFrame(sqlContext, rdd, list("a", "b")) + dfAsDF <- as.DataFrame(sqlContext, rdd, list("a", "b")) + expect_is(df, "DataFrame") + expect_is(dfAsDF, "DataFrame") + expect_equal(count(df), 10) + expect_equal(count(dfAsDF), 10) + expect_equal(nrow(df), 10) + expect_equal(nrow(dfAsDF), 10) + expect_equal(ncol(df), 2) + expect_equal(ncol(dfAsDF), 2) + expect_equal(dim(df), c(10, 2)) + expect_equal(dim(dfAsDF), c(10, 2)) + expect_equal(columns(df), c("a", "b")) + expect_equal(columns(dfAsDF), c("a", "b")) + expect_
[1/4] spark git commit: [SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases.
Repository: spark Updated Branches: refs/heads/master 9cde7d5fa -> 39d677c8f http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R new file mode 100644 index 000..39fc94a --- /dev/null +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -0,0 +1,1730 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("SparkSQL functions") + +# Utility function for easily checking the values of a StructField +checkStructField <- function(actual, expectedName, expectedType, expectedNullable) { + expect_equal(class(actual), "structField") + expect_equal(actual$name(), expectedName) + expect_equal(actual$dataType.toString(), expectedType) + expect_equal(actual$nullable(), expectedNullable) +} + +markUtf8 <- function(s) { + Encoding(s) <- "UTF-8" + s +} + +# Tests for SparkSQL functions in SparkR + +sc <- sparkR.init() + +sqlContext <- sparkRSQL.init(sc) + +mockLines <- c("{\"name\":\"Michael\"}", + "{\"name\":\"Andy\", \"age\":30}", + "{\"name\":\"Justin\", \"age\":19}") +jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") +parquetPath <- tempfile(pattern="sparkr-test", fileext=".parquet") +writeLines(mockLines, jsonPath) + +# For test nafunctions, like dropna(), fillna(),... +mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", + "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", + "{\"name\":\"David\",\"age\":60,\"height\":null}", + "{\"name\":\"Amy\",\"age\":null,\"height\":null}", + "{\"name\":null,\"age\":null,\"height\":null}") +jsonPathNa <- tempfile(pattern="sparkr-test", fileext=".tmp") +writeLines(mockLinesNa, jsonPathNa) + +# For test complex types in DataFrame +mockLinesComplexType <- + c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}", +"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}", +"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}") +complexTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") +writeLines(mockLinesComplexType, complexTypeJsonPath) + +test_that("infer types and check types", { + expect_equal(infer_type(1L), "integer") + expect_equal(infer_type(1.0), "double") + expect_equal(infer_type("abc"), "string") + expect_equal(infer_type(TRUE), "boolean") + expect_equal(infer_type(as.Date("2015-03-11")), "date") + expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp") + expect_equal(infer_type(c(1L, 2L)), "array") + expect_equal(infer_type(list(1L, 2L)), "array") + expect_equal(infer_type(listToStruct(list(a = 1L, b = "2"))), "struct") + e <- new.env() + assign("a", 1L, envir = e) + expect_equal(infer_type(e), "map") + + expect_error(checkType("map"), "Key type in a map must be string or character") + + expect_equal(infer_type(as.raw(c(1, 2, 3))), "binary") +}) + +test_that("structType and structField", { + testField <- structField("a", "string") + expect_is(testField, "structField") + expect_equal(testField$name(), "a") + expect_true(testField$nullable()) + + testSchema <- structType(testField, structField("b", "integer")) + expect_is(testSchema, "structType") + expect_is(testSchema$fields()[[2]], "structField") + expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType") +}) + +test_that("create DataFrame from RDD", { + rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) + df <- createDataFrame(sqlContext, rdd, list("a", "b")) + dfAsDF <- as.DataFrame(sqlContext, rdd, list("a", "b")) + expect_is(df, "DataFrame") + expect_is(dfAsDF, "DataFrame") + expect_equal(count(df), 10) + expect_equal(count(dfAsDF), 10) + expect_equal(nrow(df), 10) + expect_equal(nrow(dfAsDF), 10) + expect_equal(ncol(df), 2) + expect_equal(ncol(dfAsDF), 2) + expect_equal(dim(df), c(10, 2)) + expect_equal(dim(dfAsDF), c(10, 2)) + expect_equal(columns(df), c("a", "b")) + expect_equal(columns(dfAsDF), c("a", "b")) + expect_equa
[2/4] spark git commit: [SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases.
http://git-wip-us.apache.org/repos/asf/spark/blob/3f230f7b/R/pkg/inst/tests/testthat/test_binaryFile.R -- diff --git a/R/pkg/inst/tests/testthat/test_binaryFile.R b/R/pkg/inst/tests/testthat/test_binaryFile.R new file mode 100644 index 000..f2452ed --- /dev/null +++ b/R/pkg/inst/tests/testthat/test_binaryFile.R @@ -0,0 +1,89 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("functions on binary files") + +# JavaSparkContext handle +sc <- sparkR.init() + +mockFile <- c("Spark is pretty.", "Spark is awesome.") + +test_that("saveAsObjectFile()/objectFile() following textFile() works", { + fileName1 <- tempfile(pattern="spark-test", fileext=".tmp") + fileName2 <- tempfile(pattern="spark-test", fileext=".tmp") + writeLines(mockFile, fileName1) + + rdd <- textFile(sc, fileName1, 1) + saveAsObjectFile(rdd, fileName2) + rdd <- objectFile(sc, fileName2) + expect_equal(collect(rdd), as.list(mockFile)) + + unlink(fileName1) + unlink(fileName2, recursive = TRUE) +}) + +test_that("saveAsObjectFile()/objectFile() works on a parallelized list", { + fileName <- tempfile(pattern="spark-test", fileext=".tmp") + + l <- list(1, 2, 3) + rdd <- parallelize(sc, l, 1) + saveAsObjectFile(rdd, fileName) + rdd <- objectFile(sc, fileName) + expect_equal(collect(rdd), l) + + unlink(fileName, recursive = TRUE) +}) + +test_that("saveAsObjectFile()/objectFile() following RDD transformations works", { + fileName1 <- tempfile(pattern="spark-test", fileext=".tmp") + fileName2 <- tempfile(pattern="spark-test", fileext=".tmp") + writeLines(mockFile, fileName1) + + rdd <- textFile(sc, fileName1) + + words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] }) + wordCount <- lapply(words, function(word) { list(word, 1L) }) + + counts <- reduceByKey(wordCount, "+", 2L) + + saveAsObjectFile(counts, fileName2) + counts <- objectFile(sc, fileName2) + + output <- collect(counts) + expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1), +list("is", 2)) + expect_equal(sortKeyValueList(output), sortKeyValueList(expected)) + + unlink(fileName1) + unlink(fileName2, recursive = TRUE) +}) + +test_that("saveAsObjectFile()/objectFile() works with multiple paths", { + fileName1 <- tempfile(pattern="spark-test", fileext=".tmp") + fileName2 <- tempfile(pattern="spark-test", fileext=".tmp") + + rdd1 <- parallelize(sc, "Spark is pretty.") + saveAsObjectFile(rdd1, fileName1) + rdd2 <- parallelize(sc, "Spark is awesome.") + saveAsObjectFile(rdd2, fileName2) + + rdd <- objectFile(sc, c(fileName1, fileName2)) + expect_equal(count(rdd), 2) + + unlink(fileName1, recursive = TRUE) + unlink(fileName2, recursive = TRUE) +}) http://git-wip-us.apache.org/repos/asf/spark/blob/3f230f7b/R/pkg/inst/tests/testthat/test_binary_function.R -- diff --git a/R/pkg/inst/tests/testthat/test_binary_function.R b/R/pkg/inst/tests/testthat/test_binary_function.R new file mode 100644 index 000..f054ac9 --- /dev/null +++ b/R/pkg/inst/tests/testthat/test_binary_function.R @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("binary functions") + +# JavaSparkContext handle +sc <- sparkR.init() + +# Data +nums <- 1:10 +rdd <- parallelize(sc, nums, 2L) + +# File content +mockFile <- c("Spark is pretty.", "Spark is awesome.") + +test_that("union on two RDDs"
spark git commit: [SPARK-12132] [PYSPARK] raise KeyboardInterrupt inside SIGINT handler
Repository: spark Updated Branches: refs/heads/master 39d677c8f -> ef3f047c0 [SPARK-12132] [PYSPARK] raise KeyboardInterrupt inside SIGINT handler Currently, the current line is not cleared by Cltr-C After this patch ``` >>> asdfasdf^C Traceback (most recent call last): File "~/spark/python/pyspark/context.py", line 225, in signal_handler raise KeyboardInterrupt() KeyboardInterrupt ``` It's still worse than 1.5 (and before). Author: Davies Liu Closes #10134 from davies/fix_cltrc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef3f047c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef3f047c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef3f047c Branch: refs/heads/master Commit: ef3f047c07ef0ac4a3a97e6bc11e1c28c6c8f9a0 Parents: 39d677c Author: Davies Liu Authored: Mon Dec 7 11:00:25 2015 -0800 Committer: Davies Liu Committed: Mon Dec 7 11:00:25 2015 -0800 -- python/pyspark/context.py | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ef3f047c/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 77710a1..529d16b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -222,6 +222,7 @@ class SparkContext(object): # create a signal handler which would be invoked on receiving SIGINT def signal_handler(signal, frame): self.cancelAllJobs() +raise KeyboardInterrupt() # see http://stackoverflow.com/questions/23206787/ if isinstance(threading.current_thread(), threading._MainThread): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12132] [PYSPARK] raise KeyboardInterrupt inside SIGINT handler
Repository: spark Updated Branches: refs/heads/branch-1.6 3f230f7b3 -> fed453821 [SPARK-12132] [PYSPARK] raise KeyboardInterrupt inside SIGINT handler Currently, the current line is not cleared by Cltr-C After this patch ``` >>> asdfasdf^C Traceback (most recent call last): File "~/spark/python/pyspark/context.py", line 225, in signal_handler raise KeyboardInterrupt() KeyboardInterrupt ``` It's still worse than 1.5 (and before). Author: Davies Liu Closes #10134 from davies/fix_cltrc. (cherry picked from commit ef3f047c07ef0ac4a3a97e6bc11e1c28c6c8f9a0) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fed45382 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fed45382 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fed45382 Branch: refs/heads/branch-1.6 Commit: fed453821d81470b9035d33e36fa6ef1df99c0de Parents: 3f230f7 Author: Davies Liu Authored: Mon Dec 7 11:00:25 2015 -0800 Committer: Davies Liu Committed: Mon Dec 7 11:00:34 2015 -0800 -- python/pyspark/context.py | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fed45382/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 77710a1..529d16b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -222,6 +222,7 @@ class SparkContext(object): # create a signal handler which would be invoked on receiving SIGINT def signal_handler(signal, frame): self.cancelAllJobs() +raise KeyboardInterrupt() # see http://stackoverflow.com/questions/23206787/ if isinstance(threading.current_thread(), threading._MainThread): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11932][STREAMING] Partition previous TrackStateRDD if partitioner not present
Repository: spark Updated Branches: refs/heads/master ef3f047c0 -> 5d80d8c6a [SPARK-11932][STREAMING] Partition previous TrackStateRDD if partitioner not present The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004). While #9983 solves SPARK-12004 by preserving the partitioner through RDD checkpoints, there may be a non-zero chance that the saving and recovery fails. To be resilient, this PR repartitions the previous state RDD if the partitioner is not detected. Author: Tathagata Das Closes #9988 from tdas/SPARK-11932. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d80d8c6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d80d8c6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d80d8c6 Branch: refs/heads/master Commit: 5d80d8c6a54b2113022eff31187e6d97521bd2cf Parents: ef3f047 Author: Tathagata Das Authored: Mon Dec 7 11:03:59 2015 -0800 Committer: Tathagata Das Committed: Mon Dec 7 11:03:59 2015 -0800 -- .../org/apache/spark/streaming/Checkpoint.scala | 2 +- .../streaming/dstream/TrackStateDStream.scala | 39 ++-- .../spark/streaming/rdd/TrackStateRDD.scala | 29 ++- .../spark/streaming/CheckpointSuite.scala | 189 ++- .../apache/spark/streaming/TestSuiteBase.scala | 6 + .../spark/streaming/TrackStateByKeySuite.scala | 77 ++-- 6 files changed, 258 insertions(+), 84 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5d80d8c6/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index fd0e8d5..d0046af 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -277,7 +277,7 @@ class CheckpointWriter( val bytes = Checkpoint.serialize(checkpoint, conf) executor.execute(new CheckpointWriteHandler( checkpoint.checkpointTime, bytes, clearCheckpointDataLater)) - logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") + logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") } catch { case rej: RejectedExecutionException => logError("Could not submit checkpoint task to the thread pool executor", rej) http://git-wip-us.apache.org/repos/asf/spark/blob/5d80d8c6/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala index 0ada111..ea62134 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala @@ -132,22 +132,37 @@ class InternalTrackStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassT /** Method that generates a RDD for the given time */ override def compute(validTime: Time): Option[RDD[TrackStateRDDRecord[K, S, E]]] = { // Get the previous state or create a new empty state RDD -val prevStateRDD = getOrCompute(validTime - slideDuration).getOrElse { - TrackStateRDD.createFromPairRDD[K, V, S, E]( -spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)), -partitioner, validTime - ) +val prevStateRDD = getOrCompute(validTime - slideDuration) match { + case Some(rdd) => +if (rdd.partitioner != Some(partitioner)) { + // If the RDD is not partitioned the right way, let us repartition it using the + // partition index as the key. This is to ensure that state RDD is always partitioned + // before creating another state RDD using it + TrackStateRDD.createFromRDD[K, V, S, E]( +rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime) +} else { + rdd +} + case None => +TrackStateRDD.createFromPairRDD[K, V, S, E]( + spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)), + partitioner, + validTime +) } + // Compute the new state RDD with pr
spark git commit: [SPARK-11932][STREAMING] Partition previous TrackStateRDD if partitioner not present
Repository: spark Updated Branches: refs/heads/branch-1.6 fed453821 -> 539914f1a [SPARK-11932][STREAMING] Partition previous TrackStateRDD if partitioner not present The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004). While #9983 solves SPARK-12004 by preserving the partitioner through RDD checkpoints, there may be a non-zero chance that the saving and recovery fails. To be resilient, this PR repartitions the previous state RDD if the partitioner is not detected. Author: Tathagata Das Closes #9988 from tdas/SPARK-11932. (cherry picked from commit 5d80d8c6a54b2113022eff31187e6d97521bd2cf) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/539914f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/539914f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/539914f1 Branch: refs/heads/branch-1.6 Commit: 539914f1a8d3a0f59e67c178f86e741927e7a658 Parents: fed4538 Author: Tathagata Das Authored: Mon Dec 7 11:03:59 2015 -0800 Committer: Tathagata Das Committed: Mon Dec 7 11:04:07 2015 -0800 -- .../org/apache/spark/streaming/Checkpoint.scala | 2 +- .../streaming/dstream/TrackStateDStream.scala | 39 ++-- .../spark/streaming/rdd/TrackStateRDD.scala | 29 ++- .../spark/streaming/CheckpointSuite.scala | 189 ++- .../apache/spark/streaming/TestSuiteBase.scala | 6 + .../spark/streaming/TrackStateByKeySuite.scala | 77 ++-- 6 files changed, 258 insertions(+), 84 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/539914f1/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index fd0e8d5..d0046af 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -277,7 +277,7 @@ class CheckpointWriter( val bytes = Checkpoint.serialize(checkpoint, conf) executor.execute(new CheckpointWriteHandler( checkpoint.checkpointTime, bytes, clearCheckpointDataLater)) - logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") + logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") } catch { case rej: RejectedExecutionException => logError("Could not submit checkpoint task to the thread pool executor", rej) http://git-wip-us.apache.org/repos/asf/spark/blob/539914f1/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala index 0ada111..ea62134 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala @@ -132,22 +132,37 @@ class InternalTrackStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassT /** Method that generates a RDD for the given time */ override def compute(validTime: Time): Option[RDD[TrackStateRDDRecord[K, S, E]]] = { // Get the previous state or create a new empty state RDD -val prevStateRDD = getOrCompute(validTime - slideDuration).getOrElse { - TrackStateRDD.createFromPairRDD[K, V, S, E]( -spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)), -partitioner, validTime - ) +val prevStateRDD = getOrCompute(validTime - slideDuration) match { + case Some(rdd) => +if (rdd.partitioner != Some(partitioner)) { + // If the RDD is not partitioned the right way, let us repartition it using the + // partition index as the key. This is to ensure that state RDD is always partitioned + // before creating another state RDD using it + TrackStateRDD.createFromRDD[K, V, S, E]( +rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime) +} else { + rdd +} + case None => +TrackStateRDD.createFromPairRDD[K, V, S, E]( + spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
spark git commit: [SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize
Repository: spark Updated Branches: refs/heads/master 5d80d8c6a -> 3f4efb5c2 [SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize Merged #10051 again since #10083 is resolved. This reverts commit 328b757d5d4486ea3c2e246780792d7a57ee85e5. Author: Shixiong Zhu Closes #10167 from zsxwing/merge-SPARK-12060. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f4efb5c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f4efb5c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f4efb5c Branch: refs/heads/master Commit: 3f4efb5c23b029496b112760fa062ff070c20334 Parents: 5d80d8c Author: Shixiong Zhu Authored: Mon Dec 7 12:01:09 2015 -0800 Committer: Shixiong Zhu Committed: Mon Dec 7 12:01:09 2015 -0800 -- .../spark/serializer/JavaSerializer.scala | 7 ++--- .../spark/util/ByteBufferOutputStream.scala | 31 2 files changed, 34 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3f4efb5c/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala -- diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index b463a71..ea718a0 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -24,8 +24,7 @@ import scala.reflect.ClassTag import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.ByteBufferInputStream -import org.apache.spark.util.Utils +import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} private[spark] class JavaSerializationStream( out: OutputStream, counterReset: Int, extraDebugInfo: Boolean) @@ -96,11 +95,11 @@ private[spark] class JavaSerializerInstance( extends SerializerInstance { override def serialize[T: ClassTag](t: T): ByteBuffer = { -val bos = new ByteArrayOutputStream() +val bos = new ByteBufferOutputStream() val out = serializeStream(bos) out.writeObject(t) out.close() -ByteBuffer.wrap(bos.toByteArray) +bos.toByteBuffer } override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { http://git-wip-us.apache.org/repos/asf/spark/blob/3f4efb5c/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala new file mode 100644 index 000..92e4522 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer + +/** + * Provide a zero-copy way to convert data in ByteArrayOutputStream to ByteBuffer + */ +private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream { + + def toByteBuffer: ByteBuffer = { +return ByteBuffer.wrap(buf, 0, count) + } +} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and AppClient (backport 1.5)
Repository: spark Updated Branches: refs/heads/branch-1.5 93a0510a5 -> 3868ab644 [SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and AppClient (backport 1.5) backport #10108 to branch 1.5 Author: Shixiong Zhu Closes #10135 from zsxwing/fix-threadpool-1.5. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3868ab64 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3868ab64 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3868ab64 Branch: refs/heads/branch-1.5 Commit: 3868ab644cc87ce68ba1605f6da65c5e951ce412 Parents: 93a0510 Author: Shixiong Zhu Authored: Mon Dec 7 12:04:18 2015 -0800 Committer: Shixiong Zhu Committed: Mon Dec 7 12:04:18 2015 -0800 -- .../apache/spark/deploy/client/AppClient.scala | 10 -- .../org/apache/spark/deploy/worker/Worker.scala | 10 -- .../apache/spark/deploy/yarn/YarnAllocator.scala | 19 +-- 3 files changed, 13 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 25ea692..bd28429 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -66,12 +66,10 @@ private[spark] class AppClient( // A thread pool for registering with masters. Because registering with a master is a blocking // action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same // time so that we can register with all masters. -private val registerMasterThreadPool = new ThreadPoolExecutor( - 0, - masterRpcAddresses.size, // Make sure we can register with all masters at the same time - 60L, TimeUnit.SECONDS, - new SynchronousQueue[Runnable](), - ThreadUtils.namedThreadFactory("appclient-register-master-threadpool")) +private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool( + "appclient-register-master-threadpool", + masterRpcAddresses.length // Make sure we can register with all masters at the same time +) // A scheduled executor for scheduling the registration actions private val registrationRetryThread = http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 79b1536..a898bb1 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -147,12 +147,10 @@ private[deploy] class Worker( // A thread pool for registering with masters. Because registering with a master is a blocking // action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same // time so that we can register with all masters. - private val registerMasterThreadPool = new ThreadPoolExecutor( -0, -masterRpcAddresses.size, // Make sure we can register with all masters at the same time -60L, TimeUnit.SECONDS, -new SynchronousQueue[Runnable](), -ThreadUtils.namedThreadFactory("worker-register-master-threadpool")) + private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool( +"worker-register-master-threadpool", +masterRpcAddresses.size // Make sure we can register with all masters at the same time + ) var coresUsed = 0 var memoryUsed = 0 http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 6a02848..52a3fd9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -21,26 +21,21 @@ import java.util.Collections import java.util.concurrent._ import java.util.regex.Pattern -import org.apache.spark.util.Utils - import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import com.google.common.util.concurrent.ThreadFactoryBuilder - import org.apache.hadoop.conf.Configuration import org.apache.
spark git commit: [SPARK-11963][DOC] Add docs for QuantileDiscretizer
Repository: spark Updated Branches: refs/heads/master 3f4efb5c2 -> 871e85d9c [SPARK-11963][DOC] Add docs for QuantileDiscretizer https://issues.apache.org/jira/browse/SPARK-11963 Author: Xusen Yin Closes #9962 from yinxusen/SPARK-11963. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/871e85d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/871e85d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/871e85d9 Branch: refs/heads/master Commit: 871e85d9c14c6b19068cc732951a8ae8db61b411 Parents: 3f4efb5 Author: Xusen Yin Authored: Mon Dec 7 13:16:47 2015 -0800 Committer: Joseph K. Bradley Committed: Mon Dec 7 13:16:47 2015 -0800 -- docs/ml-features.md | 65 ++ .../ml/JavaQuantileDiscretizerExample.java | 71 .../ml/QuantileDiscretizerExample.scala | 49 ++ 3 files changed, 185 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/871e85d9/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index 05c2c96..b499d6e 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -1705,6 +1705,71 @@ print(output.select("features", "clicked").first()) +## QuantileDiscretizer + +`QuantileDiscretizer` takes a column with continuous features and outputs a column with binned +categorical features. +The bin ranges are chosen by taking a sample of the data and dividing it into roughly equal parts. +The lower and upper bin bounds will be `-Infinity` and `+Infinity`, covering all real values. +This attempts to find `numBuckets` partitions based on a sample of the given input data, but it may +find fewer depending on the data sample values. + +Note that the result may be different every time you run it, since the sample strategy behind it is +non-deterministic. + +**Examples** + +Assume that we have a DataFrame with the columns `id`, `hour`: + +~~~ + id | hour +|-- + 0 | 18.0 +|-- + 1 | 19.0 +|-- + 2 | 8.0 +|-- + 3 | 5.0 +|-- + 4 | 2.2 +~~~ + +`hour` is a continuous feature with `Double` type. We want to turn the continuous feature into +categorical one. Given `numBuckets = 3`, we should get the following DataFrame: + +~~~ + id | hour | result +|--|-- + 0 | 18.0 | 2.0 +|--|-- + 1 | 19.0 | 2.0 +|--|-- + 2 | 8.0 | 1.0 +|--|-- + 3 | 5.0 | 1.0 +|--|-- + 4 | 2.2 | 0.0 +~~~ + + + + +Refer to the [QuantileDiscretizer Scala docs](api/scala/index.html#org.apache.spark.ml.feature.QuantileDiscretizer) +for more details on the API. + +{% include_example scala/org/apache/spark/examples/ml/QuantileDiscretizerExample.scala %} + + + + +Refer to the [QuantileDiscretizer Java docs](api/java/org/apache/spark/ml/feature/QuantileDiscretizer.html) +for more details on the API. + +{% include_example java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java %} + + + # Feature Selectors ## VectorSlicer http://git-wip-us.apache.org/repos/asf/spark/blob/871e85d9/examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java new file mode 100644 index 000..251ae79 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +// $example on$ +import java.util.Arrays; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.ml.feature.QuantileDiscretizer;
spark git commit: [SPARK-11963][DOC] Add docs for QuantileDiscretizer
Repository: spark Updated Branches: refs/heads/branch-1.6 539914f1a -> c8aa5f201 [SPARK-11963][DOC] Add docs for QuantileDiscretizer https://issues.apache.org/jira/browse/SPARK-11963 Author: Xusen Yin Closes #9962 from yinxusen/SPARK-11963. (cherry picked from commit 871e85d9c14c6b19068cc732951a8ae8db61b411) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8aa5f20 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8aa5f20 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8aa5f20 Branch: refs/heads/branch-1.6 Commit: c8aa5f2011cf30a360d5206ee45202c4b1d61e21 Parents: 539914f Author: Xusen Yin Authored: Mon Dec 7 13:16:47 2015 -0800 Committer: Joseph K. Bradley Committed: Mon Dec 7 13:17:00 2015 -0800 -- docs/ml-features.md | 65 ++ .../ml/JavaQuantileDiscretizerExample.java | 71 .../ml/QuantileDiscretizerExample.scala | 49 ++ 3 files changed, 185 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c8aa5f20/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index 05c2c96..b499d6e 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -1705,6 +1705,71 @@ print(output.select("features", "clicked").first()) +## QuantileDiscretizer + +`QuantileDiscretizer` takes a column with continuous features and outputs a column with binned +categorical features. +The bin ranges are chosen by taking a sample of the data and dividing it into roughly equal parts. +The lower and upper bin bounds will be `-Infinity` and `+Infinity`, covering all real values. +This attempts to find `numBuckets` partitions based on a sample of the given input data, but it may +find fewer depending on the data sample values. + +Note that the result may be different every time you run it, since the sample strategy behind it is +non-deterministic. + +**Examples** + +Assume that we have a DataFrame with the columns `id`, `hour`: + +~~~ + id | hour +|-- + 0 | 18.0 +|-- + 1 | 19.0 +|-- + 2 | 8.0 +|-- + 3 | 5.0 +|-- + 4 | 2.2 +~~~ + +`hour` is a continuous feature with `Double` type. We want to turn the continuous feature into +categorical one. Given `numBuckets = 3`, we should get the following DataFrame: + +~~~ + id | hour | result +|--|-- + 0 | 18.0 | 2.0 +|--|-- + 1 | 19.0 | 2.0 +|--|-- + 2 | 8.0 | 1.0 +|--|-- + 3 | 5.0 | 1.0 +|--|-- + 4 | 2.2 | 0.0 +~~~ + + + + +Refer to the [QuantileDiscretizer Scala docs](api/scala/index.html#org.apache.spark.ml.feature.QuantileDiscretizer) +for more details on the API. + +{% include_example scala/org/apache/spark/examples/ml/QuantileDiscretizerExample.scala %} + + + + +Refer to the [QuantileDiscretizer Java docs](api/java/org/apache/spark/ml/feature/QuantileDiscretizer.html) +for more details on the API. + +{% include_example java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java %} + + + # Feature Selectors ## VectorSlicer http://git-wip-us.apache.org/repos/asf/spark/blob/c8aa5f20/examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java new file mode 100644 index 000..251ae79 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +// $example on$ +import java.ut
spark git commit: [SPARK-11884] Drop multiple columns in the DataFrame API
Repository: spark Updated Branches: refs/heads/master 871e85d9c -> 84b809445 [SPARK-11884] Drop multiple columns in the DataFrame API See the thread Ben started: http://search-hadoop.com/m/q3RTtveEuhjsr7g/ This PR adds drop() method to DataFrame which accepts multiple column names Author: tedyu Closes #9862 from ted-yu/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84b80944 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84b80944 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84b80944 Branch: refs/heads/master Commit: 84b809445f39b9030f272528bdaa39d1559cbc6e Parents: 871e85d Author: tedyu Authored: Mon Dec 7 14:58:09 2015 -0800 Committer: Michael Armbrust Committed: Mon Dec 7 14:58:09 2015 -0800 -- .../scala/org/apache/spark/sql/DataFrame.scala | 24 +--- .../org/apache/spark/sql/DataFrameSuite.scala | 7 ++ 2 files changed, 23 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/84b80944/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index eb87003..243a8c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1261,16 +1261,24 @@ class DataFrame private[sql]( * @since 1.4.0 */ def drop(colName: String): DataFrame = { +drop(Seq(colName) : _*) + } + + /** + * Returns a new [[DataFrame]] with columns dropped. + * This is a no-op if schema doesn't contain column name(s). + * @group dfops + * @since 1.6.0 + */ + @scala.annotation.varargs + def drop(colNames: String*): DataFrame = { val resolver = sqlContext.analyzer.resolver -val shouldDrop = schema.exists(f => resolver(f.name, colName)) -if (shouldDrop) { - val colsAfterDrop = schema.filter { field => -val name = field.name -!resolver(name, colName) - }.map(f => Column(f.name)) - select(colsAfterDrop : _*) -} else { +val remainingCols = + schema.filter(f => colNames.forall(n => !resolver(f.name, n))).map(f => Column(f.name)) +if (remainingCols.size == this.schema.size) { this +} else { + this.select(remainingCols: _*) } } http://git-wip-us.apache.org/repos/asf/spark/blob/84b80944/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 76e9648..605a654 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -378,6 +378,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.schema.map(_.name) === Seq("value")) } + test("drop columns using drop") { +val src = Seq((0, 2, 3)).toDF("a", "b", "c") +val df = src.drop("a", "b") +checkAnswer(df, Row(3)) +assert(df.schema.map(_.name) === Seq("c")) + } + test("drop unknown column (no-op)") { val df = testData.drop("random") checkAnswer( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12184][PYTHON] Make python api doc for pivot consistant with scala doc
Repository: spark Updated Branches: refs/heads/master 84b809445 -> 36282f78b [SPARK-12184][PYTHON] Make python api doc for pivot consistant with scala doc In SPARK-11946 the API for pivot was changed a bit and got updated doc, the doc changes were not made for the python api though. This PR updates the python doc to be consistent. Author: Andrew Ray Closes #10176 from aray/sql-pivot-python-doc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36282f78 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36282f78 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36282f78 Branch: refs/heads/master Commit: 36282f78b888743066843727426c6d806231aa97 Parents: 84b8094 Author: Andrew Ray Authored: Mon Dec 7 15:01:00 2015 -0800 Committer: Yin Huai Committed: Mon Dec 7 15:01:00 2015 -0800 -- python/pyspark/sql/group.py | 14 +- 1 file changed, 9 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/36282f78/python/pyspark/sql/group.py -- diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index 1911588..9ca303a 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -169,16 +169,20 @@ class GroupedData(object): @since(1.6) def pivot(self, pivot_col, values=None): -"""Pivots a column of the current DataFrame and perform the specified aggregation. +""" +Pivots a column of the current [[DataFrame]] and perform the specified aggregation. +There are two versions of pivot function: one that requires the caller to specify the list +of distinct values to pivot on, and one that does not. The latter is more concise but less +efficient, because Spark needs to first compute the list of distinct values internally. -:param pivot_col: Column to pivot -:param values: Optional list of values of pivot column that will be translated to columns in -the output DataFrame. If values are not provided the method will do an immediate call -to .distinct() on the pivot column. +:param pivot_col: Name of the column to pivot. +:param values: List of values that will be translated to columns in the output DataFrame. +// Compute the sum of earnings for each year by course with each course as a separate column >>> df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect() [Row(year=2012, dotNET=15000, Java=2), Row(year=2013, dotNET=48000, Java=3)] +// Or without specifying column values (less efficient) >>> df4.groupBy("year").pivot("course").sum("earnings").collect() [Row(year=2012, Java=2, dotNET=15000), Row(year=2013, Java=3, dotNET=48000)] """ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12184][PYTHON] Make python api doc for pivot consistant with scala doc
Repository: spark Updated Branches: refs/heads/branch-1.6 c8aa5f201 -> cdeb89b34 [SPARK-12184][PYTHON] Make python api doc for pivot consistant with scala doc In SPARK-11946 the API for pivot was changed a bit and got updated doc, the doc changes were not made for the python api though. This PR updates the python doc to be consistent. Author: Andrew Ray Closes #10176 from aray/sql-pivot-python-doc. (cherry picked from commit 36282f78b888743066843727426c6d806231aa97) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cdeb89b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cdeb89b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cdeb89b3 Branch: refs/heads/branch-1.6 Commit: cdeb89b34614fb39062976c4796d187992333c88 Parents: c8aa5f2 Author: Andrew Ray Authored: Mon Dec 7 15:01:00 2015 -0800 Committer: Yin Huai Committed: Mon Dec 7 15:01:10 2015 -0800 -- python/pyspark/sql/group.py | 14 +- 1 file changed, 9 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cdeb89b3/python/pyspark/sql/group.py -- diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index 1911588..9ca303a 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -169,16 +169,20 @@ class GroupedData(object): @since(1.6) def pivot(self, pivot_col, values=None): -"""Pivots a column of the current DataFrame and perform the specified aggregation. +""" +Pivots a column of the current [[DataFrame]] and perform the specified aggregation. +There are two versions of pivot function: one that requires the caller to specify the list +of distinct values to pivot on, and one that does not. The latter is more concise but less +efficient, because Spark needs to first compute the list of distinct values internally. -:param pivot_col: Column to pivot -:param values: Optional list of values of pivot column that will be translated to columns in -the output DataFrame. If values are not provided the method will do an immediate call -to .distinct() on the pivot column. +:param pivot_col: Name of the column to pivot. +:param values: List of values that will be translated to columns in the output DataFrame. +// Compute the sum of earnings for each year by course with each course as a separate column >>> df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect() [Row(year=2012, dotNET=15000, Java=2), Row(year=2013, dotNET=48000, Java=3)] +// Or without specifying column values (less efficient) >>> df4.groupBy("year").pivot("course").sum("earnings").collect() [Row(year=2012, Java=2, dotNET=15000), Row(year=2013, Java=3, dotNET=48000)] """ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12160][MLLIB] Use SQLContext.getOrCreate in MLlib
Repository: spark Updated Branches: refs/heads/branch-1.6 cdeb89b34 -> 115bfbdae [SPARK-12160][MLLIB] Use SQLContext.getOrCreate in MLlib Switched from using SQLContext constructor to using getOrCreate, mainly in model save/load methods. This covers all instances in spark.mllib. There were no uses of the constructor in spark.ml. CC: mengxr yhuai Author: Joseph K. Bradley Closes #10161 from jkbradley/mllib-sqlcontext-fix. (cherry picked from commit 3e7e05f5ee763925ed60410d7de04cf36b723de1) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/115bfbda Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/115bfbda Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/115bfbda Branch: refs/heads/branch-1.6 Commit: 115bfbdae82b1c2804ea501ffd420d0aa17aac45 Parents: cdeb89b Author: Joseph K. Bradley Authored: Mon Dec 7 16:37:09 2015 -0800 Committer: Xiangrui Meng Committed: Mon Dec 7 16:37:16 2015 -0800 -- .../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 6 +++--- .../org/apache/spark/mllib/classification/NaiveBayes.scala | 8 .../mllib/classification/impl/GLMClassificationModel.scala | 4 ++-- .../apache/spark/mllib/clustering/GaussianMixtureModel.scala | 4 ++-- .../org/apache/spark/mllib/clustering/KMeansModel.scala | 4 ++-- .../spark/mllib/clustering/PowerIterationClustering.scala| 4 ++-- .../scala/org/apache/spark/mllib/feature/ChiSqSelector.scala | 4 ++-- .../main/scala/org/apache/spark/mllib/feature/Word2Vec.scala | 4 ++-- .../mllib/recommendation/MatrixFactorizationModel.scala | 4 ++-- .../apache/spark/mllib/regression/IsotonicRegression.scala | 4 ++-- .../spark/mllib/regression/impl/GLMRegressionModel.scala | 4 ++-- .../apache/spark/mllib/tree/model/DecisionTreeModel.scala| 4 ++-- .../apache/spark/mllib/tree/model/treeEnsembleModels.scala | 4 ++-- 13 files changed, 29 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/115bfbda/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 54b03a9..2aa6aec 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1191,7 +1191,7 @@ private[python] class PythonMLLibAPI extends Serializable { def getIndexedRows(indexedRowMatrix: IndexedRowMatrix): DataFrame = { // We use DataFrames for serialization of IndexedRows to Python, // so return a DataFrame. -val sqlContext = new SQLContext(indexedRowMatrix.rows.sparkContext) +val sqlContext = SQLContext.getOrCreate(indexedRowMatrix.rows.sparkContext) sqlContext.createDataFrame(indexedRowMatrix.rows) } @@ -1201,7 +1201,7 @@ private[python] class PythonMLLibAPI extends Serializable { def getMatrixEntries(coordinateMatrix: CoordinateMatrix): DataFrame = { // We use DataFrames for serialization of MatrixEntry entries to // Python, so return a DataFrame. -val sqlContext = new SQLContext(coordinateMatrix.entries.sparkContext) +val sqlContext = SQLContext.getOrCreate(coordinateMatrix.entries.sparkContext) sqlContext.createDataFrame(coordinateMatrix.entries) } @@ -1211,7 +1211,7 @@ private[python] class PythonMLLibAPI extends Serializable { def getMatrixBlocks(blockMatrix: BlockMatrix): DataFrame = { // We use DataFrames for serialization of sub-matrix blocks to // Python, so return a DataFrame. -val sqlContext = new SQLContext(blockMatrix.blocks.sparkContext) +val sqlContext = SQLContext.getOrCreate(blockMatrix.blocks.sparkContext) sqlContext.createDataFrame(blockMatrix.blocks) } } http://git-wip-us.apache.org/repos/asf/spark/blob/115bfbda/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index a956084..aef9ef2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -192,7 +192,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { modelType: String) def save(sc: SparkContext, path: String, data: Data): Unit = { - val sqlContext = new SQLContext(sc) + val sqlContext = SQLContext.getOrCreate(sc)
spark git commit: [SPARK-12160][MLLIB] Use SQLContext.getOrCreate in MLlib
Repository: spark Updated Branches: refs/heads/master 36282f78b -> 3e7e05f5e [SPARK-12160][MLLIB] Use SQLContext.getOrCreate in MLlib Switched from using SQLContext constructor to using getOrCreate, mainly in model save/load methods. This covers all instances in spark.mllib. There were no uses of the constructor in spark.ml. CC: mengxr yhuai Author: Joseph K. Bradley Closes #10161 from jkbradley/mllib-sqlcontext-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e7e05f5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e7e05f5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e7e05f5 Branch: refs/heads/master Commit: 3e7e05f5ee763925ed60410d7de04cf36b723de1 Parents: 36282f7 Author: Joseph K. Bradley Authored: Mon Dec 7 16:37:09 2015 -0800 Committer: Xiangrui Meng Committed: Mon Dec 7 16:37:09 2015 -0800 -- .../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 6 +++--- .../org/apache/spark/mllib/classification/NaiveBayes.scala | 8 .../mllib/classification/impl/GLMClassificationModel.scala | 4 ++-- .../apache/spark/mllib/clustering/GaussianMixtureModel.scala | 4 ++-- .../org/apache/spark/mllib/clustering/KMeansModel.scala | 4 ++-- .../spark/mllib/clustering/PowerIterationClustering.scala| 4 ++-- .../scala/org/apache/spark/mllib/feature/ChiSqSelector.scala | 4 ++-- .../main/scala/org/apache/spark/mllib/feature/Word2Vec.scala | 4 ++-- .../mllib/recommendation/MatrixFactorizationModel.scala | 4 ++-- .../apache/spark/mllib/regression/IsotonicRegression.scala | 4 ++-- .../spark/mllib/regression/impl/GLMRegressionModel.scala | 4 ++-- .../apache/spark/mllib/tree/model/DecisionTreeModel.scala| 4 ++-- .../apache/spark/mllib/tree/model/treeEnsembleModels.scala | 4 ++-- 13 files changed, 29 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3e7e05f5/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 54b03a9..2aa6aec 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1191,7 +1191,7 @@ private[python] class PythonMLLibAPI extends Serializable { def getIndexedRows(indexedRowMatrix: IndexedRowMatrix): DataFrame = { // We use DataFrames for serialization of IndexedRows to Python, // so return a DataFrame. -val sqlContext = new SQLContext(indexedRowMatrix.rows.sparkContext) +val sqlContext = SQLContext.getOrCreate(indexedRowMatrix.rows.sparkContext) sqlContext.createDataFrame(indexedRowMatrix.rows) } @@ -1201,7 +1201,7 @@ private[python] class PythonMLLibAPI extends Serializable { def getMatrixEntries(coordinateMatrix: CoordinateMatrix): DataFrame = { // We use DataFrames for serialization of MatrixEntry entries to // Python, so return a DataFrame. -val sqlContext = new SQLContext(coordinateMatrix.entries.sparkContext) +val sqlContext = SQLContext.getOrCreate(coordinateMatrix.entries.sparkContext) sqlContext.createDataFrame(coordinateMatrix.entries) } @@ -1211,7 +1211,7 @@ private[python] class PythonMLLibAPI extends Serializable { def getMatrixBlocks(blockMatrix: BlockMatrix): DataFrame = { // We use DataFrames for serialization of sub-matrix blocks to // Python, so return a DataFrame. -val sqlContext = new SQLContext(blockMatrix.blocks.sparkContext) +val sqlContext = SQLContext.getOrCreate(blockMatrix.blocks.sparkContext) sqlContext.createDataFrame(blockMatrix.blocks) } } http://git-wip-us.apache.org/repos/asf/spark/blob/3e7e05f5/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index a956084..aef9ef2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -192,7 +192,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { modelType: String) def save(sc: SparkContext, path: String, data: Data): Unit = { - val sqlContext = new SQLContext(sc) + val sqlContext = SQLContext.getOrCreate(sc) import sqlContext.implicits._ // Create JSON metadata. @@ -208,7 +208,7 @@ object NaiveBayesMod
[1/3] spark git commit: [SPARK-11551][DOC][EXAMPLE] Replace example code in ml-features.md using include_example
Repository: spark Updated Branches: refs/heads/master 3e7e05f5e -> 78209b0cc http://git-wip-us.apache.org/repos/asf/spark/blob/78209b0c/examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala new file mode 100644 index 000..1be8a5f --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.ml + +// $example on$ +import org.apache.spark.ml.feature.StringIndexer +// $example off$ +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} + +object StringIndexerExample { + def main(args: Array[String]): Unit = { +val conf = new SparkConf().setAppName("StringIndexerExample") +val sc = new SparkContext(conf) +val sqlContext = new SQLContext(sc) + +// $example on$ +val df = sqlContext.createDataFrame( + Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")) +).toDF("id", "category") + +val indexer = new StringIndexer() + .setInputCol("category") + .setOutputCol("categoryIndex") + +val indexed = indexer.fit(df).transform(df) +indexed.show() +// $example off$ +sc.stop() + } +} +// scalastyle:on println + http://git-wip-us.apache.org/repos/asf/spark/blob/78209b0c/examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala new file mode 100644 index 000..01e0d13 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.ml + +// $example on$ +import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer} +// $example off$ +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} + +object TokenizerExample { + def main(args: Array[String]): Unit = { +val conf = new SparkConf().setAppName("TokenizerExample") +val sc = new SparkContext(conf) +val sqlContext = new SQLContext(sc) + +// $example on$ +val sentenceDataFrame = sqlContext.createDataFrame(Seq( + (0, "Hi I heard about Spark"), + (1, "I wish Java could use case classes"), + (2, "Logistic,regression,models,are,neat") +)).toDF("label", "sentence") + +val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") +val regexTokenizer = new RegexTokenizer() + .setInputCol("sentence") + .setOutputCol("words") + .setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false) + +val tokenized = tokenizer.transform(sentenceDataFrame) +tokenized.select("words", "label").take(3).foreach(println) +val regexTokenized = regexTokenizer.transform(sentenceDataFrame) +regexTokenized.select("words", "label").take(3).foreach(println) +// $example off$ +sc.stop() + } +}
[2/3] spark git commit: [SPARK-11551][DOC][EXAMPLE] Replace example code in ml-features.md using include_example
http://git-wip-us.apache.org/repos/asf/spark/blob/78209b0c/examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java new file mode 100644 index 000..668f71e --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; + +// $example on$ +import java.util.Arrays; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.ml.feature.PolynomialExpansion; +import org.apache.spark.mllib.linalg.VectorUDT; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +// $example off$ + +public class JavaPolynomialExpansionExample { + public static void main(String[] args) { +SparkConf conf = new SparkConf().setAppName("JavaPolynomialExpansionExample"); +JavaSparkContext jsc = new JavaSparkContext(conf); +SQLContext jsql = new SQLContext(jsc); + +// $example on$ +PolynomialExpansion polyExpansion = new PolynomialExpansion() + .setInputCol("features") + .setOutputCol("polyFeatures") + .setDegree(3); + +JavaRDD data = jsc.parallelize(Arrays.asList( + RowFactory.create(Vectors.dense(-2.0, 2.3)), + RowFactory.create(Vectors.dense(0.0, 0.0)), + RowFactory.create(Vectors.dense(0.6, -1.1)) +)); + +StructType schema = new StructType(new StructField[]{ + new StructField("features", new VectorUDT(), false, Metadata.empty()), +}); + +DataFrame df = jsql.createDataFrame(data, schema); +DataFrame polyDF = polyExpansion.transform(df); + +Row[] row = polyDF.select("polyFeatures").take(3); +for (Row r : row) { + System.out.println(r.get(0)); +} +// $example off$ +jsc.stop(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/78209b0c/examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java new file mode 100644 index 000..1e1062b --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; + +// $example on$ +import java.util.Arrays; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.ml.feature.RFormula; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory
[3/3] spark git commit: [SPARK-11551][DOC][EXAMPLE] Replace example code in ml-features.md using include_example
[SPARK-11551][DOC][EXAMPLE] Replace example code in ml-features.md using include_example Made new patch contaning only markdown examples moved to exmaple/folder. Ony three java code were not shfted since they were contaning compliation error ,these classes are 1)StandardScale 2)NormalizerExample 3)VectorIndexer Author: Xusen Yin Author: somideshmukh Closes #10002 from somideshmukh/SomilBranch1.33. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78209b0c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78209b0c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78209b0c Branch: refs/heads/master Commit: 78209b0ccaf3f22b5e2345dfb2b98edfdb746819 Parents: 3e7e05f Author: somideshmukh Authored: Mon Dec 7 23:26:34 2015 -0800 Committer: Xiangrui Meng Committed: Mon Dec 7 23:26:34 2015 -0800 -- docs/ml-features.md | 1109 +- .../spark/examples/ml/JavaBinarizerExample.java | 68 ++ .../examples/ml/JavaBucketizerExample.java | 70 ++ .../spark/examples/ml/JavaDCTExample.java | 65 + .../ml/JavaElementwiseProductExample.java | 75 ++ .../examples/ml/JavaMinMaxScalerExample.java| 50 + .../spark/examples/ml/JavaNGramExample.java | 71 ++ .../examples/ml/JavaNormalizerExample.java | 52 + .../examples/ml/JavaOneHotEncoderExample.java | 77 ++ .../spark/examples/ml/JavaPCAExample.java | 71 ++ .../ml/JavaPolynomialExpansionExample.java | 71 ++ .../spark/examples/ml/JavaRFormulaExample.java | 69 ++ .../examples/ml/JavaStandardScalerExample.java | 53 + .../ml/JavaStopWordsRemoverExample.java | 65 + .../examples/ml/JavaStringIndexerExample.java | 66 ++ .../spark/examples/ml/JavaTokenizerExample.java | 75 ++ .../examples/ml/JavaVectorAssemblerExample.java | 67 ++ .../examples/ml/JavaVectorIndexerExample.java | 60 + .../examples/ml/JavaVectorSlicerExample.java| 73 ++ .../src/main/python/ml/binarizer_example.py | 43 + .../src/main/python/ml/bucketizer_example.py| 42 + .../python/ml/elementwise_product_example.py| 39 + examples/src/main/python/ml/n_gram_example.py | 42 + .../src/main/python/ml/normalizer_example.py| 41 + .../main/python/ml/onehot_encoder_example.py| 47 + examples/src/main/python/ml/pca_example.py | 42 + .../python/ml/polynomial_expansion_example.py | 43 + examples/src/main/python/ml/rformula_example.py | 44 + .../main/python/ml/standard_scaler_example.py | 42 + .../main/python/ml/stopwords_remover_example.py | 40 + .../main/python/ml/string_indexer_example.py| 39 + .../src/main/python/ml/tokenizer_example.py | 44 + .../main/python/ml/vector_assembler_example.py | 42 + .../main/python/ml/vector_indexer_example.py| 39 + .../spark/examples/ml/BinarizerExample.scala| 48 + .../spark/examples/ml/BucketizerExample.scala | 51 + .../apache/spark/examples/ml/DCTExample.scala | 54 + .../examples/ml/ElementWiseProductExample.scala | 53 + .../spark/examples/ml/MinMaxScalerExample.scala | 49 + .../apache/spark/examples/ml/NGramExample.scala | 47 + .../spark/examples/ml/NormalizerExample.scala | 50 + .../examples/ml/OneHotEncoderExample.scala | 58 + .../apache/spark/examples/ml/PCAExample.scala | 54 + .../ml/PolynomialExpansionExample.scala | 53 + .../spark/examples/ml/RFormulaExample.scala | 49 + .../examples/ml/StandardScalerExample.scala | 51 + .../examples/ml/StopWordsRemoverExample.scala | 48 + .../examples/ml/StringIndexerExample.scala | 49 + .../spark/examples/ml/TokenizerExample.scala| 54 + .../examples/ml/VectorAssemblerExample.scala| 49 + .../examples/ml/VectorIndexerExample.scala | 53 + .../spark/examples/ml/VectorSlicerExample.scala | 58 + 52 files changed, 2806 insertions(+), 1058 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/78209b0c/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index b499d6e..5105a94 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -170,25 +170,7 @@ Refer to the [Tokenizer Scala docs](api/scala/index.html#org.apache.spark.ml.fea and the [RegexTokenizer Scala docs](api/scala/index.html#org.apache.spark.ml.feature.Tokenizer) for more details on the API. -{% highlight scala %} -import org.apache.spark.ml.feature.{Tokenizer, RegexTokenizer} - -val sentenceDataFrame = sqlContext.createDataFrame(Seq( - (0, "Hi I heard about Spark"), - (1, "I wish Java could use case classes"), - (2, "Logistic,regression,models,are,neat") -)).toDF("label", "sentence") -val tokenizer = new Tokenizer().setInp
[3/3] spark git commit: [SPARK-11551][DOC][EXAMPLE] Replace example code in ml-features.md using include_example
[SPARK-11551][DOC][EXAMPLE] Replace example code in ml-features.md using include_example Made new patch contaning only markdown examples moved to exmaple/folder. Ony three java code were not shfted since they were contaning compliation error ,these classes are 1)StandardScale 2)NormalizerExample 3)VectorIndexer Author: Xusen Yin Author: somideshmukh Closes #10002 from somideshmukh/SomilBranch1.33. (cherry picked from commit 78209b0ccaf3f22b5e2345dfb2b98edfdb746819) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c683ed5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c683ed5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c683ed5 Branch: refs/heads/branch-1.6 Commit: 3c683ed5ffe704a6fec7c6d434eeed784276470d Parents: 115bfbd Author: somideshmukh Authored: Mon Dec 7 23:26:34 2015 -0800 Committer: Xiangrui Meng Committed: Mon Dec 7 23:26:56 2015 -0800 -- docs/ml-features.md | 1109 +- .../spark/examples/ml/JavaBinarizerExample.java | 68 ++ .../examples/ml/JavaBucketizerExample.java | 70 ++ .../spark/examples/ml/JavaDCTExample.java | 65 + .../ml/JavaElementwiseProductExample.java | 75 ++ .../examples/ml/JavaMinMaxScalerExample.java| 50 + .../spark/examples/ml/JavaNGramExample.java | 71 ++ .../examples/ml/JavaNormalizerExample.java | 52 + .../examples/ml/JavaOneHotEncoderExample.java | 77 ++ .../spark/examples/ml/JavaPCAExample.java | 71 ++ .../ml/JavaPolynomialExpansionExample.java | 71 ++ .../spark/examples/ml/JavaRFormulaExample.java | 69 ++ .../examples/ml/JavaStandardScalerExample.java | 53 + .../ml/JavaStopWordsRemoverExample.java | 65 + .../examples/ml/JavaStringIndexerExample.java | 66 ++ .../spark/examples/ml/JavaTokenizerExample.java | 75 ++ .../examples/ml/JavaVectorAssemblerExample.java | 67 ++ .../examples/ml/JavaVectorIndexerExample.java | 60 + .../examples/ml/JavaVectorSlicerExample.java| 73 ++ .../src/main/python/ml/binarizer_example.py | 43 + .../src/main/python/ml/bucketizer_example.py| 42 + .../python/ml/elementwise_product_example.py| 39 + examples/src/main/python/ml/n_gram_example.py | 42 + .../src/main/python/ml/normalizer_example.py| 41 + .../main/python/ml/onehot_encoder_example.py| 47 + examples/src/main/python/ml/pca_example.py | 42 + .../python/ml/polynomial_expansion_example.py | 43 + examples/src/main/python/ml/rformula_example.py | 44 + .../main/python/ml/standard_scaler_example.py | 42 + .../main/python/ml/stopwords_remover_example.py | 40 + .../main/python/ml/string_indexer_example.py| 39 + .../src/main/python/ml/tokenizer_example.py | 44 + .../main/python/ml/vector_assembler_example.py | 42 + .../main/python/ml/vector_indexer_example.py| 39 + .../spark/examples/ml/BinarizerExample.scala| 48 + .../spark/examples/ml/BucketizerExample.scala | 51 + .../apache/spark/examples/ml/DCTExample.scala | 54 + .../examples/ml/ElementWiseProductExample.scala | 53 + .../spark/examples/ml/MinMaxScalerExample.scala | 49 + .../apache/spark/examples/ml/NGramExample.scala | 47 + .../spark/examples/ml/NormalizerExample.scala | 50 + .../examples/ml/OneHotEncoderExample.scala | 58 + .../apache/spark/examples/ml/PCAExample.scala | 54 + .../ml/PolynomialExpansionExample.scala | 53 + .../spark/examples/ml/RFormulaExample.scala | 49 + .../examples/ml/StandardScalerExample.scala | 51 + .../examples/ml/StopWordsRemoverExample.scala | 48 + .../examples/ml/StringIndexerExample.scala | 49 + .../spark/examples/ml/TokenizerExample.scala| 54 + .../examples/ml/VectorAssemblerExample.scala| 49 + .../examples/ml/VectorIndexerExample.scala | 53 + .../spark/examples/ml/VectorSlicerExample.scala | 58 + 52 files changed, 2806 insertions(+), 1058 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c683ed5/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index b499d6e..5105a94 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -170,25 +170,7 @@ Refer to the [Tokenizer Scala docs](api/scala/index.html#org.apache.spark.ml.fea and the [RegexTokenizer Scala docs](api/scala/index.html#org.apache.spark.ml.feature.Tokenizer) for more details on the API. -{% highlight scala %} -import org.apache.spark.ml.feature.{Tokenizer, RegexTokenizer} - -val sentenceDataFrame = sqlContext.createDataFrame(Seq( - (0, "Hi I heard about Spark"), - (1, "I wish Java could use case classes"), - (2, "Log
[2/3] spark git commit: [SPARK-11551][DOC][EXAMPLE] Replace example code in ml-features.md using include_example
http://git-wip-us.apache.org/repos/asf/spark/blob/3c683ed5/examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java new file mode 100644 index 000..668f71e --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; + +// $example on$ +import java.util.Arrays; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.ml.feature.PolynomialExpansion; +import org.apache.spark.mllib.linalg.VectorUDT; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +// $example off$ + +public class JavaPolynomialExpansionExample { + public static void main(String[] args) { +SparkConf conf = new SparkConf().setAppName("JavaPolynomialExpansionExample"); +JavaSparkContext jsc = new JavaSparkContext(conf); +SQLContext jsql = new SQLContext(jsc); + +// $example on$ +PolynomialExpansion polyExpansion = new PolynomialExpansion() + .setInputCol("features") + .setOutputCol("polyFeatures") + .setDegree(3); + +JavaRDD data = jsc.parallelize(Arrays.asList( + RowFactory.create(Vectors.dense(-2.0, 2.3)), + RowFactory.create(Vectors.dense(0.0, 0.0)), + RowFactory.create(Vectors.dense(0.6, -1.1)) +)); + +StructType schema = new StructType(new StructField[]{ + new StructField("features", new VectorUDT(), false, Metadata.empty()), +}); + +DataFrame df = jsql.createDataFrame(data, schema); +DataFrame polyDF = polyExpansion.transform(df); + +Row[] row = polyDF.select("polyFeatures").take(3); +for (Row r : row) { + System.out.println(r.get(0)); +} +// $example off$ +jsc.stop(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/3c683ed5/examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java new file mode 100644 index 000..1e1062b --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; + +// $example on$ +import java.util.Arrays; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.ml.feature.RFormula; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory
[1/3] spark git commit: [SPARK-11551][DOC][EXAMPLE] Replace example code in ml-features.md using include_example
Repository: spark Updated Branches: refs/heads/branch-1.6 115bfbdae -> 3c683ed5f http://git-wip-us.apache.org/repos/asf/spark/blob/3c683ed5/examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala new file mode 100644 index 000..1be8a5f --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.ml + +// $example on$ +import org.apache.spark.ml.feature.StringIndexer +// $example off$ +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} + +object StringIndexerExample { + def main(args: Array[String]): Unit = { +val conf = new SparkConf().setAppName("StringIndexerExample") +val sc = new SparkContext(conf) +val sqlContext = new SQLContext(sc) + +// $example on$ +val df = sqlContext.createDataFrame( + Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")) +).toDF("id", "category") + +val indexer = new StringIndexer() + .setInputCol("category") + .setOutputCol("categoryIndex") + +val indexed = indexer.fit(df).transform(df) +indexed.show() +// $example off$ +sc.stop() + } +} +// scalastyle:on println + http://git-wip-us.apache.org/repos/asf/spark/blob/3c683ed5/examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala new file mode 100644 index 000..01e0d13 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.ml + +// $example on$ +import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer} +// $example off$ +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} + +object TokenizerExample { + def main(args: Array[String]): Unit = { +val conf = new SparkConf().setAppName("TokenizerExample") +val sc = new SparkContext(conf) +val sqlContext = new SQLContext(sc) + +// $example on$ +val sentenceDataFrame = sqlContext.createDataFrame(Seq( + (0, "Hi I heard about Spark"), + (1, "I wish Java could use case classes"), + (2, "Logistic,regression,models,are,neat") +)).toDF("label", "sentence") + +val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") +val regexTokenizer = new RegexTokenizer() + .setInputCol("sentence") + .setOutputCol("words") + .setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false) + +val tokenized = tokenizer.transform(sentenceDataFrame) +tokenized.select("words", "label").take(3).foreach(println) +val regexTokenized = regexTokenizer.transform(sentenceDataFrame) +regexTokenized.select("words", "label").take(3).foreach(println) +// $example off$ +sc.stop() + }
spark git commit: Closes #10098
Repository: spark Updated Branches: refs/heads/master 78209b0cc -> 73896588d Closes #10098 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73896588 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73896588 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73896588 Branch: refs/heads/master Commit: 73896588dd3af6ba77c9692cd5120ee32448eb22 Parents: 78209b0 Author: Xiangrui Meng Authored: Mon Dec 7 23:34:16 2015 -0800 Committer: Xiangrui Meng Committed: Mon Dec 7 23:34:16 2015 -0800 -- -- - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12160][MLLIB] Use SQLContext.getOrCreate in MLlib - 1.5 backport
Repository: spark Updated Branches: refs/heads/branch-1.5 3868ab644 -> 2f30927a5 [SPARK-12160][MLLIB] Use SQLContext.getOrCreate in MLlib - 1.5 backport This backports [https://github.com/apache/spark/pull/10161] to Spark 1.5, with the difference that ChiSqSelector does not require modification. Switched from using SQLContext constructor to using getOrCreate, mainly in model save/load methods. This covers all instances in spark.mllib. There were no uses of the constructor in spark.ml. CC: yhuai mengxr Author: Joseph K. Bradley Closes #10183 from jkbradley/sqlcontext-backport1.5. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f30927a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f30927a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f30927a Branch: refs/heads/branch-1.5 Commit: 2f30927a5f40f2862e777bfe97282ddcfc0a063a Parents: 3868ab6 Author: Joseph K. Bradley Authored: Mon Dec 7 23:37:23 2015 -0800 Committer: Xiangrui Meng Committed: Mon Dec 7 23:37:23 2015 -0800 -- .../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 6 +++--- .../org/apache/spark/mllib/classification/NaiveBayes.scala | 8 .../mllib/classification/impl/GLMClassificationModel.scala | 4 ++-- .../apache/spark/mllib/clustering/GaussianMixtureModel.scala | 4 ++-- .../org/apache/spark/mllib/clustering/KMeansModel.scala | 4 ++-- .../spark/mllib/clustering/PowerIterationClustering.scala| 4 ++-- .../main/scala/org/apache/spark/mllib/feature/Word2Vec.scala | 4 ++-- .../mllib/recommendation/MatrixFactorizationModel.scala | 4 ++-- .../apache/spark/mllib/regression/IsotonicRegression.scala | 4 ++-- .../spark/mllib/regression/impl/GLMRegressionModel.scala | 4 ++-- .../apache/spark/mllib/tree/model/DecisionTreeModel.scala| 4 ++-- .../apache/spark/mllib/tree/model/treeEnsembleModels.scala | 4 ++-- 12 files changed, 27 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index f585aac..06e13b7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1149,7 +1149,7 @@ private[python] class PythonMLLibAPI extends Serializable { def getIndexedRows(indexedRowMatrix: IndexedRowMatrix): DataFrame = { // We use DataFrames for serialization of IndexedRows to Python, // so return a DataFrame. -val sqlContext = new SQLContext(indexedRowMatrix.rows.sparkContext) +val sqlContext = SQLContext.getOrCreate(indexedRowMatrix.rows.sparkContext) sqlContext.createDataFrame(indexedRowMatrix.rows) } @@ -1159,7 +1159,7 @@ private[python] class PythonMLLibAPI extends Serializable { def getMatrixEntries(coordinateMatrix: CoordinateMatrix): DataFrame = { // We use DataFrames for serialization of MatrixEntry entries to // Python, so return a DataFrame. -val sqlContext = new SQLContext(coordinateMatrix.entries.sparkContext) +val sqlContext = SQLContext.getOrCreate(coordinateMatrix.entries.sparkContext) sqlContext.createDataFrame(coordinateMatrix.entries) } @@ -1169,7 +1169,7 @@ private[python] class PythonMLLibAPI extends Serializable { def getMatrixBlocks(blockMatrix: BlockMatrix): DataFrame = { // We use DataFrames for serialization of sub-matrix blocks to // Python, so return a DataFrame. -val sqlContext = new SQLContext(blockMatrix.blocks.sparkContext) +val sqlContext = SQLContext.getOrCreate(blockMatrix.blocks.sparkContext) sqlContext.createDataFrame(blockMatrix.blocks) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index a956084..aef9ef2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -192,7 +192,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { modelType: String) def save(sc: SparkContext, path: String, data: Data): Unit = { - val sqlContext = new SQLContext(sc) + val sqlContext = SQLContext.getOrCreate(sc) impor
spark git commit: [SPARK-10259][ML] Add @since annotation to ml.classification
Repository: spark Updated Branches: refs/heads/master 73896588d -> 7d05a6245 [SPARK-10259][ML] Add @since annotation to ml.classification Add since annotation to ml.classification Author: Takahashi Hiroshi Closes #8534 from taishi-oss/issue10259. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d05a624 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d05a624 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d05a624 Branch: refs/heads/master Commit: 7d05a624510f7299b3dd07f87c203db1ff7caa3e Parents: 7389658 Author: Takahashi Hiroshi Authored: Mon Dec 7 23:46:55 2015 -0800 Committer: Xiangrui Meng Committed: Mon Dec 7 23:46:55 2015 -0800 -- .../classification/DecisionTreeClassifier.scala | 30 +++-- .../spark/ml/classification/GBTClassifier.scala | 35 +-- .../ml/classification/LogisticRegression.scala | 64 +++- .../MultilayerPerceptronClassifier.scala| 23 +-- .../spark/ml/classification/NaiveBayes.scala| 19 -- .../spark/ml/classification/OneVsRest.scala | 24 ++-- .../classification/RandomForestClassifier.scala | 34 +-- 7 files changed, 185 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7d05a624/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index c478aea..8c4cec1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.classification -import org.apache.spark.annotation.Experimental +import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree.{DecisionTreeModel, DecisionTreeParams, Node, TreeClassifierParams} import org.apache.spark.ml.tree.impl.RandomForest @@ -36,32 +36,44 @@ import org.apache.spark.sql.DataFrame * It supports both binary and multiclass labels, as well as both continuous and categorical * features. */ +@Since("1.4.0") @Experimental -final class DecisionTreeClassifier(override val uid: String) +final class DecisionTreeClassifier @Since("1.4.0") ( +@Since("1.4.0") override val uid: String) extends ProbabilisticClassifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel] with DecisionTreeParams with TreeClassifierParams { + @Since("1.4.0") def this() = this(Identifiable.randomUID("dtc")) // Override parameter setters from parent trait for Java API compatibility. + @Since("1.4.0") override def setMaxDepth(value: Int): this.type = super.setMaxDepth(value) + @Since("1.4.0") override def setMaxBins(value: Int): this.type = super.setMaxBins(value) + @Since("1.4.0") override def setMinInstancesPerNode(value: Int): this.type = super.setMinInstancesPerNode(value) + @Since("1.4.0") override def setMinInfoGain(value: Double): this.type = super.setMinInfoGain(value) + @Since("1.4.0") override def setMaxMemoryInMB(value: Int): this.type = super.setMaxMemoryInMB(value) + @Since("1.4.0") override def setCacheNodeIds(value: Boolean): this.type = super.setCacheNodeIds(value) + @Since("1.4.0") override def setCheckpointInterval(value: Int): this.type = super.setCheckpointInterval(value) + @Since("1.4.0") override def setImpurity(value: String): this.type = super.setImpurity(value) + @Since("1.6.0") override def setSeed(value: Long): this.type = super.setSeed(value) override protected def train(dataset: DataFrame): DecisionTreeClassificationModel = { @@ -89,12 +101,15 @@ final class DecisionTreeClassifier(override val uid: String) subsamplingRate = 1.0) } + @Since("1.4.1") override def copy(extra: ParamMap): DecisionTreeClassifier = defaultCopy(extra) } +@Since("1.4.0") @Experimental object DecisionTreeClassifier { /** Accessor for supported impurities: entropy, gini */ + @Since("1.4.0") final val supportedImpurities: Array[String] = TreeClassifierParams.supportedImpurities } @@ -104,12 +119,13 @@ object DecisionTreeClassifier { * It supports both binary and multiclass labels, as well as both continuous and categorical * features. */ +@Since("1.4.0") @Experimental final class DecisionTreeClassificationModel private[ml] ( -override val uid: String, -override val rootNode: Node, -override val numFeatures: Int, -override val numClasses: Int) +@Since("1.4.
spark git commit: [SPARK-10259][ML] Add @since annotation to ml.classification
Repository: spark Updated Branches: refs/heads/branch-1.6 3c683ed5f -> 8652fc03c [SPARK-10259][ML] Add @since annotation to ml.classification Add since annotation to ml.classification Author: Takahashi Hiroshi Closes #8534 from taishi-oss/issue10259. (cherry picked from commit 7d05a624510f7299b3dd07f87c203db1ff7caa3e) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8652fc03 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8652fc03 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8652fc03 Branch: refs/heads/branch-1.6 Commit: 8652fc03c21f79b41ce13f41991feba11fc7b29c Parents: 3c683ed Author: Takahashi Hiroshi Authored: Mon Dec 7 23:46:55 2015 -0800 Committer: Xiangrui Meng Committed: Mon Dec 7 23:47:03 2015 -0800 -- .../classification/DecisionTreeClassifier.scala | 30 +++-- .../spark/ml/classification/GBTClassifier.scala | 35 +-- .../ml/classification/LogisticRegression.scala | 64 +++- .../MultilayerPerceptronClassifier.scala| 23 +-- .../spark/ml/classification/NaiveBayes.scala| 19 -- .../spark/ml/classification/OneVsRest.scala | 24 ++-- .../classification/RandomForestClassifier.scala | 34 +-- 7 files changed, 185 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8652fc03/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index c478aea..8c4cec1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.classification -import org.apache.spark.annotation.Experimental +import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree.{DecisionTreeModel, DecisionTreeParams, Node, TreeClassifierParams} import org.apache.spark.ml.tree.impl.RandomForest @@ -36,32 +36,44 @@ import org.apache.spark.sql.DataFrame * It supports both binary and multiclass labels, as well as both continuous and categorical * features. */ +@Since("1.4.0") @Experimental -final class DecisionTreeClassifier(override val uid: String) +final class DecisionTreeClassifier @Since("1.4.0") ( +@Since("1.4.0") override val uid: String) extends ProbabilisticClassifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel] with DecisionTreeParams with TreeClassifierParams { + @Since("1.4.0") def this() = this(Identifiable.randomUID("dtc")) // Override parameter setters from parent trait for Java API compatibility. + @Since("1.4.0") override def setMaxDepth(value: Int): this.type = super.setMaxDepth(value) + @Since("1.4.0") override def setMaxBins(value: Int): this.type = super.setMaxBins(value) + @Since("1.4.0") override def setMinInstancesPerNode(value: Int): this.type = super.setMinInstancesPerNode(value) + @Since("1.4.0") override def setMinInfoGain(value: Double): this.type = super.setMinInfoGain(value) + @Since("1.4.0") override def setMaxMemoryInMB(value: Int): this.type = super.setMaxMemoryInMB(value) + @Since("1.4.0") override def setCacheNodeIds(value: Boolean): this.type = super.setCacheNodeIds(value) + @Since("1.4.0") override def setCheckpointInterval(value: Int): this.type = super.setCheckpointInterval(value) + @Since("1.4.0") override def setImpurity(value: String): this.type = super.setImpurity(value) + @Since("1.6.0") override def setSeed(value: Long): this.type = super.setSeed(value) override protected def train(dataset: DataFrame): DecisionTreeClassificationModel = { @@ -89,12 +101,15 @@ final class DecisionTreeClassifier(override val uid: String) subsamplingRate = 1.0) } + @Since("1.4.1") override def copy(extra: ParamMap): DecisionTreeClassifier = defaultCopy(extra) } +@Since("1.4.0") @Experimental object DecisionTreeClassifier { /** Accessor for supported impurities: entropy, gini */ + @Since("1.4.0") final val supportedImpurities: Array[String] = TreeClassifierParams.supportedImpurities } @@ -104,12 +119,13 @@ object DecisionTreeClassifier { * It supports both binary and multiclass labels, as well as both continuous and categorical * features. */ +@Since("1.4.0") @Experimental final class DecisionTreeClassificationModel private[ml] ( -override val uid: String, -override
spark git commit: [SPARK-11958][SPARK-11957][ML][DOC] SQLTransformer user guide and example code
Repository: spark Updated Branches: refs/heads/branch-1.6 8652fc03c -> 5c8216920 [SPARK-11958][SPARK-11957][ML][DOC] SQLTransformer user guide and example code Add ```SQLTransformer``` user guide, example code and make Scala API doc more clear. Author: Yanbo Liang Closes #10006 from yanboliang/spark-11958. (cherry picked from commit 4a39b5a1bee28cec792d509654f6236390cafdcb) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c821692 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c821692 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c821692 Branch: refs/heads/branch-1.6 Commit: 5c8216920b4110d8fc4329e1fe52543ee17c4a54 Parents: 8652fc0 Author: Yanbo Liang Authored: Mon Dec 7 23:50:57 2015 -0800 Committer: Xiangrui Meng Committed: Mon Dec 7 23:51:10 2015 -0800 -- docs/ml-features.md | 59 .../examples/ml/JavaSQLTransformerExample.java | 59 examples/src/main/python/ml/sql_transformer.py | 40 + .../examples/ml/SQLTransformerExample.scala | 45 +++ .../spark/ml/feature/SQLTransformer.scala | 11 +++- 5 files changed, 212 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5c821692/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index 5105a94..f85e0d5 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -756,6 +756,65 @@ for more details on the API. +## SQLTransformer + +`SQLTransformer` implements the transformations which are defined by SQL statement. +Currently we only support SQL syntax like `"SELECT ... FROM __THIS__ ..."` +where `"__THIS__"` represents the underlying table of the input dataset. +The select clause specifies the fields, constants, and expressions to display in +the output, it can be any select clause that Spark SQL supports. Users can also +use Spark SQL built-in function and UDFs to operate on these selected columns. +For example, `SQLTransformer` supports statements like: + +* `SELECT a, a + b AS a_b FROM __THIS__` +* `SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5` +* `SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b` + +**Examples** + +Assume that we have the following DataFrame with columns `id`, `v1` and `v2`: + + + id | v1 | v2 +|-|- + 0 | 1.0 | 3.0 + 2 | 2.0 | 5.0 + + +This is the output of the `SQLTransformer` with statement `"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"`: + + + id | v1 | v2 | v3 | v4 +|-|-|-|- + 0 | 1.0 | 3.0 | 4.0 | 3.0 + 2 | 2.0 | 5.0 | 7.0 |10.0 + + + + + +Refer to the [SQLTransformer Scala docs](api/scala/index.html#org.apache.spark.ml.feature.SQLTransformer) +for more details on the API. + +{% include_example scala/org/apache/spark/examples/ml/SQLTransformerExample.scala %} + + + + +Refer to the [SQLTransformer Java docs](api/java/org/apache/spark/ml/feature/SQLTransformer.html) +for more details on the API. + +{% include_example java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java %} + + + + +Refer to the [SQLTransformer Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.SQLTransformer) for more details on the API. + +{% include_example python/ml/sql_transformer.py %} + + + ## VectorAssembler `VectorAssembler` is a transformer that combines a given list of columns into a single vector http://git-wip-us.apache.org/repos/asf/spark/blob/5c821692/examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java new file mode 100644 index 000..d55c707 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or impli
spark git commit: [SPARK-11958][SPARK-11957][ML][DOC] SQLTransformer user guide and example code
Repository: spark Updated Branches: refs/heads/master 7d05a6245 -> 4a39b5a1b [SPARK-11958][SPARK-11957][ML][DOC] SQLTransformer user guide and example code Add ```SQLTransformer``` user guide, example code and make Scala API doc more clear. Author: Yanbo Liang Closes #10006 from yanboliang/spark-11958. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a39b5a1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a39b5a1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a39b5a1 Branch: refs/heads/master Commit: 4a39b5a1bee28cec792d509654f6236390cafdcb Parents: 7d05a62 Author: Yanbo Liang Authored: Mon Dec 7 23:50:57 2015 -0800 Committer: Xiangrui Meng Committed: Mon Dec 7 23:50:57 2015 -0800 -- docs/ml-features.md | 59 .../examples/ml/JavaSQLTransformerExample.java | 59 examples/src/main/python/ml/sql_transformer.py | 40 + .../examples/ml/SQLTransformerExample.scala | 45 +++ .../spark/ml/feature/SQLTransformer.scala | 11 +++- 5 files changed, 212 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4a39b5a1/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index 5105a94..f85e0d5 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -756,6 +756,65 @@ for more details on the API. +## SQLTransformer + +`SQLTransformer` implements the transformations which are defined by SQL statement. +Currently we only support SQL syntax like `"SELECT ... FROM __THIS__ ..."` +where `"__THIS__"` represents the underlying table of the input dataset. +The select clause specifies the fields, constants, and expressions to display in +the output, it can be any select clause that Spark SQL supports. Users can also +use Spark SQL built-in function and UDFs to operate on these selected columns. +For example, `SQLTransformer` supports statements like: + +* `SELECT a, a + b AS a_b FROM __THIS__` +* `SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5` +* `SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b` + +**Examples** + +Assume that we have the following DataFrame with columns `id`, `v1` and `v2`: + + + id | v1 | v2 +|-|- + 0 | 1.0 | 3.0 + 2 | 2.0 | 5.0 + + +This is the output of the `SQLTransformer` with statement `"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"`: + + + id | v1 | v2 | v3 | v4 +|-|-|-|- + 0 | 1.0 | 3.0 | 4.0 | 3.0 + 2 | 2.0 | 5.0 | 7.0 |10.0 + + + + + +Refer to the [SQLTransformer Scala docs](api/scala/index.html#org.apache.spark.ml.feature.SQLTransformer) +for more details on the API. + +{% include_example scala/org/apache/spark/examples/ml/SQLTransformerExample.scala %} + + + + +Refer to the [SQLTransformer Java docs](api/java/org/apache/spark/ml/feature/SQLTransformer.html) +for more details on the API. + +{% include_example java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java %} + + + + +Refer to the [SQLTransformer Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.SQLTransformer) for more details on the API. + +{% include_example python/ml/sql_transformer.py %} + + + ## VectorAssembler `VectorAssembler` is a transformer that combines a given list of columns into a single vector http://git-wip-us.apache.org/repos/asf/spark/blob/4a39b5a1/examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java new file mode 100644 index 000..d55c707 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the Licens