Repository: spark Updated Branches: refs/heads/master 65338de5f -> 44c400315
[SPARK-22400][SQL] rename some APIs and classes to make their meaning clearer ## What changes were proposed in this pull request? Both `ReadSupport` and `ReadTask` have a method called `createReader`, but they create different things. This could cause some confusion for data source developers. The same issue exists between `WriteSupport` and `DataWriterFactory`, both of which have a method called `createWriter`. This PR renames the method of `ReadTask`/`DataWriterFactory` to `createDataReader`/`createDataWriter`. Besides, the name of `RowToInternalRowDataWriterFactory` is not correct, because it actually converts `InternalRow`s to `Row`s. It should be renamed `InternalRowDataWriterFactory`. ## How was this patch tested? Only renaming, should be covered by existing tests. Author: Zhenhua Wang <[email protected]> Closes #19610 from wzhfy/rename. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44c40031 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44c40031 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44c40031 Branch: refs/heads/master Commit: 44c4003155c1d243ffe0f73d5537b4c8b3f3b564 Parents: 65338de Author: Zhenhua Wang <[email protected]> Authored: Mon Oct 30 10:21:05 2017 -0700 Committer: gatorsmile <[email protected]> Committed: Mon Oct 30 10:21:05 2017 -0700 ---------------------------------------------------------------------- .../spark/sql/sources/v2/reader/DataReader.java | 4 ++-- .../apache/spark/sql/sources/v2/reader/ReadTask.java | 2 +- .../spark/sql/sources/v2/writer/DataWriter.java | 6 +++--- .../sql/sources/v2/writer/DataWriterFactory.java | 2 +- .../sql/execution/datasources/v2/DataSourceRDD.scala | 2 +- .../datasources/v2/DataSourceV2ScanExec.scala | 5 +++-- .../datasources/v2/WriteToDataSourceV2.scala | 14 +++++++------- .../sql/sources/v2/JavaAdvancedDataSourceV2.java | 2 +- .../spark/sql/sources/v2/JavaSimpleDataSourceV2.java | 2 +- .../sql/sources/v2/JavaUnsafeRowDataSourceV2.java | 2 +- .../spark/sql/sources/v2/DataSourceV2Suite.scala | 8 +++++--- .../sql/sources/v2/SimpleWritableDataSource.scala | 9 ++++----- 12 files changed, 30 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java index 95e0915..52bb138 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java @@ -22,8 +22,8 @@ import java.io.Closeable; import org.apache.spark.annotation.InterfaceStability; /** - * A data reader returned by {@link ReadTask#createReader()} and is responsible for outputting data - * for a RDD partition. + * A data reader returned by {@link ReadTask#createDataReader()} and is responsible for + * outputting data for a RDD partition. * * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data * source readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java index 01362df..44786db 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java @@ -45,5 +45,5 @@ public interface ReadTask<T> extends Serializable { /** * Returns a data reader to do the actual reading work for this read task. */ - DataReader<T> createReader(); + DataReader<T> createDataReader(); } http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java index 1426141..d84afba 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java @@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2.writer; import org.apache.spark.annotation.InterfaceStability; /** - * A data writer returned by {@link DataWriterFactory#createWriter(int, int)} and is + * A data writer returned by {@link DataWriterFactory#createDataWriter(int, int)} and is * responsible for writing data for an input RDD partition. * * One Spark task has one exclusive data writer, so there is no thread-safe concern. @@ -34,7 +34,7 @@ import org.apache.spark.annotation.InterfaceStability; * {@link DataSourceV2Writer#commit(WriterCommitMessage[])} with commit messages from other data * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an * exception will be sent to the driver side, and Spark will retry this writing task for some times, - * each time {@link DataWriterFactory#createWriter(int, int)} gets a different `attemptNumber`, + * each time {@link DataWriterFactory#createDataWriter(int, int)} gets a different `attemptNumber`, * and finally call {@link DataSourceV2Writer#abort(WriterCommitMessage[])} if all retry fail. * * Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task @@ -64,7 +64,7 @@ public interface DataWriter<T> { /** * Commits this writer after all records are written successfully, returns a commit message which - * will be send back to driver side and pass to + * will be sent back to driver side and passed to * {@link DataSourceV2Writer#commit(WriterCommitMessage[])}. * * The written data should only be visible to data source readers after http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java index f812d10..fe56cc0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java @@ -46,5 +46,5 @@ public interface DataWriterFactory<T> extends Serializable { * tasks with the same task id running at the same time. Implementations can * use this attempt number to distinguish writers of different task attempts. */ - DataWriter<T> createWriter(int partitionId, int attemptNumber); + DataWriter<T> createDataWriter(int partitionId, int attemptNumber); } http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index b8fe5ac..5f30be5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -39,7 +39,7 @@ class DataSourceRDD( } override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { - val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createReader() + val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader() context.addTaskCompletionListener(_ => reader.close()) val iter = new Iterator[UnsafeRow] { private[this] var valuePrepared = false http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index addc12a..3f243dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -67,8 +67,9 @@ class RowToUnsafeRowReadTask(rowReadTask: ReadTask[Row], schema: StructType) override def preferredLocations: Array[String] = rowReadTask.preferredLocations - override def createReader: DataReader[UnsafeRow] = { - new RowToUnsafeDataReader(rowReadTask.createReader, RowEncoder.apply(schema).resolveAndBind()) + override def createDataReader: DataReader[UnsafeRow] = { + new RowToUnsafeDataReader( + rowReadTask.createDataReader, RowEncoder.apply(schema).resolveAndBind()) } } http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index 92c1e1f..b72d15e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -48,7 +48,7 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan) override protected def doExecute(): RDD[InternalRow] = { val writeTask = writer match { case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() - case _ => new RowToInternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) + case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) } val rdd = query.execute() @@ -93,7 +93,7 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { - val dataWriter = writeTask.createWriter(context.partitionId(), context.attemptNumber()) + val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { @@ -111,18 +111,18 @@ object DataWritingSparkTask extends Logging { } } -class RowToInternalRowDataWriterFactory( +class InternalRowDataWriterFactory( rowWriterFactory: DataWriterFactory[Row], schema: StructType) extends DataWriterFactory[InternalRow] { - override def createWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = { - new RowToInternalRowDataWriter( - rowWriterFactory.createWriter(partitionId, attemptNumber), + override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = { + new InternalRowDataWriter( + rowWriterFactory.createDataWriter(partitionId, attemptNumber), RowEncoder.apply(schema).resolveAndBind()) } } -class RowToInternalRowDataWriter(rowWriter: DataWriter[Row], encoder: ExpressionEncoder[Row]) +class InternalRowDataWriter(rowWriter: DataWriter[Row], encoder: ExpressionEncoder[Row]) extends DataWriter[InternalRow] { override def write(record: InternalRow): Unit = rowWriter.write(encoder.fromRow(record)) http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java index da2c13f..1cfdc08 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java @@ -100,7 +100,7 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport { } @Override - public DataReader<Row> createReader() { + public DataReader<Row> createDataReader() { return new JavaAdvancedReadTask(start - 1, end, requiredSchema); } http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java index 08469f1..2d458b7 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java @@ -58,7 +58,7 @@ public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport { } @Override - public DataReader<Row> createReader() { + public DataReader<Row> createDataReader() { return new JavaSimpleReadTask(start - 1, end); } http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java index 9efe7c7..f6aa008 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java @@ -58,7 +58,7 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport { } @Override - public DataReader<UnsafeRow> createReader() { + public DataReader<UnsafeRow> createDataReader() { return new JavaUnsafeRowReadTask(start - 1, end); } http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 092702a..ab37e49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -167,7 +167,7 @@ class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport { class SimpleReadTask(start: Int, end: Int) extends ReadTask[Row] with DataReader[Row] { private var current = start - 1 - override def createReader(): DataReader[Row] = new SimpleReadTask(start, end) + override def createDataReader(): DataReader[Row] = new SimpleReadTask(start, end) override def next(): Boolean = { current += 1 @@ -233,7 +233,9 @@ class AdvancedReadTask(start: Int, end: Int, requiredSchema: StructType) private var current = start - 1 - override def createReader(): DataReader[Row] = new AdvancedReadTask(start, end, requiredSchema) + override def createDataReader(): DataReader[Row] = { + new AdvancedReadTask(start, end, requiredSchema) + } override def close(): Unit = {} @@ -273,7 +275,7 @@ class UnsafeRowReadTask(start: Int, end: Int) private var current = start - 1 - override def createReader(): DataReader[UnsafeRow] = new UnsafeRowReadTask(start, end) + override def createDataReader(): DataReader[UnsafeRow] = new UnsafeRowReadTask(start, end) override def next(): Boolean = { current += 1 http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 6fb60f4..cd7252e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.sources.v2 import java.io.{BufferedReader, InputStreamReader, IOException} -import java.text.SimpleDateFormat -import java.util.{Collections, Date, List => JList, Locale, Optional, UUID} +import java.util.{Collections, List => JList, Optional} import scala.collection.JavaConverters._ @@ -157,7 +156,7 @@ class SimpleCSVReadTask(path: String, conf: SerializableConfiguration) @transient private var currentLine: String = _ @transient private var inputStream: FSDataInputStream = _ - override def createReader(): DataReader[Row] = { + override def createDataReader(): DataReader[Row] = { val filePath = new Path(path) val fs = filePath.getFileSystem(conf.value) inputStream = fs.open(filePath) @@ -185,7 +184,7 @@ class SimpleCSVReadTask(path: String, conf: SerializableConfiguration) class SimpleCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration) extends DataWriterFactory[Row] { - override def createWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = { + override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = { val jobPath = new Path(new Path(path, "_temporary"), jobId) val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber") val fs = filePath.getFileSystem(conf.value) @@ -218,7 +217,7 @@ class SimpleCSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[Row] { class InternalRowCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration) extends DataWriterFactory[InternalRow] { - override def createWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = { + override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = { val jobPath = new Path(new Path(path, "_temporary"), jobId) val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber") val fs = filePath.getFileSystem(conf.value) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
