This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new e51e7b6  [bug] fix stream dataframe writing to doris json parse 
exception (#48)
e51e7b6 is described below

commit e51e7b618acce07e98893134e3084c81d3286bc6
Author: wei zhao <zhaowei_3...@163.com>
AuthorDate: Tue Sep 6 16:36:57 2022 +0800

    [bug] fix stream dataframe writing to doris json parse exception (#48)
    
     fix stream dataframe writing to doris json parse exception
---
 .../doris/spark/sql/DorisStreamLoadSink.scala      | 31 +++++++++++++---------
 1 file changed, 19 insertions(+), 12 deletions(-)

diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 2daaeb1..566eb3b 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.slf4j.{Logger, LoggerFactory}
 import java.io.IOException
-import java.util
 import org.apache.doris.spark.rest.RestService
 
 import scala.util.control.Breaks
@@ -48,21 +47,29 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
   }
 
   def write(queryExecution: QueryExecution): Unit = {
+    val schema = queryExecution.analyzed.output
+    // write for each partition
     queryExecution.toRdd.foreachPartition(iter => {
       val objectMapper = new ObjectMapper()
-      val arrayNode = objectMapper.createArrayNode()
+      val rowArray = objectMapper.createArrayNode()
       iter.foreach(row => {
-        val line: util.List[Object] = new util.ArrayList[Object](maxRowCount)
+        val rowNode = objectMapper.createObjectNode()
         for (i <- 0 until row.numFields) {
-          val field = row.copy().getUTF8String(i)
-          arrayNode.add(objectMapper.readTree(field.toString))
+          val colName = schema(i).name
+          val value = row.copy().getUTF8String(i)
+          if (value == null) {
+            rowNode.putNull(colName)
+          } else {
+            rowNode.put(colName, value.toString)
+          }
         }
-        if (arrayNode.size > maxRowCount - 1) {
+        rowArray.add(rowNode)
+        if (rowArray.size > maxRowCount - 1) {
           flush
         }
       })
       // flush buffer
-      if (!arrayNode.isEmpty) {
+      if (!rowArray.isEmpty) {
         flush
       }
 
@@ -76,8 +83,8 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
 
           for (i <- 0 to maxRetryTimes) {
             try {
-              dorisStreamLoader.load(arrayNode.toString)
-              arrayNode.removeAll()
+              dorisStreamLoader.load(rowArray.toString)
+              rowArray.removeAll()
               loop.break()
             }
             catch {
@@ -89,15 +96,15 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
                   Thread.sleep(1000 * i)
                 } catch {
                   case ex: InterruptedException =>
-                    logger.warn("Data that failed to load : " + 
arrayNode.toString)
+                    logger.warn("Data that failed to load : " + 
rowArray.toString)
                     Thread.currentThread.interrupt()
                     throw new IOException("unable to flush; interrupted while 
doing another attempt", e)
                 }
             }
           }
 
-          if (!arrayNode.isEmpty) {
-            logger.warn("Data that failed to load : " + arrayNode.toString)
+          if (!rowArray.isEmpty) {
+            logger.warn("Data that failed to load : " + rowArray.toString)
             throw new IOException(s"Failed to load data on BE: 
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
           }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to