This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
commit 24c633c9ae69072e0e178c0c0d0d7aab432d2266 Author: 董涛 <782112...@qq.com> AuthorDate: Tue Jan 11 15:03:06 2022 +0800 [improvement](spark-connector) Throw an exception when the data push fails and there are too many retries (#7531) --- .../java/org/apache/doris/spark/DorisStreamLoad.java | 9 +++++++-- .../org/apache/doris/spark/sql/DorisSourceProvider.scala | 16 ++++++++++++++-- .../org/apache/doris/spark/sql/DorisStreamLoadSink.scala | 13 ++++++++++++- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index dd7e48c..ec3892d 100644 --- a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -138,7 +138,7 @@ public class DorisStreamLoad implements Serializable{ } } - public void load(List<List<Object>> rows) throws StreamLoadException { + public String listToString(List<List<Object>> rows){ StringJoiner lines = new StringJoiner(LINE_DELIMITER); for (List<Object> row : rows) { StringJoiner line = new StringJoiner(FIELD_DELIMITER); @@ -151,9 +151,14 @@ public class DorisStreamLoad implements Serializable{ } lines.add(line.toString()); } - load(lines.toString()); + return lines.toString(); } + + public void load(List<List<Object>> rows) throws StreamLoadException { + String records = listToString(rows); + load(records); + } public void load(String value) throws StreamLoadException { LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value); LoadResponse loadResponse = loadBatch(value); diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index 12b7608..9b7d3f0 100644 --- a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -27,16 +27,19 @@ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import org.slf4j.{Logger, LoggerFactory} - import java.io.IOException import java.util + +import org.apache.doris.spark.rest.RestService + import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.util.control.Breaks private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider - with StreamSinkProvider { + with StreamSinkProvider + with Serializable { private val logger: Logger = LoggerFactory.getLogger(classOf[DorisSourceProvider].getName) @@ -97,14 +100,23 @@ private[sql] class DorisSourceProvider extends DataSourceRegister catch { case e: Exception => try { + logger.warn("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr) + //If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again + dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings,logger)) Thread.sleep(1000 * i) } catch { case ex: InterruptedException => + logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer)) Thread.currentThread.interrupt() throw new IOException("unable to flush; interrupted while doing another attempt", e) } } } + + if(!rowsBuffer.isEmpty){ + logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer)) + throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.") + } } } diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala index edd08f1..6e73698 100644 --- a/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala @@ -23,9 +23,11 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.{DataFrame, SQLContext} import org.slf4j.{Logger, LoggerFactory} - import java.io.IOException import java.util + +import org.apache.doris.spark.rest.RestService + import scala.util.control.Breaks private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSettings) extends Sink with Serializable { @@ -81,14 +83,23 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe catch { case e: Exception => try { + logger.warn("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr) + //If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again + dorisStreamLoader.setHostPort(RestService.randomBackendV2(settings,logger)) Thread.sleep(1000 * i) } catch { case ex: InterruptedException => + logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer)) Thread.currentThread.interrupt() throw new IOException("unable to flush; interrupted while doing another attempt", e) } } } + + if(!rowsBuffer.isEmpty){ + logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer)) + throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.") + } } } }) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org