Repository: spark Updated Branches: refs/heads/master cf2e0ae72 -> 9a430a027
[SPARK-11068] [SQL] [FOLLOW-UP] move execution listener to util Author: Wenchen Fan <[email protected]> Closes #9119 from cloud-fan/callback. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a430a02 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a430a02 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a430a02 Branch: refs/heads/master Commit: 9a430a027faafb083ca569698effb697af26a1db Parents: cf2e0ae Author: Wenchen Fan <[email protected]> Authored: Wed Oct 14 15:08:13 2015 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed Oct 14 15:08:13 2015 -0700 ---------------------------------------------------------------------- .../spark/sql/QueryExecutionListener.scala | 136 ------------------- .../scala/org/apache/spark/sql/SQLContext.scala | 1 + .../spark/sql/util/QueryExecutionListener.scala | 136 +++++++++++++++++++ .../spark/sql/DataFrameCallbackSuite.scala | 82 ----------- .../spark/sql/util/DataFrameCallbackSuite.scala | 83 +++++++++++ 5 files changed, 220 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9a430a02/sql/core/src/main/scala/org/apache/spark/sql/QueryExecutionListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/QueryExecutionListener.scala deleted file mode 100644 index 14fbebb..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/QueryExecutionListener.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import java.util.concurrent.locks.ReentrantReadWriteLock -import scala.collection.mutable.ListBuffer - -import org.apache.spark.annotation.{DeveloperApi, Experimental} -import org.apache.spark.Logging -import org.apache.spark.sql.execution.QueryExecution - - -/** - * The interface of query execution listener that can be used to analyze execution metrics. - * - * Note that implementations should guarantee thread-safety as they will be used in a non - * thread-safe way. - */ -@Experimental -trait QueryExecutionListener { - - /** - * A callback function that will be called when a query executed successfully. - * Implementations should guarantee thread-safe. - * - * @param funcName the name of the action that triggered this query. - * @param qe the QueryExecution object that carries detail information like logical plan, - * physical plan, etc. - * @param duration the execution time for this query in nanoseconds. - */ - @DeveloperApi - def onSuccess(funcName: String, qe: QueryExecution, duration: Long) - - /** - * A callback function that will be called when a query execution failed. - * Implementations should guarantee thread-safe. - * - * @param funcName the name of the action that triggered this query. - * @param qe the QueryExecution object that carries detail information like logical plan, - * physical plan, etc. - * @param exception the exception that failed this query. - */ - @DeveloperApi - def onFailure(funcName: String, qe: QueryExecution, exception: Exception) -} - -@Experimental -class ExecutionListenerManager extends Logging { - private[this] val listeners = ListBuffer.empty[QueryExecutionListener] - private[this] val lock = new ReentrantReadWriteLock() - - /** Acquires a read lock on the cache for the duration of `f`. */ - private def readLock[A](f: => A): A = { - val rl = lock.readLock() - rl.lock() - try f finally { - rl.unlock() - } - } - - /** Acquires a write lock on the cache for the duration of `f`. */ - private def writeLock[A](f: => A): A = { - val wl = lock.writeLock() - wl.lock() - try f finally { - wl.unlock() - } - } - - /** - * Registers the specified QueryExecutionListener. - */ - @DeveloperApi - def register(listener: QueryExecutionListener): Unit = writeLock { - listeners += listener - } - - /** - * Unregisters the specified QueryExecutionListener. - */ - @DeveloperApi - def unregister(listener: QueryExecutionListener): Unit = writeLock { - listeners -= listener - } - - /** - * clears out all registered QueryExecutionListeners. - */ - @DeveloperApi - def clear(): Unit = writeLock { - listeners.clear() - } - - private[sql] def onSuccess( - funcName: String, - qe: QueryExecution, - duration: Long): Unit = readLock { - withErrorHandling { listener => - listener.onSuccess(funcName, qe, duration) - } - } - - private[sql] def onFailure( - funcName: String, - qe: QueryExecution, - exception: Exception): Unit = readLock { - withErrorHandling { listener => - listener.onFailure(funcName, qe, exception) - } - } - - private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = { - for (listener <- listeners) { - try { - f(listener) - } catch { - case e: Exception => logWarning("error executing query execution listener", e) - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/9a430a02/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a835408..3d5e35a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -45,6 +45,7 @@ import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.{execution => sparkexecution} +import org.apache.spark.sql.util.ExecutionListenerManager import org.apache.spark.util.Utils /** http://git-wip-us.apache.org/repos/asf/spark/blob/9a430a02/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala new file mode 100644 index 0000000..909a8ab --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import java.util.concurrent.locks.ReentrantReadWriteLock +import scala.collection.mutable.ListBuffer + +import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.Logging +import org.apache.spark.sql.execution.QueryExecution + + +/** + * The interface of query execution listener that can be used to analyze execution metrics. + * + * Note that implementations should guarantee thread-safety as they will be used in a non + * thread-safe way. + */ +@Experimental +trait QueryExecutionListener { + + /** + * A callback function that will be called when a query executed successfully. + * Implementations should guarantee thread-safe. + * + * @param funcName the name of the action that triggered this query. + * @param qe the QueryExecution object that carries detail information like logical plan, + * physical plan, etc. + * @param duration the execution time for this query in nanoseconds. + */ + @DeveloperApi + def onSuccess(funcName: String, qe: QueryExecution, duration: Long) + + /** + * A callback function that will be called when a query execution failed. + * Implementations should guarantee thread-safe. + * + * @param funcName the name of the action that triggered this query. + * @param qe the QueryExecution object that carries detail information like logical plan, + * physical plan, etc. + * @param exception the exception that failed this query. + */ + @DeveloperApi + def onFailure(funcName: String, qe: QueryExecution, exception: Exception) +} + +@Experimental +class ExecutionListenerManager extends Logging { + private[this] val listeners = ListBuffer.empty[QueryExecutionListener] + private[this] val lock = new ReentrantReadWriteLock() + + /** Acquires a read lock on the cache for the duration of `f`. */ + private def readLock[A](f: => A): A = { + val rl = lock.readLock() + rl.lock() + try f finally { + rl.unlock() + } + } + + /** Acquires a write lock on the cache for the duration of `f`. */ + private def writeLock[A](f: => A): A = { + val wl = lock.writeLock() + wl.lock() + try f finally { + wl.unlock() + } + } + + /** + * Registers the specified QueryExecutionListener. + */ + @DeveloperApi + def register(listener: QueryExecutionListener): Unit = writeLock { + listeners += listener + } + + /** + * Unregisters the specified QueryExecutionListener. + */ + @DeveloperApi + def unregister(listener: QueryExecutionListener): Unit = writeLock { + listeners -= listener + } + + /** + * clears out all registered QueryExecutionListeners. + */ + @DeveloperApi + def clear(): Unit = writeLock { + listeners.clear() + } + + private[sql] def onSuccess( + funcName: String, + qe: QueryExecution, + duration: Long): Unit = readLock { + withErrorHandling { listener => + listener.onSuccess(funcName, qe, duration) + } + } + + private[sql] def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = readLock { + withErrorHandling { listener => + listener.onFailure(funcName, qe, exception) + } + } + + private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = { + for (listener <- listeners) { + try { + f(listener) + } catch { + case e: Exception => logWarning("error executing query execution listener", e) + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/9a430a02/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCallbackSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCallbackSuite.scala deleted file mode 100644 index 4e286a0..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCallbackSuite.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.SparkException -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project} -import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.test.SharedSQLContext - -import scala.collection.mutable.ArrayBuffer - -class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { - import testImplicits._ - import functions._ - - test("execute callback functions when a DataFrame action finished successfully") { - val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)] - val listener = new QueryExecutionListener { - // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} - - override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { - metrics += ((funcName, qe, duration)) - } - } - sqlContext.listenerManager.register(listener) - - val df = Seq(1 -> "a").toDF("i", "j") - df.select("i").collect() - df.filter($"i" > 0).count() - - assert(metrics.length == 2) - - assert(metrics(0)._1 == "collect") - assert(metrics(0)._2.analyzed.isInstanceOf[Project]) - assert(metrics(0)._3 > 0) - - assert(metrics(1)._1 == "count") - assert(metrics(1)._2.analyzed.isInstanceOf[Aggregate]) - assert(metrics(1)._3 > 0) - } - - test("execute callback functions when a DataFrame action failed") { - val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)] - val listener = new QueryExecutionListener { - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { - metrics += ((funcName, qe, exception)) - } - - // Only test failed case here, so no need to implement `onSuccess` - override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {} - } - sqlContext.listenerManager.register(listener) - - val errorUdf = udf[Int, Int] { _ => throw new RuntimeException("udf error") } - val df = sparkContext.makeRDD(Seq(1 -> "a")).toDF("i", "j") - - // Ignore the log when we are expecting an exception. - sparkContext.setLogLevel("FATAL") - val e = intercept[SparkException](df.select(errorUdf($"i")).collect()) - - assert(metrics.length == 1) - assert(metrics(0)._1 == "collect") - assert(metrics(0)._2.analyzed.isInstanceOf[Project]) - assert(metrics(0)._3.getMessage == e.getMessage) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/9a430a02/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala new file mode 100644 index 0000000..eb056cd --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import org.apache.spark.SparkException +import org.apache.spark.sql.{functions, QueryTest} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project} +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.test.SharedSQLContext + +import scala.collection.mutable.ArrayBuffer + +class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + import functions._ + + test("execute callback functions when a DataFrame action finished successfully") { + val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)] + val listener = new QueryExecutionListener { + // Only test successful case here, so no need to implement `onFailure` + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + + override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { + metrics += ((funcName, qe, duration)) + } + } + sqlContext.listenerManager.register(listener) + + val df = Seq(1 -> "a").toDF("i", "j") + df.select("i").collect() + df.filter($"i" > 0).count() + + assert(metrics.length == 2) + + assert(metrics(0)._1 == "collect") + assert(metrics(0)._2.analyzed.isInstanceOf[Project]) + assert(metrics(0)._3 > 0) + + assert(metrics(1)._1 == "count") + assert(metrics(1)._2.analyzed.isInstanceOf[Aggregate]) + assert(metrics(1)._3 > 0) + } + + test("execute callback functions when a DataFrame action failed") { + val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)] + val listener = new QueryExecutionListener { + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + metrics += ((funcName, qe, exception)) + } + + // Only test failed case here, so no need to implement `onSuccess` + override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {} + } + sqlContext.listenerManager.register(listener) + + val errorUdf = udf[Int, Int] { _ => throw new RuntimeException("udf error") } + val df = sparkContext.makeRDD(Seq(1 -> "a")).toDF("i", "j") + + // Ignore the log when we are expecting an exception. + sparkContext.setLogLevel("FATAL") + val e = intercept[SparkException](df.select(errorUdf($"i")).collect()) + + assert(metrics.length == 1) + assert(metrics(0)._1 == "collect") + assert(metrics(0)._2.analyzed.isInstanceOf[Project]) + assert(metrics(0)._3.getMessage == e.getMessage) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
