This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 4538a43 [refactor] Unified writing through DorisWriter (#104) 4538a43 is described below commit 4538a430e3321be4949a9637f35bb163e5f1256f Author: gnehil <adamlee...@gmail.com> AuthorDate: Tue May 30 22:55:04 2023 +0800 [refactor] Unified writing through DorisWriter (#104) * use writer to write data * resolve conflicts * unify jackson version * remove useless code --- spark-doris-connector/pom.xml | 2 +- .../{ => load}/CachedDorisStreamLoadClient.java | 7 +-- .../doris/spark/{ => load}/DorisStreamLoad.java | 39 +++++++------- .../doris/spark/sql/DorisSourceProvider.scala | 62 ++-------------------- .../doris/spark/sql/DorisStreamLoadSink.scala | 52 ++---------------- .../scala/org/apache/doris/spark/sql/Utils.scala | 6 ++- .../DorisWriter.scala} | 54 +++++++++---------- 7 files changed, 60 insertions(+), 162 deletions(-) diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index 77b37c1..e4b4c8b 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -70,7 +70,7 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.scm.id>github</project.scm.id> <netty.version>4.1.77.Final</netty.version> - <fasterxml.jackson.version>2.13.3</fasterxml.jackson.version> + <fasterxml.jackson.version>2.10.5</fasterxml.jackson.version> <thrift-service.version>1.0.0</thrift-service.version> </properties> diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java similarity index 90% rename from spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java rename to spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java index 1d89126..d3dab49 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java @@ -15,17 +15,12 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.spark; +package org.apache.doris.spark.load; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; import org.apache.doris.spark.cfg.SparkSettings; -import org.apache.doris.spark.exception.DorisException; - -import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java similarity index 96% rename from spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java rename to spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 6738c09..61379e3 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -14,15 +14,8 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package org.apache.doris.spark; +package org.apache.doris.spark.load; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.SparkSettings; import org.apache.doris.spark.exception.StreamLoadException; @@ -30,6 +23,14 @@ import org.apache.doris.spark.rest.RestService; import org.apache.doris.spark.rest.models.BackendV2; import org.apache.doris.spark.rest.models.RespContent; import org.apache.doris.spark.util.ListUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; @@ -45,10 +46,17 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.sql.Date; import java.sql.Timestamp; -import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Calendar; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -73,13 +81,11 @@ public class DorisStreamLoad implements Serializable { private String tbl; private String authEncoded; private String columns; - private String[] dfColumns; private String maxFilterRatio; private Map<String, String> streamLoadProp; private static final long cacheExpireTimeout = 4 * 60; private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache; private final String fileType; - private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS"); public DorisStreamLoad(SparkSettings settings) { String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\."); @@ -101,11 +107,6 @@ public class DorisStreamLoad implements Serializable { } } - public DorisStreamLoad(SparkSettings settings, String[] dfColumns) { - this(settings); - this.dfColumns = dfColumns; - } - public String getLoadUrlStr() { if (StringUtils.isEmpty(loadUrlStr)) { return ""; @@ -168,7 +169,7 @@ public class DorisStreamLoad implements Serializable { } - public void loadV2(List<List<Object>> rows) throws StreamLoadException, JsonProcessingException { + public void loadV2(List<List<Object>> rows, String[] dfColumns) throws StreamLoadException, JsonProcessingException { if (fileType.equals("csv")) { load(listToString(rows)); } else if(fileType.equals("json")) { diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index e469f38..94fab9e 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -17,9 +17,9 @@ package org.apache.doris.spark.sql -import org.apache.doris.spark.DorisStreamLoad -import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} +import org.apache.doris.spark.cfg.SparkSettings import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME +import org.apache.doris.spark.writer.DorisWriter import org.apache.spark.SparkConf import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.sources._ @@ -28,12 +28,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import org.slf4j.{Logger, LoggerFactory} -import java.io.IOException -import java.time.Duration -import java.util -import java.util.Objects import scala.collection.JavaConverters.mapAsJavaMapConverter -import scala.util.{Failure, Success} private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider @@ -60,58 +55,9 @@ private[sql] class DorisSourceProvider extends DataSourceRegister val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf) sparkSettings.merge(Utils.params(parameters, logger).asJava) // init stream loader - val dorisStreamLoader = new DorisStreamLoad(sparkSettings, data.columns) + val writer = new DorisWriter(sparkSettings) + writer.write(data) - val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) - val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) - val sinkTaskPartitionSize = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) - val sinkTaskUseRepartition = sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean - val batchInterValMs = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT) - - logger.info(s"maxRowCount ${maxRowCount}") - logger.info(s"maxRetryTimes ${maxRetryTimes}") - logger.info(s"batchInterVarMs ${batchInterValMs}") - - var resultRdd = data.rdd - if (Objects.nonNull(sinkTaskPartitionSize)) { - resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) - } - - resultRdd.foreachPartition(partition => { - val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount) - partition.foreach(row => { - val line: util.List[Object] = new util.ArrayList[Object]() - for (i <- 0 until row.size) { - val field = row.get(i) - line.add(field.asInstanceOf[AnyRef]) - } - rowsBuffer.add(line) - if (rowsBuffer.size > maxRowCount - 1 ) { - flush() - } - }) - // flush buffer - if (!rowsBuffer.isEmpty) { - flush() - } - - /** - * flush data to Doris and do retry when flush error - * - */ - def flush(): Unit = { - Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) { - dorisStreamLoader.loadV2(rowsBuffer) - rowsBuffer.clear() - } match { - case Success(_) => - case Failure(e) => - throw new IOException( - s"Failed to load $maxRowCount batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e) - } - } - - }) new BaseRelation { override def sqlContext: SQLContext = unsupportedException diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala index 4644820..342e940 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala @@ -17,69 +17,27 @@ package org.apache.doris.spark.sql -import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} -import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad} -import org.apache.spark.rdd.RDD +import org.apache.doris.spark.cfg.SparkSettings +import org.apache.doris.spark.writer.DorisWriter import org.apache.spark.sql.execution.streaming.Sink -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.{DataFrame, SQLContext} import org.slf4j.{Logger, LoggerFactory} -import java.io.IOException -import java.time.Duration -import java.util -import java.util.Objects -import scala.collection.JavaConverters._ -import scala.util.{Failure, Success} - private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSettings) extends Sink with Serializable { private val logger: Logger = LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName) @volatile private var latestBatchId = -1L - val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) - val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) - val sinkTaskPartitionSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) - val sinkTaskUseRepartition = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean - val batchInterValMs = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT) - val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings) + private val writer = new DorisWriter(settings) override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= latestBatchId) { logger.info(s"Skipping already committed batch $batchId") } else { - write(data.rdd) + writer.write(data) latestBatchId = batchId } } - def write(rdd: RDD[Row]): Unit = { - var resultRdd = rdd - if (Objects.nonNull(sinkTaskPartitionSize)) { - resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) - } - resultRdd - .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava) - .foreachPartition(partition => { - partition - .grouped(batchSize) - .foreach(batch => flush(batch)) - }) - - /** - * flush data to Doris and do retry when flush error - * - */ - def flush(batch: Iterable[util.List[Object]]): Unit = { - Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) { - dorisStreamLoader.loadV2(batch.toList.asJava) - } match { - case Success(_) => - case Failure(e) => - throw new IOException( - s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max $maxRetryTimes retry times.", e) - } - } - } - override def toString: String = "DorisStreamLoadSink" } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala index ba6fa86..2f3a5bb 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -31,7 +31,7 @@ import scala.annotation.tailrec import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} -private[sql] object Utils { +private[spark] object Utils { /** * quote column name * @param colName column name @@ -169,7 +169,9 @@ private[sql] object Utils { assert(retryTimes >= 0) val result = Try(f) result match { - case Success(result) => Success(result) + case Success(result) => + LockSupport.parkNanos(interval.toNanos) + Success(result) case Failure(exception: T) if retryTimes > 0 => logger.warn(s"Execution failed caused by: ", exception) logger.warn(s"$retryTimes times retry remaining, the next will be in ${interval.toMillis}ms") diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala similarity index 51% copy from spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala copy to spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index 4644820..3839ff7 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -15,13 +15,12 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.spark.sql +package org.apache.doris.spark.writer import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} -import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.execution.streaming.Sink -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.doris.spark.load.{CachedDorisStreamLoadClient, DorisStreamLoad} +import org.apache.doris.spark.sql.Utils +import org.apache.spark.sql.DataFrame import org.slf4j.{Logger, LoggerFactory} import java.io.IOException @@ -31,29 +30,25 @@ import java.util.Objects import scala.collection.JavaConverters._ import scala.util.{Failure, Success} -private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSettings) extends Sink with Serializable { +class DorisWriter(settings: SparkSettings) extends Serializable { - private val logger: Logger = LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName) - @volatile private var latestBatchId = -1L - val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) - val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) - val sinkTaskPartitionSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) - val sinkTaskUseRepartition = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean - val batchInterValMs = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT) + private val logger: Logger = LoggerFactory.getLogger(classOf[DorisWriter]) - val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings) + val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, + ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) + private val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, + ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) + private val sinkTaskPartitionSize: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) + private val sinkTaskUseRepartition: Boolean = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, + ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean + private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, + ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT) - override def addBatch(batchId: Long, data: DataFrame): Unit = { - if (batchId <= latestBatchId) { - logger.info(s"Skipping already committed batch $batchId") - } else { - write(data.rdd) - latestBatchId = batchId - } - } + private val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings) - def write(rdd: RDD[Row]): Unit = { - var resultRdd = rdd + def write(dataFrame: DataFrame): Unit = { + var resultRdd = dataFrame.rdd + val dfColumns = dataFrame.columns if (Objects.nonNull(sinkTaskPartitionSize)) { resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) } @@ -62,24 +57,25 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe .foreachPartition(partition => { partition .grouped(batchSize) - .foreach(batch => flush(batch)) + .foreach(batch => flush(batch, dfColumns)) }) /** * flush data to Doris and do retry when flush error * */ - def flush(batch: Iterable[util.List[Object]]): Unit = { + def flush(batch: Iterable[util.List[Object]], dfColumns: Array[String]): Unit = { Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) { - dorisStreamLoader.loadV2(batch.toList.asJava) + dorisStreamLoader.loadV2(batch.toList.asJava, dfColumns) } match { case Success(_) => case Failure(e) => throw new IOException( - s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max $maxRetryTimes retry times.", e) + s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e) } } + } - override def toString: String = "DorisStreamLoadSink" + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org