This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-website.git
The following commit(s) were added to refs/heads/master by this push: new e32dd8bc7af Add pipeline execution document (#1573) e32dd8bc7af is described below commit e32dd8bc7aff1052b89a0d5320a9b04e27f2e66a Author: wangbo <wan...@apache.org> AuthorDate: Tue Dec 24 13:54:08 2024 +0800 Add pipeline execution document (#1573) --- .../pipeline-execution-engine.md | 100 +++++++++---------- .../pipeline-execution-engine.md | 107 +++++++++------------ .../pipeline-execution-engine.md | 107 +++++++++------------ .../pipeline-execution-engine.md | 107 +++++++++------------ static/images/pip_exec_1.png | Bin 0 -> 43292 bytes static/images/pip_exec_2.png | Bin 0 -> 168656 bytes static/images/pip_exec_3.png | Bin 0 -> 157212 bytes static/images/pip_exec_4.png | Bin 0 -> 19075 bytes static/images/pip_exec_5.png | Bin 0 -> 41892 bytes static/images/pip_exec_6.png | Bin 0 -> 118931 bytes static/images/pip_exec_7.png | Bin 0 -> 358877 bytes .../pipeline-execution-engine.md | 100 +++++++++---------- .../pipeline-execution-engine.md | 99 +++++++++---------- 13 files changed, 273 insertions(+), 347 deletions(-) diff --git a/docs/query-acceleration/pipeline-execution-engine.md b/docs/query-acceleration/pipeline-execution-engine.md index 503b2a4d7b0..eee47ea0296 100644 --- a/docs/query-acceleration/pipeline-execution-engine.md +++ b/docs/query-acceleration/pipeline-execution-engine.md @@ -1,6 +1,6 @@ --- { - "title": "Pipeline Execution Engine", + "title": "Parallel Execution", "language": "en", "toc_min_heading_level": 2, "toc_max_heading_level": 4 @@ -28,87 +28,79 @@ under the License. -The Pipeline Execution Engine is an experimental feature in Apache Doris 2.0, which was later optimized and upgraded in version 2.1 (i.e., PipelineX). In versions 3.0 and later, PipelineX is used as the only execution engine in Doris and renamed to Pipeline Execution Engine. +The parallel execution model of Doris is a Pipeline execution model, primarily inspired by the implementation described in the Hyper paper (https://db.in.tum.de/~leis/papers/morsels.pdf). The Pipeline execution model fully leverages the computational power of multi-core CPUs while limiting the number of query threads in Doris, addressing the issue of thread explosion during execution. For details on its design, implementation, and effectiveness, refer to [DSIP-027](DSIP-027: Support Pipe [...] -The goal of pipeline execution engine is to replace the current execution engine of Doris's volcano model, fully release the computing power of multi-core CPUs, and limit the number of Doris's query threads to solve the problem of Doris's execution thread bloat. +Starting from Doris 3.0, the Pipeline execution model has completely replaced the original Volcano model. Based on the Pipeline execution model, Doris supports the parallel processing of Query, DDL, and DML statements. -Its specific design, implementation and effects can be found in [DSIP-027]([DSIP-027: Support Pipeline Exec Engine - DORIS - Apache Software Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine)) and [DSIP-035]([DSIP-035: PipelineX Execution Engine - DORIS - Apache Software Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-035%3A+PipelineX+Execution+Engine)). +## Physical Plan -## Principle +To better understand the Pipeline execution model, it is first necessary to introduce two important concepts in the physical query plan: PlanFragment and PlanNode. We will use the following SQL statement as an example: +``` +SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1); +``` -The current Doris SQL execution engine is designed based on the traditional volcano model, which has the following problems in a single multi-core scenario: +The FE will first translate it into the following logical plan, where each node represents a PlanNode. The specific meaning of each type of node can be found in the introduction to the physical plan. -* Inability to take full advantage of multi-core computing power to improve query performance,**most scenarios require manual setting of parallelism** for performance tuning, which is almost difficult to set in production environments. + -* Each instance of a standalone query corresponds to one thread of the thread pool, which introduces two additional problems. - - * Once the thread pool is hit full. **Doris' query engine will enter a pseudo-deadlock** and will not respond to subsequent queries. **At the same time there is a certain probability of entering a logical deadlock** situation: for example, all threads are executing an instance's probe task. - - * Blocking arithmetic will take up thread resources,**blocking thread resources can not be yielded to instances that can be scheduled**, the overall resource utilization does not go up. +Since Doris is built on an MPP architecture, each query aims to involve all BEs in parallel execution as much as possible to reduce query latency. Therefore, the logical plan must be transformed into a physical plan. The transformation essentially involves inserting DataSink and ExchangeNode into the logical plan. These two nodes facilitate the shuffling of data across multiple BEs. -* Blocking arithmetic relies on the OS thread scheduling mechanism, **thread switching overhead (especially in the scenario of system mixing))** +After the transformation, each PlanFragment corresponds to a portion of the PlanNode and can be sent as an independent task to a BE. Each BE processes the PlanNode contained within the PlanFragment and then uses the DataSink and ExchangeNode operators to shuffle data to other BEs for subsequent computation. -The resulting set of problems drove Doris to implement an execution engine adapted to the architecture of modern multi-core CPUs. + -And as shown in the figure below (quoted from[Push versus pull-based loop fusion in query engines]([jfp_1800010a (cambridge.org)](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf))),The resulting set of problems drove Doris to implement an execution engine adapted to the architecture of modern multi-core CPUs.: +Doris's planning process is divided into three layers: +PLAN:The execution plan. A SQL statement is translated by the query planner into an execution plan, which is then provided to the execution engine for execution. - +FRAGMENT:Since Doris is a distributed execution engine, a complete execution plan is divided into multiple single-machine execution fragments. A FRAGMENT represents a complete single-machine execution fragment. Multiple fragments combine to form a complete PLAN. -1. Transformation of the traditional pull pull logic-driven execution process into a data-driven execution engine for the push model +PLAN NODE:Operators, which are the smallest units of the execution plan. A FRAGMENT consists of multiple operators, each responsible for a specific execution logic, such as aggregation or join operations. -2. Blocking operations are asynchronous, reducing the execution overhead caused by thread switching and thread blocking and making more efficient use of the CPU +## Pipeline Execution +A PlanFragment is the smallest unit of a task sent by the FE to the BE for execution. A BE may receive multiple different PlanFragments for the same query, and each PlanFragment is processed independently. Upon receiving a PlanFragment, the BE splits it into multiple Pipelines and then starts multiple PipelineTasks to achieve parallel execution, thereby improving query efficiency. -3. Controls the number of threads to be executed and reduces the resource congestion of large queries on small queries in mixed load scenarios by controlling time slice switching + -4. In terms of execution concurrency, pipelineX introduces local exchange optimization to fully utilize CPU resources, and distribute data evenly across different tasks to minimize data skewing. In addition, pipelineX will no longer be constrained by the number of tablets. -5. Logically, multiple pipeline tasks share all shared states of the same pipeline and eliminate additional initialization overhead, such as expressions and some const variables. +### Pipeline +A Pipeline consists of a SourceOperator, a SinkOperator, and several intermediate operators. The SourceOperator represents reading data from an external source, which can be a table (e.g., OlapTable) or a buffer (e.g., Exchange). The SinkOperator represents the data output, which can either be shuffled to other nodes over the network (e.g., DataStreamSinkOperator) or output to a hash table (e.g., aggregation operators, join build hash tables, etc.). -6. In terms of scheduling logic, the blocking conditions of all pipeline tasks are encapsulated using Dependency, and the execution logic of the tasks is triggered by external events (such as rpc completion) to enter the runnable queue, thereby eliminating the overhead of blocking polling threads. + -7. Profile: Provide users with simple and easy to understand metrics. +Multiple Pipelines are actually interdependent. Take the JoinNode as an example—it is split into two Pipelines. Pipeline-0 reads data from Exchange to build the hash table, while Pipeline-1 reads data from the table to perform the probe operation. These two Pipelines are connected by a dependency relationship, meaning Pipeline-1 can only execute after Pipeline-0 has completed. This dependency relationship is referred to as a Dependency. Once Pipeline-0 finishes execution, it calls the se [...] +### PipelineTask +A Pipeline is actually a logical concept; it is not an executable entity. Once a Pipeline is defined, it needs to be further instantiated into multiple PipelineTasks. The data that needs to be read is then distributed to different PipelineTasks, ultimately achieving parallel processing. The operators within the multiple PipelineTasks of the same Pipeline are identical, but they differ in their states. For example, they might read different data or build different hash tables. These diffe [...] -This improves the efficiency of CPU execution on mixed-load SQL and enhances the performance of SQL queries. +Each PipelineTask is eventually submitted to a thread pool to be executed as an independent task. With the Dependency trigger mechanism, this approach allows better utilization of multi-core CPUs and achieves full parallelism. -## Usage +### Operator +In most cases, each operator in a Pipeline corresponds to a PlanNode, but there are some special operators with exceptions: +* JoinNode is split into JoinBuildOperator and JoinProbeOperator. +* AggNode is split into AggSinkOperator and AggSourceOperator. +* SortNode is split into SortSinkOperator and SortSourceOperator. +The basic principle is that for certain "breaking" operators (those that need to collect all the data before performing computation), the data ingestion part is split into a Sink, while the part that retrieves data from the operator is referred to as the Source. -### Query +## Scan 并行化 +Scanning data is a very heavy I/O operation, as it requires reading large amounts of data from local disks (or from HDFS or S3 in the case of data lake scenarios, which introduces even longer latency), consuming a significant amount of time. Therefore, we have introduced parallel scanning technology in the ScanOperator. The ScanOperator dynamically generates multiple Scanners, each of which scans around 1 to 2 million rows of data. While performing the scan, each Scanner handles tasks su [...] -1. enable_pipeline_engine + - Setting the session variable `enable_pipeline_engine` to `true` will make BE to use the Pipeline execution engine when performing query execution. +By using parallel scanning technology, we can effectively avoid issues where certain ScanOperators take an excessively long time due to improper bucketing or data skew, which would otherwise slow down the entire query latency. - ```sql - set enable_pipeline_engine = true; - ``` +## Local Shuffle +In the Pipeline execution model, Local Exchange acts as a Pipeline Breaker, a technique that redistributes data locally across different execution tasks. It evenly distributes all the data output by the upstream Pipeline to all the tasks in the downstream Pipeline using methods like HASH or Round Robin. This helps solve the problem of data skew during execution, ensuring that the execution model is no longer limited by data storage or the query plan. Let's now provide an example to illus [...] -2. parallel_pipeline_task_num +We will further explain how Local Exchange can prevent data skew using Pipeline-1 from the previous example. - The `parallel_pipeline_task_num` represents the number of Pipeline Tasks for a SQL query to be queried concurrently.The default configuration of Doris is `0`, in which case the number of Pipeline Tasks will be automatically set to half of the minimum number of CPUs in the current cluster machine. You can also adjust it according to your own situation. + - ```sql - set parallel_pipeline_task_num = 0; - ``` +As shown in the figure above, by inserting a Local Exchange in Pipeline-1, we further split Pipeline-1 into Pipeline-1-0 and Pipeline-1-1. - You can limit the automatically configured concurrency by setting `max_instance_num`(The default value is 64) +Now, let's assume the current concurrency level is 3 (each Pipeline has 3 tasks), and each task reads one bucket from the storage layer. The number of rows in the three buckets is 1, 1, and 7, respectively. The execution before and after inserting the Local Exchange changes as follows: -3. enable_local_shuffle + - Set `enable_local_shuffle` to true will enable local shuffle optimization. Local shuffle will try to evenly distribute data among different pipeline tasks to avoid data skewing as much as possible. +As can be seen from the figure on the right, the amount of data that the HashJoin and Agg operators need to process changes from (1, 1, 7) to (3, 3, 3), thereby avoiding data skew. - ```sql - set enable_local_shuffle = true; - ``` - -4. ignore_storage_data_distribution - - Settings `ignore_storage_data_distribution` is true, it means ignoring the data distribution of the storage layer. When used in conjunction with local shuffle, the concurrency capability of the pipelineX engine will no longer be constrained by the number of storage layer tables, thus fully utilizing machine resources. - - ```sql - set ignore_storage_data_distribution = true; - ``` - -### Load - -The engine selected for import are detailed in the [Load](../data-operate/import/load-manual) documentation. +In Doris, Local Exchange is planned based on a series of rules. For example, when a query involves time-consuming operators like Join, Aggregation, or Window Functions, Local Exchange is used to minimize data skew as much as possible. \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/query-acceleration/pipeline-execution-engine.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/query-acceleration/pipeline-execution-engine.md index 5d806cac159..32df1f590a0 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/query-acceleration/pipeline-execution-engine.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/query-acceleration/pipeline-execution-engine.md @@ -1,6 +1,6 @@ --- { - "title": "Pipeline 执行引擎", + "title": "并行执行", "language": "zh-CN", "toc_min_heading_level": 2, "toc_max_heading_level": 4 @@ -28,88 +28,71 @@ under the License. -:::info 备注 -Pipeline 执行引擎 是 Doris 在 2.0 版本加入的实验性功能,随后在 2.1 版本进行了优化与升级(即 PipelineX)。在 3.0 以及之后的版本中,Doris 只使用 PipelineX 作为唯一执行引擎,并且更名为 Pipeline 执行引擎。 -::: +Doris的并行执行模型是一种Pipeline 执行模型,主要参考了Hyper论文中Pipeline的实现方式(https://db.in.tum.de/~leis/papers/morsels.pdf),Pipeline 执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris 的执行线程膨胀的问题。它的具体设计、实现和效果可以参阅 [DSIP-027](DSIP-027: Support Pipeline Exec Engine - DORIS - Apache Software Foundation) 以及 [DSIP-035](DSIP-035: PipelineX Execution Engine - DORIS - Apache Software Foundation)。 +Doris 3.0 之后,Pipeline 执行模型彻底替换了原有的火山模型,基于Pipeline 执行模型,Doris 实现了 Query、DDL、DML 语句的并行处理。 -Pipeline 执行引擎的主要目标是为了替换之前 Doris 基于火山模型的执行引擎,充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris 的执行线程膨胀的问题。 +## 物理计划 +为了更好的理解Pipeline 执行模型,首先需要介绍一下物理查询计划中两个重要的概念:PlanFragment和PlanNode。我们使用下面这条SQL 作为例子: +``` +SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1); +``` -它的具体设计、实现和效果可以参阅 [DSIP-027]([DSIP-027: Support Pipeline Exec Engine - DORIS - Apache Software Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine)) 以及 [DSIP-035]([DSIP-035: PipelineX Execution Engine - DORIS - Apache Software Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-035%3A+PipelineX+Execution+Engine))。 +FE 首先会把它翻译成下面这种逻辑计划,计划中每个节点就是一个PlanNode,每种Node的具体含义,可以参考查看物理计划的介绍。 -## 原理 + -当前的 Doris 的 SQL 执行引擎是基于传统的火山模型进行设计,在单机多核的场景下存在下面的一些问题: +由于Doris 是一个MPP的架构,每个查询都会尽可能的让所有的BE 都参与进来并行执行,来降低查询的延时。所以还需要将上述逻辑计划拆分为一个物理计划,拆分物理计划基本上就是在逻辑计划中插入了DataSink和ExchangeNode,通过这两个Node完成了数据在多个BE 之间的Shuffle。拆分完成后,每个PlanFragment 相当于包含了一部分PlanNode,可以作为一个独立的任务发送给BE,每个BE 完成了PlanFragment内包含的PlanNode的计算后,通过DataSink和ExchangeNode 这两个算子把数据shuffle到其他BE上来进行接下来的计算。 -* 无法充分利用多核计算能力,提升查询性能,**多数场景下进行性能调优时需要手动设置并行度**,在生产环境中几乎很难进行设定。 + -* 单机查询的每个 Instance 对应线程池的一个线程,这会带来额外的两个问题。 - - * 线程池一旦打满。**Doris 的查询引擎会进入假性死锁**,对后续的查询无法响应。**同时有一定概率进入逻辑死锁**的情况:比如所有的线程都在执行一个 Instance 的 Probe 任务。 - - * 阻塞的算子会占用线程资源,**而阻塞的线程资源无法让渡给能够调度的 Instance**,整体资源利用率上不去。 +所以Doris的规划分为3层: +PLAN:执行计划,一个SQL会被执行规划器翻译成一个执行计划,之后执行计划会提供给执行引擎执行。 -* 阻塞算子依赖操作系统的线程调度机制,**线程切换开销较大(尤其在系统混布的场景中)** +FRAGMENT:由于DORIS是一个分布式执行引擎。一个完整的执行计划会被切分为多个单机的执行片段。一个FRAGMENT表是一个完整的单机执行片段。多个FRAGMENT组合在一起,构成一个完整的PLAN。 -由此带来的一系列问题驱动 Doris 需要实现适应现代多核 CPU 的体系结构的执行引擎。 +PLAN NODE:算子,是执行计划的最小单位。一个FRAGMENT由多个算子构成。每一个算子负责一个实际的执行逻辑,比如聚合,连接等 -而如下图所示(引用自[Push versus pull-based loop fusion in query engines]([jfp_1800010a (cambridge.org)](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf))),Pipeline 执行引擎基于多核 CPU 的特点,重新设计由数据驱动的执行引擎: +## Pipeline 执行 +PlanFragment 是FE 发往BE 执行任务的最小单位。BE可能会收到同一个Query的多个不同的PlanFragment,每个PlanFragment都会被单独的处理。在收到PlanFragment 之后,BE会把PlanFragment 拆分为多个Pipeline,进而启动多个PipelineTask 来实现并行执行,提升查询效率。 - + -1. 将传统 Pull 拉取的逻辑驱动的执行流程改造为 Push 模型的数据驱动的执行引擎 -2. 阻塞操作异步化,减少了线程切换,线程阻塞导致的执行开销,对于 CPU 的利用更为高效 +### Pipeline +一个Pipeline 有一个SourceOperator 和 一个SinkOperator 以及中间的多个其他Operator组成。SourceOperator 代表从外部读取数据,可以是一个表(OlapTable),也可以是一个Buffer(Exchange)。SinkOperator 表示数据的输出,输出可以是通过网络shuffle到别的节点,比如DataStreamSinkOperator,也可以是输出到HashTable,比如Agg算子,JoinBuildHashTable等。 -3. 控制了执行线程的数目,通过时间片的切换的控制,在混合负载的场景中,减少大查询对于小查询的资源挤占问题 + -4. 执行并发上,依赖 Local Exchange 使 Pipeline 充分并发,可以让数据被均匀分布到不同的 Task 中,尽可能减少数据倾斜,此外,Pipeline 也将不再受存储层 Tablet 数量的制约。 +多个Pipeline 之间实际是有依赖关系的,以JoinNode为例,他实际被拆分到了2个Pipeline 里。其中Pipeline-0是读取Exchange的数据,来构建HashTable;Pipeline-1 是从表里读取数据,来进行Probe。这2个Pipeline 之间是有关联关系的,只有Pipeline-0运行完毕之后才能执行Pipeline-1。这两者之间的依赖关系,称为Dependency。当Pipeline-0 运行完毕后,会调用Dependency的set_ready 方法通知Pipeline-1 可执行。 -5. 执行逻辑上,多个 Pipeline Task 共享同一个 Pipeline 的全部共享状态,例如表达式和一些 Const 变量,消除了额外的初始化开销。 +### PipelineTask +Pipeline 实际还是一个逻辑概念,他并不是一个可执行的实体。在有了Pipeline之后,需要进一步的把Pipeline 实例化为多个PipelineTask。将需要读取的数据分配给不同的PipelineTask 最终实现并行处理。同一个Pipeline的多个PipelineTask 之间的Operator 完全相同,他们的区别在于Operator的状态不一样,比如读取的数据不一样,构建出的HashTable 不一样,这些不一样的状态,我们称之为LocalState。 +每个PipelineTask 最终都会被提交到一个线程池中作为独立的任务执行。在Dependency 这种触发机制下,可以更好的利用多核CPU,实现充分的并行。 -6. 调度逻辑上,所有 Pipeline Task 的阻塞条件都使用 Dependency 进行了封装,通过外部事件(例如 RPC 完成)触发 task 的执行逻辑进入 Runnable 队列,从而消除了阻塞轮询线程的开销。 +### Operator +在大多数时候,Pipeline 中的每个Operator 都对应了一个PlanNode,但是有一些特殊的算子除外: +- JoinNode,被拆分为JoinBuildOperator和JoinProbeOperator +- AggNode 被拆分为AggSinkOperator和AggSourceOperator +- SortNode 被拆分为SortSinkOperator 和 SortSourceOperator +基本原理是,对于一些breaking 算子(需要把所有的数据都收集齐之后才能运算的算子),把灌入数据的部分拆分为Sink,然后把从这个算子里获取数据的部分称为Source。 -7. Profile:为用户提供简单易懂的指标。 +## Scan 并行化 +扫描数据是一个非常重的IO 操作,它需要从本地磁盘读取大量的数据(如果是数据湖的场景,就需要从HDFS或者S3中读取,延时更长),需要比较多的时间。所以我们在ScanOperator 中引入了并行扫描的技术,ScanOperator会动态的生成多个Scanner,每个Scanner 扫描100w-200w 行左右的数据,每个Scanner 在做数据扫描时,完成相应的数据解压、过滤等计算任务,然后把数据发送给一个DataQueue,供ScanOperator 读取。 -从而提高了 CPU 在混合负载 SQL 上执行时的效率,提升了 SQL 查询的性能。 + -## 使用方式 +通过并行扫描的技术可以有效的避免由于分桶不合理或者数据倾斜导致某些ScanOperator 执行时间特别久,把整个查询的延时都拖慢的问题。 -### 查询 +## Local Shuffle +在Pipeline执行模型中,Local Exchange作为一个Pipeline Breaker出现,是在本地将数据重新分发至各个执行任务的技术。它把上游Pipeline输出的全部数据以某种方式(HASH / Round Robin)均匀分发到下游Pipeline的全部Task中。解决执行过程中的数据倾斜的问题,使执行模型不再受数据存储以及plan的限制。接下来我们举例来说明Local Exchange的工作逻辑。 +我们用上述例子中的Pipeline-1为例子进一步阐述Local Exchange如何可以避免数据倾斜。 -1. enable_pipeline_engine + - 将 Session 变量 `enable_pipeline_engine` 设置为 `true`,则 BE 在进行查询执行时将会使用 Pipeline 执行引擎。 +如上图所示,首先,通过在Pipeline 1中插入Local Exchange,我们把Pipeline 1进一步拆分成Pipeline 1-0和Pipeline 1-1。 +此时,我们不妨假设当前并发等于3(每个Pipeline有3个task),每个task读取存储层的一个bucket,而3个bucket中数据行数分别是1,1,7。则插入Local Exchange前后的执行变化如下: - ```sql - set enable_pipeline_engine = true; - ``` + -2. parallel_pipeline_task_num - - `parallel_pipeline_task_num` 代表了 SQL 查询进行查询并发的 Pipeline Task 数目。Doris 默认的配置为 `0`,此时 Pipeline Task 数目将自动设置为当前集群机器中最少的 CPU 数量的一半。用户也可以根据自己的实际情况进行调整。 - - ```sql - set parallel_pipeline_task_num = 0; - ``` - - 可以通过设置 `max_instance_num` 来限制自动设置的并发数 (默认为 64) - -3. enable_local_shuffle - - 设置`enable_local_shuffle`为 True 则打开 Local Shuffle 优化。Local Shuffle 将尽可能将数据均匀分布给不同的 Pipeline Task 从而尽可能避免数据倾斜。 - - ```sql - set enable_local_shuffle = true; - ``` - -4. ignore_storage_data_distribution - - 设置`ignore_storage_data_distribution`为 True 则表示忽略存储层的数据分布。结合 Local Shuffle 一起使用,则 Pipeline 引擎的并发能力将不再受到存储层 Tablet 数量的制约,从而充分利用机器资源。 - - ```sql - set ignore_storage_data_distribution = true; - ``` - -### 导入 - -导入的引擎选择设置,详见[导入](../data-operate/import/load-manual)文档。 +从图右可以看出,HashJoin和Agg算子需要处理的数据量从(1,1,7)变成了(3,3,3)从而避免了数据倾斜。 +在Doris中,Local Exchange根据一系列规则来决定是否被规划,例如当查询耗时比较大的Join、聚合、窗口函数等算子需要被执行时,我们就需要使用Local Exchange来尽可能避免数据倾斜。 \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/query-acceleration/pipeline-execution-engine.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/query-acceleration/pipeline-execution-engine.md index 5d806cac159..0b483ed239f 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/query-acceleration/pipeline-execution-engine.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/query-acceleration/pipeline-execution-engine.md @@ -1,6 +1,6 @@ --- { - "title": "Pipeline 执行引擎", + "title": "并行执行", "language": "zh-CN", "toc_min_heading_level": 2, "toc_max_heading_level": 4 @@ -28,88 +28,71 @@ under the License. -:::info 备注 -Pipeline 执行引擎 是 Doris 在 2.0 版本加入的实验性功能,随后在 2.1 版本进行了优化与升级(即 PipelineX)。在 3.0 以及之后的版本中,Doris 只使用 PipelineX 作为唯一执行引擎,并且更名为 Pipeline 执行引擎。 -::: +Doris的并行执行模型是一种Pipeline 执行模型,主要参考了Hyper论文中Pipeline的实现方式(https://db.in.tum.de/~leis/papers/morsels.pdf),Pipeline 执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris 的执行线程膨胀的问题。它的具体设计、实现和效果可以参阅 [DSIP-027](DSIP-027: Support Pipeline Exec Engine - DORIS - Apache Software Foundation) 以及 [DSIP-035](DSIP-035: PipelineX Execution Engine - DORIS - Apache Software Foundation)。 +Doris 3.0 之后,Pipeline 执行模型彻底替换了原有的火山模型,基于Pipeline 执行模型,Doris 实现了 Query、DDL、DML 语句的并行处理。 -Pipeline 执行引擎的主要目标是为了替换之前 Doris 基于火山模型的执行引擎,充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris 的执行线程膨胀的问题。 +## 物理计划 +为了更好的理解Pipeline 执行模型,首先需要介绍一下物理查询计划中两个重要的概念:PlanFragment和PlanNode。我们使用下面这条SQL 作为例子: +``` +SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1); +``` -它的具体设计、实现和效果可以参阅 [DSIP-027]([DSIP-027: Support Pipeline Exec Engine - DORIS - Apache Software Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine)) 以及 [DSIP-035]([DSIP-035: PipelineX Execution Engine - DORIS - Apache Software Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-035%3A+PipelineX+Execution+Engine))。 +FE 首先会把它翻译成下面这种逻辑计划,计划中每个节点就是一个PlanNode,每种Node的具体含义,可以参考查看物理计划的介绍。 -## 原理 + -当前的 Doris 的 SQL 执行引擎是基于传统的火山模型进行设计,在单机多核的场景下存在下面的一些问题: +由于Doris 是一个MPP的架构,每个查询都会尽可能的让所有的BE 都参与进来并行执行,来降低查询的延时。所以还需要将上述逻辑计划拆分为一个物理计划,拆分物理计划基本上就是在逻辑计划中插入了DataSink和ExchangeNode,通过这两个Node完成了数据在多个BE 之间的Shuffle。拆分完成后,每个PlanFragment 相当于包含了一部分PlanNode,可以作为一个独立的任务发送给BE,每个BE 完成了PlanFragment内包含的PlanNode的计算后,通过DataSink和ExchangeNode 这两个算子把数据shuffle到其他BE上来进行接下来的计算。 -* 无法充分利用多核计算能力,提升查询性能,**多数场景下进行性能调优时需要手动设置并行度**,在生产环境中几乎很难进行设定。 + -* 单机查询的每个 Instance 对应线程池的一个线程,这会带来额外的两个问题。 - - * 线程池一旦打满。**Doris 的查询引擎会进入假性死锁**,对后续的查询无法响应。**同时有一定概率进入逻辑死锁**的情况:比如所有的线程都在执行一个 Instance 的 Probe 任务。 - - * 阻塞的算子会占用线程资源,**而阻塞的线程资源无法让渡给能够调度的 Instance**,整体资源利用率上不去。 +所以Doris的规划分为3层: +PLAN:执行计划,一个SQL会被执行规划器翻译成一个执行计划,之后执行计划会提供给执行引擎执行。 -* 阻塞算子依赖操作系统的线程调度机制,**线程切换开销较大(尤其在系统混布的场景中)** +FRAGMENT:由于DORIS是一个分布式执行引擎。一个完整的执行计划会被切分为多个单机的执行片段。一个FRAGMENT表是一个完整的单机执行片段。多个FRAGMENT组合在一起,构成一个完整的PLAN。 -由此带来的一系列问题驱动 Doris 需要实现适应现代多核 CPU 的体系结构的执行引擎。 +PLAN NODE:算子,是执行计划的最小单位。一个FRAGMENT由多个算子构成。每一个算子负责一个实际的执行逻辑,比如聚合,连接等 -而如下图所示(引用自[Push versus pull-based loop fusion in query engines]([jfp_1800010a (cambridge.org)](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf))),Pipeline 执行引擎基于多核 CPU 的特点,重新设计由数据驱动的执行引擎: +## Pipeline 执行 +PlanFragment 是FE 发往BE 执行任务的最小单位。BE可能会收到同一个Query的多个不同的PlanFragment,每个PlanFragment都会被单独的处理。在收到PlanFragment 之后,BE会把PlanFragment 拆分为多个Pipeline,进而启动多个PipelineTask 来实现并行执行,提升查询效率。 - + -1. 将传统 Pull 拉取的逻辑驱动的执行流程改造为 Push 模型的数据驱动的执行引擎 -2. 阻塞操作异步化,减少了线程切换,线程阻塞导致的执行开销,对于 CPU 的利用更为高效 +### Pipeline +一个Pipeline 有一个SourceOperator 和 一个SinkOperator 以及中间的多个其他Operator组成。SourceOperator 代表从外部读取数据,可以是一个表(OlapTable),也可以是一个Buffer(Exchange)。SinkOperator 表示数据的输出,输出可以是通过网络shuffle到别的节点,比如DataStreamSinkOperator,也可以是输出到HashTable,比如Agg算子,JoinBuildHashTable等。 -3. 控制了执行线程的数目,通过时间片的切换的控制,在混合负载的场景中,减少大查询对于小查询的资源挤占问题 + -4. 执行并发上,依赖 Local Exchange 使 Pipeline 充分并发,可以让数据被均匀分布到不同的 Task 中,尽可能减少数据倾斜,此外,Pipeline 也将不再受存储层 Tablet 数量的制约。 +多个Pipeline 之间实际是有依赖关系的,以JoinNode为例,他实际被拆分到了2个Pipeline 里。其中Pipeline-0是读取Exchange的数据,来构建HashTable;Pipeline-1 是从表里读取数据,来进行Probe。这2个Pipeline 之间是有关联关系的,只有Pipeline-0运行完毕之后才能执行Pipeline-1。这两者之间的依赖关系,称为Dependency。当Pipeline-0 运行完毕后,会调用Dependency的set_ready 方法通知Pipeline-1 可执行。 -5. 执行逻辑上,多个 Pipeline Task 共享同一个 Pipeline 的全部共享状态,例如表达式和一些 Const 变量,消除了额外的初始化开销。 +### PipelineTask +Pipeline 实际还是一个逻辑概念,他并不是一个可执行的实体。在有了Pipeline之后,需要进一步的把Pipeline 实例化为多个PipelineTask。将需要读取的数据分配给不同的PipelineTask 最终实现并行处理。同一个Pipeline的多个PipelineTask 之间的Operator 完全相同,他们的区别在于Operator的状态不一样,比如读取的数据不一样,构建出的HashTable 不一样,这些不一样的状态,我们称之为LocalState。 +每个PipelineTask 最终都会被提交到一个线程池中作为独立的任务执行。在Dependency 这种触发机制下,可以更好的利用多核CPU,实现充分的并行。 -6. 调度逻辑上,所有 Pipeline Task 的阻塞条件都使用 Dependency 进行了封装,通过外部事件(例如 RPC 完成)触发 task 的执行逻辑进入 Runnable 队列,从而消除了阻塞轮询线程的开销。 +### Operator +在大多数时候,Pipeline 中的每个Operator 都对应了一个PlanNode,但是有一些特殊的算子除外: +- JoinNode,被拆分为JoinBuildOperator和JoinProbeOperator +- AggNode 被拆分为AggSinkOperator和AggSourceOperator +- SortNode 被拆分为SortSinkOperator 和 SortSourceOperator + 基本原理是,对于一些breaking 算子(需要把所有的数据都收集齐之后才能运算的算子),把灌入数据的部分拆分为Sink,然后把从这个算子里获取数据的部分称为Source。 -7. Profile:为用户提供简单易懂的指标。 +## Scan 并行化 +扫描数据是一个非常重的IO 操作,它需要从本地磁盘读取大量的数据(如果是数据湖的场景,就需要从HDFS或者S3中读取,延时更长),需要比较多的时间。所以我们在ScanOperator 中引入了并行扫描的技术,ScanOperator会动态的生成多个Scanner,每个Scanner 扫描100w-200w 行左右的数据,每个Scanner 在做数据扫描时,完成相应的数据解压、过滤等计算任务,然后把数据发送给一个DataQueue,供ScanOperator 读取。 -从而提高了 CPU 在混合负载 SQL 上执行时的效率,提升了 SQL 查询的性能。 + -## 使用方式 +通过并行扫描的技术可以有效的避免由于分桶不合理或者数据倾斜导致某些ScanOperator 执行时间特别久,把整个查询的延时都拖慢的问题。 -### 查询 +## Local Shuffle +在Pipeline执行模型中,Local Exchange作为一个Pipeline Breaker出现,是在本地将数据重新分发至各个执行任务的技术。它把上游Pipeline输出的全部数据以某种方式(HASH / Round Robin)均匀分发到下游Pipeline的全部Task中。解决执行过程中的数据倾斜的问题,使执行模型不再受数据存储以及plan的限制。接下来我们举例来说明Local Exchange的工作逻辑。 +我们用上述例子中的Pipeline-1为例子进一步阐述Local Exchange如何可以避免数据倾斜。 -1. enable_pipeline_engine + - 将 Session 变量 `enable_pipeline_engine` 设置为 `true`,则 BE 在进行查询执行时将会使用 Pipeline 执行引擎。 +如上图所示,首先,通过在Pipeline 1中插入Local Exchange,我们把Pipeline 1进一步拆分成Pipeline 1-0和Pipeline 1-1。 +此时,我们不妨假设当前并发等于3(每个Pipeline有3个task),每个task读取存储层的一个bucket,而3个bucket中数据行数分别是1,1,7。则插入Local Exchange前后的执行变化如下: - ```sql - set enable_pipeline_engine = true; - ``` + -2. parallel_pipeline_task_num - - `parallel_pipeline_task_num` 代表了 SQL 查询进行查询并发的 Pipeline Task 数目。Doris 默认的配置为 `0`,此时 Pipeline Task 数目将自动设置为当前集群机器中最少的 CPU 数量的一半。用户也可以根据自己的实际情况进行调整。 - - ```sql - set parallel_pipeline_task_num = 0; - ``` - - 可以通过设置 `max_instance_num` 来限制自动设置的并发数 (默认为 64) - -3. enable_local_shuffle - - 设置`enable_local_shuffle`为 True 则打开 Local Shuffle 优化。Local Shuffle 将尽可能将数据均匀分布给不同的 Pipeline Task 从而尽可能避免数据倾斜。 - - ```sql - set enable_local_shuffle = true; - ``` - -4. ignore_storage_data_distribution - - 设置`ignore_storage_data_distribution`为 True 则表示忽略存储层的数据分布。结合 Local Shuffle 一起使用,则 Pipeline 引擎的并发能力将不再受到存储层 Tablet 数量的制约,从而充分利用机器资源。 - - ```sql - set ignore_storage_data_distribution = true; - ``` - -### 导入 - -导入的引擎选择设置,详见[导入](../data-operate/import/load-manual)文档。 +从图右可以看出,HashJoin和Agg算子需要处理的数据量从(1,1,7)变成了(3,3,3)从而避免了数据倾斜。 +在Doris中,Local Exchange根据一系列规则来决定是否被规划,例如当查询耗时比较大的Join、聚合、窗口函数等算子需要被执行时,我们就需要使用Local Exchange来尽可能避免数据倾斜。 \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/query-acceleration/pipeline-execution-engine.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/query-acceleration/pipeline-execution-engine.md index 5d806cac159..0b483ed239f 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/query-acceleration/pipeline-execution-engine.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/query-acceleration/pipeline-execution-engine.md @@ -1,6 +1,6 @@ --- { - "title": "Pipeline 执行引擎", + "title": "并行执行", "language": "zh-CN", "toc_min_heading_level": 2, "toc_max_heading_level": 4 @@ -28,88 +28,71 @@ under the License. -:::info 备注 -Pipeline 执行引擎 是 Doris 在 2.0 版本加入的实验性功能,随后在 2.1 版本进行了优化与升级(即 PipelineX)。在 3.0 以及之后的版本中,Doris 只使用 PipelineX 作为唯一执行引擎,并且更名为 Pipeline 执行引擎。 -::: +Doris的并行执行模型是一种Pipeline 执行模型,主要参考了Hyper论文中Pipeline的实现方式(https://db.in.tum.de/~leis/papers/morsels.pdf),Pipeline 执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris 的执行线程膨胀的问题。它的具体设计、实现和效果可以参阅 [DSIP-027](DSIP-027: Support Pipeline Exec Engine - DORIS - Apache Software Foundation) 以及 [DSIP-035](DSIP-035: PipelineX Execution Engine - DORIS - Apache Software Foundation)。 +Doris 3.0 之后,Pipeline 执行模型彻底替换了原有的火山模型,基于Pipeline 执行模型,Doris 实现了 Query、DDL、DML 语句的并行处理。 -Pipeline 执行引擎的主要目标是为了替换之前 Doris 基于火山模型的执行引擎,充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris 的执行线程膨胀的问题。 +## 物理计划 +为了更好的理解Pipeline 执行模型,首先需要介绍一下物理查询计划中两个重要的概念:PlanFragment和PlanNode。我们使用下面这条SQL 作为例子: +``` +SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1); +``` -它的具体设计、实现和效果可以参阅 [DSIP-027]([DSIP-027: Support Pipeline Exec Engine - DORIS - Apache Software Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine)) 以及 [DSIP-035]([DSIP-035: PipelineX Execution Engine - DORIS - Apache Software Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-035%3A+PipelineX+Execution+Engine))。 +FE 首先会把它翻译成下面这种逻辑计划,计划中每个节点就是一个PlanNode,每种Node的具体含义,可以参考查看物理计划的介绍。 -## 原理 + -当前的 Doris 的 SQL 执行引擎是基于传统的火山模型进行设计,在单机多核的场景下存在下面的一些问题: +由于Doris 是一个MPP的架构,每个查询都会尽可能的让所有的BE 都参与进来并行执行,来降低查询的延时。所以还需要将上述逻辑计划拆分为一个物理计划,拆分物理计划基本上就是在逻辑计划中插入了DataSink和ExchangeNode,通过这两个Node完成了数据在多个BE 之间的Shuffle。拆分完成后,每个PlanFragment 相当于包含了一部分PlanNode,可以作为一个独立的任务发送给BE,每个BE 完成了PlanFragment内包含的PlanNode的计算后,通过DataSink和ExchangeNode 这两个算子把数据shuffle到其他BE上来进行接下来的计算。 -* 无法充分利用多核计算能力,提升查询性能,**多数场景下进行性能调优时需要手动设置并行度**,在生产环境中几乎很难进行设定。 + -* 单机查询的每个 Instance 对应线程池的一个线程,这会带来额外的两个问题。 - - * 线程池一旦打满。**Doris 的查询引擎会进入假性死锁**,对后续的查询无法响应。**同时有一定概率进入逻辑死锁**的情况:比如所有的线程都在执行一个 Instance 的 Probe 任务。 - - * 阻塞的算子会占用线程资源,**而阻塞的线程资源无法让渡给能够调度的 Instance**,整体资源利用率上不去。 +所以Doris的规划分为3层: +PLAN:执行计划,一个SQL会被执行规划器翻译成一个执行计划,之后执行计划会提供给执行引擎执行。 -* 阻塞算子依赖操作系统的线程调度机制,**线程切换开销较大(尤其在系统混布的场景中)** +FRAGMENT:由于DORIS是一个分布式执行引擎。一个完整的执行计划会被切分为多个单机的执行片段。一个FRAGMENT表是一个完整的单机执行片段。多个FRAGMENT组合在一起,构成一个完整的PLAN。 -由此带来的一系列问题驱动 Doris 需要实现适应现代多核 CPU 的体系结构的执行引擎。 +PLAN NODE:算子,是执行计划的最小单位。一个FRAGMENT由多个算子构成。每一个算子负责一个实际的执行逻辑,比如聚合,连接等 -而如下图所示(引用自[Push versus pull-based loop fusion in query engines]([jfp_1800010a (cambridge.org)](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf))),Pipeline 执行引擎基于多核 CPU 的特点,重新设计由数据驱动的执行引擎: +## Pipeline 执行 +PlanFragment 是FE 发往BE 执行任务的最小单位。BE可能会收到同一个Query的多个不同的PlanFragment,每个PlanFragment都会被单独的处理。在收到PlanFragment 之后,BE会把PlanFragment 拆分为多个Pipeline,进而启动多个PipelineTask 来实现并行执行,提升查询效率。 - + -1. 将传统 Pull 拉取的逻辑驱动的执行流程改造为 Push 模型的数据驱动的执行引擎 -2. 阻塞操作异步化,减少了线程切换,线程阻塞导致的执行开销,对于 CPU 的利用更为高效 +### Pipeline +一个Pipeline 有一个SourceOperator 和 一个SinkOperator 以及中间的多个其他Operator组成。SourceOperator 代表从外部读取数据,可以是一个表(OlapTable),也可以是一个Buffer(Exchange)。SinkOperator 表示数据的输出,输出可以是通过网络shuffle到别的节点,比如DataStreamSinkOperator,也可以是输出到HashTable,比如Agg算子,JoinBuildHashTable等。 -3. 控制了执行线程的数目,通过时间片的切换的控制,在混合负载的场景中,减少大查询对于小查询的资源挤占问题 + -4. 执行并发上,依赖 Local Exchange 使 Pipeline 充分并发,可以让数据被均匀分布到不同的 Task 中,尽可能减少数据倾斜,此外,Pipeline 也将不再受存储层 Tablet 数量的制约。 +多个Pipeline 之间实际是有依赖关系的,以JoinNode为例,他实际被拆分到了2个Pipeline 里。其中Pipeline-0是读取Exchange的数据,来构建HashTable;Pipeline-1 是从表里读取数据,来进行Probe。这2个Pipeline 之间是有关联关系的,只有Pipeline-0运行完毕之后才能执行Pipeline-1。这两者之间的依赖关系,称为Dependency。当Pipeline-0 运行完毕后,会调用Dependency的set_ready 方法通知Pipeline-1 可执行。 -5. 执行逻辑上,多个 Pipeline Task 共享同一个 Pipeline 的全部共享状态,例如表达式和一些 Const 变量,消除了额外的初始化开销。 +### PipelineTask +Pipeline 实际还是一个逻辑概念,他并不是一个可执行的实体。在有了Pipeline之后,需要进一步的把Pipeline 实例化为多个PipelineTask。将需要读取的数据分配给不同的PipelineTask 最终实现并行处理。同一个Pipeline的多个PipelineTask 之间的Operator 完全相同,他们的区别在于Operator的状态不一样,比如读取的数据不一样,构建出的HashTable 不一样,这些不一样的状态,我们称之为LocalState。 +每个PipelineTask 最终都会被提交到一个线程池中作为独立的任务执行。在Dependency 这种触发机制下,可以更好的利用多核CPU,实现充分的并行。 -6. 调度逻辑上,所有 Pipeline Task 的阻塞条件都使用 Dependency 进行了封装,通过外部事件(例如 RPC 完成)触发 task 的执行逻辑进入 Runnable 队列,从而消除了阻塞轮询线程的开销。 +### Operator +在大多数时候,Pipeline 中的每个Operator 都对应了一个PlanNode,但是有一些特殊的算子除外: +- JoinNode,被拆分为JoinBuildOperator和JoinProbeOperator +- AggNode 被拆分为AggSinkOperator和AggSourceOperator +- SortNode 被拆分为SortSinkOperator 和 SortSourceOperator + 基本原理是,对于一些breaking 算子(需要把所有的数据都收集齐之后才能运算的算子),把灌入数据的部分拆分为Sink,然后把从这个算子里获取数据的部分称为Source。 -7. Profile:为用户提供简单易懂的指标。 +## Scan 并行化 +扫描数据是一个非常重的IO 操作,它需要从本地磁盘读取大量的数据(如果是数据湖的场景,就需要从HDFS或者S3中读取,延时更长),需要比较多的时间。所以我们在ScanOperator 中引入了并行扫描的技术,ScanOperator会动态的生成多个Scanner,每个Scanner 扫描100w-200w 行左右的数据,每个Scanner 在做数据扫描时,完成相应的数据解压、过滤等计算任务,然后把数据发送给一个DataQueue,供ScanOperator 读取。 -从而提高了 CPU 在混合负载 SQL 上执行时的效率,提升了 SQL 查询的性能。 + -## 使用方式 +通过并行扫描的技术可以有效的避免由于分桶不合理或者数据倾斜导致某些ScanOperator 执行时间特别久,把整个查询的延时都拖慢的问题。 -### 查询 +## Local Shuffle +在Pipeline执行模型中,Local Exchange作为一个Pipeline Breaker出现,是在本地将数据重新分发至各个执行任务的技术。它把上游Pipeline输出的全部数据以某种方式(HASH / Round Robin)均匀分发到下游Pipeline的全部Task中。解决执行过程中的数据倾斜的问题,使执行模型不再受数据存储以及plan的限制。接下来我们举例来说明Local Exchange的工作逻辑。 +我们用上述例子中的Pipeline-1为例子进一步阐述Local Exchange如何可以避免数据倾斜。 -1. enable_pipeline_engine + - 将 Session 变量 `enable_pipeline_engine` 设置为 `true`,则 BE 在进行查询执行时将会使用 Pipeline 执行引擎。 +如上图所示,首先,通过在Pipeline 1中插入Local Exchange,我们把Pipeline 1进一步拆分成Pipeline 1-0和Pipeline 1-1。 +此时,我们不妨假设当前并发等于3(每个Pipeline有3个task),每个task读取存储层的一个bucket,而3个bucket中数据行数分别是1,1,7。则插入Local Exchange前后的执行变化如下: - ```sql - set enable_pipeline_engine = true; - ``` + -2. parallel_pipeline_task_num - - `parallel_pipeline_task_num` 代表了 SQL 查询进行查询并发的 Pipeline Task 数目。Doris 默认的配置为 `0`,此时 Pipeline Task 数目将自动设置为当前集群机器中最少的 CPU 数量的一半。用户也可以根据自己的实际情况进行调整。 - - ```sql - set parallel_pipeline_task_num = 0; - ``` - - 可以通过设置 `max_instance_num` 来限制自动设置的并发数 (默认为 64) - -3. enable_local_shuffle - - 设置`enable_local_shuffle`为 True 则打开 Local Shuffle 优化。Local Shuffle 将尽可能将数据均匀分布给不同的 Pipeline Task 从而尽可能避免数据倾斜。 - - ```sql - set enable_local_shuffle = true; - ``` - -4. ignore_storage_data_distribution - - 设置`ignore_storage_data_distribution`为 True 则表示忽略存储层的数据分布。结合 Local Shuffle 一起使用,则 Pipeline 引擎的并发能力将不再受到存储层 Tablet 数量的制约,从而充分利用机器资源。 - - ```sql - set ignore_storage_data_distribution = true; - ``` - -### 导入 - -导入的引擎选择设置,详见[导入](../data-operate/import/load-manual)文档。 +从图右可以看出,HashJoin和Agg算子需要处理的数据量从(1,1,7)变成了(3,3,3)从而避免了数据倾斜。 +在Doris中,Local Exchange根据一系列规则来决定是否被规划,例如当查询耗时比较大的Join、聚合、窗口函数等算子需要被执行时,我们就需要使用Local Exchange来尽可能避免数据倾斜。 \ No newline at end of file diff --git a/static/images/pip_exec_1.png b/static/images/pip_exec_1.png new file mode 100644 index 00000000000..8f258bd9fcf Binary files /dev/null and b/static/images/pip_exec_1.png differ diff --git a/static/images/pip_exec_2.png b/static/images/pip_exec_2.png new file mode 100644 index 00000000000..184babf1735 Binary files /dev/null and b/static/images/pip_exec_2.png differ diff --git a/static/images/pip_exec_3.png b/static/images/pip_exec_3.png new file mode 100644 index 00000000000..49ba05472ba Binary files /dev/null and b/static/images/pip_exec_3.png differ diff --git a/static/images/pip_exec_4.png b/static/images/pip_exec_4.png new file mode 100644 index 00000000000..e8fd4e5d452 Binary files /dev/null and b/static/images/pip_exec_4.png differ diff --git a/static/images/pip_exec_5.png b/static/images/pip_exec_5.png new file mode 100644 index 00000000000..840410d1e29 Binary files /dev/null and b/static/images/pip_exec_5.png differ diff --git a/static/images/pip_exec_6.png b/static/images/pip_exec_6.png new file mode 100644 index 00000000000..d6c8325adf8 Binary files /dev/null and b/static/images/pip_exec_6.png differ diff --git a/static/images/pip_exec_7.png b/static/images/pip_exec_7.png new file mode 100644 index 00000000000..58648e3e37f Binary files /dev/null and b/static/images/pip_exec_7.png differ diff --git a/versioned_docs/version-2.1/query-acceleration/pipeline-execution-engine.md b/versioned_docs/version-2.1/query-acceleration/pipeline-execution-engine.md index 503b2a4d7b0..788f1d9385c 100644 --- a/versioned_docs/version-2.1/query-acceleration/pipeline-execution-engine.md +++ b/versioned_docs/version-2.1/query-acceleration/pipeline-execution-engine.md @@ -1,6 +1,6 @@ --- { - "title": "Pipeline Execution Engine", + "title": "Parallel Execution", "language": "en", "toc_min_heading_level": 2, "toc_max_heading_level": 4 @@ -28,87 +28,79 @@ under the License. -The Pipeline Execution Engine is an experimental feature in Apache Doris 2.0, which was later optimized and upgraded in version 2.1 (i.e., PipelineX). In versions 3.0 and later, PipelineX is used as the only execution engine in Doris and renamed to Pipeline Execution Engine. +The parallel execution model of Doris is a Pipeline execution model, primarily inspired by the implementation described in the Hyper paper (https://db.in.tum.de/~leis/papers/morsels.pdf). The Pipeline execution model fully leverages the computational power of multi-core CPUs while limiting the number of query threads in Doris, addressing the issue of thread explosion during execution. For details on its design, implementation, and effectiveness, refer to [DSIP-027](DSIP-027: Support Pipe [...] -The goal of pipeline execution engine is to replace the current execution engine of Doris's volcano model, fully release the computing power of multi-core CPUs, and limit the number of Doris's query threads to solve the problem of Doris's execution thread bloat. +Starting from Doris 3.0, the Pipeline execution model has completely replaced the original Volcano model. Based on the Pipeline execution model, Doris supports the parallel processing of Query, DDL, and DML statements. -Its specific design, implementation and effects can be found in [DSIP-027]([DSIP-027: Support Pipeline Exec Engine - DORIS - Apache Software Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine)) and [DSIP-035]([DSIP-035: PipelineX Execution Engine - DORIS - Apache Software Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-035%3A+PipelineX+Execution+Engine)). +## Physical Plan -## Principle +To better understand the Pipeline execution model, it is first necessary to introduce two important concepts in the physical query plan: PlanFragment and PlanNode. We will use the following SQL statement as an example: +``` +SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1); +``` -The current Doris SQL execution engine is designed based on the traditional volcano model, which has the following problems in a single multi-core scenario: +The FE will first translate it into the following logical plan, where each node represents a PlanNode. The specific meaning of each type of node can be found in the introduction to the physical plan. -* Inability to take full advantage of multi-core computing power to improve query performance,**most scenarios require manual setting of parallelism** for performance tuning, which is almost difficult to set in production environments. + -* Each instance of a standalone query corresponds to one thread of the thread pool, which introduces two additional problems. - - * Once the thread pool is hit full. **Doris' query engine will enter a pseudo-deadlock** and will not respond to subsequent queries. **At the same time there is a certain probability of entering a logical deadlock** situation: for example, all threads are executing an instance's probe task. - - * Blocking arithmetic will take up thread resources,**blocking thread resources can not be yielded to instances that can be scheduled**, the overall resource utilization does not go up. +Since Doris is built on an MPP architecture, each query aims to involve all BEs in parallel execution as much as possible to reduce query latency. Therefore, the logical plan must be transformed into a physical plan. The transformation essentially involves inserting DataSink and ExchangeNode into the logical plan. These two nodes facilitate the shuffling of data across multiple BEs. -* Blocking arithmetic relies on the OS thread scheduling mechanism, **thread switching overhead (especially in the scenario of system mixing))** +After the transformation, each PlanFragment corresponds to a portion of the PlanNode and can be sent as an independent task to a BE. Each BE processes the PlanNode contained within the PlanFragment and then uses the DataSink and ExchangeNode operators to shuffle data to other BEs for subsequent computation. -The resulting set of problems drove Doris to implement an execution engine adapted to the architecture of modern multi-core CPUs. + -And as shown in the figure below (quoted from[Push versus pull-based loop fusion in query engines]([jfp_1800010a (cambridge.org)](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf))),The resulting set of problems drove Doris to implement an execution engine adapted to the architecture of modern multi-core CPUs.: +Doris's planning process is divided into three layers: +PLAN:The execution plan. A SQL statement is translated by the query planner into an execution plan, which is then provided to the execution engine for execution. - +FRAGMENT:Since Doris is a distributed execution engine, a complete execution plan is divided into multiple single-machine execution fragments. A FRAGMENT represents a complete single-machine execution fragment. Multiple fragments combine to form a complete PLAN. -1. Transformation of the traditional pull pull logic-driven execution process into a data-driven execution engine for the push model +PLAN NODE:Operators, which are the smallest units of the execution plan. A FRAGMENT consists of multiple operators, each responsible for a specific execution logic, such as aggregation or join operations. -2. Blocking operations are asynchronous, reducing the execution overhead caused by thread switching and thread blocking and making more efficient use of the CPU +## Pipeline Execution +A PlanFragment is the smallest unit of a task sent by the FE to the BE for execution. A BE may receive multiple different PlanFragments for the same query, and each PlanFragment is processed independently. Upon receiving a PlanFragment, the BE splits it into multiple Pipelines and then starts multiple PipelineTasks to achieve parallel execution, thereby improving query efficiency. -3. Controls the number of threads to be executed and reduces the resource congestion of large queries on small queries in mixed load scenarios by controlling time slice switching + -4. In terms of execution concurrency, pipelineX introduces local exchange optimization to fully utilize CPU resources, and distribute data evenly across different tasks to minimize data skewing. In addition, pipelineX will no longer be constrained by the number of tablets. -5. Logically, multiple pipeline tasks share all shared states of the same pipeline and eliminate additional initialization overhead, such as expressions and some const variables. +### Pipeline +A Pipeline consists of a SourceOperator, a SinkOperator, and several intermediate operators. The SourceOperator represents reading data from an external source, which can be a table (e.g., OlapTable) or a buffer (e.g., Exchange). The SinkOperator represents the data output, which can either be shuffled to other nodes over the network (e.g., DataStreamSinkOperator) or output to a hash table (e.g., aggregation operators, join build hash tables, etc.). -6. In terms of scheduling logic, the blocking conditions of all pipeline tasks are encapsulated using Dependency, and the execution logic of the tasks is triggered by external events (such as rpc completion) to enter the runnable queue, thereby eliminating the overhead of blocking polling threads. + -7. Profile: Provide users with simple and easy to understand metrics. +Multiple Pipelines are actually interdependent. Take the JoinNode as an example—it is split into two Pipelines. Pipeline-0 reads data from Exchange to build the hash table, while Pipeline-1 reads data from the table to perform the probe operation. These two Pipelines are connected by a dependency relationship, meaning Pipeline-1 can only execute after Pipeline-0 has completed. This dependency relationship is referred to as a Dependency. Once Pipeline-0 finishes execution, it calls the se [...] +### PipelineTask +A Pipeline is actually a logical concept; it is not an executable entity. Once a Pipeline is defined, it needs to be further instantiated into multiple PipelineTasks. The data that needs to be read is then distributed to different PipelineTasks, ultimately achieving parallel processing. The operators within the multiple PipelineTasks of the same Pipeline are identical, but they differ in their states. For example, they might read different data or build different hash tables. These diffe [...] -This improves the efficiency of CPU execution on mixed-load SQL and enhances the performance of SQL queries. +Each PipelineTask is eventually submitted to a thread pool to be executed as an independent task. With the Dependency trigger mechanism, this approach allows better utilization of multi-core CPUs and achieves full parallelism. -## Usage +### Operator +In most cases, each operator in a Pipeline corresponds to a PlanNode, but there are some special operators with exceptions: +* JoinNode is split into JoinBuildOperator and JoinProbeOperator. +* AggNode is split into AggSinkOperator and AggSourceOperator. +* SortNode is split into SortSinkOperator and SortSourceOperator. + The basic principle is that for certain "breaking" operators (those that need to collect all the data before performing computation), the data ingestion part is split into a Sink, while the part that retrieves data from the operator is referred to as the Source. -### Query +## Scan 并行化 +Scanning data is a very heavy I/O operation, as it requires reading large amounts of data from local disks (or from HDFS or S3 in the case of data lake scenarios, which introduces even longer latency), consuming a significant amount of time. Therefore, we have introduced parallel scanning technology in the ScanOperator. The ScanOperator dynamically generates multiple Scanners, each of which scans around 1 to 2 million rows of data. While performing the scan, each Scanner handles tasks su [...] -1. enable_pipeline_engine + - Setting the session variable `enable_pipeline_engine` to `true` will make BE to use the Pipeline execution engine when performing query execution. +By using parallel scanning technology, we can effectively avoid issues where certain ScanOperators take an excessively long time due to improper bucketing or data skew, which would otherwise slow down the entire query latency. - ```sql - set enable_pipeline_engine = true; - ``` +## Local Shuffle +In the Pipeline execution model, Local Exchange acts as a Pipeline Breaker, a technique that redistributes data locally across different execution tasks. It evenly distributes all the data output by the upstream Pipeline to all the tasks in the downstream Pipeline using methods like HASH or Round Robin. This helps solve the problem of data skew during execution, ensuring that the execution model is no longer limited by data storage or the query plan. Let's now provide an example to illus [...] -2. parallel_pipeline_task_num +We will further explain how Local Exchange can prevent data skew using Pipeline-1 from the previous example. - The `parallel_pipeline_task_num` represents the number of Pipeline Tasks for a SQL query to be queried concurrently.The default configuration of Doris is `0`, in which case the number of Pipeline Tasks will be automatically set to half of the minimum number of CPUs in the current cluster machine. You can also adjust it according to your own situation. + - ```sql - set parallel_pipeline_task_num = 0; - ``` +As shown in the figure above, by inserting a Local Exchange in Pipeline-1, we further split Pipeline-1 into Pipeline-1-0 and Pipeline-1-1. - You can limit the automatically configured concurrency by setting `max_instance_num`(The default value is 64) +Now, let's assume the current concurrency level is 3 (each Pipeline has 3 tasks), and each task reads one bucket from the storage layer. The number of rows in the three buckets is 1, 1, and 7, respectively. The execution before and after inserting the Local Exchange changes as follows: -3. enable_local_shuffle + - Set `enable_local_shuffle` to true will enable local shuffle optimization. Local shuffle will try to evenly distribute data among different pipeline tasks to avoid data skewing as much as possible. +As can be seen from the figure on the right, the amount of data that the HashJoin and Agg operators need to process changes from (1, 1, 7) to (3, 3, 3), thereby avoiding data skew. - ```sql - set enable_local_shuffle = true; - ``` - -4. ignore_storage_data_distribution - - Settings `ignore_storage_data_distribution` is true, it means ignoring the data distribution of the storage layer. When used in conjunction with local shuffle, the concurrency capability of the pipelineX engine will no longer be constrained by the number of storage layer tables, thus fully utilizing machine resources. - - ```sql - set ignore_storage_data_distribution = true; - ``` - -### Load - -The engine selected for import are detailed in the [Load](../data-operate/import/load-manual) documentation. +In Doris, Local Exchange is planned based on a series of rules. For example, when a query involves time-consuming operators like Join, Aggregation, or Window Functions, Local Exchange is used to minimize data skew as much as possible. diff --git a/versioned_docs/version-3.0/query-acceleration/pipeline-execution-engine.md b/versioned_docs/version-3.0/query-acceleration/pipeline-execution-engine.md index 503b2a4d7b0..8239554ba03 100644 --- a/versioned_docs/version-3.0/query-acceleration/pipeline-execution-engine.md +++ b/versioned_docs/version-3.0/query-acceleration/pipeline-execution-engine.md @@ -1,6 +1,6 @@ --- { - "title": "Pipeline Execution Engine", + "title": "Parallel Execution", "language": "en", "toc_min_heading_level": 2, "toc_max_heading_level": 4 @@ -28,87 +28,80 @@ under the License. -The Pipeline Execution Engine is an experimental feature in Apache Doris 2.0, which was later optimized and upgraded in version 2.1 (i.e., PipelineX). In versions 3.0 and later, PipelineX is used as the only execution engine in Doris and renamed to Pipeline Execution Engine. +The parallel execution model of Doris is a Pipeline execution model, primarily inspired by the implementation described in the Hyper paper (https://db.in.tum.de/~leis/papers/morsels.pdf). The Pipeline execution model fully leverages the computational power of multi-core CPUs while limiting the number of query threads in Doris, addressing the issue of thread explosion during execution. For details on its design, implementation, and effectiveness, refer to [DSIP-027](DSIP-027: Support Pipe [...] -The goal of pipeline execution engine is to replace the current execution engine of Doris's volcano model, fully release the computing power of multi-core CPUs, and limit the number of Doris's query threads to solve the problem of Doris's execution thread bloat. +Starting from Doris 3.0, the Pipeline execution model has completely replaced the original Volcano model. Based on the Pipeline execution model, Doris supports the parallel processing of Query, DDL, and DML statements. -Its specific design, implementation and effects can be found in [DSIP-027]([DSIP-027: Support Pipeline Exec Engine - DORIS - Apache Software Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine)) and [DSIP-035]([DSIP-035: PipelineX Execution Engine - DORIS - Apache Software Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-035%3A+PipelineX+Execution+Engine)). +## Physical Plan -## Principle +To better understand the Pipeline execution model, it is first necessary to introduce two important concepts in the physical query plan: PlanFragment and PlanNode. We will use the following SQL statement as an example: +``` +SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1); +``` -The current Doris SQL execution engine is designed based on the traditional volcano model, which has the following problems in a single multi-core scenario: +The FE will first translate it into the following logical plan, where each node represents a PlanNode. The specific meaning of each type of node can be found in the introduction to the physical plan. -* Inability to take full advantage of multi-core computing power to improve query performance,**most scenarios require manual setting of parallelism** for performance tuning, which is almost difficult to set in production environments. + -* Each instance of a standalone query corresponds to one thread of the thread pool, which introduces two additional problems. - - * Once the thread pool is hit full. **Doris' query engine will enter a pseudo-deadlock** and will not respond to subsequent queries. **At the same time there is a certain probability of entering a logical deadlock** situation: for example, all threads are executing an instance's probe task. - - * Blocking arithmetic will take up thread resources,**blocking thread resources can not be yielded to instances that can be scheduled**, the overall resource utilization does not go up. +Since Doris is built on an MPP architecture, each query aims to involve all BEs in parallel execution as much as possible to reduce query latency. Therefore, the logical plan must be transformed into a physical plan. The transformation essentially involves inserting DataSink and ExchangeNode into the logical plan. These two nodes facilitate the shuffling of data across multiple BEs. -* Blocking arithmetic relies on the OS thread scheduling mechanism, **thread switching overhead (especially in the scenario of system mixing))** +After the transformation, each PlanFragment corresponds to a portion of the PlanNode and can be sent as an independent task to a BE. Each BE processes the PlanNode contained within the PlanFragment and then uses the DataSink and ExchangeNode operators to shuffle data to other BEs for subsequent computation. -The resulting set of problems drove Doris to implement an execution engine adapted to the architecture of modern multi-core CPUs. + -And as shown in the figure below (quoted from[Push versus pull-based loop fusion in query engines]([jfp_1800010a (cambridge.org)](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf))),The resulting set of problems drove Doris to implement an execution engine adapted to the architecture of modern multi-core CPUs.: +Doris's planning process is divided into three layers: +PLAN:The execution plan. A SQL statement is translated by the query planner into an execution plan, which is then provided to the execution engine for execution. - +FRAGMENT:Since Doris is a distributed execution engine, a complete execution plan is divided into multiple single-machine execution fragments. A FRAGMENT represents a complete single-machine execution fragment. Multiple fragments combine to form a complete PLAN. -1. Transformation of the traditional pull pull logic-driven execution process into a data-driven execution engine for the push model +PLAN NODE:Operators, which are the smallest units of the execution plan. A FRAGMENT consists of multiple operators, each responsible for a specific execution logic, such as aggregation or join operations. -2. Blocking operations are asynchronous, reducing the execution overhead caused by thread switching and thread blocking and making more efficient use of the CPU +## Pipeline Execution +A PlanFragment is the smallest unit of a task sent by the FE to the BE for execution. A BE may receive multiple different PlanFragments for the same query, and each PlanFragment is processed independently. Upon receiving a PlanFragment, the BE splits it into multiple Pipelines and then starts multiple PipelineTasks to achieve parallel execution, thereby improving query efficiency. -3. Controls the number of threads to be executed and reduces the resource congestion of large queries on small queries in mixed load scenarios by controlling time slice switching + -4. In terms of execution concurrency, pipelineX introduces local exchange optimization to fully utilize CPU resources, and distribute data evenly across different tasks to minimize data skewing. In addition, pipelineX will no longer be constrained by the number of tablets. -5. Logically, multiple pipeline tasks share all shared states of the same pipeline and eliminate additional initialization overhead, such as expressions and some const variables. +### Pipeline +A Pipeline consists of a SourceOperator, a SinkOperator, and several intermediate operators. The SourceOperator represents reading data from an external source, which can be a table (e.g., OlapTable) or a buffer (e.g., Exchange). The SinkOperator represents the data output, which can either be shuffled to other nodes over the network (e.g., DataStreamSinkOperator) or output to a hash table (e.g., aggregation operators, join build hash tables, etc.). -6. In terms of scheduling logic, the blocking conditions of all pipeline tasks are encapsulated using Dependency, and the execution logic of the tasks is triggered by external events (such as rpc completion) to enter the runnable queue, thereby eliminating the overhead of blocking polling threads. + -7. Profile: Provide users with simple and easy to understand metrics. +Multiple Pipelines are actually interdependent. Take the JoinNode as an example—it is split into two Pipelines. Pipeline-0 reads data from Exchange to build the hash table, while Pipeline-1 reads data from the table to perform the probe operation. These two Pipelines are connected by a dependency relationship, meaning Pipeline-1 can only execute after Pipeline-0 has completed. This dependency relationship is referred to as a Dependency. Once Pipeline-0 finishes execution, it calls the se [...] +### PipelineTask +A Pipeline is actually a logical concept; it is not an executable entity. Once a Pipeline is defined, it needs to be further instantiated into multiple PipelineTasks. The data that needs to be read is then distributed to different PipelineTasks, ultimately achieving parallel processing. The operators within the multiple PipelineTasks of the same Pipeline are identical, but they differ in their states. For example, they might read different data or build different hash tables. These diffe [...] -This improves the efficiency of CPU execution on mixed-load SQL and enhances the performance of SQL queries. +Each PipelineTask is eventually submitted to a thread pool to be executed as an independent task. With the Dependency trigger mechanism, this approach allows better utilization of multi-core CPUs and achieves full parallelism. -## Usage +### Operator +In most cases, each operator in a Pipeline corresponds to a PlanNode, but there are some special operators with exceptions: +* JoinNode is split into JoinBuildOperator and JoinProbeOperator. +* AggNode is split into AggSinkOperator and AggSourceOperator. +* SortNode is split into SortSinkOperator and SortSourceOperator. + The basic principle is that for certain "breaking" operators (those that need to collect all the data before performing computation), the data ingestion part is split into a Sink, while the part that retrieves data from the operator is referred to as the Source. -### Query +## Scan 并行化 +Scanning data is a very heavy I/O operation, as it requires reading large amounts of data from local disks (or from HDFS or S3 in the case of data lake scenarios, which introduces even longer latency), consuming a significant amount of time. Therefore, we have introduced parallel scanning technology in the ScanOperator. The ScanOperator dynamically generates multiple Scanners, each of which scans around 1 to 2 million rows of data. While performing the scan, each Scanner handles tasks su [...] -1. enable_pipeline_engine + - Setting the session variable `enable_pipeline_engine` to `true` will make BE to use the Pipeline execution engine when performing query execution. +By using parallel scanning technology, we can effectively avoid issues where certain ScanOperators take an excessively long time due to improper bucketing or data skew, which would otherwise slow down the entire query latency. - ```sql - set enable_pipeline_engine = true; - ``` +## Local Shuffle +In the Pipeline execution model, Local Exchange acts as a Pipeline Breaker, a technique that redistributes data locally across different execution tasks. It evenly distributes all the data output by the upstream Pipeline to all the tasks in the downstream Pipeline using methods like HASH or Round Robin. This helps solve the problem of data skew during execution, ensuring that the execution model is no longer limited by data storage or the query plan. Let's now provide an example to illus [...] -2. parallel_pipeline_task_num +We will further explain how Local Exchange can prevent data skew using Pipeline-1 from the previous example. - The `parallel_pipeline_task_num` represents the number of Pipeline Tasks for a SQL query to be queried concurrently.The default configuration of Doris is `0`, in which case the number of Pipeline Tasks will be automatically set to half of the minimum number of CPUs in the current cluster machine. You can also adjust it according to your own situation. + - ```sql - set parallel_pipeline_task_num = 0; - ``` +As shown in the figure above, by inserting a Local Exchange in Pipeline-1, we further split Pipeline-1 into Pipeline-1-0 and Pipeline-1-1. - You can limit the automatically configured concurrency by setting `max_instance_num`(The default value is 64) +Now, let's assume the current concurrency level is 3 (each Pipeline has 3 tasks), and each task reads one bucket from the storage layer. The number of rows in the three buckets is 1, 1, and 7, respectively. The execution before and after inserting the Local Exchange changes as follows: -3. enable_local_shuffle + - Set `enable_local_shuffle` to true will enable local shuffle optimization. Local shuffle will try to evenly distribute data among different pipeline tasks to avoid data skewing as much as possible. +As can be seen from the figure on the right, the amount of data that the HashJoin and Agg operators need to process changes from (1, 1, 7) to (3, 3, 3), thereby avoiding data skew. - ```sql - set enable_local_shuffle = true; - ``` +In Doris, Local Exchange is planned based on a series of rules. For example, when a query involves time-consuming operators like Join, Aggregation, or Window Functions, Local Exchange is used to minimize data skew as much as possible. -4. ignore_storage_data_distribution - - Settings `ignore_storage_data_distribution` is true, it means ignoring the data distribution of the storage layer. When used in conjunction with local shuffle, the concurrency capability of the pipelineX engine will no longer be constrained by the number of storage layer tables, thus fully utilizing machine resources. - - ```sql - set ignore_storage_data_distribution = true; - ``` - -### Load - -The engine selected for import are detailed in the [Load](../data-operate/import/load-manual) documentation. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org