This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3789128  [improvement] add retry interval when load fail (#307)
3789128 is described below

commit 3789128a60cd13152ba3ac38227748be94276442
Author: wudi <676366...@qq.com>
AuthorDate: Tue Apr 8 21:56:59 2025 +0800

    [improvement] add retry interval when load fail (#307)
---
 .../main/java/org/apache/doris/spark/config/DorisOptions.java    | 1 +
 .../src/main/scala/org/apache/doris/spark/sql/Utils.scala        | 1 -
 .../org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala   | 4 ++--
 .../main/scala/org/apache/doris/spark/writer/DorisWriter.scala   | 9 ++++++---
 .../scala/org/apache/doris/spark/write/DorisDataWriter.scala     | 4 +++-
 5 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 4319688..a8d29ae 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -68,6 +68,7 @@ public class DorisOptions {
     public static final ConfigOption<Integer> DORIS_SINK_BATCH_SIZE = 
ConfigOptions.name("doris.sink.batch.size").intType().defaultValue(100000).withDescription("");
 
     public static final ConfigOption<Integer> DORIS_SINK_MAX_RETRIES = 
ConfigOptions.name("doris.sink.max-retries").intType().defaultValue(0).withDescription("");
+    public static final ConfigOption<Integer> DORIS_SINK_RETRY_INTERVAL_MS = 
ConfigOptions.name("doris.sink.retry.interval.ms").intType().defaultValue(10000).withDescription("The
 interval at which the Spark connector tries to load the batch of data again 
after load fails.");
 
     public static final ConfigOption<String> DORIS_MAX_FILTER_RATIO = 
ConfigOptions.name("doris.max.filter.ratio").stringType().withoutDefaultValue().withDescription("");
 
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/Utils.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/Utils.scala
index 3135281..7bb3961 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/Utils.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/Utils.scala
@@ -189,7 +189,6 @@ private[spark] object Utils {
     val result = Try(f)
     result match {
       case Success(result) =>
-        LockSupport.parkNanos(interval.toNanos)
         Success(result)
       case Failure(exception: T) if retryTimes > 0 =>
         logger.warn(s"Execution failed caused by: ", exception)
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
index 7c1d48f..dd73f53 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
@@ -70,9 +70,9 @@ class DorisWriterFailoverITCase extends 
AbstractContainerTestBase {
          | "fenodes"="${getFenodes}",
          | "user"="${getDorisUsername}",
          | "password"="${getDorisPassword}",
-         | "doris.sink.batch.interval.ms"="1000",
+         | "doris.sink.retry.interval.ms"="10000",
          | "doris.sink.batch.size"="1",
-         | "doris.sink.max-retries"="100",
+         | "doris.sink.max-retries"="3",
          | "doris.sink.enable-2pc"="false"
          |)
          |""".stripMargin)
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
 
b/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index d9d312c..52b1b13 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.CollectionAccumulator
 import org.slf4j.{Logger, LoggerFactory}
 
 import java.time.Duration
+import java.util.concurrent.locks.LockSupport
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success}
@@ -45,10 +46,11 @@ class DorisWriter(config: DorisConfig,
 
   private val maxRetryTimes: Int = 
config.getValue(DorisOptions.DORIS_SINK_MAX_RETRIES)
   private val batchSize: Int = 
config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE)
-  private val batchInterValMs: Int = 
config.getValue(DorisOptions.DORIS_SINK_BATCH_INTERVAL_MS)
+  private val batchIntervalMs: Int = 
config.getValue(DorisOptions.DORIS_SINK_BATCH_INTERVAL_MS)
+  private val retryIntervalMs: Int = 
config.getValue(DorisOptions.DORIS_SINK_RETRY_INTERVAL_MS)
 
   if (maxRetryTimes > 0) {
-    logger.info(s"batch retry enabled, size is $batchSize, interval is 
$batchInterValMs")
+    logger.info(s"batch retry enabled, size is $batchSize, retry interval is 
$retryIntervalMs")
   }
 
   private val enable2PC: Boolean = 
config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC)
@@ -87,7 +89,7 @@ class DorisWriter(config: DorisConfig,
     resultRdd.foreachPartition(iterator => {
       while (iterator.hasNext) {
         val batchIterator = new BatchIterator(iterator, batchSize, 
maxRetryTimes > 0)
-        val retry = Utils.retry[Option[CommitMessage], 
Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) _
+        val retry = Utils.retry[Option[CommitMessage], 
Exception](maxRetryTimes, Duration.ofMillis(retryIntervalMs.toLong), logger) _
         retry(loadFunc(batchIterator, schema))(batchIterator.reset()) match {
           case Success(msg) =>
             if (enable2PC) handleLoadSuccess(msg, txnAcc)
@@ -97,6 +99,7 @@ class DorisWriter(config: DorisConfig,
             batchIterator.close()
             throw e
         }
+        LockSupport.parkNanos(Duration.ofMillis(batchIntervalMs).toNanos)
       }
 
     })
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
index 6628e9a..b6edf3c 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
@@ -45,6 +45,8 @@ class DorisDataWriter(config: DorisConfig, schema: 
StructType, partitionId: Int,
 
   private val retries = config.getValue(DorisOptions.DORIS_SINK_MAX_RETRIES)
 
+  private val retryIntervalMs = 
config.getValue(DorisOptions.DORIS_SINK_RETRY_INTERVAL_MS)
+
   private val twoPhaseCommitEnabled = 
config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC)
 
   private val committedMessages = mutable.Buffer[String]()
@@ -81,7 +83,7 @@ class DorisDataWriter(config: DorisConfig, schema: 
StructType, partitionId: Int,
   @throws[Exception]
   private def loadBatchWithRetries(record: InternalRow): Unit = {
     var isRetrying = false
-    Retry.exec[Unit, Exception](retries, 
Duration.ofMillis(batchIntervalMs.toLong), log) {
+    Retry.exec[Unit, Exception](retries, 
Duration.ofMillis(retryIntervalMs.toLong), log) {
       if (isRetrying) {
         // retrying, reload data from buffer
         do {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to