Repository: spark
Updated Branches:
  refs/heads/master 4cee2ce25 -> 0e3312ee7


[SPARK-18256] Improve the performance of event log replay in HistoryServer

## What changes were proposed in this pull request?

This patch significantly improves the performance of event log replay in the 
HistoryServer via two simple changes:

- **Don't use `extractOpt`**: it turns out that `json4s`'s `extractOpt` method 
uses exceptions for control flow, causing huge performance bottlenecks due to 
the overhead of initializing exceptions. To avoid this overhead, we can simply 
use our own` Utils.jsonOption` method. This patch replaces all uses of 
`extractOpt` with `Utils.jsonOption` and adds a style checker rule to ban the 
use of the slow `extractOpt` method.
- **Don't call `Utils.getFormattedClassName` for every event**: the old code 
called` Utils.getFormattedClassName` dozens of times per replayed event in 
order to match up class names in events with SparkListener event names. By 
simply storing the results of these calls in constants rather than recomputing 
them, we're able to eliminate a huge performance hotspot by removing thousands 
of expensive `Class.getSimpleName` calls.

## How was this patch tested?

Tested by profiling the replay of a long event log using YourKit. For an event 
log containing 1000+ jobs, each of which had thousands of tasks, the changes in 
this patch cut the replay time in half:

![image](https://cloud.githubusercontent.com/assets/50748/19980953/31154622-a1bd-11e6-9be4-21fbb9b3f9a7.png)

Prior to this patch's changes, the two slowest methods in log replay were 
internal exceptions thrown by `Json4S` and calls to `Class.getSimpleName()`:

![image](https://cloud.githubusercontent.com/assets/50748/19981052/87416cce-a1bd-11e6-9f25-06a7cd391822.png)

After this patch, these hotspots are completely eliminated.

Author: Josh Rosen <[email protected]>

Closes #15756 from JoshRosen/speed-up-jsonprotocol.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e3312ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e3312ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e3312ee

Branch: refs/heads/master
Commit: 0e3312ee72c44f4c9acafbd80d0c8a14f3aff875
Parents: 4cee2ce
Author: Josh Rosen <[email protected]>
Authored: Fri Nov 4 19:32:26 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Fri Nov 4 19:32:26 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/JsonProtocol.scala    | 106 +++++++++++--------
 scalastyle-config.xml                           |   6 ++
 2 files changed, 70 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0e3312ee/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c11eb3f..6593aab 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -107,20 +107,20 @@ private[spark] object JsonProtocol {
   def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): 
JValue = {
     val stageInfo = stageInfoToJson(stageSubmitted.stageInfo)
     val properties = propertiesToJson(stageSubmitted.properties)
-    ("Event" -> Utils.getFormattedClassName(stageSubmitted)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) ~
     ("Stage Info" -> stageInfo) ~
     ("Properties" -> properties)
   }
 
   def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): 
JValue = {
     val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
-    ("Event" -> Utils.getFormattedClassName(stageCompleted)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) ~
     ("Stage Info" -> stageInfo)
   }
 
   def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
     val taskInfo = taskStart.taskInfo
-    ("Event" -> Utils.getFormattedClassName(taskStart)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) ~
     ("Stage ID" -> taskStart.stageId) ~
     ("Stage Attempt ID" -> taskStart.stageAttemptId) ~
     ("Task Info" -> taskInfoToJson(taskInfo))
@@ -128,7 +128,7 @@ private[spark] object JsonProtocol {
 
   def taskGettingResultToJson(taskGettingResult: 
SparkListenerTaskGettingResult): JValue = {
     val taskInfo = taskGettingResult.taskInfo
-    ("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) ~
     ("Task Info" -> taskInfoToJson(taskInfo))
   }
 
@@ -137,7 +137,7 @@ private[spark] object JsonProtocol {
     val taskInfo = taskEnd.taskInfo
     val taskMetrics = taskEnd.taskMetrics
     val taskMetricsJson = if (taskMetrics != null) 
taskMetricsToJson(taskMetrics) else JNothing
-    ("Event" -> Utils.getFormattedClassName(taskEnd)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) ~
     ("Stage ID" -> taskEnd.stageId) ~
     ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
     ("Task Type" -> taskEnd.taskType) ~
@@ -148,7 +148,7 @@ private[spark] object JsonProtocol {
 
   def jobStartToJson(jobStart: SparkListenerJobStart): JValue = {
     val properties = propertiesToJson(jobStart.properties)
-    ("Event" -> Utils.getFormattedClassName(jobStart)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) ~
     ("Job ID" -> jobStart.jobId) ~
     ("Submission Time" -> jobStart.time) ~
     ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~  // Added in 
Spark 1.2.0
@@ -158,7 +158,7 @@ private[spark] object JsonProtocol {
 
   def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = {
     val jobResult = jobResultToJson(jobEnd.jobResult)
-    ("Event" -> Utils.getFormattedClassName(jobEnd)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobEnd) ~
     ("Job ID" -> jobEnd.jobId) ~
     ("Completion Time" -> jobEnd.time) ~
     ("Job Result" -> jobResult)
@@ -170,7 +170,7 @@ private[spark] object JsonProtocol {
     val sparkProperties = mapToJson(environmentDetails("Spark 
Properties").toMap)
     val systemProperties = mapToJson(environmentDetails("System 
Properties").toMap)
     val classpathEntries = mapToJson(environmentDetails("Classpath 
Entries").toMap)
-    ("Event" -> Utils.getFormattedClassName(environmentUpdate)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.environmentUpdate) ~
     ("JVM Information" -> jvmInformation) ~
     ("Spark Properties" -> sparkProperties) ~
     ("System Properties" -> systemProperties) ~
@@ -179,7 +179,7 @@ private[spark] object JsonProtocol {
 
   def blockManagerAddedToJson(blockManagerAdded: 
SparkListenerBlockManagerAdded): JValue = {
     val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
-    ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded) ~
     ("Block Manager ID" -> blockManagerId) ~
     ("Maximum Memory" -> blockManagerAdded.maxMem) ~
     ("Timestamp" -> blockManagerAdded.time)
@@ -187,18 +187,18 @@ private[spark] object JsonProtocol {
 
   def blockManagerRemovedToJson(blockManagerRemoved: 
SparkListenerBlockManagerRemoved): JValue = {
     val blockManagerId = 
blockManagerIdToJson(blockManagerRemoved.blockManagerId)
-    ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
+    ("Event" -> 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerRemoved) ~
     ("Block Manager ID" -> blockManagerId) ~
     ("Timestamp" -> blockManagerRemoved.time)
   }
 
   def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
-    ("Event" -> Utils.getFormattedClassName(unpersistRDD)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.unpersistRDD) ~
     ("RDD ID" -> unpersistRDD.rddId)
   }
 
   def applicationStartToJson(applicationStart: SparkListenerApplicationStart): 
JValue = {
-    ("Event" -> Utils.getFormattedClassName(applicationStart)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationStart) ~
     ("App Name" -> applicationStart.appName) ~
     ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
     ("Timestamp" -> applicationStart.time) ~
@@ -208,33 +208,33 @@ private[spark] object JsonProtocol {
   }
 
   def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): 
JValue = {
-    ("Event" -> Utils.getFormattedClassName(applicationEnd)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationEnd) ~
     ("Timestamp" -> applicationEnd.time)
   }
 
   def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = 
{
-    ("Event" -> Utils.getFormattedClassName(executorAdded)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorAdded) ~
     ("Timestamp" -> executorAdded.time) ~
     ("Executor ID" -> executorAdded.executorId) ~
     ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
   }
 
   def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): 
JValue = {
-    ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorRemoved) ~
     ("Timestamp" -> executorRemoved.time) ~
     ("Executor ID" -> executorRemoved.executorId) ~
     ("Removed Reason" -> executorRemoved.reason)
   }
 
   def logStartToJson(logStart: SparkListenerLogStart): JValue = {
-    ("Event" -> Utils.getFormattedClassName(logStart)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.logStart) ~
     ("Spark Version" -> SPARK_VERSION)
   }
 
   def executorMetricsUpdateToJson(metricsUpdate: 
SparkListenerExecutorMetricsUpdate): JValue = {
     val execId = metricsUpdate.execId
     val accumUpdates = metricsUpdate.accumUpdates
-    ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~
     ("Executor ID" -> execId) ~
     ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, 
stageAttemptId, updates) =>
       ("Task ID" -> taskId) ~
@@ -485,7 +485,7 @@ private[spark] object JsonProtocol {
    * JSON deserialization methods for SparkListenerEvents |
    * ---------------------------------------------------- */
 
-  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
+  private object SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES {
     val stageSubmitted = 
Utils.getFormattedClassName(SparkListenerStageSubmitted)
     val stageCompleted = 
Utils.getFormattedClassName(SparkListenerStageCompleted)
     val taskStart = Utils.getFormattedClassName(SparkListenerTaskStart)
@@ -503,6 +503,10 @@ private[spark] object JsonProtocol {
     val executorRemoved = 
Utils.getFormattedClassName(SparkListenerExecutorRemoved)
     val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
     val metricsUpdate = 
Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
+  }
+
+  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
+    import SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES._
 
     (json \ "Event").extract[String] match {
       case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -540,7 +544,8 @@ private[spark] object JsonProtocol {
 
   def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
     val stageId = (json \ "Stage ID").extract[Int]
-    val stageAttemptId = (json \ "Stage Attempt 
ID").extractOpt[Int].getOrElse(0)
+    val stageAttemptId =
+      Utils.jsonOption(json \ "Stage Attempt 
ID").map(_.extract[Int]).getOrElse(0)
     val taskInfo = taskInfoFromJson(json \ "Task Info")
     SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
   }
@@ -552,7 +557,8 @@ private[spark] object JsonProtocol {
 
   def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
     val stageId = (json \ "Stage ID").extract[Int]
-    val stageAttemptId = (json \ "Stage Attempt 
ID").extractOpt[Int].getOrElse(0)
+    val stageAttemptId =
+      Utils.jsonOption(json \ "Stage Attempt 
ID").map(_.extract[Int]).getOrElse(0)
     val taskType = (json \ "Task Type").extract[String]
     val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
     val taskInfo = taskInfoFromJson(json \ "Task Info")
@@ -662,20 +668,22 @@ private[spark] object JsonProtocol {
 
   def stageInfoFromJson(json: JValue): StageInfo = {
     val stageId = (json \ "Stage ID").extract[Int]
-    val attemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
+    val attemptId = Utils.jsonOption(json \ "Stage Attempt 
ID").map(_.extract[Int]).getOrElse(0)
     val stageName = (json \ "Stage Name").extract[String]
     val numTasks = (json \ "Number of Tasks").extract[Int]
     val rddInfos = (json \ "RDD 
Info").extract[List[JValue]].map(rddInfoFromJson)
     val parentIds = Utils.jsonOption(json \ "Parent IDs")
       .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
       .getOrElse(Seq.empty)
-    val details = (json \ "Details").extractOpt[String].getOrElse("")
+    val details = Utils.jsonOption(json \ 
"Details").map(_.extract[String]).getOrElse("")
     val submissionTime = Utils.jsonOption(json \ "Submission 
Time").map(_.extract[Long])
     val completionTime = Utils.jsonOption(json \ "Completion 
Time").map(_.extract[Long])
     val failureReason = Utils.jsonOption(json \ "Failure 
Reason").map(_.extract[String])
-    val accumulatedValues = (json \ "Accumulables").extractOpt[List[JValue]] 
match {
-      case Some(values) => values.map(accumulableInfoFromJson)
-      case None => Seq[AccumulableInfo]()
+    val accumulatedValues = {
+      Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) 
match {
+        case Some(values) => values.map(accumulableInfoFromJson)
+        case None => Seq[AccumulableInfo]()
+      }
     }
 
     val stageInfo = new StageInfo(
@@ -692,17 +700,17 @@ private[spark] object JsonProtocol {
   def taskInfoFromJson(json: JValue): TaskInfo = {
     val taskId = (json \ "Task ID").extract[Long]
     val index = (json \ "Index").extract[Int]
-    val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
+    val attempt = Utils.jsonOption(json \ 
"Attempt").map(_.extract[Int]).getOrElse(1)
     val launchTime = (json \ "Launch Time").extract[Long]
     val executorId = (json \ "Executor ID").extract[String]
     val host = (json \ "Host").extract[String]
     val taskLocality = TaskLocality.withName((json \ 
"Locality").extract[String])
-    val speculative = (json \ 
"Speculative").extractOpt[Boolean].getOrElse(false)
+    val speculative = Utils.jsonOption(json \ 
"Speculative").exists(_.extract[Boolean])
     val gettingResultTime = (json \ "Getting Result Time").extract[Long]
     val finishTime = (json \ "Finish Time").extract[Long]
     val failed = (json \ "Failed").extract[Boolean]
-    val killed = (json \ "Killed").extractOpt[Boolean].getOrElse(false)
-    val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match {
+    val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean])
+    val accumulables = Utils.jsonOption(json \ 
"Accumulables").map(_.extract[Seq[JValue]]) match {
       case Some(values) => values.map(accumulableInfoFromJson)
       case None => Seq[AccumulableInfo]()
     }
@@ -719,12 +727,13 @@ private[spark] object JsonProtocol {
 
   def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
     val id = (json \ "ID").extract[Long]
-    val name = (json \ "Name").extractOpt[String]
+    val name = Utils.jsonOption(json \ "Name").map(_.extract[String])
     val update = Utils.jsonOption(json \ "Update").map { v => 
accumValueFromJson(name, v) }
     val value = Utils.jsonOption(json \ "Value").map { v => 
accumValueFromJson(name, v) }
-    val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
-    val countFailedValues = (json \ "Count Failed 
Values").extractOpt[Boolean].getOrElse(false)
-    val metadata = (json \ "Metadata").extractOpt[String]
+    val internal = Utils.jsonOption(json \ 
"Internal").exists(_.extract[Boolean])
+    val countFailedValues =
+      Utils.jsonOption(json \ "Count Failed Values").exists(_.extract[Boolean])
+    val metadata = Utils.jsonOption(json \ "Metadata").map(_.extract[String])
     new AccumulableInfo(id, name, update, value, internal, countFailedValues, 
metadata)
   }
 
@@ -782,9 +791,11 @@ private[spark] object JsonProtocol {
       readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks 
Fetched").extract[Int])
       readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks 
Fetched").extract[Int])
       readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes 
Read").extract[Long])
-      readMetrics.incLocalBytesRead((readJson \ "Local Bytes 
Read").extractOpt[Long].getOrElse(0L))
+      readMetrics.incLocalBytesRead(
+        Utils.jsonOption(readJson \ "Local Bytes 
Read").map(_.extract[Long]).getOrElse(0L))
       readMetrics.incFetchWaitTime((readJson \ "Fetch Wait 
Time").extract[Long])
-      readMetrics.incRecordsRead((readJson \ "Total Records 
Read").extractOpt[Long].getOrElse(0L))
+      readMetrics.incRecordsRead(
+        Utils.jsonOption(readJson \ "Total Records 
Read").map(_.extract[Long]).getOrElse(0L))
       metrics.mergeShuffleReadMetrics()
     }
 
@@ -793,8 +804,8 @@ private[spark] object JsonProtocol {
     Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
       val writeMetrics = metrics.shuffleWriteMetrics
       writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes 
Written").extract[Long])
-      writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written")
-        .extractOpt[Long].getOrElse(0L))
+      writeMetrics.incRecordsWritten(
+        Utils.jsonOption(writeJson \ "Shuffle Records 
Written").map(_.extract[Long]).getOrElse(0L))
       writeMetrics.incWriteTime((writeJson \ "Shuffle Write 
Time").extract[Long])
     }
 
@@ -802,14 +813,16 @@ private[spark] object JsonProtocol {
     Utils.jsonOption(json \ "Output Metrics").foreach { outJson =>
       val outputMetrics = metrics.outputMetrics
       outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long])
-      outputMetrics.setRecordsWritten((outJson \ "Records 
Written").extractOpt[Long].getOrElse(0L))
+      outputMetrics.setRecordsWritten(
+        Utils.jsonOption(outJson \ "Records 
Written").map(_.extract[Long]).getOrElse(0L))
     }
 
     // Input metrics
     Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
       val inputMetrics = metrics.inputMetrics
       inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
-      inputMetrics.incRecordsRead((inJson \ "Records 
Read").extractOpt[Long].getOrElse(0L))
+      inputMetrics.incRecordsRead(
+        Utils.jsonOption(inJson \ "Records 
Read").map(_.extract[Long]).getOrElse(0L))
     }
 
     // Updated blocks
@@ -824,7 +837,7 @@ private[spark] object JsonProtocol {
     metrics
   }
 
-  def taskEndReasonFromJson(json: JValue): TaskEndReason = {
+  private object TASK_END_REASON_FORMATTED_CLASS_NAMES {
     val success = Utils.getFormattedClassName(Success)
     val resubmitted = Utils.getFormattedClassName(Resubmitted)
     val fetchFailed = Utils.getFormattedClassName(FetchFailed)
@@ -834,6 +847,10 @@ private[spark] object JsonProtocol {
     val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
     val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
     val unknownReason = Utils.getFormattedClassName(UnknownReason)
+  }
+
+  def taskEndReasonFromJson(json: JValue): TaskEndReason = {
+    import TASK_END_REASON_FORMATTED_CLASS_NAMES._
 
     (json \ "Reason").extract[String] match {
       case `success` => Success
@@ -850,7 +867,8 @@ private[spark] object JsonProtocol {
         val className = (json \ "Class Name").extract[String]
         val description = (json \ "Description").extract[String]
         val stackTrace = stackTraceFromJson(json \ "Stack Trace")
-        val fullStackTrace = (json \ "Full Stack 
Trace").extractOpt[String].orNull
+        val fullStackTrace =
+          Utils.jsonOption(json \ "Full Stack 
Trace").map(_.extract[String]).orNull
         // Fallback on getting accumulator updates from TaskMetrics, which was 
logged in Spark 1.x
         val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
           .map(_.extract[List[JValue]].map(accumulableInfoFromJson))
@@ -891,9 +909,13 @@ private[spark] object JsonProtocol {
     BlockManagerId(executorId, host, port)
   }
 
-  def jobResultFromJson(json: JValue): JobResult = {
+  private object JOB_RESULT_FORMATTED_CLASS_NAMES {
     val jobSucceeded = Utils.getFormattedClassName(JobSucceeded)
     val jobFailed = Utils.getFormattedClassName(JobFailed)
+  }
+
+  def jobResultFromJson(json: JValue): JobResult = {
+    import JOB_RESULT_FORMATTED_CLASS_NAMES._
 
     (json \ "Result").extract[String] match {
       case `jobSucceeded` => JobSucceeded

http://git-wip-us.apache.org/repos/asf/spark/blob/0e3312ee/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 81d57d7..4833385 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -217,6 +217,12 @@ This file is divided into 3 sections:
     of Commons Lang 2 (package org.apache.commons.lang.*)</customMessage>
   </check>
 
+  <check customId="extractopt" level="error" 
class="org.scalastyle.scalariform.TokenChecker" enabled="true">
+    <parameters><parameter name="regex">extractOpt</parameter></parameters>
+    <customMessage>Use Utils.jsonOption(x).map(.extract[T]) instead of 
.extractOpt[T], as the latter
+    is slower.  </customMessage>
+  </check>
+
   <check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" 
enabled="true">
     <parameters>
       <parameter name="groups">java,scala,3rdParty,spark</parameter>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to