Repository: spark Updated Branches: refs/heads/master d2f4f30b1 -> 9e4b4bd08
Revert "SPARK-2038: rename "conf" parameters in the saveAsHadoop functions" This reverts commit 443f5e1bbcf9ec55e5ce6e4f738a002a47818100. This commit unfortunately would break source compatibility if users have named the hadoopConf parameter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e4b4bd0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e4b4bd0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e4b4bd0 Branch: refs/heads/master Commit: 9e4b4bd0837cfc4ef1af1edcbc56290821e49e92 Parents: d2f4f30 Author: Patrick Wendell <[email protected]> Authored: Tue Jun 17 19:34:17 2014 -0700 Committer: Patrick Wendell <[email protected]> Committed: Tue Jun 17 19:34:17 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/rdd/PairRDDFunctions.scala | 49 ++++++++++---------- 1 file changed, 24 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9e4b4bd0/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index bff77b4..fe36c80 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -719,9 +719,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - hadoopConf: Configuration = self.context.hadoopConfiguration) + conf: Configuration = self.context.hadoopConfiguration) { - val job = new NewAPIHadoopJob(hadoopConf) + val job = new NewAPIHadoopJob(conf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) job.setOutputFormatClass(outputFormatClass) @@ -752,25 +752,24 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], - hadoopConf: JobConf = new JobConf(self.context.hadoopConfiguration), + conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None) { - hadoopConf.setOutputKeyClass(keyClass) - hadoopConf.setOutputValueClass(valueClass) + conf.setOutputKeyClass(keyClass) + conf.setOutputValueClass(valueClass) // Doesn't work in Scala 2.9 due to what may be a generics bug // TODO: Should we uncomment this for Scala 2.10? // conf.setOutputFormat(outputFormatClass) - hadoopConf.set("mapred.output.format.class", outputFormatClass.getName) + conf.set("mapred.output.format.class", outputFormatClass.getName) for (c <- codec) { - hadoopConf.setCompressMapOutput(true) - hadoopConf.set("mapred.output.compress", "true") - hadoopConf.setMapOutputCompressorClass(c) - hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) - hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) + conf.setCompressMapOutput(true) + conf.set("mapred.output.compress", "true") + conf.setMapOutputCompressorClass(c) + conf.set("mapred.output.compression.codec", c.getCanonicalName) + conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } - hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath(hadoopConf, - SparkHadoopWriter.createPathFromString(path, hadoopConf)) - saveAsHadoopDataset(hadoopConf) + conf.setOutputCommitter(classOf[FileOutputCommitter]) + FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf)) + saveAsHadoopDataset(conf) } /** @@ -779,8 +778,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * output paths required (e.g. a table name to write to) in the same way as it would be * configured for a Hadoop MapReduce job. */ - def saveAsNewAPIHadoopDataset(hadoopConf: Configuration) { - val job = new NewAPIHadoopJob(hadoopConf) + def saveAsNewAPIHadoopDataset(conf: Configuration) { + val job = new NewAPIHadoopJob(conf) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = self.id @@ -836,10 +835,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */ - def saveAsHadoopDataset(hadoopConf: JobConf) { - val outputFormatInstance = hadoopConf.getOutputFormat - val keyClass = hadoopConf.getOutputKeyClass - val valueClass = hadoopConf.getOutputValueClass + def saveAsHadoopDataset(conf: JobConf) { + val outputFormatInstance = conf.getOutputFormat + val keyClass = conf.getOutputKeyClass + val valueClass = conf.getOutputValueClass if (outputFormatInstance == null) { throw new SparkException("Output format class not set") } @@ -849,18 +848,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) if (valueClass == null) { throw new SparkException("Output value class not set") } - SparkHadoopUtil.get.addCredentials(hadoopConf) + SparkHadoopUtil.get.addCredentials(conf) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { // FileOutputFormat ignores the filesystem parameter - val ignoredFs = FileSystem.get(hadoopConf) - hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) + val ignoredFs = FileSystem.get(conf) + conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf) } - val writer = new SparkHadoopWriter(hadoopConf) + val writer = new SparkHadoopWriter(conf) writer.preSetup() def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
