This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 90dd8716ed [refactor](multicast) change the way multicast do filter, 
project and shuffle (#21412)
90dd8716ed is described below

commit 90dd8716edc514abad87f99c36ef0e8fe5024989
Author: morrySnow <101034200+morrys...@users.noreply.github.com>
AuthorDate: Tue Jul 4 16:51:07 2023 +0800

    [refactor](multicast) change the way multicast do filter, project and 
shuffle (#21412)
    
    Co-authored-by: Jerry Hu <mrh...@gmail.com>
    
    1. Filtering is done at the sending end rather than the receiving end
    2. Projection is done at the sending end rather than the receiving end
    3. Each sender can use different shuffle policies to send data
---
 .../exec/multi_cast_data_stream_source.cpp         |  60 +++++++--
 .../pipeline/exec/multi_cast_data_stream_source.h  |  21 +++-
 be/src/pipeline/pipeline_fragment_context.cpp      |  13 +-
 be/src/runtime/runtime_filter_mgr.cpp              |   1 -
 ...nsumer_node.cpp => runtime_filter_consumer.cpp} |  49 ++++----
 ...r_consumer_node.h => runtime_filter_consumer.h} |  30 +++--
 be/src/vec/exec/scan/vscan_node.cpp                |   4 +-
 be/src/vec/exec/scan/vscan_node.h                  |  10 +-
 be/src/vec/exec/vselect_node.cpp                   |  20 +--
 be/src/vec/exec/vselect_node.h                     |   7 +-
 .../glue/translator/PhysicalPlanTranslator.java    | 135 ++++++++++-----------
 .../properties/ChildOutputPropertyDeriver.java     |   2 +-
 .../properties/ChildrenPropertiesRegulator.java    |  34 ++++++
 .../properties/DistributionSpecMustShuffle.java    |  35 ++++++
 .../nereids/properties/PhysicalProperties.java     |   2 +
 .../org/apache/doris/planner/DataStreamSink.java   |  87 ++++++++++++-
 .../org/apache/doris/planner/ExchangeNode.java     |  10 +-
 .../doris/planner/MultiCastPlanFragment.java       |   5 +-
 .../org/apache/doris/planner/PlanFragment.java     |   2 +-
 .../java/org/apache/doris/planner/PlanNode.java    |   4 +-
 gensrc/thrift/DataSinks.thrift                     |  12 ++
 .../nereids_tpcds_shape_sf100_p0/shape/query1.out  |  13 +-
 .../nereids_tpcds_shape_sf100_p0/shape/query2.out  |  18 +--
 .../nereids_tpcds_shape_sf100_p0/shape/query24.out |  12 +-
 .../nereids_tpcds_shape_sf100_p0/shape/query30.out |   5 +-
 .../nereids_tpcds_shape_sf100_p0/shape/query47.out |  10 +-
 .../nereids_tpcds_shape_sf100_p0/shape/query57.out |  10 +-
 .../nereids_tpcds_shape_sf100_p0/shape/query81.out |   8 +-
 .../nereids_tpcds_shape_sf100_p0/shape/query95.out |  14 +--
 29 files changed, 434 insertions(+), 199 deletions(-)

diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 06211faf52..924db8a729 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -27,14 +27,16 @@
 namespace doris::pipeline {
 
 
MultiCastDataStreamerSourceOperatorBuilder::MultiCastDataStreamerSourceOperatorBuilder(
-        int32_t id, const int consumer_id, 
std::shared_ptr<MultiCastDataStreamer>& data_streamer)
+        int32_t id, const int consumer_id, 
std::shared_ptr<MultiCastDataStreamer>& data_streamer,
+        const TDataStreamSink& sink)
         : OperatorBuilderBase(id, "MultiCastDataStreamerSourceOperator"),
           _consumer_id(consumer_id),
-          _multi_cast_data_streamer(data_streamer) {};
+          _multi_cast_data_streamer(data_streamer),
+          _t_data_stream_sink(sink) {}
 
 OperatorPtr MultiCastDataStreamerSourceOperatorBuilder::build_operator() {
-    return std::make_shared<MultiCastDataStreamerSourceOperator>(this, 
_consumer_id,
-                                                                 
_multi_cast_data_streamer);
+    return std::make_shared<MultiCastDataStreamerSourceOperator>(
+            this, _consumer_id, _multi_cast_data_streamer, 
_t_data_stream_sink);
 }
 
 const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() {
@@ -43,10 +45,38 @@ const RowDescriptor& 
MultiCastDataStreamerSourceOperatorBuilder::row_desc() {
 
 MultiCastDataStreamerSourceOperator::MultiCastDataStreamerSourceOperator(
         OperatorBuilderBase* operator_builder, const int consumer_id,
-        std::shared_ptr<MultiCastDataStreamer>& data_streamer)
+        std::shared_ptr<MultiCastDataStreamer>& data_streamer, const 
TDataStreamSink& sink)
         : OperatorBase(operator_builder),
+          vectorized::RuntimeFilterConsumer(sink.dest_node_id, 
sink.runtime_filters,
+                                            data_streamer->row_desc(), 
_conjuncts),
           _consumer_id(consumer_id),
-          _multi_cast_data_streamer(data_streamer) {};
+          _multi_cast_data_streamer(data_streamer),
+          _t_data_stream_sink(sink) {}
+
+Status MultiCastDataStreamerSourceOperator::prepare(doris::RuntimeState* 
state) {
+    RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state));
+    _register_runtime_filter();
+    if (_t_data_stream_sink.__isset.output_exprs) {
+        
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.output_exprs,
+                                                             
_output_expr_contexts));
+        RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_contexts, 
state, row_desc()));
+    }
+
+    if (_t_data_stream_sink.__isset.conjuncts) {
+        RETURN_IF_ERROR(
+                
vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts, 
_conjuncts));
+        RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, 
row_desc()));
+    }
+    return Status::OK();
+}
+
+Status MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) {
+    return _acquire_runtime_filter();
+}
+
+bool 
MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() {
+    return 
vectorized::RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout();
+}
 
 bool MultiCastDataStreamerSourceOperator::can_read() {
     return _multi_cast_data_streamer->can_read(_consumer_id);
@@ -55,7 +85,23 @@ bool MultiCastDataStreamerSourceOperator::can_read() {
 Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                       SourceState& 
source_state) {
     bool eos = false;
-    _multi_cast_data_streamer->pull(_consumer_id, block, &eos);
+    vectorized::Block tmp_block;
+    vectorized::Block* output_block = block;
+    if (!_output_expr_contexts.empty()) {
+        output_block = &tmp_block;
+    }
+    _multi_cast_data_streamer->pull(_consumer_id, output_block, &eos);
+
+    if (!_conjuncts.empty()) {
+        RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, 
output_block,
+                                                               
output_block->columns()));
+    }
+
+    if (!_output_expr_contexts.empty() && output_block->rows() > 0) {
+        
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
+                _output_expr_contexts, *output_block, block));
+        materialize_block_inplace(*block);
+    }
     if (eos) {
         source_state = SourceState::FINISHED;
     }
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 15bd320b89..8c9cb99f82 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -22,6 +22,7 @@
 
 #include "common/status.h"
 #include "operator.h"
+#include "vec/exec/runtime_filter_consumer.h"
 
 namespace doris {
 class ExecNode;
@@ -37,7 +38,8 @@ class MultiCastDataStreamer;
 class MultiCastDataStreamerSourceOperatorBuilder final : public 
OperatorBuilderBase {
 public:
     MultiCastDataStreamerSourceOperatorBuilder(int32_t id, const int 
consumer_id,
-                                               
std::shared_ptr<MultiCastDataStreamer>&);
+                                               
std::shared_ptr<MultiCastDataStreamer>&,
+                                               const TDataStreamSink&);
 
     bool is_source() const override { return true; }
 
@@ -48,20 +50,25 @@ public:
 private:
     const int _consumer_id;
     std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
+    TDataStreamSink _t_data_stream_sink;
 };
 
-class MultiCastDataStreamerSourceOperator final : public OperatorBase {
+class MultiCastDataStreamerSourceOperator final : public OperatorBase,
+                                                  public 
vectorized::RuntimeFilterConsumer {
 public:
     MultiCastDataStreamerSourceOperator(OperatorBuilderBase* operator_builder,
                                         const int consumer_id,
-                                        
std::shared_ptr<MultiCastDataStreamer>& data_streamer);
+                                        
std::shared_ptr<MultiCastDataStreamer>& data_streamer,
+                                        const TDataStreamSink& sink);
 
     Status get_block(RuntimeState* state, vectorized::Block* block,
                      SourceState& source_state) override;
 
-    Status prepare(RuntimeState* state) override { return Status::OK(); };
+    Status prepare(RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override { return Status::OK(); };
+    Status open(RuntimeState* state) override;
+
+    bool runtime_filters_are_ready_or_timeout() override;
 
     Status sink(RuntimeState* state, vectorized::Block* block, SourceState 
source_state) override {
         return Status::OK();
@@ -76,6 +83,10 @@ public:
 private:
     const int _consumer_id;
     std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
+    TDataStreamSink _t_data_stream_sink;
+
+    vectorized::VExprContextSPtrs _output_expr_contexts;
+    vectorized::VExprContextSPtrs _conjuncts;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 4aaaf7a7ec..0d9745255f 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -768,16 +768,25 @@ Status PipelineFragmentContext::_create_sink(int 
sender_id, const TDataSink& thr
         _multi_cast_stream_sink_senders.resize(sender_size);
         for (int i = 0; i < sender_size; ++i) {
             auto new_pipeline = add_pipeline();
+
+            auto row_desc =
+                    
!thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
+                            ? RowDescriptor(
+                                      _runtime_state->desc_tbl(),
+                                      
{thrift_sink.multi_cast_stream_sink.sinks[i].output_tuple_id},
+                                      {false})
+                            : sink_->row_desc();
             // 1. create the data stream sender sink
             _multi_cast_stream_sink_senders[i].reset(new 
vectorized::VDataStreamSender(
-                    _runtime_state.get(), _runtime_state->obj_pool(), 
sender_id, sink_->row_desc(),
+                    _runtime_state.get(), _runtime_state->obj_pool(), 
sender_id, row_desc,
                     thrift_sink.multi_cast_stream_sink.sinks[i],
                     thrift_sink.multi_cast_stream_sink.destinations[i], 16 * 
1024, false));
 
             // 2. create and set the source operator of 
multi_cast_data_stream_source for new pipeline
             OperatorBuilderPtr source_op =
                     
std::make_shared<MultiCastDataStreamerSourceOperatorBuilder>(
-                            next_operator_builder_id(), i, 
multi_cast_data_streamer);
+                            next_operator_builder_id(), i, 
multi_cast_data_streamer,
+                            thrift_sink.multi_cast_stream_sink.sinks[i]);
             new_pipeline->add_operator(source_op);
 
             // 3. create and set sink operator of data stream sender for new 
pipeline
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index e2b4c525ac..14ee165684 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -132,7 +132,6 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
             }
             IRuntimeFilter* filter;
             RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, 
&_query_ctx->obj_pool, &desc,
-
                                                    &options, 
RuntimeFilterRole::CONSUMER, node_id,
                                                    &filter, build_bf_exactly));
             _consumer_map[key].emplace_back(node_id, filter);
diff --git a/be/src/vec/exec/runtime_filter_consumer_node.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
similarity index 75%
rename from be/src/vec/exec/runtime_filter_consumer_node.cpp
rename to be/src/vec/exec/runtime_filter_consumer.cpp
index dd631ce66e..4f51d99ccc 100644
--- a/be/src/vec/exec/runtime_filter_consumer_node.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -15,22 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/exec/runtime_filter_consumer_node.h"
+#include "vec/exec/runtime_filter_consumer.h"
 
 namespace doris::vectorized {
 
-RuntimeFilterConsumerNode::RuntimeFilterConsumerNode(ObjectPool* pool, const 
TPlanNode& tnode,
-                                                     const DescriptorTbl& 
descs)
-        : ExecNode(pool, tnode, descs), 
_runtime_filter_descs(tnode.runtime_filters) {}
+RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id,
+                                             const 
std::vector<TRuntimeFilterDesc>& runtime_filters,
+                                             const RowDescriptor& 
row_descriptor,
+                                             VExprContextSPtrs& conjuncts)
+        : _filter_id(filter_id),
+          _runtime_filter_descs(runtime_filters),
+          _row_descriptor_ref(row_descriptor),
+          _conjuncts_ref(conjuncts) {}
 
-Status RuntimeFilterConsumerNode::init(const TPlanNode& tnode, RuntimeState* 
state) {
-    RETURN_IF_ERROR(ExecNode::init(tnode, state));
+Status RuntimeFilterConsumer::init(RuntimeState* state) {
     _state = state;
     RETURN_IF_ERROR(_register_runtime_filter());
     return Status::OK();
 }
 
-Status RuntimeFilterConsumerNode::_register_runtime_filter() {
+Status RuntimeFilterConsumer::_register_runtime_filter() {
     int filter_size = _runtime_filter_descs.size();
     _runtime_filter_ctxs.reserve(filter_size);
     _runtime_filter_ready_flag.reserve(filter_size);
@@ -43,14 +47,14 @@ Status 
RuntimeFilterConsumerNode::_register_runtime_filter() {
             // 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
             // 2. This filter is bloom filter (only bloom filter should be 
used for merging)
             
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, _state->query_options(), id(), false));
+                    filter_desc, _state->query_options(), _filter_id, false));
             
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
-                    filter_desc.filter_id, id(), &runtime_filter));
+                    filter_desc.filter_id, _filter_id, &runtime_filter));
         } else {
             
RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, _state->query_options(), id(), false));
+                    filter_desc, _state->query_options(), _filter_id, false));
             RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(
-                    filter_desc.filter_id, id(), &runtime_filter));
+                    filter_desc.filter_id, _filter_id, &runtime_filter));
         }
         _runtime_filter_ctxs.emplace_back(runtime_filter);
         _runtime_filter_ready_flag.emplace_back(false);
@@ -58,7 +62,7 @@ Status RuntimeFilterConsumerNode::_register_runtime_filter() {
     return Status::OK();
 }
 
-bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() {
+bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
     if (!_blocked_by_rf) {
         return true;
     }
@@ -72,20 +76,19 @@ bool 
RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() {
     return true;
 }
 
-Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) {
+Status RuntimeFilterConsumer::_acquire_runtime_filter() {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     VExprSPtrs vexprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
         bool ready = runtime_filter->is_ready();
-        if (!ready && wait) {
+        if (!ready) {
             ready = runtime_filter->await();
         }
         if (ready && !_runtime_filter_ctxs[i].apply_mark) {
             RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
             _runtime_filter_ctxs[i].apply_mark = true;
-        } else if ((wait || !runtime_filter->is_ready_or_timeout()) &&
-                   runtime_filter->current_state() == 
RuntimeFilterState::NOT_READY &&
+        } else if (runtime_filter->current_state() == 
RuntimeFilterState::NOT_READY &&
                    !_runtime_filter_ctxs[i].apply_mark) {
             _blocked_by_rf = true;
         } else if (!_runtime_filter_ctxs[i].apply_mark) {
@@ -101,23 +104,23 @@ Status 
RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) {
     return Status::OK();
 }
 
-Status RuntimeFilterConsumerNode::_append_rf_into_conjuncts(const VExprSPtrs& 
vexprs) {
+Status RuntimeFilterConsumer::_append_rf_into_conjuncts(const VExprSPtrs& 
vexprs) {
     if (vexprs.empty()) {
         return Status::OK();
     }
 
     for (auto& expr : vexprs) {
         VExprContextSPtr conjunct = VExprContext::create_shared(expr);
-        RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor));
+        RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref));
         RETURN_IF_ERROR(conjunct->open(_state));
         _rf_vexpr_set.insert(expr);
-        _conjuncts.emplace_back(conjunct);
+        _conjuncts_ref.emplace_back(conjunct);
     }
 
     return Status::OK();
 }
 
-Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* 
arrived_rf_num) {
+Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* 
arrived_rf_num) {
     if (_is_all_rf_applied) {
         *arrived_rf_num = _runtime_filter_descs.size();
         return Status::OK();
@@ -140,12 +143,12 @@ Status 
RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* ar
             continue;
         } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
             
RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs(
-                    &exprs, _row_descriptor, _state));
+                    &exprs, _row_descriptor_ref, _state));
             ++current_arrived_rf_num;
             _runtime_filter_ctxs[i].apply_mark = true;
         }
     }
-    // 2. Append unapplied runtime filters to vconjunct_ctx_ptr
+    // 2. Append unapplied runtime filters to _conjuncts
     if (!exprs.empty()) {
         RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs));
     }
@@ -157,7 +160,7 @@ Status 
RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* ar
     return Status::OK();
 }
 
-void RuntimeFilterConsumerNode::_prepare_rf_timer(RuntimeProfile* profile) {
+void RuntimeFilterConsumer::_prepare_rf_timer(RuntimeProfile* profile) {
     _acquire_runtime_filter_timer = ADD_TIMER(profile, 
"AcquireRuntimeFilterTime");
 }
 
diff --git a/be/src/vec/exec/runtime_filter_consumer_node.h 
b/be/src/vec/exec/runtime_filter_consumer.h
similarity index 80%
rename from be/src/vec/exec/runtime_filter_consumer_node.h
rename to be/src/vec/exec/runtime_filter_consumer.h
index 518e0e865c..c938e8510b 100644
--- a/be/src/vec/exec/runtime_filter_consumer_node.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -22,14 +22,16 @@
 
 namespace doris::vectorized {
 
-class RuntimeFilterConsumerNode : public ExecNode {
+class RuntimeFilterConsumer {
 public:
-    RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
-    ~RuntimeFilterConsumerNode() override = default;
+    RuntimeFilterConsumer(const int32_t filter_id,
+                          const std::vector<TRuntimeFilterDesc>& 
runtime_filters,
+                          const RowDescriptor& row_descriptor, 
VExprContextSPtrs& conjuncts);
+    ~RuntimeFilterConsumer() = default;
 
-    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
+    Status init(RuntimeState* state);
 
-    // Try append late arrived runtime filters.
+    // Try to append late arrived runtime filters.
     // Return num of filters which are applied already.
     Status try_append_late_arrival_runtime_filter(int* arrived_rf_num);
 
@@ -39,7 +41,7 @@ protected:
     // Register and get all runtime filters at Init phase.
     Status _register_runtime_filter();
     // Get all arrived runtime filters at Open phase.
-    Status _acquire_runtime_filter(bool wait = true);
+    Status _acquire_runtime_filter();
     // Append late-arrival runtime filters to the vconjunct_ctx.
     Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs);
 
@@ -54,15 +56,23 @@ protected:
         IRuntimeFilter* runtime_filter;
     };
 
-    RuntimeState* _state;
-
     std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
-
-    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
     // Set to true if the runtime filter is ready.
     std::vector<bool> _runtime_filter_ready_flag;
     doris::Mutex _rf_locks;
     phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set;
+
+private:
+    RuntimeState* _state;
+
+    int32_t _filter_id;
+
+    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+
+    const RowDescriptor& _row_descriptor_ref;
+
+    VExprContextSPtrs& _conjuncts_ref;
+
     // True means all runtime filters are applied to scanners
     bool _is_all_rf_applied = true;
     bool _blocked_by_rf = false;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index aaefef32ee..e9a14aac71 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -94,7 +94,9 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
 }
 
 Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
-    RETURN_IF_ERROR(RuntimeFilterConsumerNode::init(tnode, state));
+    RETURN_IF_ERROR(ExecNode::init(tnode, state));
+    RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
+    _state = state;
     _is_pipeline_scan = state->enable_pipeline_exec();
 
     const TQueryOptions& query_options = state->query_options();
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index ee0dadefdc..481e3480dc 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -43,7 +43,7 @@
 #include "runtime/runtime_state.h"
 #include "util/lock.h"
 #include "util/runtime_profile.h"
-#include "vec/exec/runtime_filter_consumer_node.h"
+#include "vec/exec/runtime_filter_consumer.h"
 #include "vec/exec/scan/scanner_context.h"
 #include "vec/exec/scan/vscanner.h"
 #include "vec/runtime/shared_scanner_controller.h"
@@ -88,10 +88,12 @@ struct FilterPredicates {
     std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> 
in_filters;
 };
 
-class VScanNode : public RuntimeFilterConsumerNode {
+class VScanNode : public ExecNode, public RuntimeFilterConsumer {
 public:
     VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs)
-            : RuntimeFilterConsumerNode(pool, tnode, descs) {
+            : ExecNode(pool, tnode, descs),
+              RuntimeFilterConsumer(id(), tnode.runtime_filters, 
ExecNode::_row_descriptor,
+                                    ExecNode::_conjuncts) {
         if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
             // Which means the request could be fullfilled in a single segment 
iterator request.
             if (tnode.limit > 0 && tnode.limit < 1024) {
@@ -304,6 +306,8 @@ protected:
     VExprContextSPtrs _stale_expr_ctxs;
     VExprContextSPtrs _common_expr_ctxs_push_down;
 
+    RuntimeState* _state;
+
     // If sort info is set, push limit to each scanner;
     int64_t _limit_per_scanner = -1;
 
diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp
index 626fd5ce96..ee1628cd19 100644
--- a/be/src/vec/exec/vselect_node.cpp
+++ b/be/src/vec/exec/vselect_node.cpp
@@ -37,34 +37,22 @@ class TPlanNode;
 namespace vectorized {
 
 VSelectNode::VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
-        : RuntimeFilterConsumerNode(pool, tnode, descs), _child_eos(false) {}
+        : ExecNode(pool, tnode, descs), _child_eos(false) {}
 
 Status VSelectNode::init(const TPlanNode& tnode, RuntimeState* state) {
-    return RuntimeFilterConsumerNode::init(tnode, state);
+    return ExecNode::init(tnode, state);
 }
 
 Status VSelectNode::prepare(RuntimeState* state) {
-    return RuntimeFilterConsumerNode::prepare(state);
+    return ExecNode::prepare(state);
 }
 
 Status VSelectNode::open(RuntimeState* state) {
-    RETURN_IF_ERROR(RuntimeFilterConsumerNode::open(state));
+    RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(child(0)->open(state));
     return Status::OK();
 }
 
-Status VSelectNode::alloc_resource(RuntimeState* state) {
-    if (_opened) {
-        return Status::OK();
-    }
-
-    RETURN_IF_ERROR(RuntimeFilterConsumerNode::alloc_resource(state));
-    RETURN_IF_ERROR(_acquire_runtime_filter());
-    RETURN_IF_CANCELLED(state);
-    _opened = true;
-    return Status::OK();
-}
-
 Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, 
bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
diff --git a/be/src/vec/exec/vselect_node.h b/be/src/vec/exec/vselect_node.h
index 140009e4b3..d6783d7237 100644
--- a/be/src/vec/exec/vselect_node.h
+++ b/be/src/vec/exec/vselect_node.h
@@ -17,7 +17,7 @@
 
 #pragma once
 #include "common/status.h"
-#include "vec/exec/runtime_filter_consumer_node.h"
+#include "exec/exec_node.h"
 
 namespace doris {
 class DescriptorTbl;
@@ -28,7 +28,7 @@ class TPlanNode;
 namespace vectorized {
 class Block;
 
-class VSelectNode final : public RuntimeFilterConsumerNode {
+class VSelectNode final : public ExecNode {
 public:
     VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs);
     Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
@@ -38,12 +38,9 @@ public:
     Status close(RuntimeState* state) override;
     Status pull(RuntimeState* state, vectorized::Block* output_block, bool* 
eos) override;
 
-    Status alloc_resource(RuntimeState* state) override;
-
 private:
     // true if last get_next() call on child signalled eos
     bool _child_eos;
-    bool _opened = false;
 };
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 6aff68e6b4..650bcc43a9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -247,27 +247,21 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     @Override
     public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends 
Plan> distribute,
             PlanTranslatorContext context) {
-        PlanFragment childFragment = distribute.child().accept(this, context);
+        PlanFragment inputFragment = distribute.child().accept(this, context);
         // TODO: why need set streaming here? should remove this.
-        if (childFragment.getPlanRoot() instanceof AggregationNode
+        if (inputFragment.getPlanRoot() instanceof AggregationNode
                 && distribute.child() instanceof PhysicalHashAggregate
-                && context.getFirstAggregateInFragment(childFragment) == 
distribute.child()) {
+                && context.getFirstAggregateInFragment(inputFragment) == 
distribute.child()) {
             PhysicalHashAggregate<?> hashAggregate = 
(PhysicalHashAggregate<?>) distribute.child();
             if (hashAggregate.getAggPhase() == AggPhase.LOCAL
                     && hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER) {
-                AggregationNode aggregationNode = (AggregationNode) 
childFragment.getPlanRoot();
+                AggregationNode aggregationNode = (AggregationNode) 
inputFragment.getPlanRoot();
                 
aggregationNode.setUseStreamingPreagg(hashAggregate.isMaybeUsingStream());
             }
         }
 
-        ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), 
childFragment.getPlanRoot());
+        ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), 
inputFragment.getPlanRoot());
         updateLegacyPlanIdToPhysicalPlan(exchangeNode, distribute);
-        
exchangeNode.setNumInstances(childFragment.getPlanRoot().getNumInstances());
-        if (distribute.getDistributionSpec() instanceof 
DistributionSpecGather) {
-            // gather to one instance
-            exchangeNode.setNumInstances(1);
-        }
-
         List<ExprId> validOutputIds = distribute.getOutputExprIds();
         if (distribute.child() instanceof PhysicalHashAggregate) {
             // we must add group by keys to output list,
@@ -282,8 +276,28 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         }
         DataPartition dataPartition = 
toDataPartition(distribute.getDistributionSpec(), validOutputIds, context);
         PlanFragment parentFragment = new 
PlanFragment(context.nextFragmentId(), exchangeNode, dataPartition);
-        childFragment.setDestination(exchangeNode);
-        childFragment.setOutputPartition(dataPartition);
+        
exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances());
+        if (distribute.getDistributionSpec() instanceof 
DistributionSpecGather) {
+            // gather to one instance
+            exchangeNode.setNumInstances(1);
+        }
+
+        // process multicast sink
+        if (inputFragment instanceof MultiCastPlanFragment) {
+            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) 
inputFragment.getSink();
+            DataStreamSink dataStreamSink = 
multiCastDataSink.getDataStreamSinks().get(
+                    multiCastDataSink.getDataStreamSinks().size() - 1);
+            TupleDescriptor tupleDescriptor = 
generateTupleDesc(distribute.getOutput(), null, context);
+            exchangeNode.updateTupleIds(tupleDescriptor);
+            dataStreamSink.setExchNodeId(exchangeNode.getId());
+            dataStreamSink.setOutputPartition(dataPartition);
+            parentFragment.addChild(inputFragment);
+            ((MultiCastPlanFragment) inputFragment).addToDest(exchangeNode);
+        } else {
+            inputFragment.setDestination(exchangeNode);
+            inputFragment.setOutputPartition(dataPartition);
+        }
+
         context.addPlanFragment(parentFragment);
         return parentFragment;
     }
@@ -760,71 +774,23 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         MultiCastDataSink multiCastDataSink = (MultiCastDataSink) 
multiCastFragment.getSink();
         Preconditions.checkState(multiCastDataSink != null, "invalid 
multiCastDataSink");
 
-        PhysicalCTEProducer cteProducer = 
context.getCteProduceMap().get(cteId);
+        PhysicalCTEProducer<?> cteProducer = 
context.getCteProduceMap().get(cteId);
         Preconditions.checkState(cteProducer != null, "invalid cteProducer");
 
-        ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), 
multiCastFragment.getPlanRoot());
-
-        DataStreamSink streamSink = new DataStreamSink(exchangeNode.getId());
-        streamSink.setPartition(DataPartition.RANDOM);
+        // set datasink to multicast data sink but do not set target now
+        // target will be set when translate distribute
+        DataStreamSink streamSink = new DataStreamSink();
         streamSink.setFragment(multiCastFragment);
-
         multiCastDataSink.getDataStreamSinks().add(streamSink);
         multiCastDataSink.getDestinations().add(Lists.newArrayList());
 
-        
exchangeNode.setNumInstances(multiCastFragment.getPlanRoot().getNumInstances());
-
-        PlanFragment consumeFragment = new 
PlanFragment(context.nextFragmentId(), exchangeNode,
-                multiCastFragment.getDataPartition());
-
-        Map<Slot, Slot> projectMap = Maps.newHashMap();
-        projectMap.putAll(cteConsumer.getProducerToConsumerSlotMap());
-
-        List<NamedExpression> execList = new ArrayList<>();
-        PlanNode inputPlanNode = consumeFragment.getPlanRoot();
-        List<Slot> cteProjects = cteProducer.getProjects();
-        for (Slot slot : cteProjects) {
-            if (projectMap.containsKey(slot)) {
-                execList.add(projectMap.get(slot));
-            } else {
-                throw new RuntimeException("could not find slot in cte 
producer consumer projectMap");
-            }
+        // update expr to slot mapping
+        for (Slot producerSlot : cteProducer.getProjects()) {
+            Slot consumerSlot = 
cteConsumer.getProducerToConsumerSlotMap().get(producerSlot);
+            SlotRef slotRef = context.findSlotRef(producerSlot.getExprId());
+            context.addExprIdSlotRefPair(consumerSlot.getExprId(), slotRef);
         }
-
-        List<Slot> slotList = execList
-                .stream()
-                .map(NamedExpression::toSlot)
-                .collect(Collectors.toList());
-
-        TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, 
context);
-
-        // update tuple list and tblTupleList
-        inputPlanNode.getTupleIds().clear();
-        inputPlanNode.getTupleIds().add(tupleDescriptor.getId());
-        inputPlanNode.getTblRefIds().clear();
-        inputPlanNode.getTblRefIds().add(tupleDescriptor.getId());
-        inputPlanNode.getNullableTupleIds().clear();
-        inputPlanNode.getNullableTupleIds().add(tupleDescriptor.getId());
-
-        List<Expr> execExprList = execList
-                .stream()
-                .map(e -> ExpressionTranslator.translate(e, context))
-                .collect(Collectors.toList());
-
-        inputPlanNode.setProjectList(execExprList);
-        inputPlanNode.setOutputTupleDesc(tupleDescriptor);
-
-        // update data partition
-        consumeFragment.setDataPartition(DataPartition.RANDOM);
-
-        SelectNode projectNode = new SelectNode(context.nextPlanNodeId(), 
inputPlanNode);
-        consumeFragment.setPlanRoot(projectNode);
-
-        multiCastFragment.getDestNodeList().add(exchangeNode);
-        consumeFragment.addChild(multiCastFragment);
-        context.getPlanFragments().add(consumeFragment);
-
-        return consumeFragment;
+        return multiCastFragment;
     }
 
     @Override
@@ -859,6 +825,17 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         }
         PlanFragment inputFragment = filter.child(0).accept(this, context);
 
+        // process multicast sink
+        if (inputFragment instanceof MultiCastPlanFragment) {
+            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) 
inputFragment.getSink();
+            DataStreamSink dataStreamSink = 
multiCastDataSink.getDataStreamSinks().get(
+                    multiCastDataSink.getDataStreamSinks().size() - 1);
+            filter.getConjuncts().stream()
+                    .map(e -> ExpressionTranslator.translate(e, context))
+                    .forEach(dataStreamSink::addConjunct);
+            return inputFragment;
+        }
+
         PlanNode planNode = inputFragment.getPlanRoot();
         if (planNode instanceof ExchangeNode || planNode instanceof SortNode 
|| planNode instanceof UnionNode) {
             // the three nodes don't support conjuncts, need create a 
SelectNode to filter data
@@ -1397,19 +1374,31 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 ((AbstractPhysicalJoin<?, ?>) 
project.child(0).child(0)).setShouldTranslateOutput(false);
             }
         }
+
         PlanFragment inputFragment = project.child(0).accept(this, context);
+
         List<Expr> execExprList = project.getProjects()
                 .stream()
                 .map(e -> ExpressionTranslator.translate(e, context))
                 .collect(Collectors.toList());
         // TODO: fix the project alias of an aliased relation.
-
-        PlanNode inputPlanNode = inputFragment.getPlanRoot();
         List<Slot> slotList = project.getProjects()
                 .stream()
                 .map(NamedExpression::toSlot)
                 .collect(Collectors.toList());
 
+        // process multicast sink
+        if (inputFragment instanceof MultiCastPlanFragment) {
+            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) 
inputFragment.getSink();
+            DataStreamSink dataStreamSink = 
multiCastDataSink.getDataStreamSinks().get(
+                    multiCastDataSink.getDataStreamSinks().size() - 1);
+            TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, 
null, context);
+            dataStreamSink.setProjections(execExprList);
+            dataStreamSink.setOutputTupleDesc(tupleDescriptor);
+            return inputFragment;
+        }
+
+        PlanNode inputPlanNode = inputFragment.getPlanRoot();
         List<Expr> predicateList = inputPlanNode.getConjuncts();
         Set<SlotId> requiredSlotIdSet = Sets.newHashSet();
         for (Expr expr : execExprList) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 2695b67a93..158eaa67f2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -114,7 +114,7 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
     public PhysicalProperties visitPhysicalCTEConsumer(
             PhysicalCTEConsumer cteConsumer, PlanContext context) {
         Preconditions.checkState(childrenOutputProperties.size() == 0);
-        return PhysicalProperties.ANY;
+        return PhysicalProperties.MUST_SHUFFLE;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index bb5ce1eec6..811c26569a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -27,9 +27,11 @@ import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.plans.AggMode;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.util.JoinUtils;
@@ -74,16 +76,33 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
 
     @Override
     public Boolean visit(Plan plan, Void context) {
+        // process must shuffle
+        for (int i = 0; i < children.size(); i++) {
+            DistributionSpec distributionSpec = 
childrenProperties.get(i).getDistributionSpec();
+            if (distributionSpec instanceof DistributionSpecMustShuffle) {
+                updateChildEnforceAndCost(i, PhysicalProperties.EXECUTION_ANY);
+            }
+        }
         return true;
     }
 
     @Override
     public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends 
Plan> agg, Void context) {
+        // forbid one phase agg on distribute
         if (agg.getAggMode() == AggMode.INPUT_TO_RESULT
                 && children.get(0).getPlan() instanceof PhysicalDistribute) {
             // this means one stage gather agg, usually bad pattern
             return false;
         }
+        // process must shuffle
+        visit(agg, context);
+        // process agg
+        return true;
+    }
+
+    @Override
+    public Boolean visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, 
Void context) {
+        // do not process must shuffle
         return true;
     }
 
@@ -93,6 +112,9 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         Preconditions.checkArgument(children.size() == 2, "children.size() != 
2");
         Preconditions.checkArgument(childrenProperties.size() == 2);
         Preconditions.checkArgument(requiredProperties.size() == 2);
+        // process must shuffle
+        visit(hashJoin, context);
+        // process hash join
         DistributionSpec leftDistributionSpec = 
childrenProperties.get(0).getDistributionSpec();
         DistributionSpec rightDistributionSpec = 
childrenProperties.get(1).getDistributionSpec();
 
@@ -229,6 +251,9 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         Preconditions.checkArgument(children.size() == 2, 
String.format("children.size() is %d", children.size()));
         Preconditions.checkArgument(childrenProperties.size() == 2);
         Preconditions.checkArgument(requiredProperties.size() == 2);
+        // process must shuffle
+        visit(nestedLoopJoin, context);
+        // process nlj
         DistributionSpec rightDistributionSpec = 
childrenProperties.get(1).getDistributionSpec();
         if (rightDistributionSpec instanceof DistributionSpecStorageGather) {
             updateChildEnforceAndCost(1, PhysicalProperties.GATHER);
@@ -236,8 +261,17 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         return true;
     }
 
+    @Override
+    public Boolean visitPhysicalProject(PhysicalProject<? extends Plan> 
project, Void context) {
+        // do not process must shuffle
+        return true;
+    }
+
     @Override
     public Boolean visitPhysicalSetOperation(PhysicalSetOperation 
setOperation, Void context) {
+        // process must shuffle
+        visit(setOperation, context);
+        // process set operation
         if (children.isEmpty()) {
             return true;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java
new file mode 100644
index 0000000000..8f718fbb9d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+/**
+ * present data must use after shuffle
+ */
+public class DistributionSpecMustShuffle extends DistributionSpec {
+
+    public static final DistributionSpecMustShuffle INSTANCE = new 
DistributionSpecMustShuffle();
+
+    public DistributionSpecMustShuffle() {
+        super();
+    }
+
+    @Override
+    public boolean satisfy(DistributionSpec other) {
+        return other instanceof DistributionSpecAny;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index 28bf347977..e3b5151af7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -44,6 +44,8 @@ public class PhysicalProperties {
 
     public static PhysicalProperties STORAGE_GATHER = new 
PhysicalProperties(DistributionSpecStorageGather.INSTANCE);
 
+    public static PhysicalProperties MUST_SHUFFLE = new 
PhysicalProperties(DistributionSpecMustShuffle.INSTANCE);
+
     private final OrderSpec orderSpec;
 
     private final DistributionSpec distributionSpec;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
index 0f903d69d1..6a43694527 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
@@ -20,19 +20,38 @@
 
 package org.apache.doris.planner;
 
+import org.apache.doris.analysis.BitmapFilterPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TDataStreamSink;
 import org.apache.doris.thrift.TExplainLevel;
 
+import com.google.common.collect.Lists;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+
 /**
  * Data sink that forwards data to an exchange node.
  */
 public class DataStreamSink extends DataSink {
-    private final PlanNodeId exchNodeId;
+
+    private PlanNodeId exchNodeId;
 
     private DataPartition outputPartition;
 
+    protected TupleDescriptor outputTupleDesc;
+
+    protected List<Expr> projections;
+
+    protected List<Expr> conjuncts = Lists.newArrayList();
+
+    public DataStreamSink() {
+
+    }
+
     public DataStreamSink(PlanNodeId exchNodeId) {
         this.exchNodeId = exchNodeId;
     }
@@ -42,23 +61,66 @@ public class DataStreamSink extends DataSink {
         return exchNodeId;
     }
 
+    public void setExchNodeId(PlanNodeId exchNodeId) {
+        this.exchNodeId = exchNodeId;
+    }
+
     @Override
     public DataPartition getOutputPartition() {
         return outputPartition;
     }
 
-    public void setPartition(DataPartition partition) {
-        outputPartition = partition;
+    public void setOutputPartition(DataPartition outputPartition) {
+        this.outputPartition = outputPartition;
+    }
+
+    public TupleDescriptor getOutputTupleDesc() {
+        return outputTupleDesc;
+    }
+
+    public void setOutputTupleDesc(TupleDescriptor outputTupleDesc) {
+        this.outputTupleDesc = outputTupleDesc;
+    }
+
+    public List<Expr> getProjections() {
+        return projections;
+    }
+
+    public void setProjections(List<Expr> projections) {
+        this.projections = projections;
+    }
+
+    public List<Expr> getConjuncts() {
+        return conjuncts;
+    }
+
+    public void setConjuncts(List<Expr> conjuncts) {
+        this.conjuncts = conjuncts;
+    }
+
+    public void addConjunct(Expr conjunct) {
+        this.conjuncts.add(conjunct);
     }
 
     @Override
     public String getExplainString(String prefix, TExplainLevel explainLevel) {
         StringBuilder strBuilder = new StringBuilder();
-        strBuilder.append(prefix + "STREAM DATA SINK\n");
-        strBuilder.append(prefix + "  EXCHANGE ID: " + exchNodeId + "\n");
+        strBuilder.append(prefix).append("STREAM DATA SINK\n");
+        strBuilder.append(prefix).append("  EXCHANGE ID: ").append(exchNodeId);
         if (outputPartition != null) {
-            strBuilder.append(prefix + "  " + 
outputPartition.getExplainString(explainLevel));
+            strBuilder.append("\n").append(prefix).append("  
").append(outputPartition.getExplainString(explainLevel));
+        }
+        if (!conjuncts.isEmpty()) {
+            Expr expr = 
PlanNode.convertConjunctsToAndCompoundPredicate(conjuncts);
+            strBuilder.append(prefix).append("  CONJUNCTS: 
").append(expr.toSql()).append("\n");
+        }
+        if (!CollectionUtils.isEmpty(projections)) {
+            strBuilder.append(prefix).append("  PROJECTIONS: ")
+                    
.append(PlanNode.getExplainString(projections)).append("\n");
+            strBuilder.append(prefix).append("  PROJECTION TUPLE: 
").append(outputTupleDesc.getId());
+            strBuilder.append("\n");
         }
+
         return strBuilder.toString();
     }
 
@@ -67,6 +129,19 @@ public class DataStreamSink extends DataSink {
         TDataSink result = new TDataSink(TDataSinkType.DATA_STREAM_SINK);
         TDataStreamSink tStreamSink =
                 new TDataStreamSink(exchNodeId.asInt(), 
outputPartition.toThrift());
+        for (Expr e : conjuncts) {
+            if  (!(e instanceof BitmapFilterPredicate)) {
+                tStreamSink.addToConjuncts(e.treeToThrift());
+            }
+        }
+        if (projections != null) {
+            for (Expr expr : projections) {
+                tStreamSink.addToOutputExprs(expr.treeToThrift());
+            }
+        }
+        if (outputTupleDesc != null) {
+            tStreamSink.setOutputTupleId(outputTupleDesc.getId().asInt());
+        }
         result.setStreamSink(tStreamSink);
         return result;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 30bf9eb45d..6694a05219 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -108,15 +108,21 @@ public class ExchangeNode extends PlanNode {
     public final void computeTupleIds() {
         PlanNode inputNode = getChild(0);
         TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc();
+        updateTupleIds(outputTupleDesc);
+    }
+
+    public void updateTupleIds(TupleDescriptor outputTupleDesc) {
         if (outputTupleDesc != null) {
             tupleIds.clear();
             tupleIds.add(outputTupleDesc.getId());
+            tblRefIds.add(outputTupleDesc.getId());
+            nullableTupleIds.add(outputTupleDesc.getId());
         } else {
             clearTupleIds();
             tupleIds.addAll(getChild(0).getTupleIds());
+            tblRefIds.addAll(getChild(0).getTblRefIds());
+            nullableTupleIds.addAll(getChild(0).getNullableTupleIds());
         }
-        tblRefIds.addAll(getChild(0).getTblRefIds());
-        nullableTupleIds.addAll(getChild(0).getNullableTupleIds());
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java
index 0d5b54b269..e52bbb0557 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java
@@ -35,10 +35,11 @@ public class MultiCastPlanFragment extends PlanFragment {
         this.children.addAll(planFragment.getChildren());
     }
 
-    public List<ExchangeNode> getDestNodeList() {
-        return destNodeList;
+    public void addToDest(ExchangeNode exchangeNode) {
+        destNodeList.add(exchangeNode);
     }
 
+
     public List<PlanFragment> getDestFragmentList() {
         return 
destNodeList.stream().map(PlanNode::getFragment).collect(Collectors.toList());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index 54903beae5..64ac4c3051 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -256,7 +256,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
             Preconditions.checkState(sink == null);
             // we're streaming to an exchange node
             DataStreamSink streamSink = new DataStreamSink(destNode.getId());
-            streamSink.setPartition(outputPartition);
+            streamSink.setOutputPartition(outputPartition);
             streamSink.setFragment(this);
             sink = streamSink;
         } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index ab17c0acec..cf5d14cb5e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -415,7 +415,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
         }
     }
 
-    protected Expr convertConjunctsToAndCompoundPredicate(List<Expr> 
conjuncts) {
+    public static Expr convertConjunctsToAndCompoundPredicate(List<Expr> 
conjuncts) {
         List<Expr> targetConjuncts = Lists.newArrayList(conjuncts);
         while (targetConjuncts.size() > 1) {
             List<Expr> newTargetConjuncts = Lists.newArrayList();
@@ -824,7 +824,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
         return output.toString();
     }
 
-    protected String getExplainString(List<? extends Expr> exprs) {
+    public static String getExplainString(List<? extends Expr> exprs) {
         if (exprs == null) {
             return "";
         }
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index af5d4d26a3..c78a7900a9 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -153,6 +153,18 @@ struct TDataStreamSink {
   2: required Partitions.TDataPartition output_partition
 
   3: optional bool ignore_not_found
+
+    // per-destination projections
+    4: optional list<Exprs.TExpr> output_exprs
+
+    // project output tuple id
+    5: optional Types.TTupleId output_tuple_id
+
+    // per-destination filters
+    6: optional list<Exprs.TExpr> conjuncts
+
+    // per-destination runtime filters
+    7: optional list<PlanNodes.TRuntimeFilterDesc> runtime_filters
 }
 
 struct TMultiCastDataStreamSink {
diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query1.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query1.out
index 16ab4a00e2..838ba573be 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query1.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query1.out
@@ -25,16 +25,17 @@ CteAnchor[cteId= ( CTEId#2=] )
 --------------PhysicalProject
 ----------------hashJoin[LEFT_SEMI_JOIN](ctr1.ctr_store_sk = 
ctr2.ctr_store_sk)(cast(ctr_total_return as DOUBLE) > 
cast((avg(ctr_total_return) * 1.2) as DOUBLE))
 ------------------hashJoin[INNER_JOIN](store.s_store_sk = ctr1.ctr_store_sk)
---------------------CteConsumer[cteId= ( CTEId#2=] )
+--------------------PhysicalDistribute
+----------------------CteConsumer[cteId= ( CTEId#2=] )
 --------------------PhysicalDistribute
 ----------------------PhysicalProject
 ------------------------filter((cast(s_state as VARCHAR(*)) = 'SD'))
 --------------------------PhysicalOlapScan[store]
-------------------PhysicalDistribute
---------------------PhysicalProject
-----------------------hashAgg[GLOBAL]
-------------------------PhysicalDistribute
---------------------------hashAgg[LOCAL]
+------------------PhysicalProject
+--------------------hashAgg[GLOBAL]
+----------------------PhysicalDistribute
+------------------------hashAgg[LOCAL]
+--------------------------PhysicalDistribute
 ----------------------------PhysicalProject
 ------------------------------CteConsumer[cteId= ( CTEId#2=] )
 
diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query2.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query2.out
index e970e12f97..13e0f2f0e9 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query2.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query2.out
@@ -24,20 +24,22 @@ CteAnchor[cteId= ( CTEId#4=] )
 ----------hashJoin[INNER_JOIN](expr_cast(d_week_seq1 as BIGINT) = 
expr_(d_week_seq2 - 53))
 ------------PhysicalDistribute
 --------------PhysicalProject
-----------------hashJoin[INNER_JOIN](date_dim.d_week_seq = d_week_seq1)
-------------------PhysicalProject
---------------------CteConsumer[cteId= ( CTEId#4=] )
+----------------hashJoin[INNER_JOIN](date_dim.d_week_seq = d_week_seq2)
 ------------------PhysicalDistribute
 --------------------PhysicalProject
-----------------------filter((date_dim.d_year = 1998))
+----------------------CteConsumer[cteId= ( CTEId#4=] )
+------------------PhysicalDistribute
+--------------------PhysicalProject
+----------------------filter((date_dim.d_year = 1999))
 ------------------------PhysicalOlapScan[date_dim]
 ------------PhysicalDistribute
 --------------PhysicalProject
-----------------hashJoin[INNER_JOIN](date_dim.d_week_seq = d_week_seq2)
-------------------PhysicalProject
---------------------CteConsumer[cteId= ( CTEId#4=] )
+----------------hashJoin[INNER_JOIN](date_dim.d_week_seq = d_week_seq1)
 ------------------PhysicalDistribute
 --------------------PhysicalProject
-----------------------filter((date_dim.d_year = 1999))
+----------------------CteConsumer[cteId= ( CTEId#4=] )
+------------------PhysicalDistribute
+--------------------PhysicalProject
+----------------------filter((date_dim.d_year = 1998))
 ------------------------PhysicalOlapScan[date_dim]
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query24.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query24.out
index c887e96371..56bad7457e 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query24.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query24.out
@@ -39,15 +39,17 @@ CteAnchor[cteId= ( CTEId#0=] )
 ------------hashAgg[GLOBAL]
 --------------PhysicalDistribute
 ----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------filter((cast(i_color as VARCHAR(*)) = 'beige'))
-----------------------CteConsumer[cteId= ( CTEId#0=] )
+------------------PhysicalDistribute
+--------------------PhysicalProject
+----------------------filter((cast(i_color as VARCHAR(*)) = 'beige'))
+------------------------CteConsumer[cteId= ( CTEId#0=] )
 ------------PhysicalDistribute
 --------------PhysicalAssertNumRows
 ----------------PhysicalProject
 ------------------hashAgg[GLOBAL]
 --------------------PhysicalDistribute
 ----------------------hashAgg[LOCAL]
-------------------------PhysicalProject
---------------------------CteConsumer[cteId= ( CTEId#0=] )
+------------------------PhysicalDistribute
+--------------------------PhysicalProject
+----------------------------CteConsumer[cteId= ( CTEId#0=] )
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query30.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query30.out
index ed1c8ba9cb..7338d341dc 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query30.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query30.out
@@ -40,6 +40,7 @@ CteAnchor[cteId= ( CTEId#2=] )
 ----------------hashAgg[GLOBAL]
 ------------------PhysicalDistribute
 --------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------CteConsumer[cteId= ( CTEId#2=] )
+----------------------PhysicalDistribute
+------------------------PhysicalProject
+--------------------------CteConsumer[cteId= ( CTEId#2=] )
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query47.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query47.out
index 5af21c50d8..49fc532871 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query47.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query47.out
@@ -35,13 +35,15 @@ CteAnchor[cteId= ( CTEId#0=] )
 --------PhysicalTopN
 ----------PhysicalProject
 ------------hashJoin[INNER_JOIN](s_store_name = 
v1_lead.s_store_name)(v1.i_category = v1_lead.i_category)(v1.i_brand = 
v1_lead.i_brand)(v1.s_company_name = v1_lead.s_company_name)(v1.rn = expr_(rn - 
1))
---------------PhysicalProject
-----------------CteConsumer[cteId= ( CTEId#0=] )
+--------------PhysicalDistribute
+----------------PhysicalProject
+------------------CteConsumer[cteId= ( CTEId#0=] )
 --------------PhysicalDistribute
 ----------------PhysicalProject
 ------------------hashJoin[INNER_JOIN](s_store_name = 
v1_lag.s_store_name)(v1.i_category = v1_lag.i_category)(v1.i_brand = 
v1_lag.i_brand)(v1.s_company_name = v1_lag.s_company_name)(v1.rn = expr_(rn + 
1))
---------------------PhysicalProject
-----------------------CteConsumer[cteId= ( CTEId#0=] )
+--------------------PhysicalDistribute
+----------------------PhysicalProject
+------------------------CteConsumer[cteId= ( CTEId#0=] )
 --------------------PhysicalDistribute
 ----------------------PhysicalProject
 ------------------------filter((CASE WHEN (avg_monthly_sales > 0.0000) THEN 
(abs((cast(sum_sales as DOUBLE) - cast(avg_monthly_sales as DOUBLE))) / 
cast(avg_monthly_sales as DOUBLE)) ELSE NULL END > 0.1)(v2.d_year = 
2001)(v2.avg_monthly_sales > 0.0000))
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query57.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query57.out
index 6138c0d491..1b3a0610a4 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query57.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query57.out
@@ -35,13 +35,15 @@ CteAnchor[cteId= ( CTEId#0=] )
 --------PhysicalTopN
 ----------PhysicalProject
 ------------hashJoin[INNER_JOIN](i_brand = v1_lead.i_brand)(v1.i_category = 
v1_lead.i_category)(v1.cc_name = v1_lead.cc_name)(v1.rn = expr_(rn - 1))
---------------PhysicalProject
-----------------CteConsumer[cteId= ( CTEId#0=] )
+--------------PhysicalDistribute
+----------------PhysicalProject
+------------------CteConsumer[cteId= ( CTEId#0=] )
 --------------PhysicalDistribute
 ----------------PhysicalProject
 ------------------hashJoin[INNER_JOIN](i_brand = v1_lag.i_brand)(v1.i_category 
= v1_lag.i_category)(v1.cc_name = v1_lag.cc_name)(v1.rn = expr_(rn + 1))
---------------------PhysicalProject
-----------------------CteConsumer[cteId= ( CTEId#0=] )
+--------------------PhysicalDistribute
+----------------------PhysicalProject
+------------------------CteConsumer[cteId= ( CTEId#0=] )
 --------------------PhysicalDistribute
 ----------------------PhysicalProject
 ------------------------filter((CASE WHEN (avg_monthly_sales > 0.0000) THEN 
(abs((cast(sum_sales as DOUBLE) - cast(avg_monthly_sales as DOUBLE))) / 
cast(avg_monthly_sales as DOUBLE)) ELSE NULL END > 0.1)(v2.d_year = 
1999)(v2.avg_monthly_sales > 0.0000))
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query81.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query81.out
index 44ad4b9321..bb72d81784 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query81.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query81.out
@@ -25,7 +25,8 @@ CteAnchor[cteId= ( CTEId#2=] )
 --------PhysicalProject
 ----------hashJoin[LEFT_SEMI_JOIN](ctr1.ctr_state = 
ctr2.ctr_state)(cast(ctr_total_return as DOUBLE) > cast((avg(ctr_total_return) 
* 1.2) as DOUBLE))
 ------------hashJoin[INNER_JOIN](ctr1.ctr_customer_sk = customer.c_customer_sk)
---------------CteConsumer[cteId= ( CTEId#2=] )
+--------------PhysicalDistribute
+----------------CteConsumer[cteId= ( CTEId#2=] )
 --------------PhysicalDistribute
 ----------------hashJoin[INNER_JOIN](customer_address.ca_address_sk = 
customer.c_current_addr_sk)
 ------------------PhysicalProject
@@ -39,6 +40,7 @@ CteAnchor[cteId= ( CTEId#2=] )
 ----------------hashAgg[GLOBAL]
 ------------------PhysicalDistribute
 --------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------CteConsumer[cteId= ( CTEId#2=] )
+----------------------PhysicalDistribute
+------------------------PhysicalProject
+--------------------------CteConsumer[cteId= ( CTEId#2=] )
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out
index d6ec3488d9..aa1194fe22 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out
@@ -18,19 +18,19 @@ CteAnchor[cteId= ( CTEId#3=] )
 ------------hashAgg[LOCAL]
 --------------PhysicalProject
 ----------------hashJoin[INNER_JOIN](ws1.ws_ship_date_sk = date_dim.d_date_sk)
-------------------hashJoin[RIGHT_SEMI_JOIN](ws1.ws_order_number = 
web_returns.wr_order_number)
+------------------hashJoin[RIGHT_SEMI_JOIN](ws1.ws_order_number = 
ws_wh.ws_order_number)
 --------------------PhysicalDistribute
 ----------------------PhysicalProject
+------------------------CteConsumer[cteId= ( CTEId#3=] )
+--------------------hashJoin[RIGHT_SEMI_JOIN](ws1.ws_order_number = 
web_returns.wr_order_number)
+----------------------PhysicalProject
 ------------------------hashJoin[INNER_JOIN](web_returns.wr_order_number = 
ws_wh.ws_order_number)
---------------------------PhysicalProject
-----------------------------CteConsumer[cteId= ( CTEId#3=] )
+--------------------------PhysicalDistribute
+----------------------------PhysicalProject
+------------------------------CteConsumer[cteId= ( CTEId#3=] )
 --------------------------PhysicalDistribute
 ----------------------------PhysicalProject
 ------------------------------PhysicalOlapScan[web_returns]
---------------------hashJoin[RIGHT_SEMI_JOIN](ws1.ws_order_number = 
ws_wh.ws_order_number)
-----------------------PhysicalDistribute
-------------------------PhysicalProject
---------------------------CteConsumer[cteId= ( CTEId#3=] )
 ----------------------PhysicalDistribute
 ------------------------hashJoin[INNER_JOIN](ws1.ws_web_site_sk = 
web_site.web_site_sk)
 --------------------------hashJoin[INNER_JOIN](ws1.ws_ship_addr_sk = 
customer_address.ca_address_sk)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to