This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new e51e7b6 [bug] fix stream dataframe writing to doris json parse exception (#48) e51e7b6 is described below commit e51e7b618acce07e98893134e3084c81d3286bc6 Author: wei zhao <zhaowei_3...@163.com> AuthorDate: Tue Sep 6 16:36:57 2022 +0800 [bug] fix stream dataframe writing to doris json parse exception (#48) fix stream dataframe writing to doris json parse exception --- .../doris/spark/sql/DorisStreamLoadSink.scala | 31 +++++++++++++--------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala index 2daaeb1..566eb3b 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.{DataFrame, SQLContext} import org.slf4j.{Logger, LoggerFactory} import java.io.IOException -import java.util import org.apache.doris.spark.rest.RestService import scala.util.control.Breaks @@ -48,21 +47,29 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe } def write(queryExecution: QueryExecution): Unit = { + val schema = queryExecution.analyzed.output + // write for each partition queryExecution.toRdd.foreachPartition(iter => { val objectMapper = new ObjectMapper() - val arrayNode = objectMapper.createArrayNode() + val rowArray = objectMapper.createArrayNode() iter.foreach(row => { - val line: util.List[Object] = new util.ArrayList[Object](maxRowCount) + val rowNode = objectMapper.createObjectNode() for (i <- 0 until row.numFields) { - val field = row.copy().getUTF8String(i) - arrayNode.add(objectMapper.readTree(field.toString)) + val colName = schema(i).name + val value = row.copy().getUTF8String(i) + if (value == null) { + rowNode.putNull(colName) + } else { + rowNode.put(colName, value.toString) + } } - if (arrayNode.size > maxRowCount - 1) { + rowArray.add(rowNode) + if (rowArray.size > maxRowCount - 1) { flush } }) // flush buffer - if (!arrayNode.isEmpty) { + if (!rowArray.isEmpty) { flush } @@ -76,8 +83,8 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe for (i <- 0 to maxRetryTimes) { try { - dorisStreamLoader.load(arrayNode.toString) - arrayNode.removeAll() + dorisStreamLoader.load(rowArray.toString) + rowArray.removeAll() loop.break() } catch { @@ -89,15 +96,15 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe Thread.sleep(1000 * i) } catch { case ex: InterruptedException => - logger.warn("Data that failed to load : " + arrayNode.toString) + logger.warn("Data that failed to load : " + rowArray.toString) Thread.currentThread.interrupt() throw new IOException("unable to flush; interrupted while doing another attempt", e) } } } - if (!arrayNode.isEmpty) { - logger.warn("Data that failed to load : " + arrayNode.toString) + if (!rowArray.isEmpty) { + logger.warn("Data that failed to load : " + rowArray.toString) throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org