This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 42eee25cfb9 [Feature](pipeline) Trace pipeline schedule (part II) - 
visualization demo (#31301)
42eee25cfb9 is described below

commit 42eee25cfb964d16a83d4238386555219a8842c3
Author: zclllyybb <zhaochan...@selectdb.com>
AuthorDate: Sun Apr 28 20:18:15 2024 +0800

    [Feature](pipeline) Trace pipeline schedule (part II) - visualization demo 
(#31301)
---
 be/src/pipeline/pipeline_tracing.cpp     | 119 ++++++++++++++++---------------
 be/src/pipeline/pipeline_tracing.h       |  13 ++--
 be/src/pipeline/task_scheduler.cpp       |   3 +-
 tools/pipeline-tracing/README.md         |  36 ++++++++++
 tools/pipeline-tracing/origin-to-show.py |  85 ++++++++++++++++++++++
 5 files changed, 191 insertions(+), 65 deletions(-)

diff --git a/be/src/pipeline/pipeline_tracing.cpp 
b/be/src/pipeline/pipeline_tracing.cpp
index efe9667f2b5..abedc4ff4f4 100644
--- a/be/src/pipeline/pipeline_tracing.cpp
+++ b/be/src/pipeline/pipeline_tracing.cpp
@@ -19,6 +19,7 @@
 
 #include <absl/time/clock.h>
 #include <fcntl.h>
+#include <sys/stat.h>
 
 #include <boost/algorithm/string/predicate.hpp>
 #include <chrono>
@@ -34,6 +35,8 @@
 
 namespace doris::pipeline {
 
+std::filesystem::path log_dir = fmt::format("{}/pipe_tracing", 
getenv("LOG_DIR"));
+
 void PipelineTracerContext::record(ScheduleRecord record) {
     if (_dump_type == RecordType::None) [[unlikely]] {
         return;
@@ -41,6 +44,7 @@ void PipelineTracerContext::record(ScheduleRecord record) {
     if (_datas.contains(record.query_id)) {
         _datas[record.query_id].enqueue(record);
     } else {
+        // dump per timeslice may cause this. lead perv records broken. that's 
acceptable
         std::unique_lock<std::mutex> l(_data_lock); // add new item, may rehash
         _datas[record.query_id].enqueue(record);
     }
@@ -52,12 +56,12 @@ void PipelineTracerContext::end_query(TUniqueId query_id, 
uint64_t workload_grou
         _id_to_workload_group[query_id] = workload_group;
     }
     if (_dump_type == RecordType::PerQuery) {
-        _dump(query_id);
+        _dump_query(query_id);
     } else if (_dump_type == RecordType::Periodic) {
         auto now = MonotonicSeconds();
         auto interval = now - _last_dump_time;
         if (interval > _dump_interval_s) {
-            _dump(query_id);
+            _dump_timeslice();
         }
     }
 }
@@ -90,75 +94,76 @@ Status PipelineTracerContext::change_record_params(
                                "No qualified param in changing tracing record 
method");
 }
 
-void PipelineTracerContext::_dump(TUniqueId query_id) {
-    if (_dump_type == RecordType::None) {
-        return;
-    }
-
-    std::filesystem::path log_dir = fmt::format("{}/pipe_tracing", 
getenv("LOG_DIR"));
+void PipelineTracerContext::_dump_query(TUniqueId query_id) {
     //TODO: when dump, now could append records but can't add new query. try 
use better grained locks.
     std::unique_lock<std::mutex> l(_data_lock); // can't rehash
-    if (_dump_type == RecordType::PerQuery) {
-        auto path = log_dir / fmt::format("query{}", to_string(query_id));
-        int fd = ::open(
-                path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
-                S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | 
S_IWOTH | S_IROTH);
-        if (fd < 0) [[unlikely]] {
-            throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
-                    "create tracing log file {} failed", path.c_str()));
+    auto path = log_dir / fmt::format("query{}", to_string(query_id));
+    int fd = ::open(path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
+                    S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP 
| S_IWOTH | S_IROTH);
+    if (fd < 0) [[unlikely]] {
+        throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
+                "create tracing log file {} failed", path.c_str()));
+    }
+    auto writer = io::LocalFileWriter {path, fd};
+
+    ScheduleRecord record;
+    while (_datas[query_id].try_dequeue(record)) {
+        uint64_t v = 0;
+        {
+            std::unique_lock<std::mutex> l(_tg_lock);
+            v = _id_to_workload_group.at(query_id);
         }
-        auto writer = io::LocalFileWriter {path, fd};
+        auto tmp_str = record.to_string(v);
+        auto text = Slice {tmp_str};
+        THROW_IF_ERROR(writer.appendv(&text, 1));
+    }
+
+    THROW_IF_ERROR(writer.finalize());
+    THROW_IF_ERROR(writer.close());
+
+    _last_dump_time = MonotonicSeconds();
+
+    _datas.erase(query_id);
+    {
+        std::unique_lock<std::mutex> l(_tg_lock);
+        _id_to_workload_group.erase(query_id);
+    }
+}
+
+void PipelineTracerContext::_dump_timeslice() {
+    std::unique_lock<std::mutex> l(_data_lock); // can't rehash
 
+    //TODO: if long time, per timeslice per file
+    auto path = log_dir /
+                fmt::format("until{}", 
std::chrono::steady_clock::now().time_since_epoch().count());
+    int fd = ::open(path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
+                    S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP 
| S_IWOTH | S_IROTH);
+    if (fd < 0) [[unlikely]] {
+        throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
+                "create tracing log file {} failed", path.c_str()));
+    }
+    auto writer = io::LocalFileWriter {path, fd};
+
+    // dump all query traces in this time window to one file.
+    for (auto& [query_id, trace] : _datas) {
         ScheduleRecord record;
-        while (_datas[query_id].try_dequeue(record)) {
+        while (trace.try_dequeue(record)) {
             uint64_t v = 0;
             {
                 std::unique_lock<std::mutex> l(_tg_lock);
-                v = _id_to_workload_group[query_id];
+                v = _id_to_workload_group.at(query_id);
             }
             auto tmp_str = record.to_string(v);
             auto text = Slice {tmp_str};
             THROW_IF_ERROR(writer.appendv(&text, 1));
         }
-
-        THROW_IF_ERROR(writer.finalize());
-        THROW_IF_ERROR(writer.close());
-    } else if (_dump_type == RecordType::Periodic) {
-        auto path =
-                log_dir /
-                fmt::format("until{}", 
std::chrono::steady_clock::now().time_since_epoch().count());
-        int fd = ::open(
-                path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
-                S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | 
S_IWOTH | S_IROTH);
-        if (fd < 0) [[unlikely]] {
-            throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
-                    "create tracing log file {} failed", path.c_str()));
-        }
-        auto writer = io::LocalFileWriter {path, fd};
-
-        for (auto& [id, trace] : _datas) {
-            ScheduleRecord record;
-            while (trace.try_dequeue(record)) {
-                uint64_t v = 0;
-                {
-                    std::unique_lock<std::mutex> l(_tg_lock);
-                    v = _id_to_workload_group[query_id];
-                }
-                auto tmp_str = record.to_string(v);
-                auto text = Slice {tmp_str};
-                THROW_IF_ERROR(writer.appendv(&text, 1));
-            }
-        }
-        THROW_IF_ERROR(writer.finalize());
-        THROW_IF_ERROR(writer.close());
-
-        _last_dump_time = MonotonicSeconds();
     }
+    THROW_IF_ERROR(writer.finalize());
+    THROW_IF_ERROR(writer.close());
 
-    _datas.erase(query_id);
-    {
-        std::unique_lock<std::mutex> l(_tg_lock);
-        _id_to_workload_group.erase(query_id);
-    }
+    _last_dump_time = MonotonicSeconds();
+
+    _datas.clear();
+    _id_to_workload_group.clear();
 }
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_tracing.h 
b/be/src/pipeline/pipeline_tracing.h
index e4e4d4e4e7d..eb0f2ac684a 100644
--- a/be/src/pipeline/pipeline_tracing.h
+++ b/be/src/pipeline/pipeline_tracing.h
@@ -18,12 +18,11 @@
 #pragma once
 
 #include <concurrentqueue.h>
-#include <fmt/core.h>
+#include <fmt/format.h>
 #include <gen_cpp/Types_types.h>
 #include <parallel_hashmap/phmap.h>
 
 #include <cstdint>
-#include <filesystem>
 
 #include "common/config.h"
 #include "util/hash_util.hpp" // IWYU pragma: keep
@@ -51,8 +50,7 @@ struct ScheduleRecord {
 // all tracing datas of ONE specific query
 using OneQueryTraces = moodycamel::ConcurrentQueue<ScheduleRecord>;
 
-// belongs to exec_env, for all query, if enable
-// curl http://{host}:{web_server_port}/api/running_pipeline_tasks
+// belongs to exec_env, for all query, if enabled
 class PipelineTracerContext {
 public:
     enum class RecordType {
@@ -68,12 +66,15 @@ public:
     bool enabled() const { return !(_dump_type == RecordType::None); }
 
 private:
-    void _dump(TUniqueId query_id); // dump data to disk. one query or all.
+    // dump data to disk. one query or all.
+    void _dump_query(TUniqueId query_id);
+    void _dump_timeslice();
 
     std::mutex _data_lock; // lock for map, not map items.
     phmap::flat_hash_map<TUniqueId, OneQueryTraces> _datas;
     std::mutex _tg_lock; //TODO: use an lockfree DS
-    phmap::flat_hash_map<TUniqueId, uint64_t> _id_to_workload_group;
+    phmap::flat_hash_map<TUniqueId, uint64_t>
+            _id_to_workload_group; // save query's workload group number
 
     RecordType _dump_type = RecordType::None;
     decltype(MonotonicSeconds()) _last_dump_time;
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 0461999d185..78a728faaed 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -172,8 +172,7 @@ void TaskScheduler::_do_work(size_t index) {
 
                 uint64_t end_time = MonotonicMicros();
                 auto state = task->get_state();
-                std::string state_name =
-                        state == PipelineTaskState::RUNNABLE ? 
get_state_name(state) : "";
+                std::string state_name = get_state_name(state);
                 ExecEnv::GetInstance()->pipeline_tracer_context()->record(
                         {query_id, task_name, core_id, thread_id, start_time, 
end_time,
                          state_name});
diff --git a/tools/pipeline-tracing/README.md b/tools/pipeline-tracing/README.md
new file mode 100644
index 00000000000..f047d61a452
--- /dev/null
+++ b/tools/pipeline-tracing/README.md
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Pipeline Tracing and Display Tool
+
+In the Pipeline execution engine, we split the execution plan tree of each 
Instance into multiple small Pipeline Tasks and execute them under our custom 
Pipeline scheduler. Therefore, in an environment with a large number of 
Pipeline Tasks executing, how these Tasks are scheduled across threads and CPU 
cores is an important factor for execution performance. We have developed a 
specialised tool to observe the scheduling process on a particular query or 
time period, which we call "Pipeline [...]
+
+This tool converts record files to proper JSON format for visualization.
+
+## How to Use
+
+```shell
+python3 origin-to-show.py -s <SOURCE_FILE> -d <DEST>.json
+```
+to transfer record file `<SOURCE_FILE>` to `<DEST>.json`. Then it could be 
visualized.
+
+```shell
+python3 origin-to-show.py --help
+```
+for help details.
\ No newline at end of file
diff --git a/tools/pipeline-tracing/origin-to-show.py 
b/tools/pipeline-tracing/origin-to-show.py
new file mode 100644
index 00000000000..cbe68545e36
--- /dev/null
+++ b/tools/pipeline-tracing/origin-to-show.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import sys
+from typing import List
+import json
+
+class Record:
+    def __init__(self, query_id, task_name, core_id, thread_id, start_time, 
end_time, state_name, group_id) -> None:
+        self.query_id : str = query_id
+        self.task_name : str = task_name
+        self.core_id : int = int(core_id)
+        self.thread_id : int = int(thread_id)
+        self.start_time : int = int(start_time)
+        self.end_time : int = int(end_time)
+        self.state_name : str = state_name
+        self.group_id : int = int(group_id)
+
+    def print(self) :
+        print(f"query_id = {self.query_id}, task_name = {self.task_name}, 
start_time={self.start_time}")
+
+    def get_core(self) :
+        return 1 if same_core else self.core_id
+
+    def to_json(self) :
+        json = {"name": self.task_name, "cat": self.state_name, "ph": "X", 
"ts": self.start_time, "dur": self.end_time - self.start_time,
+                "pid": self.get_core(), "tid": self.thread_id}
+        return json
+
+def get_key(record : Record) -> int:
+    return record.start_time
+
+def print_header(file):
+    print(r'{"traceEvents":[', file=file)
+
+def print_footer(file):
+    print(r"]}", file=file)
+
+parser = argparse.ArgumentParser(description='Accept file to analyse. Use 
parameters to sepecific how to illustrate it.')
+parser.add_argument('-s', '--source', help='SOURCE as the path of tracing 
record file to analyze')
+parser.add_argument('-d', '--dest', help='DEST as the path of json result file 
you want to save')
+parser.add_argument('-n', '--no-core', action='store_true', help='combine the 
thread in one core group to display')
+args = parser.parse_args()
+
+records : List[Record] = []
+same_core : bool = args.no_core
+
+### get records
+try:
+    with open(args.source, "r") as file:
+        for line in file :
+            record = Record(*line.strip().split('|'))
+            records.append(record)
+except FileNotFoundError:
+    sys.exit(f"File '{args.source}' doesn't exist. Please check the path.")
+except Exception as e:
+    sys.exit(f"Error occured! {e}")
+
+records.sort(key=get_key)
+
+### print json
+try:
+    with open(args.dest, "w") as file: # overwrite file
+        print_header(file)
+        for record in records:
+            print(json.dumps(record.to_json(), sort_keys=True, indent=4, 
separators=(',', ':')), end=',\n', file=file)
+        print_footer(file)
+    print(f"Generate json to {args.dest} succeed!")
+except Exception as e:
+    print(f"Error occured! {e}")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to