This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b0b0c2fd6e0 [opt](paimon)Enhance the observability of split and JNI in 
profile (#49688)
b0b0c2fd6e0 is described below

commit b0b0c2fd6e00d94dbe3b113f9849ce6682b4a176
Author: wuwenchi <wuwen...@selectdb.com>
AuthorDate: Thu Apr 24 04:48:21 2025 +0800

    [opt](paimon)Enhance the observability of split and JNI in profile (#49688)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    1. Add the display of the weight of the splits assigned to the BE.
    2. Further subdivide the time of the usage of JNI into 'appendDataTime'
    and 'createVectorTableTime'.
    2. Add 'MaxTimeSplitWeight' to represent the splitid with the maximum
    time consumption.
    
    When we want to view the JNI indicators in profile, we need to set
    `profile_level=2`.
    
    ```
    fe:
          - Splits Assignment Weight: 
{"172.20.32.136:39051":4,"172.20.32.136:39052":5}
    
    
    be:
                     - FillBlockTime: 36.241us
                     - JavaAppendDataTime: 54.755us
                     - JavaCreateVectorTableTime: 191.816us
                     - JavaScanTime: 1.368ms
                     - MaxTimeSplitWeight: 460
                     - OpenScannerTime: 19.93ms
    ```
---
 be/src/util/runtime_profile.cpp                    | 25 ++++++++++++
 be/src/util/runtime_profile.h                      | 47 ++++++++++++++++++++++
 be/src/vec/exec/format/table/paimon_jni_reader.cpp |  3 +-
 be/src/vec/exec/jni_connector.cpp                  | 39 ++++++++++++++++--
 be/src/vec/exec/jni_connector.h                    | 15 ++++++-
 .../runtime_profile_counter_tree_node_test.cpp     | 23 +++++++++++
 .../org/apache/doris/common/jni/JniScanner.java    | 12 ++++++
 .../org/apache/doris/paimon/PaimonJniScanner.java  |  2 +
 .../org/apache/doris/common/profile/Profile.java   |  1 +
 .../doris/common/profile/SummaryProfile.java       | 35 +++++++++++++++-
 .../apache/doris/datasource/FileQueryScanNode.java | 17 +++++---
 .../org/apache/doris/datasource/FileSplit.java     |  4 ++
 .../datasource/paimon/source/PaimonScanNode.java   |  1 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  4 ++
 .../src/main/java/org/apache/doris/spi/Split.java  |  2 +
 gensrc/thrift/PlanNodes.thrift                     |  1 +
 16 files changed, 218 insertions(+), 13 deletions(-)

diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index d9fb7183321..c39b5853999 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -473,6 +473,31 @@ void RuntimeProfile::add_description(const std::string& 
name, const std::string&
     child_counters->insert(name);
 }
 
+RuntimeProfile::ConditionCounter* RuntimeProfile::add_conditition_counter(
+        const std::string& name, TUnit::type type, const 
ConditionCounterFunction& counter_fn,
+        const std::string& parent_counter_name, int64_t level) {
+    std::lock_guard<std::mutex> l(_counter_map_lock);
+
+    if (_counter_map.find(name) != _counter_map.end()) {
+        RuntimeProfile::ConditionCounter* contition_counter =
+                dynamic_cast<ConditionCounter*>(_counter_map[name]);
+        if (contition_counter == nullptr) {
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                   "Failed to add a conditition counter that 
is duplicate and of a "
+                                   "different type for {}.",
+                                   name);
+        }
+        return contition_counter;
+    }
+
+    ConditionCounter* counter = _pool->add(new ConditionCounter(type, 
counter_fn, level));
+    _counter_map[name] = counter;
+    std::set<std::string>* child_counters =
+            find_or_insert(&_child_counter_map, parent_counter_name, 
std::set<std::string>());
+    child_counters->insert(name);
+    return counter;
+}
+
 RuntimeProfile::Counter* RuntimeProfile::get_counter(const std::string& name) {
     std::lock_guard<std::mutex> l(_counter_map_lock);
 
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index ee9a71ee900..14d14a1195d 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -278,6 +278,48 @@ public:
         DerivedCounterFunction _counter_fn;
     };
 
+    using ConditionCounterFunction = std::function<bool(int64_t, int64_t)>;
+
+    // ConditionCounter is a specialized counter that only updates its value 
when a specific condition is met.
+    // It uses a condition function (condition_func) to determine when the 
counter's value should be updated.
+    // This type of counter is particularly useful for tracking maximum 
values, minimum values, or other metrics
+    // that should only be updated when they meet certain criteria.
+    // For example, it can be used to record the maximum value of a specific 
metric during query execution,
+    // or to update the counter only when a new value exceeds some threshold.
+    class ConditionCounter : public Counter {
+    public:
+        ConditionCounter(TUnit::type type, const ConditionCounterFunction& 
condition_func,
+                         int64_t level = 2, int64_t condition = 0, int64_t 
value = 0)
+                : Counter(type, value, level),
+                  _condition(condition),
+                  _value(value),
+                  _condition_func(condition_func) {}
+
+        Counter* clone() const override {
+            std::lock_guard<std::mutex> l(_mutex);
+            return new ConditionCounter(type(), _condition_func, _condition, 
value(), level());
+        }
+
+        int64_t value() const override {
+            std::lock_guard<std::mutex> l(_mutex);
+            return _value;
+        }
+
+        void conditional_update(int64_t c, int64_t v) {
+            std::lock_guard<std::mutex> l(_mutex);
+            if (_condition_func(_condition, c)) {
+                _value = v;
+                _condition = c;
+            }
+        }
+
+    private:
+        mutable std::mutex _mutex;
+        int64_t _condition;
+        int64_t _value;
+        ConditionCounterFunction _condition_func;
+    };
+
     // NonZeroCounter will not be converted to Thrift if the value is 0.
     class NonZeroCounter : public Counter {
     public:
@@ -392,6 +434,11 @@ public:
                                         const DerivedCounterFunction& 
counter_fn,
                                         const std::string& 
parent_counter_name);
 
+    ConditionCounter* add_conditition_counter(const std::string& name, 
TUnit::type type,
+                                              const ConditionCounterFunction& 
counter_fn,
+                                              const std::string& 
parent_counter_name,
+                                              int64_t level = 2);
+
     // Gets the counter object with 'name'.  Returns nullptr if there is no 
counter with
     // that name.
     Counter* get_counter(const std::string& name);
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp 
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index a05ea4511f4..15ece01b868 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -79,8 +79,9 @@ PaimonJniReader::PaimonJniReader(const 
std::vector<SlotDescriptor*>& file_slot_d
             params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
         }
     }
+    int64_t self_split_weight = range.__isset.self_split_weight ? 
range.self_split_weight : -1;
     _jni_connector = 
std::make_unique<JniConnector>("org/apache/doris/paimon/PaimonJniScanner",
-                                                    params, column_names);
+                                                    params, column_names, 
self_split_weight);
 }
 
 Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
diff --git a/be/src/vec/exec/jni_connector.cpp 
b/be/src/vec/exec/jni_connector.cpp
index 4b5bb72e57b..932e6e2772d 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -73,7 +73,15 @@ Status JniConnector::open(RuntimeState* state, 
RuntimeProfile* profile) {
     ADD_TIMER(_profile, _connector_name.c_str());
     _open_scanner_time = ADD_CHILD_TIMER(_profile, "OpenScannerTime", 
_connector_name.c_str());
     _java_scan_time = ADD_CHILD_TIMER(_profile, "JavaScanTime", 
_connector_name.c_str());
+    _java_append_data_time =
+            ADD_CHILD_TIMER(_profile, "JavaAppendDataTime", 
_connector_name.c_str());
+    _java_create_vector_table_time =
+            ADD_CHILD_TIMER(_profile, "JavaCreateVectorTableTime", 
_connector_name.c_str());
     _fill_block_time = ADD_CHILD_TIMER(_profile, "FillBlockTime", 
_connector_name.c_str());
+    _max_time_split_weight_counter = _profile->add_conditition_counter(
+            "MaxTimeSplitWeight", TUnit::UNIT, [](int64_t _c, int64_t c) { 
return c > _c; },
+            _connector_name.c_str());
+    _java_scan_watcher = 0;
     // cannot put the env into fields, because frames in an env object is 
limited
     // to avoid limited frames in a thread, we should get local env in a 
method instead of in whole object.
     JNIEnv* env = nullptr;
@@ -82,7 +90,7 @@ Status JniConnector::open(RuntimeState* state, 
RuntimeProfile* profile) {
         batch_size = _state->batch_size();
     }
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
-    SCOPED_TIMER(_open_scanner_time);
+    SCOPED_RAW_TIMER(&_jni_scanner_open_watcher);
     _scanner_params.emplace("time_zone", _state->timezone());
     RETURN_IF_ERROR(_init_jni_scanner(env, batch_size));
     // Call org.apache.doris.common.jni.JniScanner#open
@@ -114,7 +122,7 @@ Status JniConnector::get_next_block(Block* block, size_t* 
read_rows, bool* eof)
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
     long meta_address = 0;
     {
-        SCOPED_TIMER(_java_scan_time);
+        SCOPED_RAW_TIMER(&_java_scan_watcher);
         meta_address = env->CallLongMethod(_jni_scanner_obj, 
_jni_scanner_get_next_batch);
     }
     RETURN_ERROR_IF_EXC(env);
@@ -171,6 +179,23 @@ Status JniConnector::close() {
         JNIEnv* env = nullptr;
         RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
         if (_scanner_opened && _jni_scanner_obj != nullptr) {
+            COUNTER_UPDATE(_open_scanner_time, _jni_scanner_open_watcher);
+            COUNTER_UPDATE(_fill_block_time, _fill_block_watcher);
+
+            int64_t _append = (int64_t)env->CallLongMethod(_jni_scanner_obj,
+                                                           
_jni_scanner_get_append_data_time);
+            COUNTER_UPDATE(_java_append_data_time, _append);
+
+            int64_t _create = (int64_t)env->CallLongMethod(
+                    _jni_scanner_obj, 
_jni_scanner_get_create_vector_table_time);
+            COUNTER_UPDATE(_java_create_vector_table_time, _create);
+
+            COUNTER_UPDATE(_java_scan_time, _java_scan_watcher - _append - 
_create);
+
+            _max_time_split_weight_counter->conditional_update(
+                    _jni_scanner_open_watcher + _fill_block_watcher + 
_java_scan_watcher,
+                    _self_split_weight);
+
             // _fill_block may be failed and returned, we should release table 
in close.
             // org.apache.doris.common.jni.JniScanner#releaseTable is 
idempotent
             env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
@@ -219,7 +244,15 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int 
batch_size) {
     RETURN_ERROR_IF_EXC(env);
 
     _jni_scanner_open = env->GetMethodID(_jni_scanner_cls, "open", "()V");
+    RETURN_ERROR_IF_EXC(env);
     _jni_scanner_get_next_batch = env->GetMethodID(_jni_scanner_cls, 
"getNextBatchMeta", "()J");
+    RETURN_ERROR_IF_EXC(env);
+    _jni_scanner_get_append_data_time =
+            env->GetMethodID(_jni_scanner_cls, "getAppendDataTime", "()J");
+    RETURN_ERROR_IF_EXC(env);
+    _jni_scanner_get_create_vector_table_time =
+            env->GetMethodID(_jni_scanner_cls, "getCreateVectorTableTime", 
"()J");
+    RETURN_ERROR_IF_EXC(env);
     _jni_scanner_get_table_schema =
             env->GetMethodID(_jni_scanner_cls, "getTableSchema", 
"()Ljava/lang/String;");
     RETURN_ERROR_IF_EXC(env);
@@ -270,7 +303,7 @@ Status JniConnector::fill_block(Block* block, const 
ColumnNumbers& arguments, lo
 }
 
 Status JniConnector::_fill_block(Block* block, size_t num_rows) {
-    SCOPED_TIMER(_fill_block_time);
+    SCOPED_RAW_TIMER(&_fill_block_watcher);
     JNIEnv* env = nullptr;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
     for (int i = 0; i < _column_names.size(); ++i) {
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index 0c8d27efc02..d48fc3e0f53 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -187,10 +187,11 @@ public:
      * @param column_names Fields to read, also the required_fields in 
scanner_params
      */
     JniConnector(std::string connector_class, std::map<std::string, 
std::string> scanner_params,
-                 std::vector<std::string> column_names)
+                 std::vector<std::string> column_names, int64_t 
self_split_weight = -1)
             : _connector_class(std::move(connector_class)),
               _scanner_params(std::move(scanner_params)),
-              _column_names(std::move(column_names)) {
+              _column_names(std::move(column_names)),
+              _self_split_weight(self_split_weight) {
         // Use java class name as connector name
         _connector_name = split(_connector_class, "/").back();
     }
@@ -282,14 +283,22 @@ private:
     std::string _connector_class;
     std::map<std::string, std::string> _scanner_params;
     std::vector<std::string> _column_names;
+    int32_t _self_split_weight;
     bool _is_table_schema = false;
 
     RuntimeState* _state = nullptr;
     RuntimeProfile* _profile = nullptr;
     RuntimeProfile::Counter* _open_scanner_time = nullptr;
     RuntimeProfile::Counter* _java_scan_time = nullptr;
+    RuntimeProfile::Counter* _java_append_data_time = nullptr;
+    RuntimeProfile::Counter* _java_create_vector_table_time = nullptr;
     RuntimeProfile::Counter* _fill_block_time = nullptr;
     std::map<std::string, RuntimeProfile::Counter*> _scanner_profile;
+    RuntimeProfile::ConditionCounter* _max_time_split_weight_counter = nullptr;
+
+    int64_t _jni_scanner_open_watcher = 0;
+    int64_t _java_scan_watcher = 0;
+    int64_t _fill_block_watcher = 0;
 
     size_t _has_read = 0;
 
@@ -298,6 +307,8 @@ private:
     jclass _jni_scanner_cls;
     jobject _jni_scanner_obj;
     jmethodID _jni_scanner_open;
+    jmethodID _jni_scanner_get_append_data_time;
+    jmethodID _jni_scanner_get_create_vector_table_time;
     jmethodID _jni_scanner_get_next_batch;
     jmethodID _jni_scanner_get_table_schema;
     jmethodID _jni_scanner_close;
diff --git a/be/test/util/runtime_profile_counter_tree_node_test.cpp 
b/be/test/util/runtime_profile_counter_tree_node_test.cpp
index 5226841b4dd..9001dac5de8 100644
--- a/be/test/util/runtime_profile_counter_tree_node_test.cpp
+++ b/be/test/util/runtime_profile_counter_tree_node_test.cpp
@@ -345,4 +345,27 @@ TEST_F(RuntimeProfileCounterTreeNodeTest, 
DescriptionCounter) {
     ASSERT_EQ(*child_counter_map["root"].begin(), "description_entry");
 }
 
+TEST_F(RuntimeProfileCounterTreeNodeTest, ConditionCounterTest) {
+    auto min_counter = std::make_unique<RuntimeProfile::ConditionCounter>(
+            TUnit::UNIT, [](int64_t _c, int64_t c) { return c < _c; }, 100000, 
1000000);
+
+    min_counter->conditional_update(100, 1);
+    ASSERT_EQ(min_counter->value(), 1);
+    min_counter->conditional_update(200, 2);
+    ASSERT_EQ(min_counter->value(), 1);
+    min_counter->conditional_update(10, 3);
+    ASSERT_EQ(min_counter->value(), 3);
+
+    auto max_counter = std::make_unique<RuntimeProfile::ConditionCounter>(
+            TUnit::UNIT, [](int64_t _c, int64_t c) { return c > _c; });
+
+    max_counter->conditional_update(10, 4);
+    ASSERT_EQ(max_counter->value(), 4);
+    max_counter->conditional_update(1, 5);
+    ASSERT_EQ(max_counter->value(), 4);
+    max_counter->conditional_update(100, 6);
+    ASSERT_EQ(max_counter->value(), 6);
+    max_counter->conditional_update(10, 7);
+    ASSERT_EQ(max_counter->value(), 6);
+}
 } // namespace doris
\ No newline at end of file
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
index cb191f4b038..8bb8a664ccf 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
@@ -43,6 +43,8 @@ public abstract class JniScanner {
     // to deserialize the predicate string to PaimonPredicate object.
     protected ScanPredicate[] predicates;
     protected int batchSize;
+    protected long appendDataTime = 0;
+    protected long createVectorTableTime = 0;
 
     // Initialize JniScanner
     public abstract void open() throws IOException;
@@ -87,7 +89,9 @@ public abstract class JniScanner {
 
     public long getNextBatchMeta() throws IOException {
         if (vectorTable == null) {
+            long l = System.nanoTime();
             vectorTable = VectorTable.createWritableTable(types, fields, 
batchSize);
+            createVectorTableTime += System.nanoTime() - l;
         }
         int numRows;
         try {
@@ -133,4 +137,12 @@ public abstract class JniScanner {
         }
         vectorTable = null;
     }
+
+    public long getAppendDataTime() {
+        return appendDataTime;
+    }
+
+    public long getCreateVectorTableTime() {
+        return createVectorTableTime;
+    }
 }
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index e6c04a0a2f7..6ba4578dc6e 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -204,7 +204,9 @@ public class PaimonJniScanner extends JniScanner {
                     columnValue.setOffsetRow(record);
                     for (int i = 0; i < fields.length; i++) {
                         columnValue.setIdx(i, types[i], 
paimonDataTypeList.get(i));
+                        long l = System.nanoTime();
                         appendData(i, columnValue);
+                        appendDataTime += System.nanoTime() - l;
                     }
                     rows++;
                     if (rows >= batchSize) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index 4ef5b3e897f..734f0268471 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -316,6 +316,7 @@ public class Profile {
                     }
                     return;
                 }
+                summaryProfile.queryFinished();
             }
 
             // Nereids native insert not set planner, so it is null
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 7215b8a9c65..d0416ba5f6d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUnit;
 import org.apache.doris.transaction.TransactionType;
@@ -30,14 +31,18 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 /**
  * SummaryProfile is part of a query profile.
@@ -122,6 +127,7 @@ public class SummaryProfile {
     public static final String RPC_QUEUE_TIME = "RPC Work Queue Time";
     public static final String RPC_WORK_TIME = "RPC Work Time";
     public static final String LATENCY_FROM_BE_TO_FE = "RPC Latency From BE To 
FE";
+    public static final String SPLITS_ASSIGNMENT_WEIGHT = "Splits Assignment 
Weight";
 
     // These info will display on FE's web ui table, every one will be 
displayed as
     // a column, so that should not
@@ -179,7 +185,10 @@ public class SummaryProfile {
             TRACE_ID,
             TRANSACTION_COMMIT_TIME,
             SYSTEM_MESSAGE,
-            EXECUTED_BY_FRONTEND
+            EXECUTED_BY_FRONTEND,
+            NEREIDS_BE_FOLD_CONST_TIME,
+            NEREIDS_GARBAGE_COLLECT_TIME,
+            SPLITS_ASSIGNMENT_WEIGHT
     );
 
     // Ident of each item. Default is 0, which doesn't need to present in this 
Map.
@@ -340,6 +349,8 @@ public class SummaryProfile {
     private Map<TNetworkAddress, List<Long>> rpcPhase1Latency;
     private Map<TNetworkAddress, List<Long>> rpcPhase2Latency;
 
+    private Map<Backend, Long> assignedWeightPerBackend;
+
     public SummaryProfile() {
         init();
     }
@@ -399,6 +410,24 @@ public class SummaryProfile {
         updateExecutionSummaryProfile();
     }
 
+    // This method is used to display the final data status when the overall 
query ends.
+    // This can avoid recalculating some strings and so on every time during 
the update process.
+    public void queryFinished() {
+        if (assignedWeightPerBackend != null) {
+            Map<String, Long> m = assignedWeightPerBackend.entrySet().stream()
+                    .sorted(Map.Entry.comparingByValue())
+                    .collect(Collectors.toMap(
+                        entry -> entry.getKey().getAddress(),
+                        Entry::getValue,
+                        (v1, v2) -> v1,
+                        LinkedHashMap::new
+                ));
+            executionSummaryProfile.addInfoString(
+                    SPLITS_ASSIGNMENT_WEIGHT,
+                    new GsonBuilder().create().toJson(m));
+        }
+    }
+
     private void updateSummaryProfile(Map<String, String> infos) {
         for (String key : infos.keySet()) {
             if (SUMMARY_KEYS.contains(key)) {
@@ -951,4 +980,8 @@ public class SummaryProfile {
     public void write(DataOutput output) throws IOException {
         Text.writeString(output, GsonUtils.GSON.toJson(this));
     }
+
+    public void setAssignedWeightPerBackend(Map<Backend, Long> 
assignedWeightPerBackend) {
+        this.assignedWeightPerBackend = assignedWeightPerBackend;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 14c82c1606e..c3351600fc6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.datasource.hive.source.HiveSplit;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
@@ -275,8 +276,9 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
     @Override
     public void createScanRangeLocations() throws UserException {
         long start = System.currentTimeMillis();
-        if (ConnectContext.get().getExecutor() != null) {
-            
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsStartTime();
+        StmtExecutor executor = ConnectContext.get().getExecutor();
+        if (executor != null) {
+            executor.getSummaryProfile().setGetSplitsStartTime();
         }
         TFileFormatType fileFormatType = getFileFormatType();
         if (fileFormatType == TFileFormatType.FORMAT_ORC) {
@@ -327,8 +329,8 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             splitAssignment = new SplitAssignment(
                     backendPolicy, this, this::splitToScanRange, 
locationProperties, pathPartitionKeys);
             splitAssignment.init();
-            if (ConnectContext.get().getExecutor() != null) {
-                
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
+            if (executor != null) {
+                executor.getSummaryProfile().setGetSplitsFinishTime();
             }
             if (splitAssignment.getSampleSplit() == null && 
!isFileStreamType()) {
                 return;
@@ -388,8 +390,11 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
 
         getSerializedTable().ifPresent(params::setSerializedTable);
 
-        if (ConnectContext.get().getExecutor() != null) {
-            
ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime();
+        if (executor != null) {
+            executor.getSummaryProfile().setCreateScanRangeFinishTime();
+            if (sessionVariable.showSplitProfileInfo()) {
+                
executor.getSummaryProfile().setAssignedWeightPerBackend(backendPolicy.getAssignedWeightPerBackend());
+            }
         }
         if (LOG.isDebugEnabled()) {
             LOG.debug("create #{} ScanRangeLocations cost: {} ms",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
index 37e66c7056f..da1e36c7e87 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
@@ -111,4 +111,8 @@ public class FileSplit implements Split {
             return SplitWeight.standard();
         }
     }
+
+    public long getSelfSplitWeight() {
+        return selfSplitWeight;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 6398f6da8d3..023bbe9cd4d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -185,6 +185,7 @@ public class PaimonScanNode extends FileQueryScanNode {
             // use jni reader
             rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI);
             fileDesc.setPaimonSplit(encodeObjectToString(split));
+            rangeDesc.setSelfSplitWeight(paimonSplit.getSelfSplitWeight());
         } else {
             // use native reader
             if (fileFormat.equals("orc")) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index dade95aac17..80a4bbee8e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -4779,4 +4779,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public boolean getEnableExternalTableBatchMode() {
         return enableExternalTableBatchMode;
     }
+
+    public boolean showSplitProfileInfo() {
+        return enableProfile() && getProfileLevel() > 1;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java 
b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
index 412e4b6792f..1212841d029 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
@@ -53,4 +53,6 @@ public interface Split {
     }
 
     void setTargetSplitSize(Long targetSplitSize);
+
+    long getSelfSplitWeight();
 }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 3165a6ac764..ea5da932fa6 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -482,6 +482,7 @@ struct TFileRangeDesc {
     // so fs_name should be with TFileRangeDesc
     12: optional string fs_name
     13: optional TFileFormatType format_type;
+    14: optional i64 self_split_weight
 }
 
 struct TSplitSource {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to