github-actions[bot] commented on code in PR #30875:
URL: https://github.com/apache/doris/pull/30875#discussion_r1478343348


##########
be/src/util/system_bvar_metrics.h:
##########
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "util/bvar_metrics.h"
+
+namespace doris {
+
+struct CpuBvarMetrics;
+struct MemoryBvarMetrics;
+struct DiskBvarMetrics;
+struct NetworkBvarMetrics;
+struct FileDescriptorBvarMetrics;
+struct SnmpBvarMetrics;
+struct LoadAverageBvarMetrics;
+struct ProcBvarMetrics;
+
+class SystemBvarMetrics {
+public:
+    SystemBvarMetrics(const std::set<std::string>& disk_devices,
+                      const std::vector<std::string>& network_interfaces);
+
+    ~SystemBvarMetrics();
+
+    std::string to_prometheus(const std::string& registry_name) const;
+
+    // update metrics
+    void update();
+
+    void get_disks_io_time(std::map<std::string, int64_t>* map);
+    int64_t get_max_io_util(const std::map<std::string, int64_t>& lst_value, 
int64_t interval_sec);
+
+    void get_network_traffic(std::map<std::string, int64_t>* send_map,
+                             std::map<std::string, int64_t>* rcv_map);
+    void get_max_net_traffic(const std::map<std::string, int64_t>& 
lst_send_map,
+                             const std::map<std::string, int64_t>& lst_rcv_map,
+                             int64_t interval_sec, int64_t* send_rate, 
int64_t* rcv_rate);
+
+    void update_max_disk_io_util_percent(const std::map<std::string, int64_t>& 
lst_value,
+                                         int64_t interval_sec);
+    void update_max_network_send_bytes_rate(int64_t max_send_bytes_rate);
+    void update_max_network_receive_bytes_rate(int64_t max_receive_bytes_rate);
+    void update_allocator_metrics();
+
+    //for UT
+    CpuBvarMetrics* cpu_metrics(const std::string& name) { return 
cpu_metrics_[name]; }
+    MemoryBvarMetrics* memory_metrics() { return memory_metrics_.get(); }
+    DiskBvarMetrics* disk_metrics(const std::string& name) { return 
disk_metrics_[name]; }
+    NetworkBvarMetrics* network_metrics(const std::string& name) { return 
network_metrics_[name]; }
+    FileDescriptorBvarMetrics* fd_metrics() { return fd_metrics_.get(); }
+    SnmpBvarMetrics* snmp_metrics() { return snmp_metrics_.get(); }
+    LoadAverageBvarMetrics* load_average_metrics() { return 
load_average_metrics_.get(); }
+    ProcBvarMetrics* proc_metrics() { return proc_metrics_.get(); }
+
+private:
+    void install_cpu_metrics();
+    // On Intel(R) Xeon(R) CPU E5-2450 0 @ 2.10GHz;
+    // read /proc/stat would cost about 170us
+    void update_cpu_metrics();
+
+    void install_memory_metrics();
+    void update_memory_metrics();
+
+    void install_disk_metrics(const std::set<std::string>& disk_devices);
+    void update_disk_metrics();
+
+    void install_net_metrics(const std::vector<std::string>& interfaces);
+    void update_net_metrics();
+
+    void install_fd_metrics();
+    void update_fd_metrics();
+
+    void install_snmp_metrics();
+    void update_snmp_metrics();
+
+    void install_load_avg_metrics();
+    void update_load_avg_metrics();
+
+    void install_proc_metrics();
+    void update_proc_metrics();
+
+    void get_metrics_from_proc_vmstat();
+    void get_cpu_name();
+
+    void install_max_metrics();
+
+private:

Review Comment:
   warning: redundant access specifier has the same accessibility as the 
previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/util/system_bvar_metrics.h:74:** previously declared here
   ```cpp
   private:
   ^
   ```
   
   </details>
   



##########
be/test/util/bvar_metrics_test.cpp:
##########
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/bvar_metrics.h"

Review Comment:
   warning: 'util/bvar_metrics.h' file not found [clang-diagnostic-error]
   ```cpp
   #include "util/bvar_metrics.h"
            ^
   ```
   



##########
be/test/util/bvar_metrics_test.cpp:
##########
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/bvar_metrics.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <unistd.h>
+
+#include <thread>
+
+#include "gtest/gtest_pred_impl.h"
+#include "testutil/test_util.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+
+class BvarMetricsTest : public testing::Test {
+public:
+    BvarMetricsTest() {}
+    virtual ~BvarMetricsTest() {}

Review Comment:
   warning: use '= default' to define a trivial destructor 
[modernize-use-equals-default]
   
   ```suggestion
       virtual ~BvarMetricsTest() = default;
   ```
   



##########
be/test/util/system_bvar_metrics_test.cpp:
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/system_bvar_metrics.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include <set>
+
+#include "gtest/gtest_pred_impl.h"
+#include "testutil/test_util.h"
+#include "util/bvar_metrics.h"
+
+namespace doris {
+
+class SyetenBvarMetricsTest : public testing::Test {
+public:
+    SyetenBvarMetricsTest() {}
+    virtual ~SyetenBvarMetricsTest() {}

Review Comment:
   warning: use '= default' to define a trivial destructor 
[modernize-use-equals-default]
   
   ```suggestion
       virtual ~SyetenBvarMetricsTest() = default;
   ```
   



##########
be/src/vec/sink/group_commit_block_sink.cpp:
##########
@@ -26,271 +26,280 @@
 #include "runtime/exec_env.h"
 #include "runtime/group_commit_mgr.h"
 #include "runtime/runtime_state.h"
+<<<<<<< HEAD
 #include "util/debug_points.h"
+        =======
+#include "util/doris_bvar_metrics.h"
+        >>>>>>> 4e08424c1e (clang-format)
 #include "util/doris_metrics.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/sink/vtablet_finder.h"
 
-namespace doris {
+        namespace doris {
 
-namespace vectorized {
+    namespace vectorized {

Review Comment:
   warning: nested namespaces can be concatenated 
[modernize-concat-nested-namespaces]
   
   ```suggestion
           namespace doris::vectorized {
   ```
   
   be/src/vec/sink/group_commit_block_sink.cpp:303:
   ```diff
   -     } // namespace vectorized
   - } // namespace doris
   +     } // namespace doris
   ```
   



##########
be/src/vec/sink/group_commit_block_sink.cpp:
##########
@@ -26,271 +26,280 @@
 #include "runtime/exec_env.h"
 #include "runtime/group_commit_mgr.h"
 #include "runtime/runtime_state.h"
+<<<<<<< HEAD
 #include "util/debug_points.h"
+        =======
+#include "util/doris_bvar_metrics.h"
+        >>>>>>> 4e08424c1e (clang-format)
 #include "util/doris_metrics.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/sink/vtablet_finder.h"
 
-namespace doris {
+        namespace doris {
 
-namespace vectorized {
+    namespace vectorized {
 
-GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const 
RowDescriptor& row_desc,
-                                           const std::vector<TExpr>& texprs, 
Status* status)
-        : DataSink(row_desc), _filter_bitmap(1024) {
-    // From the thrift expressions create the real exprs.
-    *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
-    _name = "GroupCommitBlockSink";
-}
-
-GroupCommitBlockSink::~GroupCommitBlockSink() {
-    if (_load_block_queue) {
-        _load_block_queue->remove_load_id(_load_id);
+    GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const 
RowDescriptor& row_desc,
+                                               const std::vector<TExpr>& 
texprs, Status* status)
+            : DataSink(row_desc), _filter_bitmap(1024) {
+        // From the thrift expressions create the real exprs.
+        *status = vectorized::VExpr::create_expr_trees(texprs, 
_output_vexpr_ctxs);
+        _name = "GroupCommitBlockSink";
     }
-}
-
-Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
-    DCHECK(t_sink.__isset.olap_table_sink);
-    auto& table_sink = t_sink.olap_table_sink;
-    _tuple_desc_id = table_sink.tuple_id;
-    _schema.reset(new OlapTableSchemaParam());
-    RETURN_IF_ERROR(_schema->init(table_sink.schema));
-    _db_id = table_sink.db_id;
-    _table_id = table_sink.table_id;
-    _base_schema_version = table_sink.base_schema_version;
-    _group_commit_mode = table_sink.group_commit_mode;
-    _load_id = table_sink.load_id;
-    _max_filter_ratio = table_sink.max_filter_ratio;
-    _vpartition = std::make_unique<doris::VOlapTablePartitionParam>(_schema, 
table_sink.partition);
-    RETURN_IF_ERROR(_vpartition->init());
-    return Status::OK();
-}
 
-Status GroupCommitBlockSink::prepare(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSink::prepare(state));
-    _state = state;
-
-    // profile must add to state's object pool
-    _profile = state->obj_pool()->add(new 
RuntimeProfile("GroupCommitBlockSink"));
-    init_sink_common_profile();
-    _mem_tracker = std::make_shared<MemTracker>("GroupCommitBlockSink:" +
-                                                
std::to_string(state->load_job_id()));
-    SCOPED_TIMER(_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+    GroupCommitBlockSink::~GroupCommitBlockSink() {
+        if (_load_block_queue) {
+            _load_block_queue->remove_load_id(_load_id);
+        }
+    }
 
-    // get table's tuple descriptor
-    _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
-    if (_output_tuple_desc == nullptr) {
-        LOG(WARNING) << "unknown destination tuple descriptor, id=" << 
_tuple_desc_id;
-        return Status::InternalError("unknown destination tuple descriptor");
+    Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
+        DCHECK(t_sink.__isset.olap_table_sink);
+        auto& table_sink = t_sink.olap_table_sink;
+        _tuple_desc_id = table_sink.tuple_id;
+        _schema.reset(new OlapTableSchemaParam());
+        RETURN_IF_ERROR(_schema->init(table_sink.schema));
+        _db_id = table_sink.db_id;
+        _table_id = table_sink.table_id;
+        _base_schema_version = table_sink.base_schema_version;
+        _group_commit_mode = table_sink.group_commit_mode;
+        _load_id = table_sink.load_id;
+        _max_filter_ratio = table_sink.max_filter_ratio;
+        _vpartition =
+                std::make_unique<doris::VOlapTablePartitionParam>(_schema, 
table_sink.partition);
+        RETURN_IF_ERROR(_vpartition->init());
+        return Status::OK();
     }
 
-    _block_convertor = 
std::make_unique<vectorized::OlapTableBlockConvertor>(_output_tuple_desc);
-    _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
-                                        _state->batch_size());
-    // Prepare the exprs to run.
-    return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc);
-}
+    Status GroupCommitBlockSink::prepare(RuntimeState* state) {
+        RETURN_IF_ERROR(DataSink::prepare(state));
+        _state = state;
 
-Status GroupCommitBlockSink::open(RuntimeState* state) {
-    // Prepare the exprs to run.
-    return vectorized::VExpr::open(_output_vexpr_ctxs, state);
-}
+        // profile must add to state's object pool
+        _profile = state->obj_pool()->add(new 
RuntimeProfile("GroupCommitBlockSink"));
+        init_sink_common_profile();
+        _mem_tracker = std::make_shared<MemTracker>("GroupCommitBlockSink:" +
+                                                    
std::to_string(state->load_job_id()));
+        SCOPED_TIMER(_profile->total_time_counter());
+        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
-Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) {
-    RETURN_IF_ERROR(DataSink::close(state, close_status));
-    RETURN_IF_ERROR(close_status);
-    int64_t total_rows = state->num_rows_load_total();
-    int64_t loaded_rows = state->num_rows_load_total();
-    state->set_num_rows_load_total(loaded_rows + 
state->num_rows_load_unselected() +
-                                   state->num_rows_load_filtered());
-    state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() 
+ total_rows -
-                                         loaded_rows);
-    if (!_is_block_appended) {
-        // if not meet the max_filter_ratio, we should return error status 
directly
-        int64_t num_selected_rows =
-                state->num_rows_load_total() - 
state->num_rows_load_unselected();
-        if (num_selected_rows > 0 &&
-            (double)state->num_rows_load_filtered() / num_selected_rows > 
_max_filter_ratio) {
-            return Status::DataQualityError("too many filtered rows");
-        }
-        RETURN_IF_ERROR(_add_blocks(state, true));
-    }
-    if (_load_block_queue) {
-        _load_block_queue->remove_load_id(_load_id);
-    }
-    // wait to wal
-    auto st = Status::OK();
-    if (_load_block_queue && 
(_load_block_queue->wait_internal_group_commit_finish ||
-                              _group_commit_mode == 
TGroupCommitMode::SYNC_MODE)) {
-        std::unique_lock l(_load_block_queue->mutex);
-        if (!_load_block_queue->process_finish) {
-            _load_block_queue->internal_group_commit_finish_cv.wait(l);
+        // get table's tuple descriptor
+        _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
+        if (_output_tuple_desc == nullptr) {
+            LOG(WARNING) << "unknown destination tuple descriptor, id=" << 
_tuple_desc_id;
+            return Status::InternalError("unknown destination tuple 
descriptor");
         }
-        st = _load_block_queue->status;
-    }
-    return st;
-}
 
-Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* 
input_block, bool eos) {
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-    Status status = Status::OK();
-    auto rows = input_block->rows();
-    auto bytes = input_block->bytes();
-    if (UNLIKELY(rows == 0)) {
-        return status;
+        _block_convertor =
+                
std::make_unique<vectorized::OlapTableBlockConvertor>(_output_tuple_desc);
+        _block_convertor->init_autoinc_info(_schema->db_id(), 
_schema->table_id(),
+                                            _state->batch_size());
+        // Prepare the exprs to run.
+        return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc);
     }
-    SCOPED_TIMER(_profile->total_time_counter());
-    // update incrementally so that FE can get the progress.
-    // the real 'num_rows_load_total' will be set when sink being closed.
-    state->update_num_rows_load_total(rows);
-    state->update_num_bytes_load_total(bytes);
-    DorisMetrics::instance()->load_rows->increment(rows);
-    DorisMetrics::instance()->load_bytes->increment(bytes);
 
-    std::shared_ptr<vectorized::Block> block;
-    bool has_filtered_rows = false;
-    RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
-            state, input_block, block, _output_vexpr_ctxs, rows, 
has_filtered_rows));
-    _has_filtered_rows = false;
-    if (!_vpartition->is_auto_partition()) {
-        //reuse vars for find_partition
-        _partitions.assign(rows, nullptr);
-        _filter_bitmap.Reset(rows);
+    Status GroupCommitBlockSink::open(RuntimeState* state) {
+        // Prepare the exprs to run.
+        return vectorized::VExpr::open(_output_vexpr_ctxs, state);
+    }
 
-        for (int index = 0; index < rows; index++) {
-            _vpartition->find_partition(block.get(), index, 
_partitions[index]);
+    Status GroupCommitBlockSink::close(RuntimeState* state, Status 
close_status) {
+        RETURN_IF_ERROR(DataSink::close(state, close_status));
+        RETURN_IF_ERROR(close_status);
+        int64_t total_rows = state->num_rows_load_total();
+        int64_t loaded_rows = state->num_rows_load_total();
+        state->set_num_rows_load_total(loaded_rows + 
state->num_rows_load_unselected() +
+                                       state->num_rows_load_filtered());
+        
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + 
total_rows -
+                                             loaded_rows);
+        if (!_is_block_appended) {
+            // if not meet the max_filter_ratio, we should return error status 
directly
+            int64_t num_selected_rows =
+                    state->num_rows_load_total() - 
state->num_rows_load_unselected();
+            if (num_selected_rows > 0 &&
+                (double)state->num_rows_load_filtered() / num_selected_rows > 
_max_filter_ratio) {
+                return Status::DataQualityError("too many filtered rows");
+            }
+            RETURN_IF_ERROR(_add_blocks(state, true));
         }
-        for (int row_index = 0; row_index < rows; row_index++) {
-            if (_partitions[row_index] == nullptr) [[unlikely]] {
-                _filter_bitmap.Set(row_index, true);
-                LOG(WARNING) << "no partition for this tuple. tuple="
-                             << block->dump_data(row_index, 1);
+        if (_load_block_queue) {
+            _load_block_queue->remove_load_id(_load_id);
+        }
+        // wait to wal
+        auto st = Status::OK();
+        if (_load_block_queue && 
(_load_block_queue->wait_internal_group_commit_finish ||
+                                  _group_commit_mode == 
TGroupCommitMode::SYNC_MODE)) {
+            std::unique_lock l(_load_block_queue->mutex);
+            if (!_load_block_queue->process_finish) {
+                _load_block_queue->internal_group_commit_finish_cv.wait(l);
             }
-            _has_filtered_rows = true;
+            st = _load_block_queue->status;
         }
+        return st;
     }
 
-    if (_block_convertor->num_filtered_rows() > 0 || _has_filtered_rows) {
-        auto cloneBlock = block->clone_without_columns();
-        auto res_block = 
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
-        for (int i = 0; i < rows; ++i) {
-            if (_block_convertor->filter_map()[i]) {
-                continue;
+    Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* 
input_block,
+                                      bool eos) {
+        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+        Status status = Status::OK();
+        auto rows = input_block->rows();
+        auto bytes = input_block->bytes();
+        if (UNLIKELY(rows == 0)) {
+            return status;
+        }
+        SCOPED_TIMER(_profile->total_time_counter());
+        // update incrementally so that FE can get the progress.
+        // the real 'num_rows_load_total' will be set when sink being closed.
+        state->update_num_rows_load_total(rows);
+        state->update_num_bytes_load_total(bytes);
+        DorisMetrics::instance()->load_rows->increment(rows);
+        DorisMetrics::instance()->load_bytes->increment(bytes);
+        DorisBvarMetrics::instance()->load_rows->increment(rows);
+        DorisBvarMetrics::instance()->load_bytes->increment(bytes);
+        std::shared_ptr<vectorized::Block> block;
+        bool has_filtered_rows = false;
+        RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
+                state, input_block, block, _output_vexpr_ctxs, rows, 
has_filtered_rows));
+        _has_filtered_rows = false;
+        if (!_vpartition->is_auto_partition()) {
+            //reuse vars for find_partition
+            _partitions.assign(rows, nullptr);
+            _filter_bitmap.Reset(rows);
+
+            for (int index = 0; index < rows; index++) {
+                _vpartition->find_partition(block.get(), index, 
_partitions[index]);
             }
-            if (_filter_bitmap.Get(i)) {
-                continue;
+            for (int row_index = 0; row_index < rows; row_index++) {
+                if (_partitions[row_index] == nullptr) [[unlikely]] {
+                    _filter_bitmap.Set(row_index, true);
+                    LOG(WARNING) << "no partition for this tuple. tuple="
+                                 << block->dump_data(row_index, 1);
+                }
+                _has_filtered_rows = true;
             }
-            res_block.add_row(block.get(), i);
         }
-        block->swap(res_block.to_block());
-    }
-    // add block into block queue
-    return _add_block(state, block);
-}
 
-Status GroupCommitBlockSink::_add_block(RuntimeState* state,
-                                        std::shared_ptr<vectorized::Block> 
block) {
-    if (block->rows() == 0) {
-        return Status::OK();
-    }
-    // the insert group commit tvf always accept nullable columns, so we 
should convert
-    // the non-nullable columns to nullable columns
-    for (int i = 0; i < block->columns(); ++i) {
-        if (block->get_by_position(i).type->is_nullable()) {
-            continue;
+        if (_block_convertor->num_filtered_rows() > 0 || _has_filtered_rows) {
+            auto cloneBlock = block->clone_without_columns();
+            auto res_block = 
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
+            for (int i = 0; i < rows; ++i) {
+                if (_block_convertor->filter_map()[i]) {
+                    continue;
+                }
+                if (_filter_bitmap.Get(i)) {
+                    continue;
+                }
+                res_block.add_row(block.get(), i);
+            }
+            block->swap(res_block.to_block());
         }
-        block->get_by_position(i).column = 
make_nullable(block->get_by_position(i).column);
-        block->get_by_position(i).type = 
make_nullable(block->get_by_position(i).type);
+        // add block into block queue
+        return _add_block(state, block);
     }
-    // add block to queue
-    auto cur_mutable_block = 
vectorized::MutableBlock::create_unique(block->clone_empty());
-    {
-        vectorized::IColumn::Selector selector;
-        for (auto i = 0; i < block->rows(); i++) {
-            selector.emplace_back(i);
+
+    Status GroupCommitBlockSink::_add_block(RuntimeState* state,
+                                            std::shared_ptr<vectorized::Block> 
block) {
+        if (block->rows() == 0) {
+            return Status::OK();
         }
-        block->append_to_block_by_selector(cur_mutable_block.get(), selector);
-    }
-    std::shared_ptr<vectorized::Block> output_block = 
vectorized::Block::create_shared();
-    output_block->swap(cur_mutable_block->to_block());
-    if (!_is_block_appended && state->num_rows_load_total() + 
state->num_rows_load_unselected() +
-                                               state->num_rows_load_filtered() 
<=
-                                       
config::group_commit_memory_rows_for_max_filter_ratio) {
-        _blocks.emplace_back(output_block);
-    } else {
-        if (!_is_block_appended) {
-            RETURN_IF_ERROR(_add_blocks(state, false));
+        // the insert group commit tvf always accept nullable columns, so we 
should convert
+        // the non-nullable columns to nullable columns
+        for (int i = 0; i < block->columns(); ++i) {
+            if (block->get_by_position(i).type->is_nullable()) {
+                continue;
+            }
+            block->get_by_position(i).column = 
make_nullable(block->get_by_position(i).column);
+            block->get_by_position(i).type = 
make_nullable(block->get_by_position(i).type);
         }
-        RETURN_IF_ERROR(_load_block_queue->add_block(
-                state, output_block, _group_commit_mode == 
TGroupCommitMode::ASYNC_MODE));
-    }
-    return Status::OK();
-}
-
-Status GroupCommitBlockSink::_add_blocks(RuntimeState* state,
-                                         bool is_blocks_contain_all_load_data) 
{
-    DCHECK(_is_block_appended == false);
-    TUniqueId load_id;
-    load_id.__set_hi(_load_id.hi);
-    load_id.__set_lo(_load_id.lo);
-    if (_load_block_queue == nullptr) {
-        if (_state->exec_env()->wal_mgr()->is_running()) {
-            
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
-                    _db_id, _table_id, _base_schema_version, load_id, 
_load_block_queue,
-                    _state->be_exec_version()));
-            if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
-                size_t pre_allocated = 
_pre_allocated(is_blocks_contain_all_load_data);
-                _group_commit_mode = 
_load_block_queue->has_enough_wal_disk_space(pre_allocated)
-                                             ? TGroupCommitMode::ASYNC_MODE
-                                             : TGroupCommitMode::SYNC_MODE;
-                if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
-                    LOG(INFO) << "Load id=" << print_id(_state->query_id())
-                              << ", use group commit label=" << 
_load_block_queue->label
-                              << " will not write wal because wal disk space 
usage reach max "
-                                 "limit. Detail info: "
-                              << 
_state->exec_env()->wal_mgr()->get_wal_dirs_info_string();
-                }
+        // add block to queue
+        auto cur_mutable_block = 
vectorized::MutableBlock::create_unique(block->clone_empty());
+        {
+            vectorized::IColumn::Selector selector;
+            for (auto i = 0; i < block->rows(); i++) {
+                selector.emplace_back(i);
             }
-            _state->set_import_label(_load_block_queue->label);
-            _state->set_wal_id(_load_block_queue->txn_id);
+            block->append_to_block_by_selector(cur_mutable_block.get(), 
selector);
+        }
+        std::shared_ptr<vectorized::Block> output_block = 
vectorized::Block::create_shared();
+        output_block->swap(cur_mutable_block->to_block());
+        if (!_is_block_appended && state->num_rows_load_total() +
+                                                   
state->num_rows_load_unselected() +
+                                                   
state->num_rows_load_filtered() <=
+                                           
config::group_commit_memory_rows_for_max_filter_ratio) {
+            _blocks.emplace_back(output_block);
         } else {
-            return Status::InternalError("be is stopping");
+            if (!_is_block_appended) {
+                RETURN_IF_ERROR(_add_blocks(state, false));
+            }
+            RETURN_IF_ERROR(_load_block_queue->add_block(
+                    state, output_block, _group_commit_mode == 
TGroupCommitMode::ASYNC_MODE));
         }
+        return Status::OK();
     }
-    for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
-        RETURN_IF_ERROR(_load_block_queue->add_block(
-                state, *it, _group_commit_mode == 
TGroupCommitMode::ASYNC_MODE));
-    }
-    _is_block_appended = true;
-    _blocks.clear();
-    
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
 {
-        if (_load_block_queue) {
-            _load_block_queue->remove_load_id(_load_id);
+
+    Status GroupCommitBlockSink::_add_blocks(RuntimeState* state,
+                                             bool 
is_blocks_contain_all_load_data) {
+        DCHECK(_is_block_appended == false);

Review Comment:
   warning: redundant boolean literal supplied to boolean operator 
[readability-simplify-boolean-expr]
   
   ```suggestion
           DCHECK(!_is_block_appended);
   ```
   



##########
be/test/util/doris_bvar_metrics_test.cpp:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/doris_bvar_metrics.h"
+
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest_pred_impl.h"
+
+namespace doris {
+
+class DorisBvarMetricsTest : public testing::Test {
+public:
+    DorisBvarMetricsTest() {}

Review Comment:
   warning: use '= default' to define a trivial default constructor 
[modernize-use-equals-default]
   
   ```suggestion
       DorisBvarMetricsTest() = default;
   ```
   



##########
be/test/util/bvar_metrics_test.cpp:
##########
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/bvar_metrics.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <unistd.h>
+
+#include <thread>
+
+#include "gtest/gtest_pred_impl.h"
+#include "testutil/test_util.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+
+class BvarMetricsTest : public testing::Test {
+public:
+    BvarMetricsTest() {}

Review Comment:
   warning: use '= default' to define a trivial default constructor 
[modernize-use-equals-default]
   
   ```suggestion
       BvarMetricsTest() = default;
   ```
   



##########
be/test/util/doris_bvar_metrics_test.cpp:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/doris_bvar_metrics.h"
+
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest_pred_impl.h"
+
+namespace doris {
+
+class DorisBvarMetricsTest : public testing::Test {
+public:
+    DorisBvarMetricsTest() {}
+    virtual ~DorisBvarMetricsTest() {}

Review Comment:
   warning: use '= default' to define a trivial destructor 
[modernize-use-equals-default]
   
   ```suggestion
       virtual ~DorisBvarMetricsTest() = default;
   ```
   



##########
be/test/util/system_bvar_metrics_test.cpp:
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/system_bvar_metrics.h"

Review Comment:
   warning: 'util/system_bvar_metrics.h' file not found [clang-diagnostic-error]
   ```cpp
   #include "util/system_bvar_metrics.h"
            ^
   ```
   



##########
be/test/util/system_bvar_metrics_test.cpp:
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/system_bvar_metrics.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include <set>
+
+#include "gtest/gtest_pred_impl.h"
+#include "testutil/test_util.h"
+#include "util/bvar_metrics.h"
+
+namespace doris {
+
+class SyetenBvarMetricsTest : public testing::Test {
+public:
+    SyetenBvarMetricsTest() {}
+    virtual ~SyetenBvarMetricsTest() {}
+};
+
+extern const char* k_ut_stat_path;
+extern const char* k_ut_diskstats_path;
+extern const char* k_ut_net_dev_path;
+extern const char* k_ut_fd_path;
+extern const char* k_ut_net_snmp_path;
+extern const char* k_ut_load_avg_path;
+extern const char* k_ut_vmstat_path;
+
+TEST_F(SyetenBvarMetricsTest, normal) {

Review Comment:
   warning: function 'TEST_F' exceeds recommended size/complexity thresholds 
[readability-function-size]
   ```cpp
   TEST_F(SyetenBvarMetricsTest, normal) {
   ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/test/util/system_bvar_metrics_test.cpp:45:** 108 lines including 
whitespace and comments (threshold 80)
   ```cpp
   TEST_F(SyetenBvarMetricsTest, normal) {
   ^
   ```
   
   </details>
   



##########
be/test/util/system_bvar_metrics_test.cpp:
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/system_bvar_metrics.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include <set>
+
+#include "gtest/gtest_pred_impl.h"
+#include "testutil/test_util.h"
+#include "util/bvar_metrics.h"
+
+namespace doris {
+
+class SyetenBvarMetricsTest : public testing::Test {
+public:
+    SyetenBvarMetricsTest() {}

Review Comment:
   warning: use '= default' to define a trivial default constructor 
[modernize-use-equals-default]
   
   ```suggestion
       SyetenBvarMetricsTest() = default;
   ```
   



##########
be/src/util/system_bvar_metrics.cpp:
##########
@@ -0,0 +1,1129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/system_bvar_metrics.h"
+
+#include <ctype.h>
+// IWYU pragma: no_include <bthread/errno.h>
+#include <errno.h> // IWYU pragma: keep
+#include <glog/logging.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <functional>
+#include <ostream>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "gutil/strings/split.h" // for string split
+#include "gutil/strtoint.h"      //  for atoi64
+#include "util/mem_info.h"
+#include "util/perf_counters.h"
+
+namespace doris {
+
+#define DECLARE_INT64_BVAR_METRIC(name, type, unit, description, group_name, 
labels, core) \
+    auto name = std::make_shared<BvarAdderMetric<int64_t>>(type, unit, #name, 
description, \
+                                                           group_name, labels, 
core);
+#define INIT_INT64_BVAR_METRIC(name, type, unit, description, group_name, 
labels, core)           \
+    name = std::make_shared<BvarAdderMetric<int64_t>>(type, unit, #name, 
description, group_name, \
+                                                      labels, core);
+
+#define INIT_DOUBLE_BVAR_METRIC(name, type, unit, description, group_name, 
labels, core)         \
+    name = std::make_shared<BvarAdderMetric<double>>(type, unit, #name, 
description, group_name, \
+                                                     labels, core);
+// /proc/stat: http://www.linuxhowtos.org/System/procstat.htm
+struct CpuBvarMetrics {
+    CpuBvarMetrics(std::shared_ptr<BvarMetricEntity> entity, std::string 
cpu_name) {
+        DECLARE_INT64_BVAR_METRIC(cpu_user, BvarMetricType::COUNTER, 
BvarMetricUnit::PERCENT, "",
+                                  "cpu", Labels({{"device", cpu_name}, 
{"mode", "user"}}), false)
+        DECLARE_INT64_BVAR_METRIC(cpu_nice, BvarMetricType::COUNTER, 
BvarMetricUnit::PERCENT, "",
+                                  "cpu", Labels({{"device", cpu_name}, 
{"mode", "nice"}}), false)
+        DECLARE_INT64_BVAR_METRIC(cpu_system, BvarMetricType::COUNTER, 
BvarMetricUnit::PERCENT, "",
+                                  "cpu", Labels({{"device", cpu_name}, 
{"mode", "system"}}), false)
+        DECLARE_INT64_BVAR_METRIC(cpu_idle, BvarMetricType::COUNTER, 
BvarMetricUnit::PERCENT, "",
+                                  "cpu", Labels({{"device", cpu_name}, 
{"mode", "idle"}}), false)
+        DECLARE_INT64_BVAR_METRIC(cpu_iowait, BvarMetricType::COUNTER, 
BvarMetricUnit::PERCENT, "",
+                                  "cpu", Labels({{"device", cpu_name}, 
{"mode", "iowait"}}), false)
+        DECLARE_INT64_BVAR_METRIC(cpu_irq, BvarMetricType::COUNTER, 
BvarMetricUnit::PERCENT, "",
+                                  "cpu", Labels({{"device", cpu_name}, 
{"mode", "irq"}}), false)
+        DECLARE_INT64_BVAR_METRIC(cpu_soft_irq, BvarMetricType::COUNTER, 
BvarMetricUnit::PERCENT,
+                                  "", "cpu", Labels({{"device", cpu_name}, 
{"mode", "soft_irq"}}),
+                                  false)
+        DECLARE_INT64_BVAR_METRIC(cpu_steal, BvarMetricType::COUNTER, 
BvarMetricUnit::PERCENT, "",
+                                  "cpu", Labels({{"device", cpu_name}, 
{"mode", "steal"}}), false)
+        DECLARE_INT64_BVAR_METRIC(cpu_guest, BvarMetricType::COUNTER, 
BvarMetricUnit::PERCENT, "",
+                                  "cpu", Labels({{"device", cpu_name}, 
{"mode", "guest"}}), false)
+        DECLARE_INT64_BVAR_METRIC(cpu_guest_nice, BvarMetricType::COUNTER, 
BvarMetricUnit::PERCENT,
+                                  "", "cpu", Labels({{"device", cpu_name}, 
{"mode", "guest_nice"}}),
+                                  false)
+        entity->register_metric("cpu_user", *cpu_user);
+        entity->register_metric("cpu_nice", *cpu_nice);
+        entity->register_metric("cpu_system", *cpu_system);
+        entity->register_metric("cpu_idle", *cpu_idle);
+        entity->register_metric("cpu_iowait", *cpu_iowait);
+        entity->register_metric("cpu_irq", *cpu_irq);
+        entity->register_metric("cpu_soft_irq", *cpu_soft_irq);
+        entity->register_metric("cpu_steal", *cpu_steal);
+        entity->register_metric("cpu_guest", *cpu_guest);
+        entity->register_metric("cpu_guest_nice", *cpu_guest_nice);
+
+        metrics.emplace_back(cpu_user);
+        metrics.emplace_back(cpu_nice);
+        metrics.emplace_back(cpu_system);
+        metrics.emplace_back(cpu_idle);
+        metrics.emplace_back(cpu_iowait);
+        metrics.emplace_back(cpu_irq);
+        metrics.emplace_back(cpu_soft_irq);
+        metrics.emplace_back(cpu_steal);
+        metrics.emplace_back(cpu_guest);
+        metrics.emplace_back(cpu_guest_nice);
+    }
+
+    static constexpr int cpu_num_metrics = 10;
+    std::vector<std::shared_ptr<BvarAdderMetric<int64_t>>> metrics;
+};
+
+struct MemoryBvarMetrics {
+    MemoryBvarMetrics(std::shared_ptr<BvarMetricEntity> entity) {
+        INIT_INT64_BVAR_METRIC(memory_allocated_bytes, BvarMetricType::GAUGE, 
BvarMetricUnit::BYTES,
+                               "", "", Labels(), false)
+        INIT_INT64_BVAR_METRIC(memory_pgpgin, BvarMetricType::GAUGE, 
BvarMetricUnit::NOUNIT, "", "",
+                               Labels(), false)
+        INIT_INT64_BVAR_METRIC(memory_pgpgout, BvarMetricType::GAUGE, 
BvarMetricUnit::NOUNIT, "",
+                               "", Labels(), false)
+        INIT_INT64_BVAR_METRIC(memory_pswpin, BvarMetricType::GAUGE, 
BvarMetricUnit::NOUNIT, "", "",
+                               Labels(), false)
+        INIT_INT64_BVAR_METRIC(memory_pswpout, BvarMetricType::GAUGE, 
BvarMetricUnit::NOUNIT, "",
+                               "", Labels(), false)
+        entity->register_metric("memory_allocated_bytes", 
*memory_allocated_bytes);
+        entity->register_metric("memory_pgpgin", *memory_pgpgin);
+        entity->register_metric("memory_pgpgout", *memory_pgpgout);
+        entity->register_metric("memory_pswpin", *memory_pswpin);
+        entity->register_metric("memory_pswpout", *memory_pswpout);
+#ifndef USE_JEMALLOC
+        INIT_INT64_BVAR_METRIC(memory_tcmalloc_allocated_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_tcmalloc_total_thread_cache_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_tcmalloc_central_cache_free_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_tcmalloc_transfer_cache_free_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_tcmalloc_thread_cache_free_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_tcmalloc_pageheap_free_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_tcmalloc_pageheap_unmapped_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        entity->register_metric("memory_tcmalloc_allocated_bytes",
+                                *memory_tcmalloc_allocated_bytes);
+        entity->register_metric("memory_tcmalloc_total_thread_cache_bytes",
+                                *memory_tcmalloc_total_thread_cache_bytes);
+        entity->register_metric("memory_tcmalloc_central_cache_free_bytes",
+                                *memory_tcmalloc_central_cache_free_bytes);
+        entity->register_metric("memory_tcmalloc_transfer_cache_free_bytes",
+                                *memory_tcmalloc_transfer_cache_free_bytes);
+        entity->register_metric("memory_tcmalloc_thread_cache_free_bytes",
+                                *memory_tcmalloc_thread_cache_free_bytes);
+        entity->register_metric("memory_tcmalloc_pageheap_free_bytes",
+                                *memory_tcmalloc_pageheap_free_bytes);
+        entity->register_metric("memory_tcmalloc_pageheap_unmapped_bytes",
+                                *memory_tcmalloc_pageheap_unmapped_bytes);
+#else
+        INIT_INT64_BVAR_METRIC(memory_jemalloc_allocated_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_jemalloc_active_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_jemalloc_metadata_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_jemalloc_resident_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_jemalloc_mapped_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_jemalloc_retained_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_jemalloc_tcache_bytes, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_jemalloc_pactive_num, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_jemalloc_pdirty_num, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_jemalloc_pmuzzy_num, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_jemalloc_dirty_purged_num, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+        INIT_INT64_BVAR_METRIC(memory_jemalloc_muzzy_purged_num, 
BvarMetricType::GAUGE,
+                               BvarMetricUnit::BYTES, "", "", Labels(), false);
+
+        entity->register_metric("memory_jemalloc_allocated_bytes",
+                                *memory_jemalloc_allocated_bytes);
+        entity->register_metric("memory_jemalloc_active_bytes", 
*memory_jemalloc_active_bytes);
+        entity->register_metric("memory_jemalloc_metadata_bytes", 
*memory_jemalloc_metadata_bytes);
+        entity->register_metric("memory_jemalloc_resident_bytes", 
*memory_jemalloc_resident_bytes);
+        entity->register_metric("memory_jemalloc_mapped_bytes", 
*memory_jemalloc_mapped_bytes);
+        entity->register_metric("memory_jemalloc_retained_bytes", 
*memory_jemalloc_retained_bytes);
+        entity->register_metric("memory_jemalloc_tcache_bytes", 
*memory_jemalloc_tcache_bytes);
+        entity->register_metric("memory_jemalloc_pactive_num", 
*memory_jemalloc_pactive_num);
+        entity->register_metric("memory_jemalloc_pdirty_num", 
*memory_jemalloc_pdirty_num);
+        entity->register_metric("memory_jemalloc_pmuzzy_num", 
*memory_jemalloc_pmuzzy_num);
+        entity->register_metric("memory_jemalloc_dirty_purged_num",
+                                *memory_jemalloc_dirty_purged_num);
+        entity->register_metric("memory_jemalloc_muzzy_purged_num",
+                                *memory_jemalloc_muzzy_purged_num);
+
+#endif
+    }
+
+    // MetricEntity* entity = nullptr;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_allocated_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_pgpgin;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_pgpgout;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_pswpin;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_pswpout;
+
+#ifndef USE_JEMALLOC
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_tcmalloc_allocated_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> 
memory_tcmalloc_total_thread_cache_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> 
memory_tcmalloc_central_cache_free_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> 
memory_tcmalloc_transfer_cache_free_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> 
memory_tcmalloc_thread_cache_free_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> 
memory_tcmalloc_pageheap_free_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> 
memory_tcmalloc_pageheap_unmapped_bytes;
+#else
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_jemalloc_allocated_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_jemalloc_active_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_jemalloc_metadata_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_jemalloc_resident_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_jemalloc_mapped_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_jemalloc_retained_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_jemalloc_tcache_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_jemalloc_pactive_num;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_jemalloc_pdirty_num;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_jemalloc_pmuzzy_num;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_jemalloc_dirty_purged_num;
+    std::shared_ptr<BvarAdderMetric<int64_t>> memory_jemalloc_muzzy_purged_num;
+#endif
+};
+
+struct DiskBvarMetrics {
+    DiskBvarMetrics(std::shared_ptr<BvarMetricEntity> entity, const 
std::string& disk_device) {
+        INIT_INT64_BVAR_METRIC(disk_reads_completed, BvarMetricType::COUNTER,
+                               BvarMetricUnit::OPERATIONS, "", "",
+                               Labels({{"device", disk_device}}), false)
+        INIT_INT64_BVAR_METRIC(disk_bytes_read, BvarMetricType::COUNTER, 
BvarMetricUnit::BYTES, "",
+                               "", Labels({{"device", disk_device}}), false)
+        INIT_INT64_BVAR_METRIC(disk_read_time_ms, BvarMetricType::COUNTER,
+                               BvarMetricUnit::MILLISECONDS, "", "",
+                               Labels({{"device", disk_device}}), false)
+        INIT_INT64_BVAR_METRIC(disk_writes_completed, BvarMetricType::COUNTER,
+                               BvarMetricUnit::OPERATIONS, "", "",
+                               Labels({{"device", disk_device}}), false)
+        INIT_INT64_BVAR_METRIC(disk_bytes_written, BvarMetricType::COUNTER, 
BvarMetricUnit::BYTES,
+                               "", "", Labels({{"device", disk_device}}), 
false);
+        INIT_INT64_BVAR_METRIC(disk_write_time_ms, BvarMetricType::COUNTER,
+                               BvarMetricUnit::MILLISECONDS, "", "",
+                               Labels({{"device", disk_device}}), false)
+        INIT_INT64_BVAR_METRIC(disk_io_time_ms, BvarMetricType::COUNTER,
+                               BvarMetricUnit::MILLISECONDS, "", "",
+                               Labels({{"device", disk_device}}), false)
+        INIT_INT64_BVAR_METRIC(disk_io_time_weigthed, BvarMetricType::COUNTER,
+                               BvarMetricUnit::MILLISECONDS, "", "",
+                               Labels({{"device", disk_device}}), false)
+        entity->register_metric("disk_reads_completed", *disk_reads_completed);
+        entity->register_metric("disk_bytes_read", *disk_bytes_read);
+        entity->register_metric("disk_read_time_ms", *disk_read_time_ms);
+        entity->register_metric("disk_writes_completed", 
*disk_writes_completed);
+        entity->register_metric("disk_bytes_written", *disk_bytes_written);
+        entity->register_metric("disk_write_time_ms", *disk_write_time_ms);
+        entity->register_metric("disk_io_time_ms", *disk_io_time_ms);
+        entity->register_metric("disk_io_time_weigthed", 
*disk_io_time_weigthed);
+    }
+
+    std::shared_ptr<BvarAdderMetric<int64_t>> disk_reads_completed;
+    std::shared_ptr<BvarAdderMetric<int64_t>> disk_bytes_read;
+    std::shared_ptr<BvarAdderMetric<int64_t>> disk_read_time_ms;
+    std::shared_ptr<BvarAdderMetric<int64_t>> disk_writes_completed;
+    std::shared_ptr<BvarAdderMetric<int64_t>> disk_bytes_written;
+    std::shared_ptr<BvarAdderMetric<int64_t>> disk_write_time_ms;
+    std::shared_ptr<BvarAdderMetric<int64_t>> disk_io_time_ms;
+    std::shared_ptr<BvarAdderMetric<int64_t>> disk_io_time_weigthed;
+};
+
+struct NetworkBvarMetrics {
+    NetworkBvarMetrics(std::shared_ptr<BvarMetricEntity> entity, const 
std::string& interface) {
+        INIT_INT64_BVAR_METRIC(network_receive_bytes, BvarMetricType::COUNTER,
+                               BvarMetricUnit::BYTES, "", "", 
Labels({{"device", interface}}),
+                               false);
+        INIT_INT64_BVAR_METRIC(network_receive_packets, 
BvarMetricType::COUNTER,
+                               BvarMetricUnit::PACKETS, "", "", 
Labels({{"device", interface}}),
+                               false);
+        INIT_INT64_BVAR_METRIC(network_send_bytes, BvarMetricType::COUNTER, 
BvarMetricUnit::BYTES,
+                               "", "", Labels({{"device", interface}}), false);
+        INIT_INT64_BVAR_METRIC(network_send_packets, BvarMetricType::COUNTER,
+                               BvarMetricUnit::PACKETS, "", "", 
Labels({{"device", interface}}),
+                               false);
+        entity->register_metric("network_receive_bytes", 
*network_receive_bytes);
+        entity->register_metric("network_receive_packets", 
*network_receive_packets);
+        entity->register_metric("network_send_bytes", *network_send_bytes);
+        entity->register_metric("network_send_packets", *network_send_packets);
+    }
+
+    std::shared_ptr<BvarAdderMetric<int64_t>> network_receive_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> network_receive_packets;
+    std::shared_ptr<BvarAdderMetric<int64_t>> network_send_bytes;
+    std::shared_ptr<BvarAdderMetric<int64_t>> network_send_packets;
+};
+
+struct FileDescriptorBvarMetrics {
+    FileDescriptorBvarMetrics(std::shared_ptr<BvarMetricEntity> entity) {
+        INIT_INT64_BVAR_METRIC(fd_num_limit, BvarMetricType::GAUGE, 
BvarMetricUnit::NOUNIT, "", "",
+                               Labels(), false);
+        INIT_INT64_BVAR_METRIC(fd_num_used, BvarMetricType::GAUGE, 
BvarMetricUnit::NOUNIT, "", "",
+                               Labels(), false);
+        entity->register_metric("fd_num_limit", *fd_num_limit);
+        entity->register_metric("fd_num_used", *fd_num_used);
+    }
+
+    std::shared_ptr<BvarAdderMetric<int64_t>> fd_num_limit;
+    std::shared_ptr<BvarAdderMetric<int64_t>> fd_num_used;
+};
+
+// metrics read from /proc/net/snmp
+struct SnmpBvarMetrics {
+    SnmpBvarMetrics(std::shared_ptr<BvarMetricEntity> entity) {
+        INIT_INT64_BVAR_METRIC(snmp_tcp_in_errs, BvarMetricType::COUNTER, 
BvarMetricUnit::NOUNIT,
+                               "The number of all problematic TCP packets 
received", "", Labels(),
+                               false)
+        INIT_INT64_BVAR_METRIC(snmp_tcp_retrans_segs, BvarMetricType::COUNTER,
+                               BvarMetricUnit::NOUNIT, "All TCP packets 
retransmitted", "",
+                               Labels(), false)
+        INIT_INT64_BVAR_METRIC(snmp_tcp_in_segs, BvarMetricType::COUNTER, 
BvarMetricUnit::NOUNIT,
+                               "All received TCP packets", "", Labels(), false)
+        INIT_INT64_BVAR_METRIC(snmp_tcp_out_segs, BvarMetricType::COUNTER, 
BvarMetricUnit::NOUNIT,
+                               "All send TCP packets with RST mark", "", 
Labels(), false)
+        entity->register_metric("snmp_tcp_in_errs", *snmp_tcp_in_errs);
+        entity->register_metric("snmp_tcp_retrans_segs", 
*snmp_tcp_retrans_segs);
+        entity->register_metric("snmp_tcp_in_segs", *snmp_tcp_in_segs);
+        entity->register_metric("snmp_tcp_out_segs", *snmp_tcp_out_segs);
+    }
+
+    std::shared_ptr<BvarAdderMetric<int64_t>> snmp_tcp_in_errs;
+    std::shared_ptr<BvarAdderMetric<int64_t>> snmp_tcp_retrans_segs;
+    std::shared_ptr<BvarAdderMetric<int64_t>> snmp_tcp_in_segs;
+    std::shared_ptr<BvarAdderMetric<int64_t>> snmp_tcp_out_segs;
+};
+
+struct LoadAverageBvarMetrics {
+    LoadAverageBvarMetrics(std::shared_ptr<BvarMetricEntity> entity) {
+        INIT_DOUBLE_BVAR_METRIC(load_average_1_minutes, BvarMetricType::GAUGE,
+                                BvarMetricUnit::NOUNIT, "", "load_average",
+                                Labels({{"mode", "1_minutes"}}), false);
+        INIT_DOUBLE_BVAR_METRIC(load_average_5_minutes, BvarMetricType::GAUGE,
+                                BvarMetricUnit::NOUNIT, "", "load_average",
+                                Labels({{"mode", "5_minutes"}}), false);
+        INIT_DOUBLE_BVAR_METRIC(load_average_15_minutes, BvarMetricType::GAUGE,
+                                BvarMetricUnit::NOUNIT, "", "load_average",
+                                Labels({{"mode", "15_minutes"}}), false);
+        entity->register_metric("load_average_1_minutes", 
*load_average_1_minutes);
+        entity->register_metric("load_average_5_minutes", 
*load_average_5_minutes);
+        entity->register_metric("load_average_15_minutes", 
*load_average_15_minutes);
+    }
+
+    std::shared_ptr<BvarAdderMetric<double>> load_average_1_minutes;
+    std::shared_ptr<BvarAdderMetric<double>> load_average_5_minutes;
+    std::shared_ptr<BvarAdderMetric<double>> load_average_15_minutes;
+};
+
+struct ProcBvarMetrics {
+    ProcBvarMetrics(std::shared_ptr<BvarMetricEntity> entity) {
+        INIT_INT64_BVAR_METRIC(proc_interrupt, BvarMetricType::COUNTER, 
BvarMetricUnit::NOUNIT, "",
+                               "proc", Labels({{"mode", "interrupt"}}), false);
+        INIT_INT64_BVAR_METRIC(proc_ctxt_switch, BvarMetricType::COUNTER, 
BvarMetricUnit::NOUNIT,
+                               "", "proc", Labels({{"mode", "ctxt_switch"}}), 
false);
+        INIT_INT64_BVAR_METRIC(proc_procs_running, BvarMetricType::COUNTER, 
BvarMetricUnit::NOUNIT,
+                               "", "proc", Labels({{"mode", 
"procs_running"}}), false);
+        INIT_INT64_BVAR_METRIC(proc_procs_blocked, BvarMetricType::COUNTER, 
BvarMetricUnit::NOUNIT,
+                               "", "proc", Labels({{"mode", 
"procs_blocked"}}), false);
+        entity->register_metric("proc_interrupt", *proc_interrupt);
+        entity->register_metric("proc_ctxt_switch", *proc_ctxt_switch);
+        entity->register_metric("proc_procs_running", *proc_procs_running);
+        entity->register_metric("proc_procs_blocked", *proc_procs_blocked);
+    }
+
+    std::shared_ptr<BvarAdderMetric<int64_t>> proc_interrupt;
+    std::shared_ptr<BvarAdderMetric<int64_t>> proc_ctxt_switch;
+    std::shared_ptr<BvarAdderMetric<int64_t>> proc_procs_running;
+    std::shared_ptr<BvarAdderMetric<int64_t>> proc_procs_blocked;
+};
+
+std::string SystemBvarMetrics::to_prometheus(const std::string& registry_name) 
const {
+    std::stringstream ss;
+    for (auto& entities : entities_map_) {
+        if (entities.second.empty()) {
+            continue;
+        }
+        int count = 0;
+        for (auto& entity : entities.second) {
+            if (!count) {
+                ss << "# TYPE " << registry_name << "_" << entity->get_name() 
<< " "
+                   << entity->get_type() << "\n";
+                count++;
+            }
+            ss << entity->to_prometheus(registry_name);
+        }
+    }
+    return ss.str();
+}
+
+SystemBvarMetrics::SystemBvarMetrics(const std::set<std::string>& disk_devices,
+                                     const std::vector<std::string>& 
network_interfaces) {
+    install_cpu_metrics();
+    install_memory_metrics();
+    install_disk_metrics(disk_devices);
+    install_net_metrics(network_interfaces);
+    install_fd_metrics();
+    install_snmp_metrics();
+    install_load_avg_metrics();
+    install_proc_metrics();
+    install_max_metrics();
+    update();
+}
+
+SystemBvarMetrics::~SystemBvarMetrics() {
+    for (auto& it : cpu_metrics_) {
+        delete it.second;
+    }
+    for (auto& it : disk_metrics_) {
+        delete it.second;
+    }
+    for (auto& it : network_metrics_) {
+        delete it.second;
+    }
+
+    if (line_ptr_ != nullptr) {
+        free(line_ptr_);
+    }
+}
+
+void SystemBvarMetrics::update() {
+    update_cpu_metrics();
+    update_memory_metrics();
+    update_disk_metrics();
+    update_net_metrics();
+    update_fd_metrics();
+    update_snmp_metrics();
+    update_load_avg_metrics();
+    update_proc_metrics();
+}
+
+void SystemBvarMetrics::install_max_metrics() {
+    auto max_entity = std::make_shared<BvarMetricEntity>("max", 
BvarMetricType::GAUGE);
+    INIT_INT64_BVAR_METRIC(max_disk_io_util_percent, BvarMetricType::GAUGE, 
BvarMetricUnit::PERCENT,
+                           "", "", Labels(), true)
+    INIT_INT64_BVAR_METRIC(max_network_send_bytes_rate, BvarMetricType::GAUGE,
+                           BvarMetricUnit::BYTES, "", "", Labels(), true)
+    INIT_INT64_BVAR_METRIC(max_network_receive_bytes_rate, 
BvarMetricType::GAUGE,
+                           BvarMetricUnit::BYTES, "", "", Labels(), true)
+    max_entity->register_metric("max_disk_io_util_percent", 
*max_disk_io_util_percent);
+    max_entity->register_metric("max_network_send_bytes_rate", 
*max_network_send_bytes_rate);
+    max_entity->register_metric("max_network_receive_bytes_rate", 
*max_network_receive_bytes_rate);
+    entities_map_["max"].push_back(max_entity);
+}
+
+void SystemBvarMetrics::install_cpu_metrics() {
+    get_cpu_name();
+    for (auto cpu_name : cpu_names_) {
+        auto cpu_entity = std::make_shared<BvarMetricEntity>("cpu", 
BvarMetricType::COUNTER);
+        CpuBvarMetrics* metrics = new CpuBvarMetrics(cpu_entity, cpu_name);
+        cpu_metrics_.emplace(cpu_name, metrics);
+        entities_map_["cpu"].push_back(cpu_entity);
+    }
+}
+
+#ifdef BE_TEST
+const char* k_ut_stat_path;
+const char* k_ut_diskstats_path;
+const char* k_ut_net_dev_path;
+const char* k_ut_fd_path;
+const char* k_ut_net_snmp_path;
+const char* k_ut_load_avg_path;
+const char* k_ut_vmstat_path;
+#endif
+
+void SystemBvarMetrics::update_cpu_metrics() {
+#ifdef BE_TEST
+    FILE* fp = fopen(k_ut_stat_path, "r");
+#else
+    FILE* fp = fopen("/proc/stat", "r");
+#endif
+    if (fp == nullptr) {
+        char buf[64];
+        LOG(WARNING) << "open /proc/stat failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+        return;
+    }
+
+    while (getline(&line_ptr_, &line_buf_size_, fp) > 0) {
+        char cpu[16];
+        int64_t values[CpuBvarMetrics::cpu_num_metrics];
+        memset(values, 0, sizeof(values));
+        int num = sscanf(line_ptr_,
+                         "%15s"
+                         " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" 
PRId64 " %" PRId64
+                         " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
+                         cpu, &values[0], &values[1], &values[2], &values[3], 
&values[4],
+                         &values[5], &values[6], &values[7], &values[8], 
&values[9]);
+        if (num < 4) {
+            continue;
+        }
+
+        std::string cpu_name(cpu);
+        auto it = cpu_metrics_.find(cpu_name);
+        if (it == cpu_metrics_.end()) {
+            continue;
+        }
+
+        for (int i = 0; i < CpuBvarMetrics::cpu_num_metrics; ++i) {
+            it->second->metrics[i]->set_value(values[i]);
+        }
+    }
+
+    if (ferror(fp) != 0) {
+        char buf[64];
+        LOG(WARNING) << "getline failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+    }
+
+    fclose(fp);
+}
+
+void SystemBvarMetrics::install_memory_metrics() {
+    auto memory_entity = std::make_shared<BvarMetricEntity>("memory", 
BvarMetricType::GAUGE);
+    memory_metrics_ = std::make_shared<MemoryBvarMetrics>(memory_entity);
+    entities_map_["memory"].push_back(memory_entity);
+}
+
+void SystemBvarMetrics::update_memory_metrics() {
+    
memory_metrics_->memory_allocated_bytes->set_value(PerfCounters::get_vm_rss());
+    get_metrics_from_proc_vmstat();
+}
+
+void SystemBvarMetrics::install_disk_metrics(const std::set<std::string>& 
disk_devices) {
+    for (auto& disk_device : disk_devices) {
+        auto disk_entity = std::make_shared<BvarMetricEntity>("disk", 
BvarMetricType::COUNTER);
+        DiskBvarMetrics* metrics = new DiskBvarMetrics(disk_entity, 
disk_device);
+        entities_map_["disk"].push_back(disk_entity);
+        disk_metrics_.emplace(disk_device, metrics);
+    }
+}
+
+void SystemBvarMetrics::update_disk_metrics() {
+#ifdef BE_TEST
+    FILE* fp = fopen(k_ut_diskstats_path, "r");
+#else
+    FILE* fp = fopen("/proc/diskstats", "r");
+#endif
+    if (fp == nullptr) {
+        char buf[64];
+        LOG(WARNING) << "open /proc/diskstats failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+        return;
+    }
+
+    // /proc/diskstats: 
https://www.kernel.org/doc/Documentation/ABI/testing/procfs-diskstats
+    // 1 - major number
+    // 2 - minor mumber
+    // 3 - device name
+    // 4 - reads completed successfully
+    // 5 - reads merged
+    // 6 - sectors read
+    // 7 - time spent reading (ms)
+    // 8 - writes completed
+    // 9 - writes merged
+    // 10 - sectors written
+    // 11 - time spent writing (ms)
+    // 12 - I/Os currently in progress
+    // 13 - time spent doing I/Os (ms)
+    // 14 - weighted time spent doing I/Os (ms)
+    // I think 1024 is enough for device name
+    int major = 0;
+    int minor = 0;
+    char device[1024];
+    int64_t values[11];
+    while (getline(&line_ptr_, &line_buf_size_, fp) > 0) {
+        memset(values, 0, sizeof(values));
+        int num = sscanf(line_ptr_,
+                         "%d %d %1023s"
+                         " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" 
PRId64 " %" PRId64
+                         " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" 
PRId64,
+                         &major, &minor, device, &values[0], &values[1], 
&values[2], &values[3],
+                         &values[4], &values[5], &values[6], &values[7], 
&values[8], &values[9],
+                         &values[10]);
+        if (num < 4) {
+            continue;
+        }
+        auto it = disk_metrics_.find(device);
+        if (it == disk_metrics_.end()) {
+            continue;
+        }
+        // update disk metrics
+        // reads_completed: 4 reads completed successfully
+        it->second->disk_reads_completed->set_value(values[0]);
+        // bytes_read: 6 sectors read * 512; 5 reads merged is ignored
+        it->second->disk_bytes_read->set_value(values[2] * 512);
+        // read_time_ms: 7 time spent reading (ms)
+        it->second->disk_read_time_ms->set_value(values[3]);
+        // writes_completed: 8 writes completed
+        it->second->disk_writes_completed->set_value(values[4]);
+        // bytes_written: 10 sectors write * 512; 9 writes merged is ignored
+        it->second->disk_bytes_written->set_value(values[6] * 512);
+        // write_time_ms: 11 time spent writing (ms)
+        it->second->disk_write_time_ms->set_value(values[7]);
+        // io_time_ms: 13 time spent doing I/Os (ms)
+        it->second->disk_io_time_ms->set_value(values[9]);
+        // io_time_weigthed: 14 - weighted time spent doing I/Os (ms)
+        it->second->disk_io_time_weigthed->set_value(values[10]);
+    }
+    if (ferror(fp) != 0) {
+        char buf[64];
+        LOG(WARNING) << "getline failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+    }
+    fclose(fp);
+}
+
+void SystemBvarMetrics::install_net_metrics(const std::vector<std::string>& 
interfaces) {
+    for (auto& interface : interfaces) {
+        auto interface_entity =
+                std::make_shared<BvarMetricEntity>("network", 
BvarMetricType::COUNTER);
+        NetworkBvarMetrics* metrics = new NetworkBvarMetrics(interface_entity, 
interface);
+        entities_map_["network"].push_back(interface_entity);
+        network_metrics_.emplace(interface, metrics);
+    }
+}
+
+void SystemBvarMetrics::update_net_metrics() {
+#ifdef BE_TEST
+    // to mock proc
+    FILE* fp = fopen(k_ut_net_dev_path, "r");
+#else
+    FILE* fp = fopen("/proc/net/dev", "r");
+#endif
+    if (fp == nullptr) {
+        char buf[64];
+        LOG(WARNING) << "open /proc/net/dev failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+        return;
+    }
+
+    // Ignore header
+    if (getline(&line_ptr_, &line_buf_size_, fp) < 0 ||
+        getline(&line_ptr_, &line_buf_size_, fp) < 0) {
+        char buf[64];
+        LOG(WARNING) << "read /proc/net/dev first two line failed, errno=" << 
errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+        fclose(fp);
+        return;
+    }
+    if (proc_net_dev_version_ == 0) {
+        if (strstr(line_ptr_, "compressed") != nullptr) {
+            proc_net_dev_version_ = 3;
+        } else if (strstr(line_ptr_, "bytes") != nullptr) {
+            proc_net_dev_version_ = 2;
+        } else {
+            proc_net_dev_version_ = 1;
+        }
+    }
+
+    while (getline(&line_ptr_, &line_buf_size_, fp) > 0) {
+        char* ptr = strrchr(line_ptr_, ':');
+        if (ptr == nullptr) {
+            continue;
+        }
+        char* start = line_ptr_;
+        while (isspace(*start)) {
+            start++;
+        }
+        std::string interface(start, ptr - start);
+        auto it = network_metrics_.find(interface);
+        if (it == network_metrics_.end()) {
+            continue;
+        }
+        ptr++;
+        int64_t receive_bytes = 0;
+        int64_t receive_packets = 0;
+        int64_t send_bytes = 0;
+        int64_t send_packets = 0;
+        switch (proc_net_dev_version_) {
+        case 3:
+            // receive: bytes packets errs drop fifo frame compressed multicast
+            // send:    bytes packets errs drop fifo colls carrier compressed
+            sscanf(ptr,
+                   " %" PRId64 " %" PRId64
+                   " %*d %*d %*d %*d %*d %*d"
+                   " %" PRId64 " %" PRId64 " %*d %*d %*d %*d %*d %*d",
+                   &receive_bytes, &receive_packets, &send_bytes, 
&send_packets);
+            break;
+        case 2:
+            // receive: bytes packets errs drop fifo frame
+            // send:    bytes packets errs drop fifo colls carrier
+            sscanf(ptr,
+                   " %" PRId64 " %" PRId64
+                   " %*d %*d %*d %*d"
+                   " %" PRId64 " %" PRId64 " %*d %*d %*d %*d %*d",
+                   &receive_bytes, &receive_packets, &send_bytes, 
&send_packets);
+            break;
+        case 1:
+            // receive: packets errs drop fifo frame
+            // send: packets errs drop fifo colls carrier
+            sscanf(ptr,
+                   " %" PRId64
+                   " %*d %*d %*d %*d"
+                   " %" PRId64 " %*d %*d %*d %*d %*d",
+                   &receive_packets, &send_packets);
+            break;
+        default:
+            break;
+        }
+        it->second->network_receive_bytes->set_value(receive_bytes);
+        it->second->network_receive_packets->set_value(receive_packets);
+        it->second->network_send_bytes->set_value(send_bytes);
+        it->second->network_send_packets->set_value(send_packets);
+    }
+    if (ferror(fp) != 0) {
+        char buf[64];
+        LOG(WARNING) << "getline failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+    }
+    fclose(fp);
+}
+
+void SystemBvarMetrics::install_fd_metrics() {
+    auto fd_entity = std::make_shared<BvarMetricEntity>("fd_num", 
BvarMetricType::GAUGE);
+    fd_metrics_ = std::make_shared<FileDescriptorBvarMetrics>(fd_entity);
+    entities_map_["fd_num"].push_back(fd_entity);
+}
+
+void SystemBvarMetrics::update_fd_metrics() {
+#ifdef BE_TEST
+    FILE* fp = fopen(k_ut_fd_path, "r");
+#else
+    FILE* fp = fopen("/proc/sys/fs/file-nr", "r");
+#endif
+    if (fp == nullptr) {
+        char buf[64];
+        LOG(WARNING) << "open /proc/sys/fs/file-nr failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+        return;
+    }
+
+    // /proc/sys/fs/file-nr: 
https://www.kernel.org/doc/Documentation/sysctl/fs.txt
+    // 1 - the number of allocated file handles
+    // 2 - the number of allocated but unused file handles
+    // 3 - the maximum number of file handles
+
+    int64_t values[3];
+    if (getline(&line_ptr_, &line_buf_size_, fp) > 0) {
+        memset(values, 0, sizeof(values));
+        int num = sscanf(line_ptr_, "%" PRId64 " %" PRId64 " %" PRId64, 
&values[0], &values[1],
+                         &values[2]);
+        if (num == 3) {
+            fd_metrics_->fd_num_limit->set_value(values[2]);
+            fd_metrics_->fd_num_used->set_value(values[0] - values[1]);
+        }
+    }
+
+    if (ferror(fp) != 0) {
+        char buf[64];
+        LOG(WARNING) << "getline failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+    }
+    fclose(fp);
+}
+
+void SystemBvarMetrics::install_snmp_metrics() {
+    auto snmp_entity = std::make_shared<BvarMetricEntity>("snmp", 
BvarMetricType::COUNTER);
+    snmp_metrics_ = std::make_shared<SnmpBvarMetrics>(snmp_entity);
+    entities_map_["snmp"].push_back(snmp_entity);
+}
+
+void SystemBvarMetrics::update_snmp_metrics() {
+#ifdef BE_TEST
+    // to mock proc
+    FILE* fp = fopen(k_ut_net_snmp_path, "r");
+#else
+    FILE* fp = fopen("/proc/net/snmp", "r");
+#endif
+    if (fp == nullptr) {
+        char buf[64];
+        LOG(WARNING) << "open /proc/net/snmp failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+        return;
+    }
+
+    // We only care about Tcp lines, so skip other lines in front of Tcp line
+    int res = 0;
+    while ((res = getline(&line_ptr_, &line_buf_size_, fp)) > 0) {
+        if (strstr(line_ptr_, "Tcp") != nullptr) {
+            break;
+        }
+    }
+    if (res <= 0) {
+        char buf[64];
+        LOG(WARNING) << "failed to skip lines of /proc/net/snmp, errno=" << 
errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+        fclose(fp);
+        return;
+    }
+
+    // parse the Tcp header
+    // Tcp: RtoAlgorithm RtoMin RtoMax MaxConn ActiveOpens PassiveOpens 
AttemptFails EstabResets CurrEstab InSegs OutSegs RetransSegs InErrs OutRsts 
InCsumErrors
+    std::vector<std::string> headers = strings::Split(line_ptr_, " ");
+    std::unordered_map<std::string, int32_t> header_map;
+    int32_t pos = 0;
+    for (auto& h : headers) {
+        header_map.emplace(h, pos++);
+    }
+
+    // read the metrics of TCP
+    if (getline(&line_ptr_, &line_buf_size_, fp) < 0) {
+        char buf[64];
+        LOG(WARNING) << "failed to skip Tcp header line of /proc/net/snmp, 
errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+        fclose(fp);
+        return;
+    }
+
+    // metric line looks like:
+    // Tcp: 1 200 120000 -1 47849374 38601877 3353843 2320314 276 1033354613 
1166025166 825439 12694 23238924 0
+    std::vector<std::string> metrics = strings::Split(line_ptr_, " ");
+    if (metrics.size() != headers.size()) {
+        LOG(WARNING) << "invalid tcp metrics line: " << line_ptr_;
+        fclose(fp);
+        return;
+    }
+    int64_t retrans_segs = atoi64(metrics[header_map["RetransSegs"]]);
+    int64_t in_errs = atoi64(metrics[header_map["InErrs"]]);
+    int64_t in_segs = atoi64(metrics[header_map["InSegs"]]);
+    int64_t out_segs = atoi64(metrics[header_map["OutSegs"]]);
+    snmp_metrics_->snmp_tcp_retrans_segs->set_value(retrans_segs);
+    snmp_metrics_->snmp_tcp_in_errs->set_value(in_errs);
+    snmp_metrics_->snmp_tcp_in_segs->set_value(in_segs);
+    snmp_metrics_->snmp_tcp_out_segs->set_value(out_segs);
+
+    if (ferror(fp) != 0) {
+        char buf[64];
+        LOG(WARNING) << "getline failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+    }
+    fclose(fp);
+}
+
+void SystemBvarMetrics::install_load_avg_metrics() {
+    auto load_average_entity =
+            std::make_shared<BvarMetricEntity>("load_average", 
BvarMetricType::COUNTER);
+    load_average_metrics_ = 
std::make_shared<LoadAverageBvarMetrics>(load_average_entity);
+    entities_map_["load_average"].push_back(load_average_entity);
+}
+
+void SystemBvarMetrics::update_load_avg_metrics() {
+#ifdef BE_TEST
+    FILE* fp = fopen(k_ut_load_avg_path, "r");
+#else
+    FILE* fp = fopen("/proc/loadavg", "r");
+#endif
+    if (fp == nullptr) {
+        char buf[64];
+        LOG(WARNING) << "open /proc/loadavg failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+        return;
+    }
+
+    double values[3];
+    if (getline(&line_ptr_, &line_buf_size_, fp) > 0) {
+        memset(values, 0, sizeof(values));
+        int num = sscanf(line_ptr_, "%lf %lf %lf", &values[0], &values[1], 
&values[2]);
+        if (num == 3) {
+            
load_average_metrics_->load_average_1_minutes->set_value(values[0]);
+            
load_average_metrics_->load_average_5_minutes->set_value(values[1]);
+            
load_average_metrics_->load_average_15_minutes->set_value(values[2]);
+        }
+    }
+
+    if (ferror(fp) != 0) {
+        char buf[64];
+        LOG(WARNING) << "getline failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+    }
+    fclose(fp);
+}
+
+void SystemBvarMetrics::install_proc_metrics() {
+    auto proc_entity = std::make_shared<BvarMetricEntity>("proc", 
BvarMetricType::COUNTER);
+    proc_metrics_ = std::make_shared<ProcBvarMetrics>(proc_entity);
+    entities_map_["load_average"].push_back(proc_entity);
+}
+
+void SystemBvarMetrics::update_proc_metrics() {
+#ifdef BE_TEST
+    FILE* fp = fopen(k_ut_stat_path, "r");
+#else
+    FILE* fp = fopen("/proc/stat", "r");
+#endif
+    if (fp == nullptr) {
+        char buf[64];
+        LOG(WARNING) << "open /proc/stat failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+        return;
+    }
+
+    uint64_t inter = 0, ctxt = 0, procs_r = 0, procs_b = 0;
+    while (getline(&line_ptr_, &line_buf_size_, fp) > 0) {
+        char* start_pos = nullptr;
+        start_pos = strstr(line_ptr_, "intr ");
+        if (start_pos) {
+            sscanf(start_pos, "intr %" PRIu64, &inter);
+            proc_metrics_->proc_interrupt->set_value(inter);
+        }
+
+        start_pos = strstr(line_ptr_, "ctxt ");
+        if (start_pos) {
+            sscanf(start_pos, "ctxt %" PRIu64, &ctxt);
+            proc_metrics_->proc_ctxt_switch->set_value(ctxt);
+        }
+
+        start_pos = strstr(line_ptr_, "procs_running ");
+        if (start_pos) {
+            sscanf(start_pos, "procs_running %" PRIu64, &procs_r);
+            proc_metrics_->proc_procs_running->set_value(procs_r);
+        }
+
+        start_pos = strstr(line_ptr_, "procs_blocked ");
+        if (start_pos) {
+            sscanf(start_pos, "procs_blocked %" PRIu64, &procs_b);
+            proc_metrics_->proc_procs_blocked->set_value(procs_b);
+        }
+    }
+
+    if (ferror(fp) != 0) {
+        char buf[64];
+        LOG(WARNING) << "getline failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+    }
+
+    fclose(fp);
+}
+
+void SystemBvarMetrics::get_metrics_from_proc_vmstat() {
+#ifdef BE_TEST
+    FILE* fp = fopen(k_ut_vmstat_path, "r");
+#else
+    FILE* fp = fopen("/proc/vmstat", "r");
+#endif
+    if (fp == nullptr) {
+        char buf[64];
+        LOG(WARNING) << "open /proc/vmstat failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+        return;
+    }
+
+    while (getline(&line_ptr_, &line_buf_size_, fp) > 0) {
+        uint64_t value;
+        char name[64];
+        int num = sscanf(line_ptr_, "%s %" PRIu64, name, &value);
+        if (num < 2) {
+            continue;
+        }
+
+        if (strcmp(name, "pgpgin") == 0) {
+            memory_metrics_->memory_pgpgin->set_value(value);
+        } else if (strcmp(name, "pgpgout") == 0) {
+            memory_metrics_->memory_pgpgout->set_value(value);
+        } else if (strcmp(name, "pswpin") == 0) {
+            memory_metrics_->memory_pswpin->set_value(value);
+        } else if (strcmp(name, "pswpout") == 0) {
+            memory_metrics_->memory_pswpout->set_value(value);
+        }
+    }
+
+    if (ferror(fp) != 0) {
+        char buf[64];
+        LOG(WARNING) << "getline failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+    }
+
+    fclose(fp);
+}
+
+void SystemBvarMetrics::get_cpu_name() {
+#ifdef BE_TEST
+    FILE* fp = fopen(k_ut_stat_path, "r");
+#else
+    FILE* fp = fopen("/proc/stat", "r");
+#endif
+    if (fp == nullptr) {
+        char buf[64];
+        LOG(WARNING) << "open /proc/stat failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+        return;
+    }
+
+    while (getline(&line_ptr_, &line_buf_size_, fp) > 0) {
+        char cpu[16];
+        char* start_pos = nullptr;
+        start_pos = strstr(line_ptr_, "cpu");
+        if (start_pos) {
+            sscanf(line_ptr_, "%15s", cpu);
+            std::string cpu_name(cpu);
+            cpu_names_.push_back(cpu_name);
+        }
+    }
+
+    if (ferror(fp) != 0) {
+        char buf[64];
+        LOG(WARNING) << "getline failed, errno=" << errno
+                     << ", message=" << strerror_r(errno, buf, 64);
+    }
+
+    fclose(fp);
+}
+
+void SystemBvarMetrics::get_disks_io_time(std::map<std::string, int64_t>* map) 
{
+    map->clear();
+    for (auto& it : disk_metrics_) {
+        map->emplace(it.first, it.second->disk_io_time_ms->get_value());
+    }
+}
+
+int64_t SystemBvarMetrics::get_max_io_util(const std::map<std::string, 
int64_t>& lst_value,
+                                           int64_t interval_sec) {
+    int64_t max = 0;
+    for (auto& it : disk_metrics_) {
+        int64_t cur = it.second->disk_io_time_ms->get_value();
+        const auto find = lst_value.find(it.first);
+        if (find == lst_value.end()) {
+            continue;
+        }
+        int64_t incr = cur - find->second;
+        if (incr > max) max = incr;
+    }
+    return max / interval_sec / 10;
+}
+
+void SystemBvarMetrics::get_network_traffic(std::map<std::string, int64_t>* 
send_map,
+                                            std::map<std::string, int64_t>* 
rcv_map) {
+    send_map->clear();
+    rcv_map->clear();
+    for (auto& it : network_metrics_) {
+        if (it.first == "lo") {
+            continue;
+        }
+        send_map->emplace(it.first, 
it.second->network_send_bytes->get_value());
+        rcv_map->emplace(it.first, 
it.second->network_receive_bytes->get_value());
+    }
+}
+
+void SystemBvarMetrics::get_max_net_traffic(const std::map<std::string, 
int64_t>& lst_send_map,

Review Comment:
   warning: method 'get_max_net_traffic' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void SystemBvarMetrics::get_max_net_traffic(const 
std::map<std::string, int64_t>& lst_send_map,
   ```
   



##########
be/test/util/doris_bvar_metrics_test.cpp:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/doris_bvar_metrics.h"

Review Comment:
   warning: 'util/doris_bvar_metrics.h' file not found [clang-diagnostic-error]
   ```cpp
   #include "util/doris_bvar_metrics.h"
            ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to