This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 3e745e7 [Improve] decrease memory usage when csv&gzip is on (#212) 3e745e7 is described below commit 3e745e732fdade8a26856bd92026b44fd02d2787 Author: zhaorongsheng <zhaorongsh...@users.noreply.github.com> AuthorDate: Mon Jul 1 10:20:40 2024 +0800 [Improve] decrease memory usage when csv&gzip is on (#212) Co-authored-by: zhaorongsheng <zhaorongsh...@corp.netease.com> --- .../org/apache/doris/spark/load/StreamLoader.scala | 33 +++++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala index 9481b6f..06bb56f 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala @@ -20,6 +20,7 @@ package org.apache.doris.spark.load import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.json.JsonMapper +import org.apache.commons.io.IOUtils import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} @@ -38,7 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType import org.slf4j.{Logger, LoggerFactory} -import java.io.{ByteArrayOutputStream, IOException} +import java.io.{ByteArrayOutputStream, IOException, InputStream} import java.net.{HttpURLConnection, URL} import java.nio.charset.StandardCharsets import java.util @@ -375,14 +376,13 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader if (compressType.nonEmpty) { if ("gz".equalsIgnoreCase(compressType.get) && format == DataFormat.CSV) { - val recordBatchString = new RecordBatchString(RecordBatch.newBuilder(iterator.asJava) + val recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(iterator.asJava) .format(format) .sep(columnSeparator) .delim(lineDelimiter) .schema(schema) .addDoubleQuotes(addDoubleQuotes).build, streamingPassthrough) - val content = recordBatchString.getContent - val compressedData = compressByGZ(content) + val compressedData = compressByGZ(recodeBatchInputStream) entity = Some(new ByteArrayEntity(compressedData)) } else { @@ -457,6 +457,31 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader compressedData } + /** + * compress data by gzip + * + * @param contentInputStream data content + * @throws + * @return compressed byte array data + */ + @throws[IOException] + def compressByGZ(contentInputStream: InputStream): Array[Byte] = { + var compressedData: Array[Byte] = null + try { + val baos = new ByteArrayOutputStream + val gzipOutputStream = new GZIPOutputStream(baos) + try { + IOUtils.copy(contentInputStream, gzipOutputStream) + gzipOutputStream.finish() + compressedData = baos.toByteArray + } finally { + if (baos != null) baos.close() + if (gzipOutputStream != null) gzipOutputStream.close() + } + } + compressedData + } + /** * handle stream load response * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org