This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 3789128 [improvement] add retry interval when load fail (#307) 3789128 is described below commit 3789128a60cd13152ba3ac38227748be94276442 Author: wudi <676366...@qq.com> AuthorDate: Tue Apr 8 21:56:59 2025 +0800 [improvement] add retry interval when load fail (#307) --- .../main/java/org/apache/doris/spark/config/DorisOptions.java | 1 + .../src/main/scala/org/apache/doris/spark/sql/Utils.scala | 1 - .../org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala | 4 ++-- .../main/scala/org/apache/doris/spark/writer/DorisWriter.scala | 9 ++++++--- .../scala/org/apache/doris/spark/write/DorisDataWriter.scala | 4 +++- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java index 4319688..a8d29ae 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java @@ -68,6 +68,7 @@ public class DorisOptions { public static final ConfigOption<Integer> DORIS_SINK_BATCH_SIZE = ConfigOptions.name("doris.sink.batch.size").intType().defaultValue(100000).withDescription(""); public static final ConfigOption<Integer> DORIS_SINK_MAX_RETRIES = ConfigOptions.name("doris.sink.max-retries").intType().defaultValue(0).withDescription(""); + public static final ConfigOption<Integer> DORIS_SINK_RETRY_INTERVAL_MS = ConfigOptions.name("doris.sink.retry.interval.ms").intType().defaultValue(10000).withDescription("The interval at which the Spark connector tries to load the batch of data again after load fails."); public static final ConfigOption<String> DORIS_MAX_FILTER_RATIO = ConfigOptions.name("doris.max.filter.ratio").stringType().withoutDefaultValue().withDescription(""); diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/Utils.scala index 3135281..7bb3961 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -189,7 +189,6 @@ private[spark] object Utils { val result = Try(f) result match { case Success(result) => - LockSupport.parkNanos(interval.toNanos) Success(result) case Failure(exception: T) if retryTimes > 0 => logger.warn(s"Execution failed caused by: ", exception) diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala index 7c1d48f..dd73f53 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala @@ -70,9 +70,9 @@ class DorisWriterFailoverITCase extends AbstractContainerTestBase { | "fenodes"="${getFenodes}", | "user"="${getDorisUsername}", | "password"="${getDorisPassword}", - | "doris.sink.batch.interval.ms"="1000", + | "doris.sink.retry.interval.ms"="10000", | "doris.sink.batch.size"="1", - | "doris.sink.max-retries"="100", + | "doris.sink.max-retries"="3", | "doris.sink.enable-2pc"="false" |) |""".stripMargin) diff --git a/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index d9d312c..52b1b13 100644 --- a/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala +++ b/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.CollectionAccumulator import org.slf4j.{Logger, LoggerFactory} import java.time.Duration +import java.util.concurrent.locks.LockSupport import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success} @@ -45,10 +46,11 @@ class DorisWriter(config: DorisConfig, private val maxRetryTimes: Int = config.getValue(DorisOptions.DORIS_SINK_MAX_RETRIES) private val batchSize: Int = config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE) - private val batchInterValMs: Int = config.getValue(DorisOptions.DORIS_SINK_BATCH_INTERVAL_MS) + private val batchIntervalMs: Int = config.getValue(DorisOptions.DORIS_SINK_BATCH_INTERVAL_MS) + private val retryIntervalMs: Int = config.getValue(DorisOptions.DORIS_SINK_RETRY_INTERVAL_MS) if (maxRetryTimes > 0) { - logger.info(s"batch retry enabled, size is $batchSize, interval is $batchInterValMs") + logger.info(s"batch retry enabled, size is $batchSize, retry interval is $retryIntervalMs") } private val enable2PC: Boolean = config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC) @@ -87,7 +89,7 @@ class DorisWriter(config: DorisConfig, resultRdd.foreachPartition(iterator => { while (iterator.hasNext) { val batchIterator = new BatchIterator(iterator, batchSize, maxRetryTimes > 0) - val retry = Utils.retry[Option[CommitMessage], Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) _ + val retry = Utils.retry[Option[CommitMessage], Exception](maxRetryTimes, Duration.ofMillis(retryIntervalMs.toLong), logger) _ retry(loadFunc(batchIterator, schema))(batchIterator.reset()) match { case Success(msg) => if (enable2PC) handleLoadSuccess(msg, txnAcc) @@ -97,6 +99,7 @@ class DorisWriter(config: DorisConfig, batchIterator.close() throw e } + LockSupport.parkNanos(Duration.ofMillis(batchIntervalMs).toNanos) } }) diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala index 6628e9a..b6edf3c 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala @@ -45,6 +45,8 @@ class DorisDataWriter(config: DorisConfig, schema: StructType, partitionId: Int, private val retries = config.getValue(DorisOptions.DORIS_SINK_MAX_RETRIES) + private val retryIntervalMs = config.getValue(DorisOptions.DORIS_SINK_RETRY_INTERVAL_MS) + private val twoPhaseCommitEnabled = config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC) private val committedMessages = mutable.Buffer[String]() @@ -81,7 +83,7 @@ class DorisDataWriter(config: DorisConfig, schema: StructType, partitionId: Int, @throws[Exception] private def loadBatchWithRetries(record: InternalRow): Unit = { var isRetrying = false - Retry.exec[Unit, Exception](retries, Duration.ofMillis(batchIntervalMs.toLong), log) { + Retry.exec[Unit, Exception](retries, Duration.ofMillis(retryIntervalMs.toLong), log) { if (isRetrying) { // retrying, reload data from buffer do { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org