This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 37cef80e0cc [opt](profile) change the node name format of 
MultiCastDataStreamer (#59422)
37cef80e0cc is described below

commit 37cef80e0cc60b3d9c5adfa781660b729112df83
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Dec 29 09:35:04 2025 +0800

    [opt](profile) change the node name format of MultiCastDataStreamer (#59422)
    
    before:
    ```
    MULTI_CAST_DATA_STREAM_SINK_OPERATOR(id=-1)
    MULTI_CAST_DATA_STREAM_SOURCE_OPERATOR(id=-1)
    MULTI_CAST_DATA_STREAM_SOURCE_OPERATOR(id=-1)
    ```
    
    After:
    ```
    MULTI_CAST_DATA_STREAM_SINK_OPERATOR(dest_id=-2, dest_id=-3)(id=-1)
    MULTI_CAST_DATA_STREAM_SOURCE_OPERATOR(id=-2)
    MULTI_CAST_DATA_STREAM_SOURCE_OPERATOR(id=-3)
    ```
    
    So that we can find the relation between sink and source node
---
 be/src/pipeline/exec/multi_cast_data_stream_sink.cpp | 11 ++++++++++-
 be/src/pipeline/pipeline_fragment_context.cpp        |  5 +++--
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
index ee3c3c8055e..362de4fb42b 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
@@ -26,7 +26,16 @@ namespace doris::pipeline {
 
 std::string MultiCastDataStreamSinkLocalState::name_suffix() {
     auto* parent = static_cast<MultiCastDataStreamSinkOperatorX*>(_parent);
-    return fmt::format(operator_name_suffix, parent->operator_id());
+    auto& dest_ids = parent->dests_id();
+    std::string result = "(";
+    for (size_t i = 0; i < dest_ids.size(); ++i) {
+        if (i > 0) {
+            result += ", ";
+        }
+        result += fmt::format("dest_id={}", dest_ids[i]);
+    }
+    result += ")";
+    return fmt::format(result + operator_name_suffix, parent->operator_id());
 }
 
 std::shared_ptr<BasicSharedState> 
MultiCastDataStreamSinkOperatorX::create_shared_state() const {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 59d557fd0d8..ed6f3708d09 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1139,8 +1139,9 @@ Status 
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
             OperatorPtr source_op;
             // 1. create and set the source operator of 
multi_cast_data_stream_source for new pipeline
             source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
-                    multi_cast_node_id, i, pool, 
thrift_sink.multi_cast_stream_sink.sinks[i],
-                    row_desc, /*operator_id=*/source_id);
+                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
+                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
+                    /*operator_id=*/source_id);
             RETURN_IF_ERROR(new_pipeline->add_operator(
                     source_op, params.__isset.parallel_instances ? 
params.parallel_instances : 0));
             // 2. create and set sink operator of data stream sender for new 
pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to