This is an automated email from the ASF dual-hosted git repository. zhaoc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new fc2d92d Update spark load doc (#2973) fc2d92d is described below commit fc2d92d68a5a6488d009770d723bd813e0d1a31c Author: wyb <wyb...@gmail.com> AuthorDate: Sat Feb 22 12:00:50 2020 +0800 Update spark load doc (#2973) --- docs/documentation/cn/internal/spark_load.md | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/docs/documentation/cn/internal/spark_load.md b/docs/documentation/cn/internal/spark_load.md index 799cf60..021b02b 100644 --- a/docs/documentation/cn/internal/spark_load.md +++ b/docs/documentation/cn/internal/spark_load.md @@ -29,6 +29,7 @@ spark load主要用于解决初次迁移,大量数据迁移doris的场景, * FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。 * BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。 * Tablet: 一个palo table的水平分片称为tablet。 +* Dpp:Data preprocessing,数据预处理模块,通过外部计算资源(Hadoop、Spark)完成对导入数据预处理,包括转化、清洗、分区、排序和聚合等。 ## 设计 @@ -68,7 +69,7 @@ Doris中现有的导入方式中,针对百G级别以上的数据的批量导 ) where k1 > 20 ) - with spark + with spark.cluster_name PROPERTIES ( "spark.master" = "yarn", @@ -78,7 +79,9 @@ Doris中现有的导入方式中,针对百G级别以上的数据的批量导 "max_filter_ratio" = "0.1", ); ``` -其中各个property的含义如下: +其中spark.cluster_name为用户导入使用的Spark集群名,可以通过SET PROPERTY来设置,可参考原来Hadoop集群的设置。 +property中的Spark集群设置会覆盖spark.cluster_name中对应的内容。 +各个property的含义如下: - spark.master是表示spark集群部署模式,支持包括yarn/standalone/local/k8s,预计先实现yarn的支持,并且使用yarn-cluster模式(yarn-client模式一般用于交互式的场景)。 - spark.executor.cores: executor的cpu个数 - spark.executor.memory: executor的内存大小 @@ -186,7 +189,7 @@ LoadLoadingTask可以复现现在的逻辑,但是,有一个地方跟BrokerLo 方案1可以最大限度的复用现有的导入框架,能够快速实现支持大数据量导入的功能。但是存在以下问题,就是经过spark etl处理之后的数据其实已经按照tablet划分好了,但是现有的Broker导入框架还是会对流式读取的数据进行分区和bucket计算,然后经过序列化通过rpc发送到对应的目标BE的机器,有一次序列化和网络IO的开销。 方案2是在SparkEtlJob生成数据的时候,直接生成doris的存储格式Segment文件,然后三个副本需要通过类似clone机制的方式,通过add_rowset接口,进行文件的导入。这种方案具体不一样的地方如下: -1. 需要在生成的文件中添加tabletid后续 +1. 需要在生成的文件中添加tabletid后缀 2. 在SparkLoadPendingTask类中增加一个接口protected Map<long, Pair<String, Long>> getFilePathMap()用于返回tabletid和文件之间的映射关系, 3. 在BE rpc服务中增加一个spark_push接口,实现拉取源端etl转化之后的文件到本地(可以通过broker读取),然后通过add_rowset接口完成数据的导入,类似克隆的逻辑 4. 生成新的导入任务SparkLoadLoadingTask,该SparkLoadLoadingTask主要功能就是读取job.json文件,解析其中的属性并且,将属性作为rpc参数,调用spark_push接口,向tablet所在的后端BE发送导入请求,进行数据的导入。BE中spark_push根据is_segment_file来决定如何处理,如果为true,则直接下载segment文件,进行add rowset;如果为false,则走pusher逻辑,实现数据导入。 @@ -195,6 +198,8 @@ LoadLoadingTask可以复现现在的逻辑,但是,有一个地方跟BrokerLo ## 总结 -综合以上三种方案,第一种方案的改动量比较小,是后面两种方案的基础。第二种方案的对导入框架的改动较大,而且需要依赖单副本导入的修改。相对来说第三种方案的性能提升可能会更好。所以,计划分两步完成spark load的工作。 -第一步,按照方案1,快读支持spark导入的功能。 -第二部,按照方案2,封装segment写入的库,并且增加一个rpc接口,实现类似clone的导入逻辑。 \ No newline at end of file +综合以上两种方案,第一种方案的改动量比较小,但是BE做了重复的工作。第二种方案可以参考原有的Hadoop导入框架。所以,计划分两步完成spark load的工作。 + +第一步,按照方案2,实现通过Spark完成导入数据的分区排序聚合,生成parquet格式文件。然后走Hadoop pusher的流程由BE转化格式。 + +第二步,封装segment写入的库,直接生成Doris底层的格式,并且增加一个rpc接口,实现类似clone的导入逻辑。 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org