This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 37cef80e0cc [opt](profile) change the node name format of
MultiCastDataStreamer (#59422)
37cef80e0cc is described below
commit 37cef80e0cc60b3d9c5adfa781660b729112df83
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Dec 29 09:35:04 2025 +0800
[opt](profile) change the node name format of MultiCastDataStreamer (#59422)
before:
```
MULTI_CAST_DATA_STREAM_SINK_OPERATOR(id=-1)
MULTI_CAST_DATA_STREAM_SOURCE_OPERATOR(id=-1)
MULTI_CAST_DATA_STREAM_SOURCE_OPERATOR(id=-1)
```
After:
```
MULTI_CAST_DATA_STREAM_SINK_OPERATOR(dest_id=-2, dest_id=-3)(id=-1)
MULTI_CAST_DATA_STREAM_SOURCE_OPERATOR(id=-2)
MULTI_CAST_DATA_STREAM_SOURCE_OPERATOR(id=-3)
```
So that we can find the relation between sink and source node
---
be/src/pipeline/exec/multi_cast_data_stream_sink.cpp | 11 ++++++++++-
be/src/pipeline/pipeline_fragment_context.cpp | 5 +++--
2 files changed, 13 insertions(+), 3 deletions(-)
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
index ee3c3c8055e..362de4fb42b 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
@@ -26,7 +26,16 @@ namespace doris::pipeline {
std::string MultiCastDataStreamSinkLocalState::name_suffix() {
auto* parent = static_cast<MultiCastDataStreamSinkOperatorX*>(_parent);
- return fmt::format(operator_name_suffix, parent->operator_id());
+ auto& dest_ids = parent->dests_id();
+ std::string result = "(";
+ for (size_t i = 0; i < dest_ids.size(); ++i) {
+ if (i > 0) {
+ result += ", ";
+ }
+ result += fmt::format("dest_id={}", dest_ids[i]);
+ }
+ result += ")";
+ return fmt::format(result + operator_name_suffix, parent->operator_id());
}
std::shared_ptr<BasicSharedState>
MultiCastDataStreamSinkOperatorX::create_shared_state() const {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 59d557fd0d8..ed6f3708d09 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1139,8 +1139,9 @@ Status
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
OperatorPtr source_op;
// 1. create and set the source operator of
multi_cast_data_stream_source for new pipeline
source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
- multi_cast_node_id, i, pool,
thrift_sink.multi_cast_stream_sink.sinks[i],
- row_desc, /*operator_id=*/source_id);
+ /*node_id*/ source_id, /*consumer_id*/ i, pool,
+ thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
+ /*operator_id=*/source_id);
RETURN_IF_ERROR(new_pipeline->add_operator(
source_op, params.__isset.parallel_instances ?
params.parallel_instances : 0));
// 2. create and set sink operator of data stream sender for new
pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]