Repository: spark Updated Branches: refs/heads/branch-2.0 4462da707 -> 6eb8ec6f4
[SPARK-14031][SQL] speedup CSV writer ## What changes were proposed in this pull request? Currently, we create an CSVWriter for every row, it's very expensive and memory hungry, took about 15 seconds to write out 1 mm rows (two columns). This PR will write the rows in batch mode, create a CSVWriter for every 1k rows, which could write out 1 mm rows in about 1 seconds (15X faster). ## How was this patch tested? Manually benchmark it. Author: Davies Liu <[email protected]> Closes #13229 from davies/csv_writer. (cherry picked from commit 80091b8a6840b562cf76341926e5b828d4def7e2) Signed-off-by: Cheng Lian <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6eb8ec6f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6eb8ec6f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6eb8ec6f Branch: refs/heads/branch-2.0 Commit: 6eb8ec6f41d7e460ba7f706590d2da62c045aead Parents: 4462da7 Author: Davies Liu <[email protected]> Authored: Mon May 23 10:48:25 2016 -0700 Committer: Cheng Lian <[email protected]> Committed: Mon May 23 10:48:39 2016 -0700 ---------------------------------------------------------------------- .../execution/datasources/csv/CSVParser.scala | 19 +++++++++++----- .../execution/datasources/csv/CSVRelation.scala | 23 +++++++++++++------- 2 files changed, 29 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6eb8ec6f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala index ae797a1..111995d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala @@ -76,17 +76,26 @@ private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten writerSettings.setQuoteAllFields(false) writerSettings.setHeaders(headers: _*) - def writeRow(row: Seq[String], includeHeader: Boolean): String = { - val buffer = new ByteArrayOutputStream() - val outputWriter = new OutputStreamWriter(buffer, StandardCharsets.UTF_8) - val writer = new CsvWriter(outputWriter, writerSettings) + private var buffer = new ByteArrayOutputStream() + private var writer = new CsvWriter( + new OutputStreamWriter(buffer, StandardCharsets.UTF_8), + writerSettings) + def writeRow(row: Seq[String], includeHeader: Boolean): Unit = { if (includeHeader) { writer.writeHeaders() } writer.writeRow(row.toArray: _*) + } + + def flush(): String = { writer.close() - buffer.toString.stripLineEnd + val lines = buffer.toString.stripLineEnd + buffer = new ByteArrayOutputStream() + writer = new CsvWriter( + new OutputStreamWriter(buffer, StandardCharsets.UTF_8), + writerSettings) + lines } } http://git-wip-us.apache.org/repos/asf/spark/blob/6eb8ec6f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 4f2d438..9849484 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -176,8 +176,8 @@ private[sql] class CsvOutputWriter( }.getRecordWriter(context) } - private var firstRow: Boolean = params.headerFlag - + private val FLUSH_BATCH_SIZE = 1024L + private var records: Long = 0L private val csvWriter = new LineCsvWriter(params, dataSchema.fieldNames.toSeq) private def rowToString(row: Seq[Any]): Seq[String] = row.map { field => @@ -191,16 +191,23 @@ private[sql] class CsvOutputWriter( override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") override protected[sql] def writeInternal(row: InternalRow): Unit = { - // TODO: Instead of converting and writing every row, we should use the univocity buffer - val resultString = csvWriter.writeRow(rowToString(row.toSeq(dataSchema)), firstRow) - if (firstRow) { - firstRow = false + csvWriter.writeRow(rowToString(row.toSeq(dataSchema)), records == 0L && params.headerFlag) + records += 1 + if (records % FLUSH_BATCH_SIZE == 0) { + flush() + } + } + + private def flush(): Unit = { + val lines = csvWriter.flush() + if (lines.nonEmpty) { + text.set(lines) + recordWriter.write(NullWritable.get(), text) } - text.set(resultString) - recordWriter.write(NullWritable.get(), text) } override def close(): Unit = { + flush() recordWriter.close(context) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
