This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e745e7  [Improve] decrease memory usage when csv&gzip is on (#212)
3e745e7 is described below

commit 3e745e732fdade8a26856bd92026b44fd02d2787
Author: zhaorongsheng <zhaorongsh...@users.noreply.github.com>
AuthorDate: Mon Jul 1 10:20:40 2024 +0800

    [Improve] decrease memory usage when csv&gzip is on (#212)
    
    Co-authored-by: zhaorongsheng <zhaorongsh...@corp.netease.com>
---
 .../org/apache/doris/spark/load/StreamLoader.scala | 33 +++++++++++++++++++---
 1 file changed, 29 insertions(+), 4 deletions(-)

diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
index 9481b6f..06bb56f 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
@@ -20,6 +20,7 @@ package org.apache.doris.spark.load
 import com.fasterxml.jackson.core.`type`.TypeReference
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.databind.json.JsonMapper
+import org.apache.commons.io.IOUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
@@ -38,7 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.StructType
 import org.slf4j.{Logger, LoggerFactory}
 
-import java.io.{ByteArrayOutputStream, IOException}
+import java.io.{ByteArrayOutputStream, IOException, InputStream}
 import java.net.{HttpURLConnection, URL}
 import java.nio.charset.StandardCharsets
 import java.util
@@ -375,14 +376,13 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
 
     if (compressType.nonEmpty) {
       if ("gz".equalsIgnoreCase(compressType.get) && format == DataFormat.CSV) 
{
-        val recordBatchString = new 
RecordBatchString(RecordBatch.newBuilder(iterator.asJava)
+        val recodeBatchInputStream = new 
RecordBatchInputStream(RecordBatch.newBuilder(iterator.asJava)
           .format(format)
           .sep(columnSeparator)
           .delim(lineDelimiter)
           .schema(schema)
           .addDoubleQuotes(addDoubleQuotes).build, streamingPassthrough)
-        val content = recordBatchString.getContent
-        val compressedData = compressByGZ(content)
+        val compressedData = compressByGZ(recodeBatchInputStream)
         entity = Some(new ByteArrayEntity(compressedData))
       }
       else {
@@ -457,6 +457,31 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
     compressedData
   }
 
+  /**
+   * compress data by gzip
+   *
+   * @param contentInputStream data content
+   * @throws
+   * @return compressed byte array data
+   */
+  @throws[IOException]
+  def compressByGZ(contentInputStream: InputStream): Array[Byte] = {
+    var compressedData: Array[Byte] = null
+    try {
+      val baos = new ByteArrayOutputStream
+      val gzipOutputStream = new GZIPOutputStream(baos)
+      try {
+        IOUtils.copy(contentInputStream, gzipOutputStream)
+        gzipOutputStream.finish()
+        compressedData = baos.toByteArray
+      } finally {
+        if (baos != null) baos.close()
+        if (gzipOutputStream != null) gzipOutputStream.close()
+      }
+    }
+    compressedData
+  }
+
   /**
    * handle stream load response
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to