This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-website.git


The following commit(s) were added to refs/heads/master by this push:
     new e32dd8bc7af Add pipeline execution document (#1573)
e32dd8bc7af is described below

commit e32dd8bc7aff1052b89a0d5320a9b04e27f2e66a
Author: wangbo <wan...@apache.org>
AuthorDate: Tue Dec 24 13:54:08 2024 +0800

    Add pipeline execution document (#1573)
---
 .../pipeline-execution-engine.md                   | 100 +++++++++----------
 .../pipeline-execution-engine.md                   | 107 +++++++++------------
 .../pipeline-execution-engine.md                   | 107 +++++++++------------
 .../pipeline-execution-engine.md                   | 107 +++++++++------------
 static/images/pip_exec_1.png                       | Bin 0 -> 43292 bytes
 static/images/pip_exec_2.png                       | Bin 0 -> 168656 bytes
 static/images/pip_exec_3.png                       | Bin 0 -> 157212 bytes
 static/images/pip_exec_4.png                       | Bin 0 -> 19075 bytes
 static/images/pip_exec_5.png                       | Bin 0 -> 41892 bytes
 static/images/pip_exec_6.png                       | Bin 0 -> 118931 bytes
 static/images/pip_exec_7.png                       | Bin 0 -> 358877 bytes
 .../pipeline-execution-engine.md                   | 100 +++++++++----------
 .../pipeline-execution-engine.md                   |  99 +++++++++----------
 13 files changed, 273 insertions(+), 347 deletions(-)

diff --git a/docs/query-acceleration/pipeline-execution-engine.md 
b/docs/query-acceleration/pipeline-execution-engine.md
index 503b2a4d7b0..eee47ea0296 100644
--- a/docs/query-acceleration/pipeline-execution-engine.md
+++ b/docs/query-acceleration/pipeline-execution-engine.md
@@ -1,6 +1,6 @@
 ---
 {
-    "title": "Pipeline Execution Engine",
+    "title": "Parallel Execution",
     "language": "en",
     "toc_min_heading_level": 2,
     "toc_max_heading_level": 4
@@ -28,87 +28,79 @@ under the License.
 
 
 
-The Pipeline Execution Engine is an experimental feature in Apache Doris 2.0, 
which was later optimized and upgraded in version 2.1 (i.e., PipelineX). In 
versions 3.0 and later, PipelineX is used as the only execution engine in Doris 
and renamed to Pipeline Execution Engine.
+The parallel execution model of Doris is a Pipeline execution model, primarily 
inspired by the implementation described in the Hyper paper 
(https://db.in.tum.de/~leis/papers/morsels.pdf). The Pipeline execution model 
fully leverages the computational power of multi-core CPUs while limiting the 
number of query threads in Doris, addressing the issue of thread explosion 
during execution. For details on its design, implementation, and effectiveness, 
refer to [DSIP-027](DSIP-027: Support Pipe [...]
 
-The goal of pipeline execution engine is to replace the current execution 
engine of Doris's volcano model, fully release the computing power of 
multi-core CPUs, and limit the number of Doris's query threads to solve the 
problem of Doris's execution thread bloat.
+Starting from Doris 3.0, the Pipeline execution model has completely replaced 
the original Volcano model. Based on the Pipeline execution model, Doris 
supports the parallel processing of Query, DDL, and DML statements.
 
-Its specific design, implementation and effects can be found in 
[DSIP-027]([DSIP-027: Support Pipeline Exec Engine - DORIS - Apache Software 
Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine))
 and [DSIP-035]([DSIP-035: PipelineX Execution Engine - DORIS - Apache Software 
Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-035%3A+PipelineX+Execution+Engine)).
+## Physical Plan
 
-## Principle
+To better understand the Pipeline execution model, it is first necessary to 
introduce two important concepts in the physical query plan: PlanFragment and 
PlanNode. We will use the following SQL statement as an example:
+```
+SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1);
+```
 
-The current Doris SQL execution engine is designed based on the traditional 
volcano model, which has the following problems in a single multi-core scenario:
+The FE will first translate it into the following logical plan, where each 
node represents a PlanNode. The specific meaning of each type of node can be 
found in the introduction to the physical plan.
 
-* Inability to take full advantage of multi-core computing power to improve 
query performance,**most scenarios require manual setting of parallelism** for 
performance tuning, which is almost difficult to set in production environments.
+![pip_exec_1](/images/pip_exec_1.png)
 
-* Each instance of a standalone query corresponds to one thread of the thread 
pool, which introduces two additional problems.
-    
-    * Once the thread pool is hit full. **Doris' query engine will enter a 
pseudo-deadlock** and will not respond to subsequent queries. **At the same 
time there is a certain probability of entering a logical deadlock** situation: 
for example, all threads are executing an instance's probe task.
-    
-    * Blocking arithmetic will take up thread resources,**blocking thread 
resources can not be yielded to instances that can be scheduled**, the overall 
resource utilization does not go up.
+Since Doris is built on an MPP architecture, each query aims to involve all 
BEs in parallel execution as much as possible to reduce query latency. 
Therefore, the logical plan must be transformed into a physical plan. The 
transformation essentially involves inserting DataSink and ExchangeNode into 
the logical plan. These two nodes facilitate the shuffling of data across 
multiple BEs.
 
-* Blocking arithmetic relies on the OS thread scheduling mechanism, **thread 
switching overhead (especially in the scenario of system mixing))**
+After the transformation, each PlanFragment corresponds to a portion of the 
PlanNode and can be sent as an independent task to a BE. Each BE processes the 
PlanNode contained within the PlanFragment and then uses the DataSink and 
ExchangeNode operators to shuffle data to other BEs for subsequent computation.
 
-The resulting set of problems drove Doris to implement an execution engine 
adapted to the architecture of modern multi-core CPUs.
+![pip_exec_2](/images/pip_exec_2.png)
 
-And as shown in the figure below (quoted from[Push versus pull-based loop 
fusion in query engines]([jfp_1800010a 
(cambridge.org)](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf))),The
 resulting set of problems drove Doris to implement an execution engine adapted 
to the architecture of modern multi-core CPUs.:
+Doris's planning process is divided into three layers:
+PLAN:The execution plan. A SQL statement is translated by the query planner 
into an execution plan, which is then provided to the execution engine for 
execution.
 
-![image.png](/images/pipeline-execution-engine.png)
+FRAGMENT:Since Doris is a distributed execution engine, a complete execution 
plan is divided into multiple single-machine execution fragments. A FRAGMENT 
represents a complete single-machine execution fragment. Multiple fragments 
combine to form a complete PLAN.
 
-1. Transformation of the traditional pull pull logic-driven execution process 
into a data-driven execution engine for the push model
+PLAN NODE:Operators, which are the smallest units of the execution plan. A 
FRAGMENT consists of multiple operators, each responsible for a specific 
execution logic, such as aggregation or join operations.
 
-2. Blocking operations are asynchronous, reducing the execution overhead 
caused by thread switching and thread blocking and making more efficient use of 
the CPU
+## Pipeline Execution
+A PlanFragment is the smallest unit of a task sent by the FE to the BE for 
execution. A BE may receive multiple different PlanFragments for the same 
query, and each PlanFragment is processed independently. Upon receiving a 
PlanFragment, the BE splits it into multiple Pipelines and then starts multiple 
PipelineTasks to achieve parallel execution, thereby improving query efficiency.
 
-3. Controls the number of threads to be executed and reduces the resource 
congestion of large queries on small queries in mixed load scenarios by 
controlling time slice switching
+![pip_exec_3](/images/pip_exec_3.png)
 
-4. In terms of execution concurrency, pipelineX introduces local exchange 
optimization to fully utilize CPU resources, and distribute data evenly across 
different tasks to minimize data skewing. In addition, pipelineX will no longer 
be constrained by the number of tablets.
 
-5. Logically, multiple pipeline tasks share all shared states of the same 
pipeline and eliminate additional initialization overhead, such as expressions 
and some const variables.
+### Pipeline
+A Pipeline consists of a SourceOperator, a SinkOperator, and several 
intermediate operators. The SourceOperator represents reading data from an 
external source, which can be a table (e.g., OlapTable) or a buffer (e.g., 
Exchange). The SinkOperator represents the data output, which can either be 
shuffled to other nodes over the network (e.g., DataStreamSinkOperator) or 
output to a hash table (e.g., aggregation operators, join build hash tables, 
etc.).
 
-6. In terms of scheduling logic, the blocking conditions of all pipeline tasks 
are encapsulated using Dependency, and the execution logic of the tasks is 
triggered by external events (such as rpc completion) to enter the runnable 
queue, thereby eliminating the overhead of blocking polling threads.
+![pip_exec_4](/images/pip_exec_4.png)
 
-7. Profile: Provide users with simple and easy to understand metrics.
+Multiple Pipelines are actually interdependent. Take the JoinNode as an 
example—it is split into two Pipelines. Pipeline-0 reads data from Exchange to 
build the hash table, while Pipeline-1 reads data from the table to perform the 
probe operation. These two Pipelines are connected by a dependency 
relationship, meaning Pipeline-1 can only execute after Pipeline-0 has 
completed. This dependency relationship is referred to as a Dependency. Once 
Pipeline-0 finishes execution, it calls the se [...]
 
+### PipelineTask
+A Pipeline is actually a logical concept; it is not an executable entity. Once 
a Pipeline is defined, it needs to be further instantiated into multiple 
PipelineTasks. The data that needs to be read is then distributed to different 
PipelineTasks, ultimately achieving parallel processing. The operators within 
the multiple PipelineTasks of the same Pipeline are identical, but they differ 
in their states. For example, they might read different data or build different 
hash tables. These diffe [...]
 
-This improves the efficiency of CPU execution on mixed-load SQL and enhances 
the performance of SQL queries.
+Each PipelineTask is eventually submitted to a thread pool to be executed as 
an independent task. With the Dependency trigger mechanism, this approach 
allows better utilization of multi-core CPUs and achieves full parallelism.
 
-## Usage
+### Operator
+In most cases, each operator in a Pipeline corresponds to a PlanNode, but 
there are some special operators with exceptions:
+* JoinNode is split into JoinBuildOperator and JoinProbeOperator.
+* AggNode is split into AggSinkOperator and AggSourceOperator.
+* SortNode is split into SortSinkOperator and SortSourceOperator.
+The basic principle is that for certain "breaking" operators (those that need 
to collect all the data before performing computation), the data ingestion part 
is split into a Sink, while the part that retrieves data from the operator is 
referred to as the Source.
 
-### Query
+## Scan 并行化
+Scanning data is a very heavy I/O operation, as it requires reading large 
amounts of data from local disks (or from HDFS or S3 in the case of data lake 
scenarios, which introduces even longer latency), consuming a significant 
amount of time. Therefore, we have introduced parallel scanning technology in 
the ScanOperator. The ScanOperator dynamically generates multiple Scanners, 
each of which scans around 1 to 2 million rows of data. While performing the 
scan, each Scanner handles tasks su [...]
 
-1. enable_pipeline_engine
+![pip_exec_5](/images/pip_exec_5.png)
 
-  Setting the session variable `enable_pipeline_engine` to `true` will make BE 
to use the Pipeline execution engine when performing query execution.
+By using parallel scanning technology, we can effectively avoid issues where 
certain ScanOperators take an excessively long time due to improper bucketing 
or data skew, which would otherwise slow down the entire query latency.
 
-  ```sql
-  set enable_pipeline_engine = true;
-  ```
+## Local Shuffle
+In the Pipeline execution model, Local Exchange acts as a Pipeline Breaker, a 
technique that redistributes data locally across different execution tasks. It 
evenly distributes all the data output by the upstream Pipeline to all the 
tasks in the downstream Pipeline using methods like HASH or Round Robin. This 
helps solve the problem of data skew during execution, ensuring that the 
execution model is no longer limited by data storage or the query plan. Let's 
now provide an example to illus [...]
 
-2. parallel_pipeline_task_num
+We will further explain how Local Exchange can prevent data skew using 
Pipeline-1 from the previous example.
 
-  The `parallel_pipeline_task_num` represents the number of Pipeline Tasks for 
a SQL query to be queried concurrently.The default configuration of Doris is 
`0`, in which case the number of Pipeline Tasks will be automatically set to 
half of the minimum number of CPUs in the current cluster machine. You can also 
adjust it according to your own situation.
+![pip_exec_6](/images/pip_exec_6.png)
 
-  ```sql
-  set parallel_pipeline_task_num = 0;
-  ```
+As shown in the figure above, by inserting a Local Exchange in Pipeline-1, we 
further split Pipeline-1 into Pipeline-1-0 and Pipeline-1-1.
 
-  You can limit the automatically configured concurrency by setting 
`max_instance_num`(The default value is 64)
+Now, let's assume the current concurrency level is 3 (each Pipeline has 3 
tasks), and each task reads one bucket from the storage layer. The number of 
rows in the three buckets is 1, 1, and 7, respectively. The execution before 
and after inserting the Local Exchange changes as follows:
 
-3. enable_local_shuffle
+![pip_exec_7](/images/pip_exec_7.png)
 
-  Set `enable_local_shuffle` to true will enable local shuffle optimization. 
Local shuffle will try to evenly distribute data among different pipeline tasks 
to avoid data skewing as much as possible.
+As can be seen from the figure on the right, the amount of data that the 
HashJoin and Agg operators need to process changes from (1, 1, 7) to (3, 3, 3), 
thereby avoiding data skew.
 
-  ```sql
-  set enable_local_shuffle = true;
-  ```
-
-4. ignore_storage_data_distribution
-
-  Settings `ignore_storage_data_distribution` is true, it means ignoring the 
data distribution of the storage layer. When used in conjunction with local 
shuffle, the concurrency capability of the pipelineX engine will no longer be 
constrained by the number of storage layer tables, thus fully utilizing machine 
resources.
-
-  ```sql
-  set ignore_storage_data_distribution = true;
-  ```
-
-### Load
-
-The engine selected for import are detailed in the 
[Load](../data-operate/import/load-manual) documentation.
+In Doris, Local Exchange is planned based on a series of rules. For example, 
when a query involves time-consuming operators like Join, Aggregation, or 
Window Functions, Local Exchange is used to minimize data skew as much as 
possible.
\ No newline at end of file
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/query-acceleration/pipeline-execution-engine.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/query-acceleration/pipeline-execution-engine.md
index 5d806cac159..32df1f590a0 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/query-acceleration/pipeline-execution-engine.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/query-acceleration/pipeline-execution-engine.md
@@ -1,6 +1,6 @@
 ---
 {
-    "title": "Pipeline 执行引擎",
+    "title": "并行执行",
     "language": "zh-CN",
     "toc_min_heading_level": 2,
     "toc_max_heading_level": 4
@@ -28,88 +28,71 @@ under the License.
 
 
 
-:::info 备注
-Pipeline 执行引擎 是 Doris 在 2.0 版本加入的实验性功能,随后在 2.1 版本进行了优化与升级(即 PipelineX)。在 3.0 
以及之后的版本中,Doris 只使用 PipelineX 作为唯一执行引擎,并且更名为 Pipeline 执行引擎。
-:::
+Doris的并行执行模型是一种Pipeline 
执行模型,主要参考了Hyper论文中Pipeline的实现方式(https://db.in.tum.de/~leis/papers/morsels.pdf),Pipeline
 执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris 
的执行线程膨胀的问题。它的具体设计、实现和效果可以参阅 [DSIP-027](DSIP-027: Support Pipeline Exec Engine - 
DORIS - Apache Software Foundation) 以及 [DSIP-035](DSIP-035: PipelineX Execution 
Engine - DORIS - Apache Software Foundation)。
+Doris 3.0 之后,Pipeline 执行模型彻底替换了原有的火山模型,基于Pipeline 执行模型,Doris 实现了 Query、DDL、DML 
语句的并行处理。
 
-Pipeline 执行引擎的主要目标是为了替换之前 Doris 基于火山模型的执行引擎,充分释放多核 CPU 的计算能力,并对 Doris 
的查询线程的数目进行限制,解决 Doris 的执行线程膨胀的问题。
+## 物理计划
+为了更好的理解Pipeline 执行模型,首先需要介绍一下物理查询计划中两个重要的概念:PlanFragment和PlanNode。我们使用下面这条SQL 
作为例子:
+```
+SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1);
+```
 
-它的具体设计、实现和效果可以参阅 [DSIP-027]([DSIP-027: Support Pipeline Exec Engine - DORIS - 
Apache Software 
Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine))
 以及 [DSIP-035]([DSIP-035: PipelineX Execution Engine - DORIS - Apache Software 
Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-035%3A+PipelineX+Execution+Engine))。
+FE 首先会把它翻译成下面这种逻辑计划,计划中每个节点就是一个PlanNode,每种Node的具体含义,可以参考查看物理计划的介绍。
 
-## 原理
+![pip_exec_1](/images/pip_exec_1.png)
 
-当前的 Doris 的 SQL 执行引擎是基于传统的火山模型进行设计,在单机多核的场景下存在下面的一些问题:
+由于Doris 是一个MPP的架构,每个查询都会尽可能的让所有的BE 
都参与进来并行执行,来降低查询的延时。所以还需要将上述逻辑计划拆分为一个物理计划,拆分物理计划基本上就是在逻辑计划中插入了DataSink和ExchangeNode,通过这两个Node完成了数据在多个BE
 之间的Shuffle。拆分完成后,每个PlanFragment 相当于包含了一部分PlanNode,可以作为一个独立的任务发送给BE,每个BE 
完成了PlanFragment内包含的PlanNode的计算后,通过DataSink和ExchangeNode 
这两个算子把数据shuffle到其他BE上来进行接下来的计算。
 
-* 无法充分利用多核计算能力,提升查询性能,**多数场景下进行性能调优时需要手动设置并行度**,在生产环境中几乎很难进行设定。
+![pip_exec_2](/images/pip_exec_2.png)
 
-* 单机查询的每个 Instance 对应线程池的一个线程,这会带来额外的两个问题。
-    
-    * 线程池一旦打满。**Doris 
的查询引擎会进入假性死锁**,对后续的查询无法响应。**同时有一定概率进入逻辑死锁**的情况:比如所有的线程都在执行一个 Instance 的 Probe 
任务。
-    
-    * 阻塞的算子会占用线程资源,**而阻塞的线程资源无法让渡给能够调度的 Instance**,整体资源利用率上不去。
+所以Doris的规划分为3层:
+PLAN:执行计划,一个SQL会被执行规划器翻译成一个执行计划,之后执行计划会提供给执行引擎执行。
 
-* 阻塞算子依赖操作系统的线程调度机制,**线程切换开销较大(尤其在系统混布的场景中)**
+FRAGMENT:由于DORIS是一个分布式执行引擎。一个完整的执行计划会被切分为多个单机的执行片段。一个FRAGMENT表是一个完整的单机执行片段。多个FRAGMENT组合在一起,构成一个完整的PLAN。
 
-由此带来的一系列问题驱动 Doris 需要实现适应现代多核 CPU 的体系结构的执行引擎。
+PLAN NODE:算子,是执行计划的最小单位。一个FRAGMENT由多个算子构成。每一个算子负责一个实际的执行逻辑,比如聚合,连接等
 
-而如下图所示(引用自[Push versus pull-based loop fusion in query engines]([jfp_1800010a 
(cambridge.org)](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf))),Pipeline
 执行引擎基于多核 CPU 的特点,重新设计由数据驱动的执行引擎:
+## Pipeline 执行
+PlanFragment 是FE 发往BE 
执行任务的最小单位。BE可能会收到同一个Query的多个不同的PlanFragment,每个PlanFragment都会被单独的处理。在收到PlanFragment
 之后,BE会把PlanFragment 拆分为多个Pipeline,进而启动多个PipelineTask 来实现并行执行,提升查询效率。
 
-![image.png](/images/pipeline-execution-engine.png)
+![pip_exec_3](/images/pip_exec_3.png)
 
-1. 将传统 Pull 拉取的逻辑驱动的执行流程改造为 Push 模型的数据驱动的执行引擎
 
-2. 阻塞操作异步化,减少了线程切换,线程阻塞导致的执行开销,对于 CPU 的利用更为高效
+### Pipeline
+一个Pipeline 有一个SourceOperator 和 一个SinkOperator 
以及中间的多个其他Operator组成。SourceOperator 
代表从外部读取数据,可以是一个表(OlapTable),也可以是一个Buffer(Exchange)。SinkOperator 
表示数据的输出,输出可以是通过网络shuffle到别的节点,比如DataStreamSinkOperator,也可以是输出到HashTable,比如Agg算子,JoinBuildHashTable等。
 
-3. 控制了执行线程的数目,通过时间片的切换的控制,在混合负载的场景中,减少大查询对于小查询的资源挤占问题
+![pip_exec_4](/images/pip_exec_4.png)
 
-4. 执行并发上,依赖 Local Exchange 使 Pipeline 充分并发,可以让数据被均匀分布到不同的 Task 
中,尽可能减少数据倾斜,此外,Pipeline 也将不再受存储层 Tablet 数量的制约。
+多个Pipeline 之间实际是有依赖关系的,以JoinNode为例,他实际被拆分到了2个Pipeline 
里。其中Pipeline-0是读取Exchange的数据,来构建HashTable;Pipeline-1 
是从表里读取数据,来进行Probe。这2个Pipeline 
之间是有关联关系的,只有Pipeline-0运行完毕之后才能执行Pipeline-1。这两者之间的依赖关系,称为Dependency。当Pipeline-0 
运行完毕后,会调用Dependency的set_ready 方法通知Pipeline-1 可执行。
 
-5. 执行逻辑上,多个 Pipeline Task 共享同一个 Pipeline 的全部共享状态,例如表达式和一些 Const 变量,消除了额外的初始化开销。
+### PipelineTask
+Pipeline 实际还是一个逻辑概念,他并不是一个可执行的实体。在有了Pipeline之后,需要进一步的把Pipeline 
实例化为多个PipelineTask。将需要读取的数据分配给不同的PipelineTask 
最终实现并行处理。同一个Pipeline的多个PipelineTask 之间的Operator 
完全相同,他们的区别在于Operator的状态不一样,比如读取的数据不一样,构建出的HashTable 
不一样,这些不一样的状态,我们称之为LocalState。
+每个PipelineTask 最终都会被提交到一个线程池中作为独立的任务执行。在Dependency 
这种触发机制下,可以更好的利用多核CPU,实现充分的并行。
 
-6. 调度逻辑上,所有 Pipeline Task 的阻塞条件都使用 Dependency 进行了封装,通过外部事件(例如 RPC 完成)触发 task 
的执行逻辑进入 Runnable 队列,从而消除了阻塞轮询线程的开销。
+### Operator
+在大多数时候,Pipeline 中的每个Operator 都对应了一个PlanNode,但是有一些特殊的算子除外:
+- JoinNode,被拆分为JoinBuildOperator和JoinProbeOperator
+- AggNode 被拆分为AggSinkOperator和AggSourceOperator
+- SortNode 被拆分为SortSinkOperator 和 SortSourceOperator
+基本原理是,对于一些breaking 
算子(需要把所有的数据都收集齐之后才能运算的算子),把灌入数据的部分拆分为Sink,然后把从这个算子里获取数据的部分称为Source。
 
-7. Profile:为用户提供简单易懂的指标。
+## Scan 并行化 
+扫描数据是一个非常重的IO 
操作,它需要从本地磁盘读取大量的数据(如果是数据湖的场景,就需要从HDFS或者S3中读取,延时更长),需要比较多的时间。所以我们在ScanOperator 
中引入了并行扫描的技术,ScanOperator会动态的生成多个Scanner,每个Scanner 扫描100w-200w 行左右的数据,每个Scanner 
在做数据扫描时,完成相应的数据解压、过滤等计算任务,然后把数据发送给一个DataQueue,供ScanOperator 读取。
 
-从而提高了 CPU 在混合负载 SQL 上执行时的效率,提升了 SQL 查询的性能。
+![pip_exec_5](/images/pip_exec_5.png)
 
-## 使用方式
+通过并行扫描的技术可以有效的避免由于分桶不合理或者数据倾斜导致某些ScanOperator 执行时间特别久,把整个查询的延时都拖慢的问题。
 
-### 查询
+## Local Shuffle
+在Pipeline执行模型中,Local Exchange作为一个Pipeline 
Breaker出现,是在本地将数据重新分发至各个执行任务的技术。它把上游Pipeline输出的全部数据以某种方式(HASH / Round 
Robin)均匀分发到下游Pipeline的全部Task中。解决执行过程中的数据倾斜的问题,使执行模型不再受数据存储以及plan的限制。接下来我们举例来说明Local
 Exchange的工作逻辑。
+我们用上述例子中的Pipeline-1为例子进一步阐述Local Exchange如何可以避免数据倾斜。
 
-1. enable_pipeline_engine
+![pip_exec_6](/images/pip_exec_6.png)
 
-  将 Session 变量 `enable_pipeline_engine` 设置为 `true`,则 BE 在进行查询执行时将会使用 Pipeline 
执行引擎。
+如上图所示,首先,通过在Pipeline 1中插入Local Exchange,我们把Pipeline 1进一步拆分成Pipeline 
1-0和Pipeline 1-1。
+此时,我们不妨假设当前并发等于3(每个Pipeline有3个task),每个task读取存储层的一个bucket,而3个bucket中数据行数分别是1,1,7。则插入Local
 Exchange前后的执行变化如下:
 
-  ```sql
-  set enable_pipeline_engine = true;
-  ```
+![pip_exec_7](/images/pip_exec_7.png)
 
-2. parallel_pipeline_task_num
-
-  `parallel_pipeline_task_num` 代表了 SQL 查询进行查询并发的 Pipeline Task 数目。Doris 默认的配置为 
`0`,此时 Pipeline Task 数目将自动设置为当前集群机器中最少的 CPU 数量的一半。用户也可以根据自己的实际情况进行调整。
-
-  ```sql
-  set parallel_pipeline_task_num = 0;
-  ```
-
-  可以通过设置 `max_instance_num` 来限制自动设置的并发数 (默认为 64)
-
-3. enable_local_shuffle
-
-  设置`enable_local_shuffle`为 True 则打开 Local Shuffle 优化。Local Shuffle 
将尽可能将数据均匀分布给不同的 Pipeline Task 从而尽可能避免数据倾斜。
-
-  ```sql
-  set enable_local_shuffle = true;
-  ```
-
-4. ignore_storage_data_distribution
-
-  设置`ignore_storage_data_distribution`为 True 则表示忽略存储层的数据分布。结合 Local Shuffle 
一起使用,则 Pipeline 引擎的并发能力将不再受到存储层 Tablet 数量的制约,从而充分利用机器资源。
-
-  ```sql
-  set ignore_storage_data_distribution = true;
-  ```
-
-### 导入
-
-导入的引擎选择设置,详见[导入](../data-operate/import/load-manual)文档。
+从图右可以看出,HashJoin和Agg算子需要处理的数据量从(1,1,7)变成了(3,3,3)从而避免了数据倾斜。
+在Doris中,Local 
Exchange根据一系列规则来决定是否被规划,例如当查询耗时比较大的Join、聚合、窗口函数等算子需要被执行时,我们就需要使用Local 
Exchange来尽可能避免数据倾斜。
\ No newline at end of file
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/query-acceleration/pipeline-execution-engine.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/query-acceleration/pipeline-execution-engine.md
index 5d806cac159..0b483ed239f 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/query-acceleration/pipeline-execution-engine.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/query-acceleration/pipeline-execution-engine.md
@@ -1,6 +1,6 @@
 ---
 {
-    "title": "Pipeline 执行引擎",
+    "title": "并行执行",
     "language": "zh-CN",
     "toc_min_heading_level": 2,
     "toc_max_heading_level": 4
@@ -28,88 +28,71 @@ under the License.
 
 
 
-:::info 备注
-Pipeline 执行引擎 是 Doris 在 2.0 版本加入的实验性功能,随后在 2.1 版本进行了优化与升级(即 PipelineX)。在 3.0 
以及之后的版本中,Doris 只使用 PipelineX 作为唯一执行引擎,并且更名为 Pipeline 执行引擎。
-:::
+Doris的并行执行模型是一种Pipeline 
执行模型,主要参考了Hyper论文中Pipeline的实现方式(https://db.in.tum.de/~leis/papers/morsels.pdf),Pipeline
 执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris 
的执行线程膨胀的问题。它的具体设计、实现和效果可以参阅 [DSIP-027](DSIP-027: Support Pipeline Exec Engine - 
DORIS - Apache Software Foundation) 以及 [DSIP-035](DSIP-035: PipelineX Execution 
Engine - DORIS - Apache Software Foundation)。
+Doris 3.0 之后,Pipeline 执行模型彻底替换了原有的火山模型,基于Pipeline 执行模型,Doris 实现了 Query、DDL、DML 
语句的并行处理。
 
-Pipeline 执行引擎的主要目标是为了替换之前 Doris 基于火山模型的执行引擎,充分释放多核 CPU 的计算能力,并对 Doris 
的查询线程的数目进行限制,解决 Doris 的执行线程膨胀的问题。
+## 物理计划
+为了更好的理解Pipeline 执行模型,首先需要介绍一下物理查询计划中两个重要的概念:PlanFragment和PlanNode。我们使用下面这条SQL 
作为例子:
+```
+SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1);
+```
 
-它的具体设计、实现和效果可以参阅 [DSIP-027]([DSIP-027: Support Pipeline Exec Engine - DORIS - 
Apache Software 
Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine))
 以及 [DSIP-035]([DSIP-035: PipelineX Execution Engine - DORIS - Apache Software 
Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-035%3A+PipelineX+Execution+Engine))。
+FE 首先会把它翻译成下面这种逻辑计划,计划中每个节点就是一个PlanNode,每种Node的具体含义,可以参考查看物理计划的介绍。
 
-## 原理
+![pip_exec_1](/images/pip_exec_1.png)
 
-当前的 Doris 的 SQL 执行引擎是基于传统的火山模型进行设计,在单机多核的场景下存在下面的一些问题:
+由于Doris 是一个MPP的架构,每个查询都会尽可能的让所有的BE 
都参与进来并行执行,来降低查询的延时。所以还需要将上述逻辑计划拆分为一个物理计划,拆分物理计划基本上就是在逻辑计划中插入了DataSink和ExchangeNode,通过这两个Node完成了数据在多个BE
 之间的Shuffle。拆分完成后,每个PlanFragment 相当于包含了一部分PlanNode,可以作为一个独立的任务发送给BE,每个BE 
完成了PlanFragment内包含的PlanNode的计算后,通过DataSink和ExchangeNode 
这两个算子把数据shuffle到其他BE上来进行接下来的计算。
 
-* 无法充分利用多核计算能力,提升查询性能,**多数场景下进行性能调优时需要手动设置并行度**,在生产环境中几乎很难进行设定。
+![pip_exec_2](/images/pip_exec_2.png)
 
-* 单机查询的每个 Instance 对应线程池的一个线程,这会带来额外的两个问题。
-    
-    * 线程池一旦打满。**Doris 
的查询引擎会进入假性死锁**,对后续的查询无法响应。**同时有一定概率进入逻辑死锁**的情况:比如所有的线程都在执行一个 Instance 的 Probe 
任务。
-    
-    * 阻塞的算子会占用线程资源,**而阻塞的线程资源无法让渡给能够调度的 Instance**,整体资源利用率上不去。
+所以Doris的规划分为3层:
+PLAN:执行计划,一个SQL会被执行规划器翻译成一个执行计划,之后执行计划会提供给执行引擎执行。
 
-* 阻塞算子依赖操作系统的线程调度机制,**线程切换开销较大(尤其在系统混布的场景中)**
+FRAGMENT:由于DORIS是一个分布式执行引擎。一个完整的执行计划会被切分为多个单机的执行片段。一个FRAGMENT表是一个完整的单机执行片段。多个FRAGMENT组合在一起,构成一个完整的PLAN。
 
-由此带来的一系列问题驱动 Doris 需要实现适应现代多核 CPU 的体系结构的执行引擎。
+PLAN NODE:算子,是执行计划的最小单位。一个FRAGMENT由多个算子构成。每一个算子负责一个实际的执行逻辑,比如聚合,连接等
 
-而如下图所示(引用自[Push versus pull-based loop fusion in query engines]([jfp_1800010a 
(cambridge.org)](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf))),Pipeline
 执行引擎基于多核 CPU 的特点,重新设计由数据驱动的执行引擎:
+## Pipeline 执行
+PlanFragment 是FE 发往BE 
执行任务的最小单位。BE可能会收到同一个Query的多个不同的PlanFragment,每个PlanFragment都会被单独的处理。在收到PlanFragment
 之后,BE会把PlanFragment 拆分为多个Pipeline,进而启动多个PipelineTask 来实现并行执行,提升查询效率。
 
-![image.png](/images/pipeline-execution-engine.png)
+![pip_exec_3](/images/pip_exec_3.png)
 
-1. 将传统 Pull 拉取的逻辑驱动的执行流程改造为 Push 模型的数据驱动的执行引擎
 
-2. 阻塞操作异步化,减少了线程切换,线程阻塞导致的执行开销,对于 CPU 的利用更为高效
+### Pipeline
+一个Pipeline 有一个SourceOperator 和 一个SinkOperator 
以及中间的多个其他Operator组成。SourceOperator 
代表从外部读取数据,可以是一个表(OlapTable),也可以是一个Buffer(Exchange)。SinkOperator 
表示数据的输出,输出可以是通过网络shuffle到别的节点,比如DataStreamSinkOperator,也可以是输出到HashTable,比如Agg算子,JoinBuildHashTable等。
 
-3. 控制了执行线程的数目,通过时间片的切换的控制,在混合负载的场景中,减少大查询对于小查询的资源挤占问题
+![pip_exec_4](/images/pip_exec_4.png)
 
-4. 执行并发上,依赖 Local Exchange 使 Pipeline 充分并发,可以让数据被均匀分布到不同的 Task 
中,尽可能减少数据倾斜,此外,Pipeline 也将不再受存储层 Tablet 数量的制约。
+多个Pipeline 之间实际是有依赖关系的,以JoinNode为例,他实际被拆分到了2个Pipeline 
里。其中Pipeline-0是读取Exchange的数据,来构建HashTable;Pipeline-1 
是从表里读取数据,来进行Probe。这2个Pipeline 
之间是有关联关系的,只有Pipeline-0运行完毕之后才能执行Pipeline-1。这两者之间的依赖关系,称为Dependency。当Pipeline-0 
运行完毕后,会调用Dependency的set_ready 方法通知Pipeline-1 可执行。
 
-5. 执行逻辑上,多个 Pipeline Task 共享同一个 Pipeline 的全部共享状态,例如表达式和一些 Const 变量,消除了额外的初始化开销。
+### PipelineTask
+Pipeline 实际还是一个逻辑概念,他并不是一个可执行的实体。在有了Pipeline之后,需要进一步的把Pipeline 
实例化为多个PipelineTask。将需要读取的数据分配给不同的PipelineTask 
最终实现并行处理。同一个Pipeline的多个PipelineTask 之间的Operator 
完全相同,他们的区别在于Operator的状态不一样,比如读取的数据不一样,构建出的HashTable 
不一样,这些不一样的状态,我们称之为LocalState。
+每个PipelineTask 最终都会被提交到一个线程池中作为独立的任务执行。在Dependency 
这种触发机制下,可以更好的利用多核CPU,实现充分的并行。
 
-6. 调度逻辑上,所有 Pipeline Task 的阻塞条件都使用 Dependency 进行了封装,通过外部事件(例如 RPC 完成)触发 task 
的执行逻辑进入 Runnable 队列,从而消除了阻塞轮询线程的开销。
+### Operator
+在大多数时候,Pipeline 中的每个Operator 都对应了一个PlanNode,但是有一些特殊的算子除外:
+- JoinNode,被拆分为JoinBuildOperator和JoinProbeOperator
+- AggNode 被拆分为AggSinkOperator和AggSourceOperator
+- SortNode 被拆分为SortSinkOperator 和 SortSourceOperator
+  基本原理是,对于一些breaking 
算子(需要把所有的数据都收集齐之后才能运算的算子),把灌入数据的部分拆分为Sink,然后把从这个算子里获取数据的部分称为Source。
 
-7. Profile:为用户提供简单易懂的指标。
+## Scan 并行化
+扫描数据是一个非常重的IO 
操作,它需要从本地磁盘读取大量的数据(如果是数据湖的场景,就需要从HDFS或者S3中读取,延时更长),需要比较多的时间。所以我们在ScanOperator 
中引入了并行扫描的技术,ScanOperator会动态的生成多个Scanner,每个Scanner 扫描100w-200w 行左右的数据,每个Scanner 
在做数据扫描时,完成相应的数据解压、过滤等计算任务,然后把数据发送给一个DataQueue,供ScanOperator 读取。
 
-从而提高了 CPU 在混合负载 SQL 上执行时的效率,提升了 SQL 查询的性能。
+![pip_exec_5](/images/pip_exec_5.png)
 
-## 使用方式
+通过并行扫描的技术可以有效的避免由于分桶不合理或者数据倾斜导致某些ScanOperator 执行时间特别久,把整个查询的延时都拖慢的问题。
 
-### 查询
+## Local Shuffle
+在Pipeline执行模型中,Local Exchange作为一个Pipeline 
Breaker出现,是在本地将数据重新分发至各个执行任务的技术。它把上游Pipeline输出的全部数据以某种方式(HASH / Round 
Robin)均匀分发到下游Pipeline的全部Task中。解决执行过程中的数据倾斜的问题,使执行模型不再受数据存储以及plan的限制。接下来我们举例来说明Local
 Exchange的工作逻辑。
+我们用上述例子中的Pipeline-1为例子进一步阐述Local Exchange如何可以避免数据倾斜。
 
-1. enable_pipeline_engine
+![pip_exec_6](/images/pip_exec_6.png)
 
-  将 Session 变量 `enable_pipeline_engine` 设置为 `true`,则 BE 在进行查询执行时将会使用 Pipeline 
执行引擎。
+如上图所示,首先,通过在Pipeline 1中插入Local Exchange,我们把Pipeline 1进一步拆分成Pipeline 
1-0和Pipeline 1-1。
+此时,我们不妨假设当前并发等于3(每个Pipeline有3个task),每个task读取存储层的一个bucket,而3个bucket中数据行数分别是1,1,7。则插入Local
 Exchange前后的执行变化如下:
 
-  ```sql
-  set enable_pipeline_engine = true;
-  ```
+![pip_exec_7](/images/pip_exec_7.png)
 
-2. parallel_pipeline_task_num
-
-  `parallel_pipeline_task_num` 代表了 SQL 查询进行查询并发的 Pipeline Task 数目。Doris 默认的配置为 
`0`,此时 Pipeline Task 数目将自动设置为当前集群机器中最少的 CPU 数量的一半。用户也可以根据自己的实际情况进行调整。
-
-  ```sql
-  set parallel_pipeline_task_num = 0;
-  ```
-
-  可以通过设置 `max_instance_num` 来限制自动设置的并发数 (默认为 64)
-
-3. enable_local_shuffle
-
-  设置`enable_local_shuffle`为 True 则打开 Local Shuffle 优化。Local Shuffle 
将尽可能将数据均匀分布给不同的 Pipeline Task 从而尽可能避免数据倾斜。
-
-  ```sql
-  set enable_local_shuffle = true;
-  ```
-
-4. ignore_storage_data_distribution
-
-  设置`ignore_storage_data_distribution`为 True 则表示忽略存储层的数据分布。结合 Local Shuffle 
一起使用,则 Pipeline 引擎的并发能力将不再受到存储层 Tablet 数量的制约,从而充分利用机器资源。
-
-  ```sql
-  set ignore_storage_data_distribution = true;
-  ```
-
-### 导入
-
-导入的引擎选择设置,详见[导入](../data-operate/import/load-manual)文档。
+从图右可以看出,HashJoin和Agg算子需要处理的数据量从(1,1,7)变成了(3,3,3)从而避免了数据倾斜。
+在Doris中,Local 
Exchange根据一系列规则来决定是否被规划,例如当查询耗时比较大的Join、聚合、窗口函数等算子需要被执行时,我们就需要使用Local 
Exchange来尽可能避免数据倾斜。
\ No newline at end of file
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/query-acceleration/pipeline-execution-engine.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/query-acceleration/pipeline-execution-engine.md
index 5d806cac159..0b483ed239f 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/query-acceleration/pipeline-execution-engine.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/query-acceleration/pipeline-execution-engine.md
@@ -1,6 +1,6 @@
 ---
 {
-    "title": "Pipeline 执行引擎",
+    "title": "并行执行",
     "language": "zh-CN",
     "toc_min_heading_level": 2,
     "toc_max_heading_level": 4
@@ -28,88 +28,71 @@ under the License.
 
 
 
-:::info 备注
-Pipeline 执行引擎 是 Doris 在 2.0 版本加入的实验性功能,随后在 2.1 版本进行了优化与升级(即 PipelineX)。在 3.0 
以及之后的版本中,Doris 只使用 PipelineX 作为唯一执行引擎,并且更名为 Pipeline 执行引擎。
-:::
+Doris的并行执行模型是一种Pipeline 
执行模型,主要参考了Hyper论文中Pipeline的实现方式(https://db.in.tum.de/~leis/papers/morsels.pdf),Pipeline
 执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris 
的执行线程膨胀的问题。它的具体设计、实现和效果可以参阅 [DSIP-027](DSIP-027: Support Pipeline Exec Engine - 
DORIS - Apache Software Foundation) 以及 [DSIP-035](DSIP-035: PipelineX Execution 
Engine - DORIS - Apache Software Foundation)。
+Doris 3.0 之后,Pipeline 执行模型彻底替换了原有的火山模型,基于Pipeline 执行模型,Doris 实现了 Query、DDL、DML 
语句的并行处理。
 
-Pipeline 执行引擎的主要目标是为了替换之前 Doris 基于火山模型的执行引擎,充分释放多核 CPU 的计算能力,并对 Doris 
的查询线程的数目进行限制,解决 Doris 的执行线程膨胀的问题。
+## 物理计划
+为了更好的理解Pipeline 执行模型,首先需要介绍一下物理查询计划中两个重要的概念:PlanFragment和PlanNode。我们使用下面这条SQL 
作为例子:
+```
+SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1);
+```
 
-它的具体设计、实现和效果可以参阅 [DSIP-027]([DSIP-027: Support Pipeline Exec Engine - DORIS - 
Apache Software 
Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine))
 以及 [DSIP-035]([DSIP-035: PipelineX Execution Engine - DORIS - Apache Software 
Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-035%3A+PipelineX+Execution+Engine))。
+FE 首先会把它翻译成下面这种逻辑计划,计划中每个节点就是一个PlanNode,每种Node的具体含义,可以参考查看物理计划的介绍。
 
-## 原理
+![pip_exec_1](/images/pip_exec_1.png)
 
-当前的 Doris 的 SQL 执行引擎是基于传统的火山模型进行设计,在单机多核的场景下存在下面的一些问题:
+由于Doris 是一个MPP的架构,每个查询都会尽可能的让所有的BE 
都参与进来并行执行,来降低查询的延时。所以还需要将上述逻辑计划拆分为一个物理计划,拆分物理计划基本上就是在逻辑计划中插入了DataSink和ExchangeNode,通过这两个Node完成了数据在多个BE
 之间的Shuffle。拆分完成后,每个PlanFragment 相当于包含了一部分PlanNode,可以作为一个独立的任务发送给BE,每个BE 
完成了PlanFragment内包含的PlanNode的计算后,通过DataSink和ExchangeNode 
这两个算子把数据shuffle到其他BE上来进行接下来的计算。
 
-* 无法充分利用多核计算能力,提升查询性能,**多数场景下进行性能调优时需要手动设置并行度**,在生产环境中几乎很难进行设定。
+![pip_exec_2](/images/pip_exec_2.png)
 
-* 单机查询的每个 Instance 对应线程池的一个线程,这会带来额外的两个问题。
-    
-    * 线程池一旦打满。**Doris 
的查询引擎会进入假性死锁**,对后续的查询无法响应。**同时有一定概率进入逻辑死锁**的情况:比如所有的线程都在执行一个 Instance 的 Probe 
任务。
-    
-    * 阻塞的算子会占用线程资源,**而阻塞的线程资源无法让渡给能够调度的 Instance**,整体资源利用率上不去。
+所以Doris的规划分为3层:
+PLAN:执行计划,一个SQL会被执行规划器翻译成一个执行计划,之后执行计划会提供给执行引擎执行。
 
-* 阻塞算子依赖操作系统的线程调度机制,**线程切换开销较大(尤其在系统混布的场景中)**
+FRAGMENT:由于DORIS是一个分布式执行引擎。一个完整的执行计划会被切分为多个单机的执行片段。一个FRAGMENT表是一个完整的单机执行片段。多个FRAGMENT组合在一起,构成一个完整的PLAN。
 
-由此带来的一系列问题驱动 Doris 需要实现适应现代多核 CPU 的体系结构的执行引擎。
+PLAN NODE:算子,是执行计划的最小单位。一个FRAGMENT由多个算子构成。每一个算子负责一个实际的执行逻辑,比如聚合,连接等
 
-而如下图所示(引用自[Push versus pull-based loop fusion in query engines]([jfp_1800010a 
(cambridge.org)](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf))),Pipeline
 执行引擎基于多核 CPU 的特点,重新设计由数据驱动的执行引擎:
+## Pipeline 执行
+PlanFragment 是FE 发往BE 
执行任务的最小单位。BE可能会收到同一个Query的多个不同的PlanFragment,每个PlanFragment都会被单独的处理。在收到PlanFragment
 之后,BE会把PlanFragment 拆分为多个Pipeline,进而启动多个PipelineTask 来实现并行执行,提升查询效率。
 
-![image.png](/images/pipeline-execution-engine.png)
+![pip_exec_3](/images/pip_exec_3.png)
 
-1. 将传统 Pull 拉取的逻辑驱动的执行流程改造为 Push 模型的数据驱动的执行引擎
 
-2. 阻塞操作异步化,减少了线程切换,线程阻塞导致的执行开销,对于 CPU 的利用更为高效
+### Pipeline
+一个Pipeline 有一个SourceOperator 和 一个SinkOperator 
以及中间的多个其他Operator组成。SourceOperator 
代表从外部读取数据,可以是一个表(OlapTable),也可以是一个Buffer(Exchange)。SinkOperator 
表示数据的输出,输出可以是通过网络shuffle到别的节点,比如DataStreamSinkOperator,也可以是输出到HashTable,比如Agg算子,JoinBuildHashTable等。
 
-3. 控制了执行线程的数目,通过时间片的切换的控制,在混合负载的场景中,减少大查询对于小查询的资源挤占问题
+![pip_exec_4](/images/pip_exec_4.png)
 
-4. 执行并发上,依赖 Local Exchange 使 Pipeline 充分并发,可以让数据被均匀分布到不同的 Task 
中,尽可能减少数据倾斜,此外,Pipeline 也将不再受存储层 Tablet 数量的制约。
+多个Pipeline 之间实际是有依赖关系的,以JoinNode为例,他实际被拆分到了2个Pipeline 
里。其中Pipeline-0是读取Exchange的数据,来构建HashTable;Pipeline-1 
是从表里读取数据,来进行Probe。这2个Pipeline 
之间是有关联关系的,只有Pipeline-0运行完毕之后才能执行Pipeline-1。这两者之间的依赖关系,称为Dependency。当Pipeline-0 
运行完毕后,会调用Dependency的set_ready 方法通知Pipeline-1 可执行。
 
-5. 执行逻辑上,多个 Pipeline Task 共享同一个 Pipeline 的全部共享状态,例如表达式和一些 Const 变量,消除了额外的初始化开销。
+### PipelineTask
+Pipeline 实际还是一个逻辑概念,他并不是一个可执行的实体。在有了Pipeline之后,需要进一步的把Pipeline 
实例化为多个PipelineTask。将需要读取的数据分配给不同的PipelineTask 
最终实现并行处理。同一个Pipeline的多个PipelineTask 之间的Operator 
完全相同,他们的区别在于Operator的状态不一样,比如读取的数据不一样,构建出的HashTable 
不一样,这些不一样的状态,我们称之为LocalState。
+每个PipelineTask 最终都会被提交到一个线程池中作为独立的任务执行。在Dependency 
这种触发机制下,可以更好的利用多核CPU,实现充分的并行。
 
-6. 调度逻辑上,所有 Pipeline Task 的阻塞条件都使用 Dependency 进行了封装,通过外部事件(例如 RPC 完成)触发 task 
的执行逻辑进入 Runnable 队列,从而消除了阻塞轮询线程的开销。
+### Operator
+在大多数时候,Pipeline 中的每个Operator 都对应了一个PlanNode,但是有一些特殊的算子除外:
+- JoinNode,被拆分为JoinBuildOperator和JoinProbeOperator
+- AggNode 被拆分为AggSinkOperator和AggSourceOperator
+- SortNode 被拆分为SortSinkOperator 和 SortSourceOperator
+  基本原理是,对于一些breaking 
算子(需要把所有的数据都收集齐之后才能运算的算子),把灌入数据的部分拆分为Sink,然后把从这个算子里获取数据的部分称为Source。
 
-7. Profile:为用户提供简单易懂的指标。
+## Scan 并行化
+扫描数据是一个非常重的IO 
操作,它需要从本地磁盘读取大量的数据(如果是数据湖的场景,就需要从HDFS或者S3中读取,延时更长),需要比较多的时间。所以我们在ScanOperator 
中引入了并行扫描的技术,ScanOperator会动态的生成多个Scanner,每个Scanner 扫描100w-200w 行左右的数据,每个Scanner 
在做数据扫描时,完成相应的数据解压、过滤等计算任务,然后把数据发送给一个DataQueue,供ScanOperator 读取。
 
-从而提高了 CPU 在混合负载 SQL 上执行时的效率,提升了 SQL 查询的性能。
+![pip_exec_5](/images/pip_exec_5.png)
 
-## 使用方式
+通过并行扫描的技术可以有效的避免由于分桶不合理或者数据倾斜导致某些ScanOperator 执行时间特别久,把整个查询的延时都拖慢的问题。
 
-### 查询
+## Local Shuffle
+在Pipeline执行模型中,Local Exchange作为一个Pipeline 
Breaker出现,是在本地将数据重新分发至各个执行任务的技术。它把上游Pipeline输出的全部数据以某种方式(HASH / Round 
Robin)均匀分发到下游Pipeline的全部Task中。解决执行过程中的数据倾斜的问题,使执行模型不再受数据存储以及plan的限制。接下来我们举例来说明Local
 Exchange的工作逻辑。
+我们用上述例子中的Pipeline-1为例子进一步阐述Local Exchange如何可以避免数据倾斜。
 
-1. enable_pipeline_engine
+![pip_exec_6](/images/pip_exec_6.png)
 
-  将 Session 变量 `enable_pipeline_engine` 设置为 `true`,则 BE 在进行查询执行时将会使用 Pipeline 
执行引擎。
+如上图所示,首先,通过在Pipeline 1中插入Local Exchange,我们把Pipeline 1进一步拆分成Pipeline 
1-0和Pipeline 1-1。
+此时,我们不妨假设当前并发等于3(每个Pipeline有3个task),每个task读取存储层的一个bucket,而3个bucket中数据行数分别是1,1,7。则插入Local
 Exchange前后的执行变化如下:
 
-  ```sql
-  set enable_pipeline_engine = true;
-  ```
+![pip_exec_7](/images/pip_exec_7.png)
 
-2. parallel_pipeline_task_num
-
-  `parallel_pipeline_task_num` 代表了 SQL 查询进行查询并发的 Pipeline Task 数目。Doris 默认的配置为 
`0`,此时 Pipeline Task 数目将自动设置为当前集群机器中最少的 CPU 数量的一半。用户也可以根据自己的实际情况进行调整。
-
-  ```sql
-  set parallel_pipeline_task_num = 0;
-  ```
-
-  可以通过设置 `max_instance_num` 来限制自动设置的并发数 (默认为 64)
-
-3. enable_local_shuffle
-
-  设置`enable_local_shuffle`为 True 则打开 Local Shuffle 优化。Local Shuffle 
将尽可能将数据均匀分布给不同的 Pipeline Task 从而尽可能避免数据倾斜。
-
-  ```sql
-  set enable_local_shuffle = true;
-  ```
-
-4. ignore_storage_data_distribution
-
-  设置`ignore_storage_data_distribution`为 True 则表示忽略存储层的数据分布。结合 Local Shuffle 
一起使用,则 Pipeline 引擎的并发能力将不再受到存储层 Tablet 数量的制约,从而充分利用机器资源。
-
-  ```sql
-  set ignore_storage_data_distribution = true;
-  ```
-
-### 导入
-
-导入的引擎选择设置,详见[导入](../data-operate/import/load-manual)文档。
+从图右可以看出,HashJoin和Agg算子需要处理的数据量从(1,1,7)变成了(3,3,3)从而避免了数据倾斜。
+在Doris中,Local 
Exchange根据一系列规则来决定是否被规划,例如当查询耗时比较大的Join、聚合、窗口函数等算子需要被执行时,我们就需要使用Local 
Exchange来尽可能避免数据倾斜。
\ No newline at end of file
diff --git a/static/images/pip_exec_1.png b/static/images/pip_exec_1.png
new file mode 100644
index 00000000000..8f258bd9fcf
Binary files /dev/null and b/static/images/pip_exec_1.png differ
diff --git a/static/images/pip_exec_2.png b/static/images/pip_exec_2.png
new file mode 100644
index 00000000000..184babf1735
Binary files /dev/null and b/static/images/pip_exec_2.png differ
diff --git a/static/images/pip_exec_3.png b/static/images/pip_exec_3.png
new file mode 100644
index 00000000000..49ba05472ba
Binary files /dev/null and b/static/images/pip_exec_3.png differ
diff --git a/static/images/pip_exec_4.png b/static/images/pip_exec_4.png
new file mode 100644
index 00000000000..e8fd4e5d452
Binary files /dev/null and b/static/images/pip_exec_4.png differ
diff --git a/static/images/pip_exec_5.png b/static/images/pip_exec_5.png
new file mode 100644
index 00000000000..840410d1e29
Binary files /dev/null and b/static/images/pip_exec_5.png differ
diff --git a/static/images/pip_exec_6.png b/static/images/pip_exec_6.png
new file mode 100644
index 00000000000..d6c8325adf8
Binary files /dev/null and b/static/images/pip_exec_6.png differ
diff --git a/static/images/pip_exec_7.png b/static/images/pip_exec_7.png
new file mode 100644
index 00000000000..58648e3e37f
Binary files /dev/null and b/static/images/pip_exec_7.png differ
diff --git 
a/versioned_docs/version-2.1/query-acceleration/pipeline-execution-engine.md 
b/versioned_docs/version-2.1/query-acceleration/pipeline-execution-engine.md
index 503b2a4d7b0..788f1d9385c 100644
--- a/versioned_docs/version-2.1/query-acceleration/pipeline-execution-engine.md
+++ b/versioned_docs/version-2.1/query-acceleration/pipeline-execution-engine.md
@@ -1,6 +1,6 @@
 ---
 {
-    "title": "Pipeline Execution Engine",
+    "title": "Parallel Execution",
     "language": "en",
     "toc_min_heading_level": 2,
     "toc_max_heading_level": 4
@@ -28,87 +28,79 @@ under the License.
 
 
 
-The Pipeline Execution Engine is an experimental feature in Apache Doris 2.0, 
which was later optimized and upgraded in version 2.1 (i.e., PipelineX). In 
versions 3.0 and later, PipelineX is used as the only execution engine in Doris 
and renamed to Pipeline Execution Engine.
+The parallel execution model of Doris is a Pipeline execution model, primarily 
inspired by the implementation described in the Hyper paper 
(https://db.in.tum.de/~leis/papers/morsels.pdf). The Pipeline execution model 
fully leverages the computational power of multi-core CPUs while limiting the 
number of query threads in Doris, addressing the issue of thread explosion 
during execution. For details on its design, implementation, and effectiveness, 
refer to [DSIP-027](DSIP-027: Support Pipe [...]
 
-The goal of pipeline execution engine is to replace the current execution 
engine of Doris's volcano model, fully release the computing power of 
multi-core CPUs, and limit the number of Doris's query threads to solve the 
problem of Doris's execution thread bloat.
+Starting from Doris 3.0, the Pipeline execution model has completely replaced 
the original Volcano model. Based on the Pipeline execution model, Doris 
supports the parallel processing of Query, DDL, and DML statements.
 
-Its specific design, implementation and effects can be found in 
[DSIP-027]([DSIP-027: Support Pipeline Exec Engine - DORIS - Apache Software 
Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine))
 and [DSIP-035]([DSIP-035: PipelineX Execution Engine - DORIS - Apache Software 
Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-035%3A+PipelineX+Execution+Engine)).
+## Physical Plan
 
-## Principle
+To better understand the Pipeline execution model, it is first necessary to 
introduce two important concepts in the physical query plan: PlanFragment and 
PlanNode. We will use the following SQL statement as an example:
+```
+SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1);
+```
 
-The current Doris SQL execution engine is designed based on the traditional 
volcano model, which has the following problems in a single multi-core scenario:
+The FE will first translate it into the following logical plan, where each 
node represents a PlanNode. The specific meaning of each type of node can be 
found in the introduction to the physical plan.
 
-* Inability to take full advantage of multi-core computing power to improve 
query performance,**most scenarios require manual setting of parallelism** for 
performance tuning, which is almost difficult to set in production environments.
+![pip_exec_1](/images/pip_exec_1.png)
 
-* Each instance of a standalone query corresponds to one thread of the thread 
pool, which introduces two additional problems.
-    
-    * Once the thread pool is hit full. **Doris' query engine will enter a 
pseudo-deadlock** and will not respond to subsequent queries. **At the same 
time there is a certain probability of entering a logical deadlock** situation: 
for example, all threads are executing an instance's probe task.
-    
-    * Blocking arithmetic will take up thread resources,**blocking thread 
resources can not be yielded to instances that can be scheduled**, the overall 
resource utilization does not go up.
+Since Doris is built on an MPP architecture, each query aims to involve all 
BEs in parallel execution as much as possible to reduce query latency. 
Therefore, the logical plan must be transformed into a physical plan. The 
transformation essentially involves inserting DataSink and ExchangeNode into 
the logical plan. These two nodes facilitate the shuffling of data across 
multiple BEs.
 
-* Blocking arithmetic relies on the OS thread scheduling mechanism, **thread 
switching overhead (especially in the scenario of system mixing))**
+After the transformation, each PlanFragment corresponds to a portion of the 
PlanNode and can be sent as an independent task to a BE. Each BE processes the 
PlanNode contained within the PlanFragment and then uses the DataSink and 
ExchangeNode operators to shuffle data to other BEs for subsequent computation.
 
-The resulting set of problems drove Doris to implement an execution engine 
adapted to the architecture of modern multi-core CPUs.
+![pip_exec_2](/images/pip_exec_2.png)
 
-And as shown in the figure below (quoted from[Push versus pull-based loop 
fusion in query engines]([jfp_1800010a 
(cambridge.org)](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf))),The
 resulting set of problems drove Doris to implement an execution engine adapted 
to the architecture of modern multi-core CPUs.:
+Doris's planning process is divided into three layers:
+PLAN:The execution plan. A SQL statement is translated by the query planner 
into an execution plan, which is then provided to the execution engine for 
execution.
 
-![image.png](/images/pipeline-execution-engine.png)
+FRAGMENT:Since Doris is a distributed execution engine, a complete execution 
plan is divided into multiple single-machine execution fragments. A FRAGMENT 
represents a complete single-machine execution fragment. Multiple fragments 
combine to form a complete PLAN.
 
-1. Transformation of the traditional pull pull logic-driven execution process 
into a data-driven execution engine for the push model
+PLAN NODE:Operators, which are the smallest units of the execution plan. A 
FRAGMENT consists of multiple operators, each responsible for a specific 
execution logic, such as aggregation or join operations.
 
-2. Blocking operations are asynchronous, reducing the execution overhead 
caused by thread switching and thread blocking and making more efficient use of 
the CPU
+## Pipeline Execution
+A PlanFragment is the smallest unit of a task sent by the FE to the BE for 
execution. A BE may receive multiple different PlanFragments for the same 
query, and each PlanFragment is processed independently. Upon receiving a 
PlanFragment, the BE splits it into multiple Pipelines and then starts multiple 
PipelineTasks to achieve parallel execution, thereby improving query efficiency.
 
-3. Controls the number of threads to be executed and reduces the resource 
congestion of large queries on small queries in mixed load scenarios by 
controlling time slice switching
+![pip_exec_3](/images/pip_exec_3.png)
 
-4. In terms of execution concurrency, pipelineX introduces local exchange 
optimization to fully utilize CPU resources, and distribute data evenly across 
different tasks to minimize data skewing. In addition, pipelineX will no longer 
be constrained by the number of tablets.
 
-5. Logically, multiple pipeline tasks share all shared states of the same 
pipeline and eliminate additional initialization overhead, such as expressions 
and some const variables.
+### Pipeline
+A Pipeline consists of a SourceOperator, a SinkOperator, and several 
intermediate operators. The SourceOperator represents reading data from an 
external source, which can be a table (e.g., OlapTable) or a buffer (e.g., 
Exchange). The SinkOperator represents the data output, which can either be 
shuffled to other nodes over the network (e.g., DataStreamSinkOperator) or 
output to a hash table (e.g., aggregation operators, join build hash tables, 
etc.).
 
-6. In terms of scheduling logic, the blocking conditions of all pipeline tasks 
are encapsulated using Dependency, and the execution logic of the tasks is 
triggered by external events (such as rpc completion) to enter the runnable 
queue, thereby eliminating the overhead of blocking polling threads.
+![pip_exec_4](/images/pip_exec_4.png)
 
-7. Profile: Provide users with simple and easy to understand metrics.
+Multiple Pipelines are actually interdependent. Take the JoinNode as an 
example—it is split into two Pipelines. Pipeline-0 reads data from Exchange to 
build the hash table, while Pipeline-1 reads data from the table to perform the 
probe operation. These two Pipelines are connected by a dependency 
relationship, meaning Pipeline-1 can only execute after Pipeline-0 has 
completed. This dependency relationship is referred to as a Dependency. Once 
Pipeline-0 finishes execution, it calls the se [...]
 
+### PipelineTask
+A Pipeline is actually a logical concept; it is not an executable entity. Once 
a Pipeline is defined, it needs to be further instantiated into multiple 
PipelineTasks. The data that needs to be read is then distributed to different 
PipelineTasks, ultimately achieving parallel processing. The operators within 
the multiple PipelineTasks of the same Pipeline are identical, but they differ 
in their states. For example, they might read different data or build different 
hash tables. These diffe [...]
 
-This improves the efficiency of CPU execution on mixed-load SQL and enhances 
the performance of SQL queries.
+Each PipelineTask is eventually submitted to a thread pool to be executed as 
an independent task. With the Dependency trigger mechanism, this approach 
allows better utilization of multi-core CPUs and achieves full parallelism.
 
-## Usage
+### Operator
+In most cases, each operator in a Pipeline corresponds to a PlanNode, but 
there are some special operators with exceptions:
+* JoinNode is split into JoinBuildOperator and JoinProbeOperator.
+* AggNode is split into AggSinkOperator and AggSourceOperator.
+* SortNode is split into SortSinkOperator and SortSourceOperator.
+  The basic principle is that for certain "breaking" operators (those that 
need to collect all the data before performing computation), the data ingestion 
part is split into a Sink, while the part that retrieves data from the operator 
is referred to as the Source.
 
-### Query
+## Scan 并行化
+Scanning data is a very heavy I/O operation, as it requires reading large 
amounts of data from local disks (or from HDFS or S3 in the case of data lake 
scenarios, which introduces even longer latency), consuming a significant 
amount of time. Therefore, we have introduced parallel scanning technology in 
the ScanOperator. The ScanOperator dynamically generates multiple Scanners, 
each of which scans around 1 to 2 million rows of data. While performing the 
scan, each Scanner handles tasks su [...]
 
-1. enable_pipeline_engine
+![pip_exec_5](/images/pip_exec_5.png)
 
-  Setting the session variable `enable_pipeline_engine` to `true` will make BE 
to use the Pipeline execution engine when performing query execution.
+By using parallel scanning technology, we can effectively avoid issues where 
certain ScanOperators take an excessively long time due to improper bucketing 
or data skew, which would otherwise slow down the entire query latency.
 
-  ```sql
-  set enable_pipeline_engine = true;
-  ```
+## Local Shuffle
+In the Pipeline execution model, Local Exchange acts as a Pipeline Breaker, a 
technique that redistributes data locally across different execution tasks. It 
evenly distributes all the data output by the upstream Pipeline to all the 
tasks in the downstream Pipeline using methods like HASH or Round Robin. This 
helps solve the problem of data skew during execution, ensuring that the 
execution model is no longer limited by data storage or the query plan. Let's 
now provide an example to illus [...]
 
-2. parallel_pipeline_task_num
+We will further explain how Local Exchange can prevent data skew using 
Pipeline-1 from the previous example.
 
-  The `parallel_pipeline_task_num` represents the number of Pipeline Tasks for 
a SQL query to be queried concurrently.The default configuration of Doris is 
`0`, in which case the number of Pipeline Tasks will be automatically set to 
half of the minimum number of CPUs in the current cluster machine. You can also 
adjust it according to your own situation.
+![pip_exec_6](/images/pip_exec_6.png)
 
-  ```sql
-  set parallel_pipeline_task_num = 0;
-  ```
+As shown in the figure above, by inserting a Local Exchange in Pipeline-1, we 
further split Pipeline-1 into Pipeline-1-0 and Pipeline-1-1.
 
-  You can limit the automatically configured concurrency by setting 
`max_instance_num`(The default value is 64)
+Now, let's assume the current concurrency level is 3 (each Pipeline has 3 
tasks), and each task reads one bucket from the storage layer. The number of 
rows in the three buckets is 1, 1, and 7, respectively. The execution before 
and after inserting the Local Exchange changes as follows:
 
-3. enable_local_shuffle
+![pip_exec_7](/images/pip_exec_7.png)
 
-  Set `enable_local_shuffle` to true will enable local shuffle optimization. 
Local shuffle will try to evenly distribute data among different pipeline tasks 
to avoid data skewing as much as possible.
+As can be seen from the figure on the right, the amount of data that the 
HashJoin and Agg operators need to process changes from (1, 1, 7) to (3, 3, 3), 
thereby avoiding data skew.
 
-  ```sql
-  set enable_local_shuffle = true;
-  ```
-
-4. ignore_storage_data_distribution
-
-  Settings `ignore_storage_data_distribution` is true, it means ignoring the 
data distribution of the storage layer. When used in conjunction with local 
shuffle, the concurrency capability of the pipelineX engine will no longer be 
constrained by the number of storage layer tables, thus fully utilizing machine 
resources.
-
-  ```sql
-  set ignore_storage_data_distribution = true;
-  ```
-
-### Load
-
-The engine selected for import are detailed in the 
[Load](../data-operate/import/load-manual) documentation.
+In Doris, Local Exchange is planned based on a series of rules. For example, 
when a query involves time-consuming operators like Join, Aggregation, or 
Window Functions, Local Exchange is used to minimize data skew as much as 
possible.
diff --git 
a/versioned_docs/version-3.0/query-acceleration/pipeline-execution-engine.md 
b/versioned_docs/version-3.0/query-acceleration/pipeline-execution-engine.md
index 503b2a4d7b0..8239554ba03 100644
--- a/versioned_docs/version-3.0/query-acceleration/pipeline-execution-engine.md
+++ b/versioned_docs/version-3.0/query-acceleration/pipeline-execution-engine.md
@@ -1,6 +1,6 @@
 ---
 {
-    "title": "Pipeline Execution Engine",
+    "title": "Parallel Execution",
     "language": "en",
     "toc_min_heading_level": 2,
     "toc_max_heading_level": 4
@@ -28,87 +28,80 @@ under the License.
 
 
 
-The Pipeline Execution Engine is an experimental feature in Apache Doris 2.0, 
which was later optimized and upgraded in version 2.1 (i.e., PipelineX). In 
versions 3.0 and later, PipelineX is used as the only execution engine in Doris 
and renamed to Pipeline Execution Engine.
+The parallel execution model of Doris is a Pipeline execution model, primarily 
inspired by the implementation described in the Hyper paper 
(https://db.in.tum.de/~leis/papers/morsels.pdf). The Pipeline execution model 
fully leverages the computational power of multi-core CPUs while limiting the 
number of query threads in Doris, addressing the issue of thread explosion 
during execution. For details on its design, implementation, and effectiveness, 
refer to [DSIP-027](DSIP-027: Support Pipe [...]
 
-The goal of pipeline execution engine is to replace the current execution 
engine of Doris's volcano model, fully release the computing power of 
multi-core CPUs, and limit the number of Doris's query threads to solve the 
problem of Doris's execution thread bloat.
+Starting from Doris 3.0, the Pipeline execution model has completely replaced 
the original Volcano model. Based on the Pipeline execution model, Doris 
supports the parallel processing of Query, DDL, and DML statements.
 
-Its specific design, implementation and effects can be found in 
[DSIP-027]([DSIP-027: Support Pipeline Exec Engine - DORIS - Apache Software 
Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine))
 and [DSIP-035]([DSIP-035: PipelineX Execution Engine - DORIS - Apache Software 
Foundation](https://cwiki.apache.org/confluence/display/DORIS/DSIP-035%3A+PipelineX+Execution+Engine)).
+## Physical Plan
 
-## Principle
+To better understand the Pipeline execution model, it is first necessary to 
introduce two important concepts in the physical query plan: PlanFragment and 
PlanNode. We will use the following SQL statement as an example:
+```
+SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1);
+```
 
-The current Doris SQL execution engine is designed based on the traditional 
volcano model, which has the following problems in a single multi-core scenario:
+The FE will first translate it into the following logical plan, where each 
node represents a PlanNode. The specific meaning of each type of node can be 
found in the introduction to the physical plan.
 
-* Inability to take full advantage of multi-core computing power to improve 
query performance,**most scenarios require manual setting of parallelism** for 
performance tuning, which is almost difficult to set in production environments.
+![pip_exec_1](/images/pip_exec_1.png)
 
-* Each instance of a standalone query corresponds to one thread of the thread 
pool, which introduces two additional problems.
-    
-    * Once the thread pool is hit full. **Doris' query engine will enter a 
pseudo-deadlock** and will not respond to subsequent queries. **At the same 
time there is a certain probability of entering a logical deadlock** situation: 
for example, all threads are executing an instance's probe task.
-    
-    * Blocking arithmetic will take up thread resources,**blocking thread 
resources can not be yielded to instances that can be scheduled**, the overall 
resource utilization does not go up.
+Since Doris is built on an MPP architecture, each query aims to involve all 
BEs in parallel execution as much as possible to reduce query latency. 
Therefore, the logical plan must be transformed into a physical plan. The 
transformation essentially involves inserting DataSink and ExchangeNode into 
the logical plan. These two nodes facilitate the shuffling of data across 
multiple BEs.
 
-* Blocking arithmetic relies on the OS thread scheduling mechanism, **thread 
switching overhead (especially in the scenario of system mixing))**
+After the transformation, each PlanFragment corresponds to a portion of the 
PlanNode and can be sent as an independent task to a BE. Each BE processes the 
PlanNode contained within the PlanFragment and then uses the DataSink and 
ExchangeNode operators to shuffle data to other BEs for subsequent computation.
 
-The resulting set of problems drove Doris to implement an execution engine 
adapted to the architecture of modern multi-core CPUs.
+![pip_exec_2](/images/pip_exec_2.png)
 
-And as shown in the figure below (quoted from[Push versus pull-based loop 
fusion in query engines]([jfp_1800010a 
(cambridge.org)](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf))),The
 resulting set of problems drove Doris to implement an execution engine adapted 
to the architecture of modern multi-core CPUs.:
+Doris's planning process is divided into three layers:
+PLAN:The execution plan. A SQL statement is translated by the query planner 
into an execution plan, which is then provided to the execution engine for 
execution.
 
-![image.png](/images/pipeline-execution-engine.png)
+FRAGMENT:Since Doris is a distributed execution engine, a complete execution 
plan is divided into multiple single-machine execution fragments. A FRAGMENT 
represents a complete single-machine execution fragment. Multiple fragments 
combine to form a complete PLAN.
 
-1. Transformation of the traditional pull pull logic-driven execution process 
into a data-driven execution engine for the push model
+PLAN NODE:Operators, which are the smallest units of the execution plan. A 
FRAGMENT consists of multiple operators, each responsible for a specific 
execution logic, such as aggregation or join operations.
 
-2. Blocking operations are asynchronous, reducing the execution overhead 
caused by thread switching and thread blocking and making more efficient use of 
the CPU
+## Pipeline Execution
+A PlanFragment is the smallest unit of a task sent by the FE to the BE for 
execution. A BE may receive multiple different PlanFragments for the same 
query, and each PlanFragment is processed independently. Upon receiving a 
PlanFragment, the BE splits it into multiple Pipelines and then starts multiple 
PipelineTasks to achieve parallel execution, thereby improving query efficiency.
 
-3. Controls the number of threads to be executed and reduces the resource 
congestion of large queries on small queries in mixed load scenarios by 
controlling time slice switching
+![pip_exec_3](/images/pip_exec_3.png)
 
-4. In terms of execution concurrency, pipelineX introduces local exchange 
optimization to fully utilize CPU resources, and distribute data evenly across 
different tasks to minimize data skewing. In addition, pipelineX will no longer 
be constrained by the number of tablets.
 
-5. Logically, multiple pipeline tasks share all shared states of the same 
pipeline and eliminate additional initialization overhead, such as expressions 
and some const variables.
+### Pipeline
+A Pipeline consists of a SourceOperator, a SinkOperator, and several 
intermediate operators. The SourceOperator represents reading data from an 
external source, which can be a table (e.g., OlapTable) or a buffer (e.g., 
Exchange). The SinkOperator represents the data output, which can either be 
shuffled to other nodes over the network (e.g., DataStreamSinkOperator) or 
output to a hash table (e.g., aggregation operators, join build hash tables, 
etc.).
 
-6. In terms of scheduling logic, the blocking conditions of all pipeline tasks 
are encapsulated using Dependency, and the execution logic of the tasks is 
triggered by external events (such as rpc completion) to enter the runnable 
queue, thereby eliminating the overhead of blocking polling threads.
+![pip_exec_4](/images/pip_exec_4.png)
 
-7. Profile: Provide users with simple and easy to understand metrics.
+Multiple Pipelines are actually interdependent. Take the JoinNode as an 
example—it is split into two Pipelines. Pipeline-0 reads data from Exchange to 
build the hash table, while Pipeline-1 reads data from the table to perform the 
probe operation. These two Pipelines are connected by a dependency 
relationship, meaning Pipeline-1 can only execute after Pipeline-0 has 
completed. This dependency relationship is referred to as a Dependency. Once 
Pipeline-0 finishes execution, it calls the se [...]
 
+### PipelineTask
+A Pipeline is actually a logical concept; it is not an executable entity. Once 
a Pipeline is defined, it needs to be further instantiated into multiple 
PipelineTasks. The data that needs to be read is then distributed to different 
PipelineTasks, ultimately achieving parallel processing. The operators within 
the multiple PipelineTasks of the same Pipeline are identical, but they differ 
in their states. For example, they might read different data or build different 
hash tables. These diffe [...]
 
-This improves the efficiency of CPU execution on mixed-load SQL and enhances 
the performance of SQL queries.
+Each PipelineTask is eventually submitted to a thread pool to be executed as 
an independent task. With the Dependency trigger mechanism, this approach 
allows better utilization of multi-core CPUs and achieves full parallelism.
 
-## Usage
+### Operator
+In most cases, each operator in a Pipeline corresponds to a PlanNode, but 
there are some special operators with exceptions:
+* JoinNode is split into JoinBuildOperator and JoinProbeOperator.
+* AggNode is split into AggSinkOperator and AggSourceOperator.
+* SortNode is split into SortSinkOperator and SortSourceOperator.
+  The basic principle is that for certain "breaking" operators (those that 
need to collect all the data before performing computation), the data ingestion 
part is split into a Sink, while the part that retrieves data from the operator 
is referred to as the Source.
 
-### Query
+## Scan 并行化
+Scanning data is a very heavy I/O operation, as it requires reading large 
amounts of data from local disks (or from HDFS or S3 in the case of data lake 
scenarios, which introduces even longer latency), consuming a significant 
amount of time. Therefore, we have introduced parallel scanning technology in 
the ScanOperator. The ScanOperator dynamically generates multiple Scanners, 
each of which scans around 1 to 2 million rows of data. While performing the 
scan, each Scanner handles tasks su [...]
 
-1. enable_pipeline_engine
+![pip_exec_5](/images/pip_exec_5.png)
 
-  Setting the session variable `enable_pipeline_engine` to `true` will make BE 
to use the Pipeline execution engine when performing query execution.
+By using parallel scanning technology, we can effectively avoid issues where 
certain ScanOperators take an excessively long time due to improper bucketing 
or data skew, which would otherwise slow down the entire query latency.
 
-  ```sql
-  set enable_pipeline_engine = true;
-  ```
+## Local Shuffle
+In the Pipeline execution model, Local Exchange acts as a Pipeline Breaker, a 
technique that redistributes data locally across different execution tasks. It 
evenly distributes all the data output by the upstream Pipeline to all the 
tasks in the downstream Pipeline using methods like HASH or Round Robin. This 
helps solve the problem of data skew during execution, ensuring that the 
execution model is no longer limited by data storage or the query plan. Let's 
now provide an example to illus [...]
 
-2. parallel_pipeline_task_num
+We will further explain how Local Exchange can prevent data skew using 
Pipeline-1 from the previous example.
 
-  The `parallel_pipeline_task_num` represents the number of Pipeline Tasks for 
a SQL query to be queried concurrently.The default configuration of Doris is 
`0`, in which case the number of Pipeline Tasks will be automatically set to 
half of the minimum number of CPUs in the current cluster machine. You can also 
adjust it according to your own situation.
+![pip_exec_6](/images/pip_exec_6.png)
 
-  ```sql
-  set parallel_pipeline_task_num = 0;
-  ```
+As shown in the figure above, by inserting a Local Exchange in Pipeline-1, we 
further split Pipeline-1 into Pipeline-1-0 and Pipeline-1-1.
 
-  You can limit the automatically configured concurrency by setting 
`max_instance_num`(The default value is 64)
+Now, let's assume the current concurrency level is 3 (each Pipeline has 3 
tasks), and each task reads one bucket from the storage layer. The number of 
rows in the three buckets is 1, 1, and 7, respectively. The execution before 
and after inserting the Local Exchange changes as follows:
 
-3. enable_local_shuffle
+![pip_exec_7](/images/pip_exec_7.png)
 
-  Set `enable_local_shuffle` to true will enable local shuffle optimization. 
Local shuffle will try to evenly distribute data among different pipeline tasks 
to avoid data skewing as much as possible.
+As can be seen from the figure on the right, the amount of data that the 
HashJoin and Agg operators need to process changes from (1, 1, 7) to (3, 3, 3), 
thereby avoiding data skew.
 
-  ```sql
-  set enable_local_shuffle = true;
-  ```
+In Doris, Local Exchange is planned based on a series of rules. For example, 
when a query involves time-consuming operators like Join, Aggregation, or 
Window Functions, Local Exchange is used to minimize data skew as much as 
possible.
 
-4. ignore_storage_data_distribution
-
-  Settings `ignore_storage_data_distribution` is true, it means ignoring the 
data distribution of the storage layer. When used in conjunction with local 
shuffle, the concurrency capability of the pipelineX engine will no longer be 
constrained by the number of storage layer tables, thus fully utilizing machine 
resources.
-
-  ```sql
-  set ignore_storage_data_distribution = true;
-  ```
-
-### Load
-
-The engine selected for import are detailed in the 
[Load](../data-operate/import/load-manual) documentation.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to