This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 45fe88a [improvement]Add an option to set the partition size of the final write stage (#60) 45fe88a is described below commit 45fe88a3520c62791c87760cef423a61575108c6 Author: lexluo09 <39718951+lexlu...@users.noreply.github.com> AuthorDate: Tue Dec 20 14:14:10 2022 +0800 [improvement]Add an option to set the partition size of the final write stage (#60) --- .../org/apache/doris/spark/cfg/ConfigurationOptions.java | 10 ++++++++++ .../src/main/java/org/apache/doris/spark/cfg/Settings.java | 6 +++++- .../org/apache/doris/spark/sql/DorisSourceProvider.scala | 12 +++++++++--- .../org/apache/doris/spark/sql/DorisStreamLoadSink.scala | 11 +++++++++-- 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 8cc4477..5ef9f19 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -75,4 +75,14 @@ public interface ConfigurationOptions { String DORIS_MAX_FILTER_RATIO = "doris.max.filter.ratio"; String STREAM_LOAD_PROP_PREFIX = "doris.sink.properties."; + + String DORIS_SINK_TASK_PARTITION_SIZE = "doris.sink.task.partition.size"; + + /** + * Set doris sink task partition size. If you set a small coalesce size and you don't have the action operations, this may result in the same parallelism in your computation. + * To avoid this, you can use repartition operations. This will add a shuffle step, but means the current upstream partitions will be executed in parallel. + */ + String DORIS_SINK_TASK_USE_REPARTITION = "doris.sink.task.use.repartition"; + + boolean DORIS_SINK_TASK_USE_REPARTITION_DEFAULT = false; } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java index 23f0cd7..d2e845a 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java @@ -47,7 +47,11 @@ public abstract class Settings { return value; } - public int getIntegerProperty(String name, int defaultValue) { + public Integer getIntegerProperty(String name) { + return getIntegerProperty(name, null); + } + + public Integer getIntegerProperty(String name, Integer defaultValue) { try { if (getProperty(name) != null) { return Integer.parseInt(getProperty(name)); diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index 31bd1aa..2922d63 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -29,9 +29,8 @@ import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import org.slf4j.{Logger, LoggerFactory} import java.io.IOException import java.util - import org.apache.doris.spark.rest.RestService - +import java.util.Objects import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.util.control.Breaks @@ -64,11 +63,18 @@ private[sql] class DorisSourceProvider extends DataSourceRegister val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) + val sinkTaskPartitionSize = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) + val sinkTaskUseRepartition = sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean logger.info(s"maxRowCount ${maxRowCount}") logger.info(s"maxRetryTimes ${maxRetryTimes}") - data.rdd.foreachPartition(partition => { + var resultRdd = data.rdd + if (Objects.nonNull(sinkTaskPartitionSize)) { + resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) + } + + resultRdd.foreachPartition(partition => { val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount) partition.foreach(row => { val line: util.List[Object] = new util.ArrayList[Object]() diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala index afc5f31..130ce21 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala @@ -25,7 +25,8 @@ import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.{DataFrame, SQLContext} import org.slf4j.{Logger, LoggerFactory} import java.io.IOException - +import org.apache.doris.spark.rest.RestService +import java.util.Objects import scala.util.control.Breaks private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSettings) extends Sink with Serializable { @@ -34,6 +35,8 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe @volatile private var latestBatchId = -1L val maxRowCount: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) + val sinkTaskPartitionSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) + val sinkTaskUseRepartition = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings) override def addBatch(batchId: Long, data: DataFrame): Unit = { @@ -47,8 +50,12 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe def write(queryExecution: QueryExecution): Unit = { val schema = queryExecution.analyzed.output + var resultRdd = queryExecution.toRdd + if (Objects.nonNull(sinkTaskPartitionSize)) { + resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) + } // write for each partition - queryExecution.toRdd.foreachPartition(iter => { + resultRdd.foreachPartition(iter => { val objectMapper = new ObjectMapper() val rowArray = objectMapper.createArrayNode() iter.foreach(row => { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org