This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git

commit 24c633c9ae69072e0e178c0c0d0d7aab432d2266
Author: 董涛 <782112...@qq.com>
AuthorDate: Tue Jan 11 15:03:06 2022 +0800

    [improvement](spark-connector) Throw an exception when the data push fails 
and there are too many retries (#7531)
---
 .../java/org/apache/doris/spark/DorisStreamLoad.java     |  9 +++++++--
 .../org/apache/doris/spark/sql/DorisSourceProvider.scala | 16 ++++++++++++++--
 .../org/apache/doris/spark/sql/DorisStreamLoadSink.scala | 13 ++++++++++++-
 3 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java 
b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index dd7e48c..ec3892d 100644
--- a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -138,7 +138,7 @@ public class DorisStreamLoad implements Serializable{
         }
     }
 
-    public void load(List<List<Object>> rows) throws StreamLoadException {
+    public String listToString(List<List<Object>> rows){
         StringJoiner lines = new StringJoiner(LINE_DELIMITER);
         for (List<Object> row : rows) {
             StringJoiner line = new StringJoiner(FIELD_DELIMITER);
@@ -151,9 +151,14 @@ public class DorisStreamLoad implements Serializable{
             }
             lines.add(line.toString());
         }
-        load(lines.toString());
+        return lines.toString();
     }
 
+
+    public void load(List<List<Object>> rows) throws StreamLoadException {
+        String records = listToString(rows);
+        load(records);
+    }
     public void load(String value) throws StreamLoadException {
         LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
         LoadResponse loadResponse = loadBatch(value);
diff --git 
a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala 
b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 12b7608..9b7d3f0 100644
--- a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -27,16 +27,19 @@ import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 import org.slf4j.{Logger, LoggerFactory}
-
 import java.io.IOException
 import java.util
+
+import org.apache.doris.spark.rest.RestService
+
 import scala.collection.JavaConverters.mapAsJavaMapConverter
 import scala.util.control.Breaks
 
 private[sql] class DorisSourceProvider extends DataSourceRegister
   with RelationProvider
   with CreatableRelationProvider
-  with StreamSinkProvider {
+  with StreamSinkProvider
+  with Serializable {
 
   private val logger: Logger = 
LoggerFactory.getLogger(classOf[DorisSourceProvider].getName)
 
@@ -97,14 +100,23 @@ private[sql] class DorisSourceProvider extends 
DataSourceRegister
             catch {
               case e: Exception =>
                 try {
+                  logger.warn("Failed to load data on BE: {} node ", 
dorisStreamLoader.getLoadUrlStr)
+                  //If the current BE node fails to execute Stream Load, 
randomly switch to other BE nodes and try again
+                  
dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings,logger))
                   Thread.sleep(1000 * i)
                 } catch {
                   case ex: InterruptedException =>
+                    logger.warn("Data that failed to load : " + 
dorisStreamLoader.listToString(rowsBuffer))
                     Thread.currentThread.interrupt()
                     throw new IOException("unable to flush; interrupted while 
doing another attempt", e)
                 }
             }
           }
+
+          if(!rowsBuffer.isEmpty){
+            logger.warn("Data that failed to load : " + 
dorisStreamLoader.listToString(rowsBuffer))
+            throw new IOException(s"Failed to load data on BE: 
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
+          }
         }
 
       }
diff --git 
a/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala 
b/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index edd08f1..6e73698 100644
--- a/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -23,9 +23,11 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.slf4j.{Logger, LoggerFactory}
-
 import java.io.IOException
 import java.util
+
+import org.apache.doris.spark.rest.RestService
+
 import scala.util.control.Breaks
 
 private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: 
SparkSettings) extends Sink with Serializable {
@@ -81,14 +83,23 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
             catch {
               case e: Exception =>
                 try {
+                  logger.warn("Failed to load data on BE: {} node ", 
dorisStreamLoader.getLoadUrlStr)
+                  //If the current BE node fails to execute Stream Load, 
randomly switch to other BE nodes and try again
+                  
dorisStreamLoader.setHostPort(RestService.randomBackendV2(settings,logger))
                   Thread.sleep(1000 * i)
                 } catch {
                   case ex: InterruptedException =>
+                    logger.warn("Data that failed to load : " + 
dorisStreamLoader.listToString(rowsBuffer))
                     Thread.currentThread.interrupt()
                     throw new IOException("unable to flush; interrupted while 
doing another attempt", e)
                 }
             }
           }
+
+          if(!rowsBuffer.isEmpty){
+            logger.warn("Data that failed to load : " + 
dorisStreamLoader.listToString(rowsBuffer))
+            throw new IOException(s"Failed to load data on BE: 
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
+          }
         }
       }
     })

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to