Repository: spark Updated Branches: refs/heads/master 836c95b10 -> f7a574a6c
[SPARK-18708][CORE] Improvement/improve docs in spark context file ## What changes were proposed in this pull request? SparkContext.scala was created a long time ago and contains several types of Scaladocs/Javadocs mixed together. Public methods/fields should have a Scaladoc that is formatted in the same way everywhere. This pull request also adds scaladoc to methods/fields that did not have it before. ## How was this patch tested? No actual code was modified, only comments. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Aliaksandr.Bedrytski <[email protected]> Closes #16137 from Mironor/improvement/improve-docs-in-spark-context-file. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7a574a6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7a574a6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7a574a6 Branch: refs/heads/master Commit: f7a574a6cbfbf7adce677819ddc892ceab905ce2 Parents: 836c95b Author: Aliaksandr.Bedrytski <[email protected]> Authored: Fri Dec 16 17:47:08 2016 +0000 Committer: Sean Owen <[email protected]> Committed: Fri Dec 16 17:47:08 2016 +0000 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 198 ++++++++++++++++--- 1 file changed, 169 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f7a574a6/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index bd3f454..cae22d7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -673,10 +673,10 @@ class SparkContext(config: SparkConf) extends Logging { * sc.cancelJobGroup("some_job_to_cancel") * }}} * - * If interruptOnCancel is set to true for the job group, then job cancellation will result - * in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure - * that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, - * where HDFS may respond to Thread.interrupt() by marking nodes as dead. + * @param interruptOnCancel If true, then job cancellation will result in `Thread.interrupt()` + * being called on the job's executor threads. This is useful to help ensure that the tasks + * are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS + * may respond to Thread.interrupt() by marking nodes as dead. */ def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description) @@ -712,6 +712,9 @@ class SparkContext(config: SparkConf) extends Logging { * modified collection. Pass a copy of the argument to avoid this. * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions. + * @param seq Scala collection to distribute + * @param numSlices number of partitions to divide the collection into + * @return RDD representing distributed collection */ def parallelize[T: ClassTag]( seq: Seq[T], @@ -729,8 +732,8 @@ class SparkContext(config: SparkConf) extends Logging { * @param start the start value. * @param end the end value. * @param step the incremental step - * @param numSlices the partition number of the new RDD. - * @return + * @param numSlices number of partitions to divide the collection into + * @return RDD representing distributed range */ def range( start: Long, @@ -795,6 +798,9 @@ class SparkContext(config: SparkConf) extends Logging { /** Distribute a local Scala collection to form an RDD. * * This method is identical to `parallelize`. + * @param seq Scala collection to distribute + * @param numSlices number of partitions to divide the collection into + * @return RDD representing distributed collection */ def makeRDD[T: ClassTag]( seq: Seq[T], @@ -806,6 +812,8 @@ class SparkContext(config: SparkConf) extends Logging { * Distribute a local Scala collection to form an RDD, with one or more * location preferences (hostnames of Spark nodes) for each object. * Create a new partition for each collection item. + * @param seq list of tuples of data and location preferences (hostnames of Spark nodes) + * @return RDD representing data partitioned according to location preferences */ def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() @@ -816,6 +824,9 @@ class SparkContext(config: SparkConf) extends Logging { /** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. + * @param path path to the text file on a supported file system + * @param minPartitions suggested minimum number of partitions for the resulting RDD + * @return RDD of lines of the text file */ def textFile( path: String, @@ -857,6 +868,7 @@ class SparkContext(config: SparkConf) extends Logging { * @param path Directory to the input data files, the path can be comma separated paths as the * list of inputs. * @param minPartitions A suggestion value of the minimal splitting number for input data. + * @return RDD representing tuples of file path and the corresponding file content */ def wholeTextFiles( path: String, @@ -908,6 +920,7 @@ class SparkContext(config: SparkConf) extends Logging { * @param path Directory to the input data files, the path can be comma separated paths as the * list of inputs. * @param minPartitions A suggestion value of the minimal splitting number for input data. + * @return RDD representing tuples of file path and corresponding file content */ def binaryFiles( path: String, @@ -968,10 +981,11 @@ class SparkContext(config: SparkConf) extends Logging { * Therefore if you plan to reuse this conf to create multiple RDDs, you need to make * sure you won't modify the conf. A safe approach is always creating a new conf for * a new RDD. - * @param inputFormatClass Class of the InputFormat - * @param keyClass Class of the keys - * @param valueClass Class of the values + * @param inputFormatClass storage format of the data to be read + * @param keyClass `Class` of the key associated with the `inputFormatClass` parameter + * @param valueClass `Class` of the value associated with the `inputFormatClass` parameter * @param minPartitions Minimum number of Hadoop Splits to generate. + * @return RDD of tuples of key and corresponding value * * @note Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle @@ -1003,6 +1017,13 @@ class SparkContext(config: SparkConf) extends Logging { * operation will create many references to the same object. * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. + * @param path directory to the input data files, the path can be comma separated paths + * as a list of inputs + * @param inputFormatClass storage format of the data to be read + * @param keyClass `Class` of the key associated with the `inputFormatClass` parameter + * @param valueClass `Class` of the value associated with the `inputFormatClass` parameter + * @param minPartitions suggested minimum number of partitions for the resulting RDD + * @return RDD of tuples of key and corresponding value */ def hadoopFile[K, V]( path: String, @@ -1042,6 +1063,10 @@ class SparkContext(config: SparkConf) extends Logging { * operation will create many references to the same object. * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. + * @param path directory to the input data files, the path can be comma separated paths + * as a list of inputs + * @param minPartitions suggested minimum number of partitions for the resulting RDD + * @return RDD of tuples of key and corresponding value */ def hadoopFile[K, V, F <: InputFormat[K, V]] (path: String, minPartitions: Int) @@ -1066,13 +1091,32 @@ class SparkContext(config: SparkConf) extends Logging { * operation will create many references to the same object. * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. + * @param path directory to the input data files, the path can be comma separated paths as + * a list of inputs + * @return RDD of tuples of key and corresponding value */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope { hadoopFile[K, V, F](path, defaultMinPartitions) } - /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ + /** + * Smarter version of `newApiHadoopFile` that uses class tags to figure out the classes of keys, + * values and the `org.apache.hadoop.mapreduce.InputFormat` (new MapReduce API) so that user + * don't need to pass them directly. Instead, callers can just write, for example: + * ``` + * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path) + * ``` + * + * @note Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. + * @param path directory to the input data files, the path can be comma separated paths + * as a list of inputs + * @return RDD of tuples of key and corresponding value + */ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] (path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope { @@ -1092,6 +1136,13 @@ class SparkContext(config: SparkConf) extends Logging { * operation will create many references to the same object. * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. + * @param path directory to the input data files, the path can be comma separated paths + * as a list of inputs + * @param fClass storage format of the data to be read + * @param kClass `Class` of the key associated with the `fClass` parameter + * @param vClass `Class` of the value associated with the `fClass` parameter + * @param conf Hadoop configuration + * @return RDD of tuples of key and corresponding value */ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( path: String, @@ -1123,9 +1174,9 @@ class SparkContext(config: SparkConf) extends Logging { * Therefore if you plan to reuse this conf to create multiple RDDs, you need to make * sure you won't modify the conf. A safe approach is always creating a new conf for * a new RDD. - * @param fClass Class of the InputFormat - * @param kClass Class of the keys - * @param vClass Class of the values + * @param fClass storage format of the data to be read + * @param kClass `Class` of the key associated with the `fClass` parameter + * @param vClass `Class` of the value associated with the `fClass` parameter * * @note Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle @@ -1158,6 +1209,12 @@ class SparkContext(config: SparkConf) extends Logging { * operation will create many references to the same object. * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. + * @param path directory to the input data files, the path can be comma separated paths + * as a list of inputs + * @param keyClass `Class` of the key associated with `SequenceFileInputFormat` + * @param valueClass `Class` of the value associated with `SequenceFileInputFormat` + * @param minPartitions suggested minimum number of partitions for the resulting RDD + * @return RDD of tuples of key and corresponding value */ def sequenceFile[K, V](path: String, keyClass: Class[K], @@ -1177,6 +1234,11 @@ class SparkContext(config: SparkConf) extends Logging { * operation will create many references to the same object. * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. + * @param path directory to the input data files, the path can be comma separated paths + * as a list of inputs + * @param keyClass `Class` of the key associated with `SequenceFileInputFormat` + * @param valueClass `Class` of the value associated with `SequenceFileInputFormat` + * @return RDD of tuples of key and corresponding value */ def sequenceFile[K, V]( path: String, @@ -1207,6 +1269,10 @@ class SparkContext(config: SparkConf) extends Logging { * operation will create many references to the same object. * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. + * @param path directory to the input data files, the path can be comma separated paths + * as a list of inputs + * @param minPartitions suggested minimum number of partitions for the resulting RDD + * @return RDD of tuples of key and corresponding value */ def sequenceFile[K, V] (path: String, minPartitions: Int = defaultMinPartitions) @@ -1231,6 +1297,11 @@ class SparkContext(config: SparkConf) extends Logging { * be pretty slow if you use the default serializer (Java serialization), * though the nice thing about it is that there's very little effort required to save arbitrary * objects. + * + * @param path directory to the input data files, the path can be comma separated paths + * as a list of inputs + * @param minPartitions suggested minimum number of partitions for the resulting RDD + * @return RDD representing deserialized data from the file(s) */ def objectFile[T: ClassTag]( path: String, @@ -1410,6 +1481,9 @@ class SparkContext(config: SparkConf) extends Logging { * Broadcast a read-only variable to the cluster, returning a * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. * The variable will be sent to each cluster only once. + * + * @param value value to broadcast to the Spark nodes + * @return `Broadcast` object, a read-only variable cached on each machine */ def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() @@ -1424,8 +1498,9 @@ class SparkContext(config: SparkConf) extends Logging { /** * Add a file to be downloaded with this Spark job on every node. - * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported - * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, + * + * @param path can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, * use `SparkFiles.get(fileName)` to find its download location. */ def addFile(path: String): Unit = { @@ -1439,12 +1514,12 @@ class SparkContext(config: SparkConf) extends Logging { /** * Add a file to be downloaded with this Spark job on every node. - * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported - * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, - * use `SparkFiles.get(fileName)` to find its download location. * - * A directory can be given if the recursive option is set to true. Currently directories are only - * supported for Hadoop-supported filesystems. + * @param path can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, + * use `SparkFiles.get(fileName)` to find its download location. + * @param recursive if true, a directory can be given in `path`. Currently directories are + * only supported for Hadoop-supported filesystems. */ def addFile(path: String, recursive: Boolean): Unit = { val uri = new Path(path).toUri @@ -1715,9 +1790,9 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. - * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported - * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + * Adds a JAR dependency for all tasks to be executed on this `SparkContext` in the future. + * @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), + * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */ def addJar(path: String) { if (path == null) { @@ -1907,6 +1982,12 @@ class SparkContext(config: SparkConf) extends Logging { /** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @param partitions set of partitions to run on; some jobs may not want to compute on all + * partitions of the target RDD, e.g. for operations like `first()` + * @param resultHandler callback to pass each result to */ def runJob[T, U: ClassTag]( rdd: RDD[T], @@ -1929,6 +2010,14 @@ class SparkContext(config: SparkConf) extends Logging { /** * Run a function on a given set of partitions in an RDD and return the results as an array. + * The function that is run against each partition additionally takes `TaskContext` argument. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @param partitions set of partitions to run on; some jobs may not want to compute on all + * partitions of the target RDD, e.g. for operations like `first()` + * @return in-memory collection with a result of the job (each collection element will contain + * a result from one partition) */ def runJob[T, U: ClassTag]( rdd: RDD[T], @@ -1940,8 +2029,14 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Run a job on a given set of partitions of an RDD, but take a function of type - * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`. + * Run a function on a given set of partitions in an RDD and return the results as an array. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @param partitions set of partitions to run on; some jobs may not want to compute on all + * partitions of the target RDD, e.g. for operations like `first()` + * @return in-memory collection with a result of the job (each collection element will contain + * a result from one partition) */ def runJob[T, U: ClassTag]( rdd: RDD[T], @@ -1952,7 +2047,13 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Run a job on all partitions in an RDD and return the results in an array. + * Run a job on all partitions in an RDD and return the results in an array. The function + * that is run against each partition additionally takes `TaskContext` argument. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @return in-memory collection with a result of the job (each collection element will contain + * a result from one partition) */ def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length) @@ -1960,13 +2061,23 @@ class SparkContext(config: SparkConf) extends Logging { /** * Run a job on all partitions in an RDD and return the results in an array. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @return in-memory collection with a result of the job (each collection element will contain + * a result from one partition) */ def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length) } /** - * Run a job on all partitions in an RDD and pass the results to a handler function. + * Run a job on all partitions in an RDD and pass the results to a handler function. The function + * that is run against each partition additionally takes `TaskContext` argument. + * + * @param rdd target RDD to run tasks on + * @param processPartition a function to run on each partition of the RDD + * @param resultHandler callback to pass each result to */ def runJob[T, U: ClassTag]( rdd: RDD[T], @@ -1978,6 +2089,10 @@ class SparkContext(config: SparkConf) extends Logging { /** * Run a job on all partitions in an RDD and pass the results to a handler function. + * + * @param rdd target RDD to run tasks on + * @param processPartition a function to run on each partition of the RDD + * @param resultHandler callback to pass each result to */ def runJob[T, U: ClassTag]( rdd: RDD[T], @@ -1991,6 +2106,13 @@ class SparkContext(config: SparkConf) extends Logging { /** * :: DeveloperApi :: * Run a job that can return approximate results. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @param evaluator `ApproximateEvaluator` to receive the partial results + * @param timeout maximum time to wait for the job, in milliseconds + * @return partial result (how partial depends on whether the job was finished before or + * after timeout) */ @DeveloperApi def runApproximateJob[T, U, R]( @@ -2012,6 +2134,13 @@ class SparkContext(config: SparkConf) extends Logging { /** * Submit a job for execution and return a FutureJob holding the result. + * + * @param rdd target RDD to run tasks on + * @param processPartition a function to run on each partition of the RDD + * @param partitions set of partitions to run on; some jobs may not want to compute on all + * partitions of the target RDD, e.g. for operations like `first()` + * @param resultHandler callback to pass each result to + * @param resultFunc function to be executed when the result is ready */ def submitJob[T, U, R]( rdd: RDD[T], @@ -2096,6 +2225,7 @@ class SparkContext(config: SparkConf) extends Logging { * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability * @throws SparkException if <tt>checkSerializable</tt> is set but <tt>f</tt> is not * serializable + * @return the cleaned closure */ private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { ClosureCleaner.clean(f, checkSerializable) @@ -2103,8 +2233,9 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Set the directory under which RDDs are going to be checkpointed. The directory must - * be a HDFS path if running on a cluster. + * Set the directory under which RDDs are going to be checkpointed. + * @param directory path to the directory where checkpoint files will be stored + * (must be HDFS path if running in cluster) */ def setCheckpointDir(directory: String) { @@ -2311,6 +2442,8 @@ object SparkContext extends Logging { * * @note This function cannot be used to create multiple SparkContext instances * even if multiple contexts are allowed. + * @param config `SparkConfig` that will be used for initialisation of the `SparkContext` + * @return current `SparkContext` (or a new one if it wasn't created before the function call) */ def getOrCreate(config: SparkConf): SparkContext = { // Synchronize to ensure that multiple create requests don't trigger an exception @@ -2336,6 +2469,7 @@ object SparkContext extends Logging { * * @note This function cannot be used to create multiple SparkContext instances * even if multiple contexts are allowed. + * @return current `SparkContext` (or a new one if wasn't created before the function call) */ def getOrCreate(): SparkContext = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { @@ -2416,6 +2550,9 @@ object SparkContext extends Logging { /** * Find the JAR from which a given class was loaded, to make it easy for users to pass * their JARs to SparkContext. + * + * @param cls class that should be inside of the jar + * @return jar that contains the Class, `None` if not found */ def jarOfClass(cls: Class[_]): Option[String] = { val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class") @@ -2437,6 +2574,9 @@ object SparkContext extends Logging { * Find the JAR that contains the class of a particular object, to make it easy for users * to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in * your driver program. + * + * @param obj reference to an instance which class should be inside of the jar + * @return jar that contains the class of the instance, `None` if not found */ def jarOfObject(obj: AnyRef): Option[String] = jarOfClass(obj.getClass) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
