This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 9536657a9d5 branch-3.1: [opt](paimon)Enhance the observability of 
split and JNI in profile #49688 (#51976)
9536657a9d5 is described below

commit 9536657a9d576eba8d04ba58a2cf72ba1930e7e2
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Fri Jun 20 13:27:06 2025 +0800

    branch-3.1: [opt](paimon)Enhance the observability of split and JNI in 
profile #49688 (#51976)
    
    bp #49688
    
    Co-authored-by: wuwenchi <[email protected]>
---
 be/src/util/runtime_profile.cpp                    | 25 ++++++++++++
 be/src/util/runtime_profile.h                      | 47 ++++++++++++++++++++++
 be/src/vec/exec/format/table/paimon_jni_reader.cpp |  3 +-
 be/src/vec/exec/jni_connector.cpp                  | 39 ++++++++++++++++--
 be/src/vec/exec/jni_connector.h                    | 15 ++++++-
 .../org/apache/doris/common/jni/JniScanner.java    | 12 ++++++
 .../org/apache/doris/paimon/PaimonJniScanner.java  |  2 +
 .../org/apache/doris/common/profile/Profile.java   |  1 +
 .../doris/common/profile/SummaryProfile.java       | 35 +++++++++++++++-
 .../apache/doris/datasource/FileQueryScanNode.java | 17 +++++---
 .../org/apache/doris/datasource/FileSplit.java     |  4 ++
 .../datasource/paimon/source/PaimonScanNode.java   |  1 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  4 ++
 .../src/main/java/org/apache/doris/spi/Split.java  |  2 +
 gensrc/thrift/PlanNodes.thrift                     |  1 +
 15 files changed, 195 insertions(+), 13 deletions(-)

diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index d8588acfc00..92681432b00 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -461,6 +461,31 @@ RuntimeProfile::DerivedCounter* 
RuntimeProfile::add_derived_counter(
     return counter;
 }
 
+RuntimeProfile::ConditionCounter* RuntimeProfile::add_conditition_counter(
+        const std::string& name, TUnit::type type, const 
ConditionCounterFunction& counter_fn,
+        const std::string& parent_counter_name, int64_t level) {
+    std::lock_guard<std::mutex> l(_counter_map_lock);
+
+    if (_counter_map.find(name) != _counter_map.end()) {
+        RuntimeProfile::ConditionCounter* contition_counter =
+                dynamic_cast<ConditionCounter*>(_counter_map[name]);
+        if (contition_counter == nullptr) {
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                   "Failed to add a conditition counter that 
is duplicate and of a "
+                                   "different type for {}.",
+                                   name);
+        }
+        return contition_counter;
+    }
+
+    ConditionCounter* counter = _pool->add(new ConditionCounter(type, 
counter_fn, level));
+    _counter_map[name] = counter;
+    std::set<std::string>* child_counters =
+            find_or_insert(&_child_counter_map, parent_counter_name, 
std::set<std::string>());
+    child_counters->insert(name);
+    return counter;
+}
+
 RuntimeProfile::Counter* RuntimeProfile::get_counter(const std::string& name) {
     std::lock_guard<std::mutex> l(_counter_map_lock);
 
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index d87fec85c8a..c518416b76a 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -272,6 +272,48 @@ public:
         DerivedCounterFunction _counter_fn;
     };
 
+    using ConditionCounterFunction = std::function<bool(int64_t, int64_t)>;
+
+    // ConditionCounter is a specialized counter that only updates its value 
when a specific condition is met.
+    // It uses a condition function (condition_func) to determine when the 
counter's value should be updated.
+    // This type of counter is particularly useful for tracking maximum 
values, minimum values, or other metrics
+    // that should only be updated when they meet certain criteria.
+    // For example, it can be used to record the maximum value of a specific 
metric during query execution,
+    // or to update the counter only when a new value exceeds some threshold.
+    class ConditionCounter : public Counter {
+    public:
+        ConditionCounter(TUnit::type type, const ConditionCounterFunction& 
condition_func,
+                         int64_t level = 2, int64_t condition = 0, int64_t 
value = 0)
+                : Counter(type, value, level),
+                  _condition(condition),
+                  _value(value),
+                  _condition_func(condition_func) {}
+
+        Counter* clone() const override {
+            std::lock_guard<std::mutex> l(_mutex);
+            return new ConditionCounter(type(), _condition_func, _condition, 
value(), level());
+        }
+
+        int64_t value() const override {
+            std::lock_guard<std::mutex> l(_mutex);
+            return _value;
+        }
+
+        void conditional_update(int64_t c, int64_t v) {
+            std::lock_guard<std::mutex> l(_mutex);
+            if (_condition_func(_condition, c)) {
+                _value = v;
+                _condition = c;
+            }
+        }
+
+    private:
+        mutable std::mutex _mutex;
+        int64_t _condition;
+        int64_t _value;
+        ConditionCounterFunction _condition_func;
+    };
+
     // NonZeroCounter will not be converted to Thrift if the value is 0.
     class NonZeroCounter : public Counter {
     public:
@@ -402,6 +444,11 @@ public:
                                         const DerivedCounterFunction& 
counter_fn,
                                         const std::string& 
parent_counter_name);
 
+    ConditionCounter* add_conditition_counter(const std::string& name, 
TUnit::type type,
+                                              const ConditionCounterFunction& 
counter_fn,
+                                              const std::string& 
parent_counter_name,
+                                              int64_t level = 2);
+
     // Gets the counter object with 'name'.  Returns nullptr if there is no 
counter with
     // that name.
     Counter* get_counter(const std::string& name);
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp 
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index e902db8bc42..71bb496d301 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -75,8 +75,9 @@ PaimonJniReader::PaimonJniReader(const 
std::vector<SlotDescriptor*>& file_slot_d
             params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
         }
     }
+    int64_t self_split_weight = range.__isset.self_split_weight ? 
range.self_split_weight : -1;
     _jni_connector = 
std::make_unique<JniConnector>("org/apache/doris/paimon/PaimonJniScanner",
-                                                    params, column_names);
+                                                    params, column_names, 
self_split_weight);
 }
 
 Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
diff --git a/be/src/vec/exec/jni_connector.cpp 
b/be/src/vec/exec/jni_connector.cpp
index 9940821fcfc..47c84b72c6e 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -71,7 +71,15 @@ Status JniConnector::open(RuntimeState* state, 
RuntimeProfile* profile) {
     ADD_TIMER(_profile, _connector_name.c_str());
     _open_scanner_time = ADD_CHILD_TIMER(_profile, "OpenScannerTime", 
_connector_name.c_str());
     _java_scan_time = ADD_CHILD_TIMER(_profile, "JavaScanTime", 
_connector_name.c_str());
+    _java_append_data_time =
+            ADD_CHILD_TIMER(_profile, "JavaAppendDataTime", 
_connector_name.c_str());
+    _java_create_vector_table_time =
+            ADD_CHILD_TIMER(_profile, "JavaCreateVectorTableTime", 
_connector_name.c_str());
     _fill_block_time = ADD_CHILD_TIMER(_profile, "FillBlockTime", 
_connector_name.c_str());
+    _max_time_split_weight_counter = _profile->add_conditition_counter(
+            "MaxTimeSplitWeight", TUnit::UNIT, [](int64_t _c, int64_t c) { 
return c > _c; },
+            _connector_name.c_str());
+    _java_scan_watcher = 0;
     // cannot put the env into fields, because frames in an env object is 
limited
     // to avoid limited frames in a thread, we should get local env in a 
method instead of in whole object.
     JNIEnv* env = nullptr;
@@ -80,7 +88,7 @@ Status JniConnector::open(RuntimeState* state, 
RuntimeProfile* profile) {
         batch_size = _state->batch_size();
     }
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
-    SCOPED_TIMER(_open_scanner_time);
+    SCOPED_RAW_TIMER(&_jni_scanner_open_watcher);
     _scanner_params.emplace("time_zone", _state->timezone());
     RETURN_IF_ERROR(_init_jni_scanner(env, batch_size));
     // Call org.apache.doris.common.jni.JniScanner#open
@@ -112,7 +120,7 @@ Status JniConnector::get_next_block(Block* block, size_t* 
read_rows, bool* eof)
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
     long meta_address = 0;
     {
-        SCOPED_TIMER(_java_scan_time);
+        SCOPED_RAW_TIMER(&_java_scan_watcher);
         meta_address = env->CallLongMethod(_jni_scanner_obj, 
_jni_scanner_get_next_batch);
     }
     RETURN_ERROR_IF_EXC(env);
@@ -169,6 +177,23 @@ Status JniConnector::close() {
         JNIEnv* env = nullptr;
         RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
         if (_scanner_opened && _jni_scanner_obj != nullptr) {
+            COUNTER_UPDATE(_open_scanner_time, _jni_scanner_open_watcher);
+            COUNTER_UPDATE(_fill_block_time, _fill_block_watcher);
+
+            int64_t _append = (int64_t)env->CallLongMethod(_jni_scanner_obj,
+                                                           
_jni_scanner_get_append_data_time);
+            COUNTER_UPDATE(_java_append_data_time, _append);
+
+            int64_t _create = (int64_t)env->CallLongMethod(
+                    _jni_scanner_obj, 
_jni_scanner_get_create_vector_table_time);
+            COUNTER_UPDATE(_java_create_vector_table_time, _create);
+
+            COUNTER_UPDATE(_java_scan_time, _java_scan_watcher - _append - 
_create);
+
+            _max_time_split_weight_counter->conditional_update(
+                    _jni_scanner_open_watcher + _fill_block_watcher + 
_java_scan_watcher,
+                    _self_split_weight);
+
             // _fill_block may be failed and returned, we should release table 
in close.
             // org.apache.doris.common.jni.JniScanner#releaseTable is 
idempotent
             env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
@@ -217,7 +242,15 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int 
batch_size) {
     RETURN_ERROR_IF_EXC(env);
 
     _jni_scanner_open = env->GetMethodID(_jni_scanner_cls, "open", "()V");
+    RETURN_ERROR_IF_EXC(env);
     _jni_scanner_get_next_batch = env->GetMethodID(_jni_scanner_cls, 
"getNextBatchMeta", "()J");
+    RETURN_ERROR_IF_EXC(env);
+    _jni_scanner_get_append_data_time =
+            env->GetMethodID(_jni_scanner_cls, "getAppendDataTime", "()J");
+    RETURN_ERROR_IF_EXC(env);
+    _jni_scanner_get_create_vector_table_time =
+            env->GetMethodID(_jni_scanner_cls, "getCreateVectorTableTime", 
"()J");
+    RETURN_ERROR_IF_EXC(env);
     _jni_scanner_get_table_schema =
             env->GetMethodID(_jni_scanner_cls, "getTableSchema", 
"()Ljava/lang/String;");
     RETURN_ERROR_IF_EXC(env);
@@ -268,7 +301,7 @@ Status JniConnector::fill_block(Block* block, const 
ColumnNumbers& arguments, lo
 }
 
 Status JniConnector::_fill_block(Block* block, size_t num_rows) {
-    SCOPED_TIMER(_fill_block_time);
+    SCOPED_RAW_TIMER(&_fill_block_watcher);
     JNIEnv* env = nullptr;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
     for (int i = 0; i < _column_names.size(); ++i) {
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index df4c85e7614..89168b5a551 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -187,10 +187,11 @@ public:
      * @param column_names Fields to read, also the required_fields in 
scanner_params
      */
     JniConnector(std::string connector_class, std::map<std::string, 
std::string> scanner_params,
-                 std::vector<std::string> column_names)
+                 std::vector<std::string> column_names, int64_t 
self_split_weight = -1)
             : _connector_class(std::move(connector_class)),
               _scanner_params(std::move(scanner_params)),
-              _column_names(std::move(column_names)) {
+              _column_names(std::move(column_names)),
+              _self_split_weight(self_split_weight) {
         // Use java class name as connector name
         _connector_name = split(_connector_class, "/").back();
     }
@@ -283,14 +284,22 @@ private:
     std::string _connector_class;
     std::map<std::string, std::string> _scanner_params;
     std::vector<std::string> _column_names;
+    int32_t _self_split_weight;
     bool _is_table_schema = false;
 
     RuntimeState* _state = nullptr;
     RuntimeProfile* _profile = nullptr;
     RuntimeProfile::Counter* _open_scanner_time = nullptr;
     RuntimeProfile::Counter* _java_scan_time = nullptr;
+    RuntimeProfile::Counter* _java_append_data_time = nullptr;
+    RuntimeProfile::Counter* _java_create_vector_table_time = nullptr;
     RuntimeProfile::Counter* _fill_block_time = nullptr;
     std::map<std::string, RuntimeProfile::Counter*> _scanner_profile;
+    RuntimeProfile::ConditionCounter* _max_time_split_weight_counter = nullptr;
+
+    int64_t _jni_scanner_open_watcher = 0;
+    int64_t _java_scan_watcher = 0;
+    int64_t _fill_block_watcher = 0;
 
     size_t _has_read = 0;
 
@@ -299,6 +308,8 @@ private:
     jclass _jni_scanner_cls;
     jobject _jni_scanner_obj;
     jmethodID _jni_scanner_open;
+    jmethodID _jni_scanner_get_append_data_time;
+    jmethodID _jni_scanner_get_create_vector_table_time;
     jmethodID _jni_scanner_get_next_batch;
     jmethodID _jni_scanner_get_table_schema;
     jmethodID _jni_scanner_close;
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
index cb191f4b038..8bb8a664ccf 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
@@ -43,6 +43,8 @@ public abstract class JniScanner {
     // to deserialize the predicate string to PaimonPredicate object.
     protected ScanPredicate[] predicates;
     protected int batchSize;
+    protected long appendDataTime = 0;
+    protected long createVectorTableTime = 0;
 
     // Initialize JniScanner
     public abstract void open() throws IOException;
@@ -87,7 +89,9 @@ public abstract class JniScanner {
 
     public long getNextBatchMeta() throws IOException {
         if (vectorTable == null) {
+            long l = System.nanoTime();
             vectorTable = VectorTable.createWritableTable(types, fields, 
batchSize);
+            createVectorTableTime += System.nanoTime() - l;
         }
         int numRows;
         try {
@@ -133,4 +137,12 @@ public abstract class JniScanner {
         }
         vectorTable = null;
     }
+
+    public long getAppendDataTime() {
+        return appendDataTime;
+    }
+
+    public long getCreateVectorTableTime() {
+        return createVectorTableTime;
+    }
 }
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index e6c04a0a2f7..6ba4578dc6e 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -204,7 +204,9 @@ public class PaimonJniScanner extends JniScanner {
                     columnValue.setOffsetRow(record);
                     for (int i = 0; i < fields.length; i++) {
                         columnValue.setIdx(i, types[i], 
paimonDataTypeList.get(i));
+                        long l = System.nanoTime();
                         appendData(i, columnValue);
+                        appendDataTime += System.nanoTime() - l;
                     }
                     rows++;
                     if (rows >= batchSize) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index eb17d380113..c1817d194ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -293,6 +293,7 @@ public class Profile {
                     }
                     return;
                 }
+                summaryProfile.queryFinished();
             }
 
             // Nereids native insert not set planner, so it is null
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index f875de5745c..035251a4cc4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUnit;
 import org.apache.doris.transaction.TransactionType;
@@ -30,14 +31,18 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 /**
  * SummaryProfile is part of a query profile.
@@ -122,6 +127,7 @@ public class SummaryProfile {
     public static final String RPC_QUEUE_TIME = "RPC Work Queue Time";
     public static final String RPC_WORK_TIME = "RPC Work Time";
     public static final String LATENCY_FROM_BE_TO_FE = "RPC Latency From BE To 
FE";
+    public static final String SPLITS_ASSIGNMENT_WEIGHT = "Splits Assignment 
Weight";
 
     // These info will display on FE's web ui table, every one will be 
displayed as
     // a column, so that should not
@@ -178,7 +184,10 @@ public class SummaryProfile {
             TRACE_ID,
             TRANSACTION_COMMIT_TIME,
             SYSTEM_MESSAGE,
-            EXECUTED_BY_FRONTEND
+            EXECUTED_BY_FRONTEND,
+            NEREIDS_BE_FOLD_CONST_TIME,
+            NEREIDS_GARBAGE_COLLECT_TIME,
+            SPLITS_ASSIGNMENT_WEIGHT
     );
 
     // Ident of each item. Default is 0, which doesn't need to present in this 
Map.
@@ -339,6 +348,8 @@ public class SummaryProfile {
     private Map<TNetworkAddress, List<Long>> rpcPhase1Latency;
     private Map<TNetworkAddress, List<Long>> rpcPhase2Latency;
 
+    private Map<Backend, Long> assignedWeightPerBackend;
+
     public SummaryProfile() {
         init();
     }
@@ -398,6 +409,24 @@ public class SummaryProfile {
         updateExecutionSummaryProfile();
     }
 
+    // This method is used to display the final data status when the overall 
query ends.
+    // This can avoid recalculating some strings and so on every time during 
the update process.
+    public void queryFinished() {
+        if (assignedWeightPerBackend != null) {
+            Map<String, Long> m = assignedWeightPerBackend.entrySet().stream()
+                    .sorted(Map.Entry.comparingByValue())
+                    .collect(Collectors.toMap(
+                        entry -> entry.getKey().getAddress(),
+                        Entry::getValue,
+                        (v1, v2) -> v1,
+                        LinkedHashMap::new
+                ));
+            executionSummaryProfile.addInfoString(
+                    SPLITS_ASSIGNMENT_WEIGHT,
+                    new GsonBuilder().create().toJson(m));
+        }
+    }
+
     private void updateSummaryProfile(Map<String, String> infos) {
         for (String key : infos.keySet()) {
             if (SUMMARY_KEYS.contains(key)) {
@@ -950,4 +979,8 @@ public class SummaryProfile {
     public void write(DataOutput output) throws IOException {
         Text.writeString(output, GsonUtils.GSON.toJson(this));
     }
+
+    public void setAssignedWeightPerBackend(Map<Backend, Long> 
assignedWeightPerBackend) {
+        this.assignedWeightPerBackend = assignedWeightPerBackend;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index dedbedeafa3..107c3541577 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.datasource.hive.source.HiveSplit;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
@@ -275,8 +276,9 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
     @Override
     public void createScanRangeLocations() throws UserException {
         long start = System.currentTimeMillis();
-        if (ConnectContext.get().getExecutor() != null) {
-            
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsStartTime();
+        StmtExecutor executor = ConnectContext.get().getExecutor();
+        if (executor != null) {
+            executor.getSummaryProfile().setGetSplitsStartTime();
         }
         TFileFormatType fileFormatType = getFileFormatType();
         if (fileFormatType == TFileFormatType.FORMAT_ORC) {
@@ -327,8 +329,8 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             splitAssignment = new SplitAssignment(
                     backendPolicy, this, this::splitToScanRange, 
locationProperties, pathPartitionKeys);
             splitAssignment.init();
-            if (ConnectContext.get().getExecutor() != null) {
-                
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
+            if (executor != null) {
+                executor.getSummaryProfile().setGetSplitsFinishTime();
             }
             if (splitAssignment.getSampleSplit() == null && 
!isFileStreamType()) {
                 return;
@@ -386,8 +388,11 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
 
         getSerializedTable().ifPresent(params::setSerializedTable);
 
-        if (ConnectContext.get().getExecutor() != null) {
-            
ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime();
+        if (executor != null) {
+            executor.getSummaryProfile().setCreateScanRangeFinishTime();
+            if (sessionVariable.showSplitProfileInfo()) {
+                
executor.getSummaryProfile().setAssignedWeightPerBackend(backendPolicy.getAssignedWeightPerBackend());
+            }
         }
         if (LOG.isDebugEnabled()) {
             LOG.debug("create #{} ScanRangeLocations cost: {} ms",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
index 37e66c7056f..da1e36c7e87 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
@@ -111,4 +111,8 @@ public class FileSplit implements Split {
             return SplitWeight.standard();
         }
     }
+
+    public long getSelfSplitWeight() {
+        return selfSplitWeight;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 8df214482cc..a856139ed18 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -170,6 +170,7 @@ public class PaimonScanNode extends FileQueryScanNode {
             // use jni reader
             rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI);
             fileDesc.setPaimonSplit(encodeObjectToString(split));
+            rangeDesc.setSelfSplitWeight(paimonSplit.getSelfSplitWeight());
         } else {
             // use native reader
             if (fileFormat.equals("orc")) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index dfa7243e3e2..61ac1cfd5e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -4771,5 +4771,9 @@ public class SessionVariable implements Serializable, 
Writable {
             }
         }
     }
+
+    public boolean showSplitProfileInfo() {
+        return enableProfile() && getProfileLevel() > 1;
+    }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java 
b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
index 412e4b6792f..1212841d029 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
@@ -53,4 +53,6 @@ public interface Split {
     }
 
     void setTargetSplitSize(Long targetSplitSize);
+
+    long getSelfSplitWeight();
 }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 10ca220d943..c92b1c91523 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -491,6 +491,7 @@ struct TFileRangeDesc {
     // so fs_name should be with TFileRangeDesc
     12: optional string fs_name
     13: optional TFileFormatType format_type;
+    14: optional i64 self_split_weight
 }
 
 struct TSplitSource {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to