This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 45fe88a  [improvement]Add an option to set the partition size of the 
final write stage (#60)
45fe88a is described below

commit 45fe88a3520c62791c87760cef423a61575108c6
Author: lexluo09 <39718951+lexlu...@users.noreply.github.com>
AuthorDate: Tue Dec 20 14:14:10 2022 +0800

    [improvement]Add an option to set the partition size of the final write 
stage (#60)
---
 .../org/apache/doris/spark/cfg/ConfigurationOptions.java     | 10 ++++++++++
 .../src/main/java/org/apache/doris/spark/cfg/Settings.java   |  6 +++++-
 .../org/apache/doris/spark/sql/DorisSourceProvider.scala     | 12 +++++++++---
 .../org/apache/doris/spark/sql/DorisStreamLoadSink.scala     | 11 +++++++++--
 4 files changed, 33 insertions(+), 6 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 8cc4477..5ef9f19 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -75,4 +75,14 @@ public interface ConfigurationOptions {
     String DORIS_MAX_FILTER_RATIO = "doris.max.filter.ratio";
 
     String STREAM_LOAD_PROP_PREFIX = "doris.sink.properties.";
+
+    String DORIS_SINK_TASK_PARTITION_SIZE = "doris.sink.task.partition.size";
+
+    /**
+     * Set doris sink task partition size. If you set a small coalesce size 
and you don't have the action operations, this may result in the same 
parallelism in your computation.
+     * To avoid this, you can use repartition operations. This will add a 
shuffle step, but means the current upstream partitions will be executed in 
parallel.
+     */
+    String DORIS_SINK_TASK_USE_REPARTITION = "doris.sink.task.use.repartition";
+
+    boolean DORIS_SINK_TASK_USE_REPARTITION_DEFAULT = false;
 }
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
index 23f0cd7..d2e845a 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
@@ -47,7 +47,11 @@ public abstract class Settings {
         return value;
     }
 
-    public int getIntegerProperty(String name, int defaultValue) {
+    public Integer getIntegerProperty(String name) {
+        return getIntegerProperty(name, null);
+    }
+
+    public Integer getIntegerProperty(String name, Integer defaultValue) {
         try {
             if (getProperty(name) != null) {
                  return Integer.parseInt(getProperty(name));
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 31bd1aa..2922d63 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -29,9 +29,8 @@ import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 import org.slf4j.{Logger, LoggerFactory}
 import java.io.IOException
 import java.util
-
 import org.apache.doris.spark.rest.RestService
-
+import java.util.Objects
 import scala.collection.JavaConverters.mapAsJavaMapConverter
 import scala.util.control.Breaks
 
@@ -64,11 +63,18 @@ private[sql] class DorisSourceProvider extends 
DataSourceRegister
 
     val maxRowCount = 
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, 
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
     val maxRetryTimes = 
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, 
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
+    val sinkTaskPartitionSize = 
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
+    val sinkTaskUseRepartition = 
sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, 
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
 
     logger.info(s"maxRowCount ${maxRowCount}")
     logger.info(s"maxRetryTimes ${maxRetryTimes}")
 
-    data.rdd.foreachPartition(partition => {
+    var resultRdd = data.rdd
+    if (Objects.nonNull(sinkTaskPartitionSize)) {
+      resultRdd = if (sinkTaskUseRepartition) 
resultRdd.repartition(sinkTaskPartitionSize) else 
resultRdd.coalesce(sinkTaskPartitionSize)
+    }
+
+    resultRdd.foreachPartition(partition => {
       val rowsBuffer: util.List[util.List[Object]] = new 
util.ArrayList[util.List[Object]](maxRowCount)
       partition.foreach(row => {
         val line: util.List[Object] = new util.ArrayList[Object]()
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index afc5f31..130ce21 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.slf4j.{Logger, LoggerFactory}
 import java.io.IOException
-
+import org.apache.doris.spark.rest.RestService
+import java.util.Objects
 import scala.util.control.Breaks
 
 private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: 
SparkSettings) extends Sink with Serializable {
@@ -34,6 +35,8 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
   @volatile private var latestBatchId = -1L
   val maxRowCount: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, 
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
   val maxRetryTimes: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, 
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
+  val sinkTaskPartitionSize = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
+  val sinkTaskUseRepartition = 
settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, 
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
   val dorisStreamLoader: DorisStreamLoad = 
CachedDorisStreamLoadClient.getOrCreate(settings)
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
@@ -47,8 +50,12 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
 
   def write(queryExecution: QueryExecution): Unit = {
     val schema = queryExecution.analyzed.output
+    var resultRdd = queryExecution.toRdd
+    if (Objects.nonNull(sinkTaskPartitionSize)) {
+      resultRdd = if (sinkTaskUseRepartition) 
resultRdd.repartition(sinkTaskPartitionSize) else 
resultRdd.coalesce(sinkTaskPartitionSize)
+    }
     // write for each partition
-    queryExecution.toRdd.foreachPartition(iter => {
+    resultRdd.foreachPartition(iter => {
       val objectMapper = new ObjectMapper()
       val rowArray = objectMapper.createArrayNode()
       iter.foreach(row => {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to