This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new fe87bd1 Extract a common method for retrying functions in Utils (#89) fe87bd1 is described below commit fe87bd17d9180cc7578d5280ded5c308fa4bbefb Author: Bowen Liang <liangbo...@gf.com.cn> AuthorDate: Mon May 29 16:27:47 2023 +0800 Extract a common method for retrying functions in Utils (#89) * extract retry method in Utils * revert irrelevant changes * revert unnecessary change --- .../apache/doris/spark/rdd/ScalaValueReader.scala | 7 ++++ .../doris/spark/sql/DorisSourceProvider.scala | 48 +++++++--------------- .../doris/spark/sql/DorisStreamLoadSink.scala | 39 +++++------------- .../scala/org/apache/doris/spark/sql/Utils.scala | 20 +++++++++ 4 files changed, 51 insertions(+), 63 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index d8ecf8d..9c12cf7 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -24,6 +24,7 @@ import java.util.concurrent._ import java.util.concurrent.locks.{Condition, Lock, ReentrantLock} import scala.collection.JavaConversions._ import scala.util.Try + import org.apache.doris.spark.backend.BackendClient import org.apache.doris.spark.cfg.ConfigurationOptions._ import org.apache.doris.spark.cfg.Settings @@ -36,10 +37,16 @@ import org.apache.doris.spark.util.ErrorMessages import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE import org.apache.spark.internal.Logging +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.{Condition, Lock, ReentrantLock} +import scala.collection.JavaConversions._ +import scala.util.Try import scala.util.control.Breaks /** * read data from Doris BE to array. + * * @param partition Doris RDD partition * @param settings request configuration */ diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index 671c4b8..e469f38 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -27,12 +27,13 @@ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import org.slf4j.{Logger, LoggerFactory} + import java.io.IOException +import java.time.Duration import java.util -import org.apache.doris.spark.rest.RestService import java.util.Objects import scala.collection.JavaConverters.mapAsJavaMapConverter -import scala.util.control.Breaks +import scala.util.{Failure, Success} private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider @@ -86,49 +87,28 @@ private[sql] class DorisSourceProvider extends DataSourceRegister } rowsBuffer.add(line) if (rowsBuffer.size > maxRowCount - 1 ) { - flush + flush() } }) // flush buffer if (!rowsBuffer.isEmpty) { - flush + flush() } /** * flush data to Doris and do retry when flush error * */ - def flush = { - val loop = new Breaks - var err: Exception = null - loop.breakable { - - for (i <- 1 to maxRetryTimes) { - try { - dorisStreamLoader.loadV2(rowsBuffer) - rowsBuffer.clear() - Thread.sleep(batchInterValMs.longValue()) - loop.break() - } - catch { - case e: Exception => - try { - logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr) - if (err == null) err = e - Thread.sleep(1000 * i) - } catch { - case ex: InterruptedException => - Thread.currentThread.interrupt() - throw new IOException("unable to flush; interrupted while doing another attempt", e) - } - } - } - - if (!rowsBuffer.isEmpty) { - throw new IOException(s"Failed to load ${maxRowCount} batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err) - } + def flush(): Unit = { + Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) { + dorisStreamLoader.loadV2(rowsBuffer) + rowsBuffer.clear() + } match { + case Success(_) => + case Failure(e) => + throw new IOException( + s"Failed to load $maxRowCount batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e) } - } }) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala index e91e8fa..4644820 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala @@ -24,11 +24,12 @@ import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.slf4j.{Logger, LoggerFactory} -import collection.JavaConverters._ import java.io.IOException +import java.time.Duration import java.util import java.util.Objects -import scala.util.control.Breaks +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success} private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSettings) extends Sink with Serializable { @@ -69,33 +70,13 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe * */ def flush(batch: Iterable[util.List[Object]]): Unit = { - val loop = new Breaks - var err: Exception = null - var loadSuccess: Boolean = false; - loop.breakable { - (1 to maxRetryTimes).foreach { i => - try { - dorisStreamLoader.loadV2(batch.toList.asJava) - loadSuccess = true - Thread.sleep(batchInterValMs.longValue()) - loop.break() - } catch { - case e: Exception => - try { - logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr) - if (err == null) err = e - Thread.sleep(1000 * i) - } catch { - case ex: InterruptedException => - Thread.currentThread.interrupt() - throw new IOException("unable to flush; interrupted while doing another attempt", ex) - } - } - } - // check load success, if not throw exception - if (!loadSuccess) { - throw new IOException(s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err) - } + Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) { + dorisStreamLoader.loadV2(batch.toList.asJava) + } match { + case Success(_) => + case Failure(e) => + throw new IOException( + s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max $maxRetryTimes retry times.", e) } } } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala index 18dd3b2..ba6fa86 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -25,6 +25,11 @@ import org.apache.spark.sql.sources._ import org.slf4j.Logger import java.sql.{Date, Timestamp} +import java.time.Duration +import java.util.concurrent.locks.LockSupport +import scala.annotation.tailrec +import scala.reflect.ClassTag +import scala.util.{Failure, Success, Try} private[sql] object Utils { /** @@ -158,4 +163,19 @@ private[sql] object Utils { finalParams } + + @tailrec + def retry[R, T <: Throwable : ClassTag](retryTimes: Int, interval: Duration, logger: Logger)(f: => R): Try[R] = { + assert(retryTimes >= 0) + val result = Try(f) + result match { + case Success(result) => Success(result) + case Failure(exception: T) if retryTimes > 0 => + logger.warn(s"Execution failed caused by: ", exception) + logger.warn(s"$retryTimes times retry remaining, the next will be in ${interval.toMillis}ms") + LockSupport.parkNanos(interval.toNanos) + retry(retryTimes - 1, interval, logger)(f) + case Failure(exception) => Failure(exception) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org