This is an automated email from the ASF dual-hosted git repository.

lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cc78fe  [Enhancement] Convert metric to Json format (#3635)
1cc78fe is described below

commit 1cc78fe69b3b07487d97592de78eefdf140bde83
Author: lichaoyong <lichaoyong...@gmail.com>
AuthorDate: Wed May 27 08:49:30 2020 +0800

    [Enhancement] Convert metric to Json format (#3635)
    
    Add a JSON format for existing metrics like this.
    ```
    {
        "tags":
        {
            "metric":"thread_pool",
            "name":"thrift-server-pool",
            "type":"active_thread_num"
        },
        "unit":"number",
        "value":3
    }
    ```
    I add a new JsonMetricVisitor to handle the transformation.
    It's not to modify existing PrometheusMetricVisitor and 
SimpleCoreMetricVisitor.
    Also I add
    1.  A unit item to indicate the metric better
    2. Cloning tablet statistics divided by database.
    3. Use white space to replace newline in audit.log
---
 be/src/exec/tablet_sink.cpp                        |   3 +
 be/src/http/action/metrics_action.cpp              |  80 ++++++-
 be/src/http/action/stream_load.cpp                 |  28 +--
 be/src/runtime/client_cache.cpp                    |   4 +-
 be/src/runtime/memory/chunk_allocator.cpp          |  12 +-
 be/src/runtime/runtime_state.h                     |  14 ++
 be/src/runtime/tmp_file_mgr.cc                     |   2 +-
 be/src/util/doris_metrics.cpp                      |  13 +-
 be/src/util/doris_metrics.h                        | 264 +++++++++++----------
 be/src/util/metrics.cpp                            |  26 +-
 be/src/util/metrics.h                              |  92 +++++--
 be/src/util/runtime_profile.cpp                    |   2 +-
 be/src/util/system_metrics.cpp                     |  55 +++--
 be/src/util/thrift_server.cpp                      |   4 +-
 be/test/http/metrics_action_test.cpp               |   8 +-
 be/test/util/doris_metrics_test.cpp                |  54 ++---
 be/test/util/new_metrics_test.cpp                  |  30 +--
 be/test/util/system_metrics_test.cpp               |  58 ++---
 .../java/org/apache/doris/analysis/InsertStmt.java |   2 +
 .../org/apache/doris/common/ThreadPoolManager.java |   3 +-
 .../common/proc/IncompleteTabletsProcNode.java     |  11 +-
 .../apache/doris/common/proc/StatisticProcDir.java |  18 +-
 .../org/apache/doris/http/rest/MetricsAction.java  |   3 +
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   3 +
 .../load/routineload/RoutineLoadTaskInfo.java      |   2 +
 .../org/apache/doris/master/ReportHandler.java     |   3 +-
 .../org/apache/doris/metric/CounterMetric.java     |   4 +-
 .../apache/doris/metric/DoubleCounterMetric.java   |   4 +-
 .../java/org/apache/doris/metric/GaugeMetric.java  |   4 +-
 .../org/apache/doris/metric/GaugeMetricImpl.java   |   4 +-
 .../org/apache/doris/metric/JsonMetricVisitor.java |  86 +++++++
 .../org/apache/doris/metric/LongCounterMetric.java |   4 +-
 .../main/java/org/apache/doris/metric/Metric.java  |  20 +-
 .../java/org/apache/doris/metric/MetricRepo.java   |  58 ++---
 .../org/apache/doris/metric/MetricVisitor.java     |   2 +
 .../doris/metric/PrometheusMetricVisitor.java      |   8 +
 .../doris/metric/SimpleCoreMetricVisitor.java      |  10 +-
 .../java/org/apache/doris/qe/ConnectProcessor.java |   4 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   2 +
 .../apache/doris/service/FrontendServiceImpl.java  |  15 +-
 .../java/org/apache/doris/task/AgentTaskQueue.java |  15 ++
 .../doris/load/loadv2/BrokerLoadJobTest.java       |   8 +
 .../org/apache/doris/load/loadv2/LoadJobTest.java  |   7 +
 43 files changed, 711 insertions(+), 338 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 4941bb3..7060a62 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -598,6 +598,9 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* 
input_batch) {
     // update incrementally so that FE can get the progress.
     // the real 'num_rows_load_total' will be set when sink being closed.
     state->update_num_rows_load_total(input_batch->num_rows());
+    state->update_num_bytes_load_total(input_batch->total_byte_size());
+    
DorisMetrics::instance()->load_rows_total.increment(input_batch->num_rows());
+    
DorisMetrics::instance()->load_bytes_total.increment(input_batch->total_byte_size());
     RowBatch* batch = input_batch;
     if (!_output_expr_ctxs.empty()) {
         SCOPED_RAW_TIMER(&_convert_batch_ns);
diff --git a/be/src/http/action/metrics_action.cpp 
b/be/src/http/action/metrics_action.cpp
index e37e2d1..b487820 100644
--- a/be/src/http/action/metrics_action.cpp
+++ b/be/src/http/action/metrics_action.cpp
@@ -17,6 +17,10 @@
 
 #include "http/action/metrics_action.h"
 
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/document.h>
+#include <rapidjson/writer.h>
 #include <string>
 
 #include "http/http_request.h"
@@ -36,7 +40,7 @@ public:
     std::string to_string() const { return _ss.str(); }
 private:
     void _visit_simple_metric(
-        const std::string& name, const MetricLabels& labels, SimpleMetric* 
metric);
+        const std::string& name, const MetricLabels& labels, Metric* metric);
 private:
     std::stringstream _ss;
 };
@@ -88,7 +92,7 @@ void PrometheusMetricsVisitor::visit(const std::string& 
prefix,
     case MetricType::COUNTER:
     case MetricType::GAUGE:
         for (auto& it : collector->metrics()) {
-            _visit_simple_metric(metric_name, it.first, (SimpleMetric*) 
it.second);
+            _visit_simple_metric(metric_name, it.first, (Metric*) it.second);
         }
         break;
     default:
@@ -97,7 +101,7 @@ void PrometheusMetricsVisitor::visit(const std::string& 
prefix,
 }
 
 void PrometheusMetricsVisitor::_visit_simple_metric(
-        const std::string& name, const MetricLabels& labels, SimpleMetric* 
metric) {
+        const std::string& name, const MetricLabels& labels, Metric* metric) {
     _ss << name;
     // labels
     if (!labels.empty()) {
@@ -138,20 +142,80 @@ void SimpleCoreMetricsVisitor::visit(const std::string& 
prefix,
     }
 
     for (auto& it : collector->metrics()) {
-        _ss << metric_name << " LONG " << ((SimpleMetric*) 
it.second)->to_string()
+        _ss << metric_name << " LONG " << ((Metric*) it.second)->to_string()
             << "\n";
     }
 }
 
+class JsonMetricsVisitor : public MetricsVisitor {
+public:
+    JsonMetricsVisitor() {
+    }
+    virtual ~JsonMetricsVisitor() {}
+    void visit(const std::string& prefix, const std::string& name,
+               MetricCollector* collector) override;
+    std::string to_string() { 
+        rapidjson::StringBuffer strBuf;
+        rapidjson::Writer<rapidjson::StringBuffer> writer(strBuf);
+        doc.Accept(writer);
+        return strBuf.GetString();
+    }
+
+private:
+    rapidjson::Document doc{rapidjson::kArrayType};
+};
+
+void JsonMetricsVisitor::visit(const std::string& prefix,
+                               const std::string& name,
+                               MetricCollector* collector) {
+    if (collector->empty() || name.empty()) {
+        return;
+    }
+
+    rapidjson::Document::AllocatorType& allocator = doc.GetAllocator();
+    switch (collector->type()) {
+    case MetricType::COUNTER:
+    case MetricType::GAUGE:
+        for (auto& it : collector->metrics()) {
+            const MetricLabels& labels = it.first;
+            Metric* metric = reinterpret_cast<Metric*>(it.second);
+            rapidjson::Value metric_obj(rapidjson::kObjectType);
+            rapidjson::Value tag_obj(rapidjson::kObjectType);
+            tag_obj.AddMember("metric", rapidjson::Value(name.c_str(), 
allocator), allocator);
+            // labels
+            if (!labels.empty()) {
+                for (auto& label : labels.labels) {
+                    tag_obj.AddMember(
+                                rapidjson::Value(label.name.c_str(), 
allocator),
+                                rapidjson::Value(label.value.c_str(), 
allocator),
+                                allocator);
+                }
+            }
+            metric_obj.AddMember("tags", tag_obj, allocator);
+            rapidjson::Value unit_val(unit_name(metric->unit()), allocator); 
+            metric_obj.AddMember("unit", unit_val, allocator);
+            metric->write_value(metric_obj, allocator);
+            doc.PushBack(metric_obj, allocator);
+        }
+        break;
+    default:
+        break;
+    }
+}
+
 void MetricsAction::handle(HttpRequest* req) {
     const std::string& type = req->param("type");
     std::string str;
-    if (type != "core") {
-        PrometheusMetricsVisitor visitor;
+    if (type == "core") {
+        SimpleCoreMetricsVisitor visitor;
+        _metrics->collect(&visitor);
+        str.assign(visitor.to_string());
+    } else if (type == "agent") {
+        JsonMetricsVisitor visitor;
         _metrics->collect(&visitor);
         str.assign(visitor.to_string());
     } else {
-        SimpleCoreMetricsVisitor visitor;
+        PrometheusMetricsVisitor visitor;
         _metrics->collect(&visitor);
         str.assign(visitor.to_string());
     }
@@ -160,4 +224,4 @@ void MetricsAction::handle(HttpRequest* req) {
     HttpChannel::send_reply(req, str);
 }
 
-}
+} // namespace doris
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 7d490fe..15979ea 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -59,10 +59,10 @@
 
 namespace doris {
 
-IntCounter k_streaming_load_requests_total;
-IntCounter k_streaming_load_bytes;
-IntCounter k_streaming_load_duration_ms;
-static IntGauge k_streaming_load_current_processing;
+METRIC_DEFINE_INT_COUNTER(streaming_load_requests_total, MetricUnit::NUMBER);
+METRIC_DEFINE_INT_COUNTER(streaming_load_bytes, MetricUnit::BYTES);
+METRIC_DEFINE_INT_COUNTER(streaming_load_duration_ms, 
MetricUnit::MILLISECONDS);
+METRIC_DEFINE_INT_GAUGE(streaming_load_current_processing, MetricUnit::NUMBER);
 
 #ifdef BE_TEST
 TStreamLoadPutResult k_stream_load_put_result;
@@ -89,13 +89,13 @@ static bool 
is_format_support_streaming(TFileFormatType::type format) {
 
 StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {
     
DorisMetrics::instance()->metrics()->register_metric("streaming_load_requests_total",
-                                            &k_streaming_load_requests_total);
+                                            &streaming_load_requests_total);
     
DorisMetrics::instance()->metrics()->register_metric("streaming_load_bytes",
-                                            &k_streaming_load_bytes);
+                                            &streaming_load_bytes);
     
DorisMetrics::instance()->metrics()->register_metric("streaming_load_duration_ms",
-                                            &k_streaming_load_duration_ms);
+                                            &streaming_load_duration_ms);
     
DorisMetrics::instance()->metrics()->register_metric("streaming_load_current_processing",
-                                            
&k_streaming_load_current_processing);
+                                            
&streaming_load_current_processing);
 }
 
 StreamLoadAction::~StreamLoadAction() {
@@ -131,10 +131,10 @@ void StreamLoadAction::handle(HttpRequest* req) {
     HttpChannel::send_reply(req, str);
 
     // update statstics
-    k_streaming_load_requests_total.increment(1);
-    k_streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000);
-    k_streaming_load_bytes.increment(ctx->receive_bytes);
-    k_streaming_load_current_processing.increment(-1);
+    streaming_load_requests_total.increment(1);
+    streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000);
+    streaming_load_bytes.increment(ctx->receive_bytes);
+    streaming_load_current_processing.increment(-1);
 }
 
 Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
@@ -164,7 +164,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
 }
 
 int StreamLoadAction::on_header(HttpRequest* req) {
-    k_streaming_load_current_processing.increment(1);
+    streaming_load_current_processing.increment(1);
 
     StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
     ctx->ref();
@@ -195,7 +195,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
         }
         auto str = ctx->to_json();
         HttpChannel::send_reply(req, str);
-        k_streaming_load_current_processing.increment(-1);
+        streaming_load_current_processing.increment(-1);
         return -1;
     }
     return 0;
diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp
index 3a46ab0..7144684 100644
--- a/be/src/runtime/client_cache.cpp
+++ b/be/src/runtime/client_cache.cpp
@@ -216,12 +216,12 @@ void ClientCacheHelper::init_metrics(MetricRegistry* 
metrics, const std::string&
     // usage, but ensures that _metrics_enabled is published.
     boost::lock_guard<boost::mutex> lock(_lock);
 
-    _used_clients.reset(new IntGauge());
+    _used_clients.reset(new IntGauge(MetricUnit::NUMBER));
     metrics->register_metric("thrift_used_clients",
                              MetricLabels().add("name", key_prefix),
                              _used_clients.get());
 
-    _opened_clients.reset(new IntGauge());
+    _opened_clients.reset(new IntGauge(MetricUnit::NUMBER));
     metrics->register_metric("thrift_opened_clients",
                              MetricLabels().add("name", key_prefix),
                              _opened_clients.get());
diff --git a/be/src/runtime/memory/chunk_allocator.cpp 
b/be/src/runtime/memory/chunk_allocator.cpp
index e331ef3..c5e68f2 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -34,12 +34,12 @@ namespace doris {
 
 ChunkAllocator* ChunkAllocator::_s_instance = nullptr;
 
-static IntCounter local_core_alloc_count;
-static IntCounter other_core_alloc_count;
-static IntCounter system_alloc_count;
-static IntCounter system_free_count;
-static IntCounter system_alloc_cost_ns;
-static IntCounter system_free_cost_ns;
+static IntCounter local_core_alloc_count(MetricUnit::NUMBER);
+static IntCounter other_core_alloc_count(MetricUnit::NUMBER);
+static IntCounter system_alloc_count(MetricUnit::NUMBER);
+static IntCounter system_free_count(MetricUnit::NUMBER);
+static IntCounter system_alloc_cost_ns(MetricUnit::NANOSECONDS);
+static IntCounter system_free_cost_ns(MetricUnit::NANOSECONDS);
 
 #ifdef BE_TEST
 static std::mutex s_mutex;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 876bcc3..c3c4873 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -389,6 +389,10 @@ public:
     void append_error_msg_to_file(const std::string& line, const std::string& 
error_msg,
         bool is_summary = false);
 
+    int64_t num_bytes_load_total() {
+        return _num_bytes_load_total.load();
+    }
+
     int64_t num_rows_load_total() {
         return _num_rows_load_total.load();
     }
@@ -413,6 +417,14 @@ public:
         _num_rows_load_total.store(num_rows);
     }
 
+    void update_num_bytes_load_total(int64_t bytes_load) {
+        _num_bytes_load_total.fetch_add(bytes_load);
+    }
+
+    void set_update_num_bytes_load_total(int64_t bytes_load) {
+        _num_bytes_load_total.store(bytes_load);
+    }
+
     void update_num_rows_load_filtered(int64_t num_rows) {
         _num_rows_load_filtered.fetch_add(num_rows);
     }
@@ -587,6 +599,8 @@ private:
     std::atomic<int64_t> _num_rows_load_unselected; // rows filtered by 
predicates
     std::atomic<int64_t> _num_print_error_rows;
 
+    std::atomic<int64_t> _num_bytes_load_total;  // total bytes read from 
source
+
     std::vector<std::string> _export_output_files;
 
     std::string _import_label;
diff --git a/be/src/runtime/tmp_file_mgr.cc b/be/src/runtime/tmp_file_mgr.cc
index 87ed68b..badc903 100644
--- a/be/src/runtime/tmp_file_mgr.cc
+++ b/be/src/runtime/tmp_file_mgr.cc
@@ -119,7 +119,7 @@ Status TmpFileMgr::init_custom(
     }
 
     DCHECK(metrics != NULL);
-    _num_active_scratch_dirs_metric.reset(new IntGauge());
+    _num_active_scratch_dirs_metric.reset(new IntGauge(MetricUnit::NUMBER));
     metrics->register_metric("active_scratch_dirs", 
_num_active_scratch_dirs_metric.get());
     //_active_scratch_dirs_metric = metrics->register_metric(new 
SetMetric<std::string>(
     //        TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST,
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 36c3e3b..2c466e9 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -150,6 +150,11 @@ DorisMetrics::DorisMetrics() : _name("doris_be"), 
_hook_name("doris_metrics"), _
     _metrics.register_metric(
         "stream_load", MetricLabels().add("type", "load_rows"),
         &stream_load_rows_total);
+    _metrics.register_metric(
+        "load", MetricLabels().add("type", "receive_bytes"),
+        &stream_receive_bytes_total);
+    _metrics.register_metric("load_rows", &load_rows_total);
+    _metrics.register_metric("load_bytes", &load_bytes_total);
 
     // Gauge
     REGISTER_DORIS_METRIC(memory_pool_bytes_total);
@@ -188,13 +193,13 @@ void DorisMetrics::initialize(
         const std::vector<std::string>& network_interfaces) {
     // disk usage
     for (auto& path : paths) {
-        IntGauge* gauge = disks_total_capacity.set_key(path);
+        IntGauge* gauge = disks_total_capacity.set_key(path, 
MetricUnit::BYTES);
         _metrics.register_metric("disks_total_capacity", 
MetricLabels().add("path", path), gauge);
-        gauge = disks_avail_capacity.set_key(path);
+        gauge = disks_avail_capacity.set_key(path, MetricUnit::BYTES);
         _metrics.register_metric("disks_avail_capacity", 
MetricLabels().add("path", path), gauge);
-        gauge = disks_data_used_capacity.set_key(path);
+        gauge = disks_data_used_capacity.set_key(path, MetricUnit::BYTES);
         _metrics.register_metric("disks_data_used_capacity", 
MetricLabels().add("path", path), gauge);
-        gauge = disks_state.set_key(path);
+        gauge = disks_state.set_key(path, MetricUnit::BYTES);
         _metrics.register_metric("disks_state", MetricLabels().add("path", 
path), gauge);
     }
 
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 7a3c0bc..e6775b5 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -37,8 +37,8 @@ public:
         }
     }
 
-    IntGauge* set_key(const std::string& key) {
-        metrics.emplace(key, IntGauge());
+    IntGauge* set_key(const std::string& key, const MetricUnit unit) {
+        metrics.emplace(key, IntGauge(unit));
         return &metrics.find(key)->second;
     }
 
@@ -54,139 +54,141 @@ private:
 
 class DorisMetrics {
 public:
-    // counters
-    IntCounter fragment_requests_total;
-    IntCounter fragment_request_duration_us;
-    IntCounter http_requests_total;
-    IntCounter http_request_duration_us;
-    IntCounter http_request_send_bytes;
-    IntCounter query_scan_bytes;
-    IntCounter query_scan_rows;
-    IntCounter ranges_processed_total;
-    IntCounter push_requests_success_total;
-    IntCounter push_requests_fail_total;
-    IntCounter push_request_duration_us;
-    IntCounter push_request_write_bytes;
-    IntCounter push_request_write_rows;
-    IntCounter create_tablet_requests_total;
-    IntCounter create_tablet_requests_failed;
-    IntCounter drop_tablet_requests_total;
-
-    IntCounter report_all_tablets_requests_total;
-    IntCounter report_all_tablets_requests_failed;
-    IntCounter report_tablet_requests_total;
-    IntCounter report_tablet_requests_failed;
-    IntCounter report_disk_requests_total;
-    IntCounter report_disk_requests_failed;
-    IntCounter report_task_requests_total;
-    IntCounter report_task_requests_failed;
-
-    IntCounter schema_change_requests_total;
-    IntCounter schema_change_requests_failed;
-    IntCounter create_rollup_requests_total;
-    IntCounter create_rollup_requests_failed;
-    IntCounter storage_migrate_requests_total;
-    IntCounter delete_requests_total;
-    IntCounter delete_requests_failed;
-    IntCounter clone_requests_total;
-    IntCounter clone_requests_failed;
-
-    IntCounter finish_task_requests_total;
-    IntCounter finish_task_requests_failed;
-
-    IntCounter base_compaction_request_total;
-    IntCounter base_compaction_request_failed;
-    IntCounter cumulative_compaction_request_total;
-    IntCounter cumulative_compaction_request_failed;
-
-    IntCounter base_compaction_deltas_total;
-    IntCounter base_compaction_bytes_total;
-    IntCounter cumulative_compaction_deltas_total;
-    IntCounter cumulative_compaction_bytes_total;
-
-    IntCounter publish_task_request_total;
-    IntCounter publish_task_failed_total;
-
-    IntCounter meta_write_request_total;
-    IntCounter meta_write_request_duration_us;
-    IntCounter meta_read_request_total;
-    IntCounter meta_read_request_duration_us;
-
-    // Counters for segment_v2
-    // -----------------------
-    // total number of segments read
-    IntCounter segment_read_total;
-    // total number of rows in queried segments (before index pruning)
-    IntCounter segment_row_total;
-    // total number of rows selected by short key index
-    IntCounter segment_rows_by_short_key;
-    // total number of rows selected by zone map index
-    IntCounter segment_rows_read_by_zone_map;
-
-    IntCounter txn_begin_request_total;
-    IntCounter txn_commit_request_total;
-    IntCounter txn_rollback_request_total;
-    IntCounter txn_exec_plan_total;
-    IntCounter stream_receive_bytes_total;
-    IntCounter stream_load_rows_total;
-
-    IntCounter memtable_flush_total;
-    IntCounter memtable_flush_duration_us;
-
-    // Gauges
-    IntGauge memory_pool_bytes_total;
-    IntGauge process_thread_num;
-    IntGauge process_fd_num_used;
-    IntGauge process_fd_num_limit_soft;
-    IntGauge process_fd_num_limit_hard;
+       // counters
+       METRIC_DEFINE_INT_COUNTER(fragment_requests_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(fragment_request_duration_us, 
MetricUnit::MICROSECONDS);
+       METRIC_DEFINE_INT_COUNTER(http_requests_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(http_request_duration_us, 
MetricUnit::MICROSECONDS);
+       METRIC_DEFINE_INT_COUNTER(http_request_send_bytes, MetricUnit::BYTES);
+       METRIC_DEFINE_INT_COUNTER(query_scan_bytes, MetricUnit::BYTES);
+       METRIC_DEFINE_INT_COUNTER(query_scan_rows, MetricUnit::BYTES);
+       METRIC_DEFINE_INT_COUNTER(ranges_processed_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(push_requests_success_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(push_requests_fail_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(push_request_duration_us, 
MetricUnit::MICROSECONDS);
+       METRIC_DEFINE_INT_COUNTER(push_request_write_bytes, MetricUnit::BYTES);
+       METRIC_DEFINE_INT_COUNTER(push_request_write_rows, MetricUnit::ROWS);
+       METRIC_DEFINE_INT_COUNTER(create_tablet_requests_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(create_tablet_requests_failed, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(drop_tablet_requests_total, 
MetricUnit::NUMBER);
+       
+       METRIC_DEFINE_INT_COUNTER(report_all_tablets_requests_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(report_all_tablets_requests_failed, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(report_tablet_requests_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(report_tablet_requests_failed, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(report_disk_requests_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(report_disk_requests_failed, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(report_task_requests_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(report_task_requests_failed, 
MetricUnit::NUMBER);
+       
+       METRIC_DEFINE_INT_COUNTER(schema_change_requests_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(schema_change_requests_failed, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(create_rollup_requests_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(create_rollup_requests_failed, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(storage_migrate_requests_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(delete_requests_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(delete_requests_failed, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(clone_requests_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(clone_requests_failed, MetricUnit::NUMBER);
+       
+       METRIC_DEFINE_INT_COUNTER(finish_task_requests_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(finish_task_requests_failed, 
MetricUnit::NUMBER);
+       
+       METRIC_DEFINE_INT_COUNTER(base_compaction_request_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(base_compaction_request_failed, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(cumulative_compaction_request_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(cumulative_compaction_request_failed, 
MetricUnit::NUMBER);
+       
+       METRIC_DEFINE_INT_COUNTER(base_compaction_deltas_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(base_compaction_bytes_total, 
MetricUnit::BYTES);
+       METRIC_DEFINE_INT_COUNTER(cumulative_compaction_deltas_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(cumulative_compaction_bytes_total, 
MetricUnit::BYTES);
+       
+       METRIC_DEFINE_INT_COUNTER(publish_task_request_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(publish_task_failed_total, 
MetricUnit::NUMBER);
+       
+       METRIC_DEFINE_INT_COUNTER(meta_write_request_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(meta_write_request_duration_us, 
MetricUnit::MICROSECONDS);
+       METRIC_DEFINE_INT_COUNTER(meta_read_request_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(meta_read_request_duration_us, 
MetricUnit::MICROSECONDS);
+       
+       // Counters for segment_v2
+       // -----------------------
+       // total number of segments read
+       METRIC_DEFINE_INT_COUNTER(segment_read_total, MetricUnit::NUMBER);
+       // total number of rows in queried segments (before index pruning)
+       METRIC_DEFINE_INT_COUNTER(segment_row_total, MetricUnit::ROWS);
+       // total number of rows selected by short key index
+       METRIC_DEFINE_INT_COUNTER(segment_rows_by_short_key, MetricUnit::ROWS);
+       // total number of rows selected by zone map index
+       METRIC_DEFINE_INT_COUNTER(segment_rows_read_by_zone_map, 
MetricUnit::ROWS);
+       
+       METRIC_DEFINE_INT_COUNTER(txn_begin_request_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(txn_commit_request_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(txn_rollback_request_total, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(txn_exec_plan_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(stream_receive_bytes_total, 
MetricUnit::BYTES);
+       METRIC_DEFINE_INT_COUNTER(stream_load_rows_total, MetricUnit::ROWS);
+       METRIC_DEFINE_INT_COUNTER(load_rows_total, MetricUnit::ROWS);
+       METRIC_DEFINE_INT_COUNTER(load_bytes_total, MetricUnit::BYTES);
+       
+       METRIC_DEFINE_INT_COUNTER(memtable_flush_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(memtable_flush_duration_us, 
MetricUnit::MICROSECONDS);
+       
+       // Gauges
+       METRIC_DEFINE_INT_GAUGE(memory_pool_bytes_total, MetricUnit::BYTES);
+       METRIC_DEFINE_INT_GAUGE(process_thread_num, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_GAUGE(process_fd_num_used, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_GAUGE(process_fd_num_limit_soft, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_GAUGE(process_fd_num_limit_hard, MetricUnit::NUMBER);
     IntGaugeMetricsMap disks_total_capacity;
     IntGaugeMetricsMap disks_avail_capacity;
     IntGaugeMetricsMap disks_data_used_capacity;
     IntGaugeMetricsMap disks_state;
-
-    // the max compaction score of all tablets.
-    // Record base and cumulative scores separately, because
-    // we need to get the larger of the two.
-    IntGauge tablet_cumulative_max_compaction_score;
-    IntGauge tablet_base_max_compaction_score;
-
-    // The following metrics will be calculated
-    // by metric calculator
-    IntGauge push_request_write_bytes_per_second;
-    IntGauge query_scan_bytes_per_second;
-    IntGauge max_disk_io_util_percent;
-    IntGauge max_network_send_bytes_rate;
-    IntGauge max_network_receive_bytes_rate;
-
-    // Metrics related with BlockManager
-    IntCounter readable_blocks_total;
-    IntCounter writable_blocks_total;
-    IntCounter blocks_created_total;
-    IntCounter blocks_deleted_total;
-    IntCounter bytes_read_total;
-    IntCounter bytes_written_total;
-    IntCounter disk_sync_total;
-    IntGauge blocks_open_reading;
-    IntGauge blocks_open_writing;
-
-    IntCounter blocks_push_remote_duration_us;
-
-    // Size of some global containers
-    UIntGauge rowset_count_generated_and_in_use;
-    UIntGauge unused_rowsets_count;
-    UIntGauge broker_count;
-    UIntGauge data_stream_receiver_count;
-    UIntGauge fragment_endpoint_count;
-    UIntGauge active_scan_context_count;
-    UIntGauge plan_fragment_count;
-    UIntGauge load_channel_count;
-    UIntGauge result_buffer_block_count;
-    UIntGauge result_block_queue_count;
-    UIntGauge routine_load_task_count;
-    UIntGauge small_file_cache_count;
-    UIntGauge stream_load_pipe_count;
-    UIntGauge brpc_endpoint_stub_count;
-    UIntGauge tablet_writer_count;
+       
+       // the max compaction score of all tablets.
+       // Record base and cumulative scores separately, because
+       // we need to get the larger of the two.
+       METRIC_DEFINE_INT_GAUGE(tablet_cumulative_max_compaction_score, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_GAUGE(tablet_base_max_compaction_score, 
MetricUnit::NUMBER);
+       
+       // The following metrics will be calculated
+       // by metric calculator
+       METRIC_DEFINE_INT_GAUGE(push_request_write_bytes_per_second, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_GAUGE(query_scan_bytes_per_second, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_GAUGE(max_disk_io_util_percent, MetricUnit::PERCENT);
+       METRIC_DEFINE_INT_GAUGE(max_network_send_bytes_rate, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_GAUGE(max_network_receive_bytes_rate, 
MetricUnit::NUMBER);
+       
+       // Metrics related with BlockManager
+       METRIC_DEFINE_INT_COUNTER(readable_blocks_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(writable_blocks_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(blocks_created_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(blocks_deleted_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_COUNTER(bytes_read_total, MetricUnit::BYTES);
+       METRIC_DEFINE_INT_COUNTER(bytes_written_total, MetricUnit::BYTES);
+       METRIC_DEFINE_INT_COUNTER(disk_sync_total, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_GAUGE(blocks_open_reading, MetricUnit::NUMBER);
+       METRIC_DEFINE_INT_GAUGE(blocks_open_writing, MetricUnit::NUMBER);
+       
+       METRIC_DEFINE_INT_COUNTER(blocks_push_remote_duration_us, 
MetricUnit::MICROSECONDS);
+       
+       // Size of some global containers
+       METRIC_DEFINE_UINT_GAUGE(rowset_count_generated_and_in_use, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(unused_rowsets_count, MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(broker_count, MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(data_stream_receiver_count, 
MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(fragment_endpoint_count, MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(active_scan_context_count, MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(plan_fragment_count, MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(load_channel_count, MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(result_buffer_block_count, MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(result_block_queue_count, MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(routine_load_task_count, MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(small_file_cache_count, MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(stream_load_pipe_count, MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(brpc_endpoint_stub_count, MetricUnit::NUMBER);
+       METRIC_DEFINE_UINT_GAUGE(tablet_writer_count, MetricUnit::NUMBER);
 
     static DorisMetrics* instance() {
         static DorisMetrics instance;
diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp
index 3b37353..cbd5900 100644
--- a/be/src/util/metrics.cpp
+++ b/be/src/util/metrics.cpp
@@ -45,6 +45,29 @@ std::ostream& operator<<(std::ostream& os, MetricType type) {
     return os;
 }
 
+const char* unit_name(MetricUnit unit) {
+    switch (unit) {
+    case MetricUnit::NANOSECONDS:
+        return "nanoseconds";
+    case MetricUnit::MICROSECONDS:
+        return "microseconds";
+    case MetricUnit::MILLISECONDS:
+        return "milliseconds";
+    case MetricUnit::SECONDS:
+        return "seconds";
+    case MetricUnit::BYTES:
+        return "bytes";
+    case MetricUnit::ROWS:
+        return "rows";
+    case MetricUnit::NUMBER:
+        return "number";
+    case MetricUnit::PERCENT:
+        return "percent";
+    default:
+        return "nounit";
+    }
+}
+
 void Metric::hide() {
     if (_registry == nullptr) {
         return;
@@ -56,8 +79,9 @@ void Metric::hide() {
 bool MetricCollector::add_metic(const MetricLabels& labels, Metric* metric) {
     if (empty()) {
         _type = metric->type();
+        _unit = metric->unit();
     } else {
-        if (metric->type() != _type) {
+        if (metric->type() != _type || metric->unit() != _unit) {
             return false;
         }
     }
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 331c5c3..46450c4 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -26,12 +26,17 @@
 #include <mutex>
 #include <iomanip>
 
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/document.h>
+
 #include "common/config.h"
 #include "util/spinlock.h"
 #include "util/core_local.h"
 
 namespace doris {
 
+namespace rj = RAPIDJSON_NAMESPACE;
+
 class MetricRegistry;
 
 enum class MetricType {
@@ -42,33 +47,49 @@ enum class MetricType {
     UNTYPED
 };
 
+enum class MetricUnit {
+    NANOSECONDS,
+    MICROSECONDS,
+    MILLISECONDS,
+    SECONDS,
+    BYTES,
+    ROWS,
+    NUMBER,
+    PERCENT,
+    NOUNIT
+};
+
 std::ostream& operator<<(std::ostream& os, MetricType type);
+const char* unit_name(MetricUnit unit);
 
 class Metric {
 public:
-    Metric(MetricType type) :_type(type), _registry(nullptr) { }
+    Metric(MetricType type, MetricUnit unit)
+      : _type(type),
+        _unit(unit),
+        _registry(nullptr) {}
     virtual ~Metric() { hide(); }
+    virtual std::string to_string() const = 0;
     MetricType type() const { return _type; }
+    MetricUnit unit() const { return _unit; }
     void hide();
+    virtual void write_value(rj::Value& metric_obj,
+                             rj::Document::AllocatorType& allocator) = 0;
 private:
     friend class MetricRegistry;
 
-    MetricType _type;
+    MetricType _type = MetricType::UNTYPED;
+    MetricUnit _unit = MetricUnit::NOUNIT;
     MetricRegistry* _registry;
 };
 
-class SimpleMetric : public Metric {
-public:
-    SimpleMetric(MetricType type) :Metric(type) { }
-    virtual ~SimpleMetric() { }
-    virtual std::string to_string() const = 0;
-};
-
 // Metric that only can increment
 template<typename T>
-class LockSimpleMetric : public SimpleMetric {
+class LockSimpleMetric : public Metric {
 public:
-    LockSimpleMetric(MetricType type) :SimpleMetric(type), _value(T()) { }
+    LockSimpleMetric(MetricType type, MetricUnit unit)
+      : Metric(type, unit),
+        _value(T()) {}
     virtual ~LockSimpleMetric() { }
 
     std::string to_string() const override {
@@ -76,6 +97,11 @@ public:
         ss << value();
         return ss.str();
     }
+
+    void write_value(rj::Value& metric_obj,
+                     rj::Document::AllocatorType& allocator) override {
+        metric_obj.AddMember("value", rj::Value(value()), allocator);
+    }
     
     T value() const {
         std::lock_guard<SpinLock> l(_lock);
@@ -103,9 +129,12 @@ protected:
 };
 
 template<typename T>
-class CoreLocalCounter : public SimpleMetric {
+class CoreLocalCounter : public Metric {
 public:
-    CoreLocalCounter() :SimpleMetric(MetricType::COUNTER), _value() { }
+    CoreLocalCounter(MetricUnit unit)
+      : Metric(MetricType::COUNTER, unit), 
+        _value() {}
+
     virtual ~CoreLocalCounter() { }
 
     std::string to_string() const override {
@@ -113,6 +142,11 @@ public:
         ss << value();
         return ss.str();
     }
+
+    void write_value(rj::Value& metric_obj,
+                     rj::Document::AllocatorType& allocator) override {
+        metric_obj.AddMember("value", rj::Value(value()), allocator);
+    }
     
     T value() const {
         T sum = 0;
@@ -132,7 +166,8 @@ protected:
 template<typename T>
 class LockCounter : public LockSimpleMetric<T> {
 public:
-    LockCounter() :LockSimpleMetric<T>(MetricType::COUNTER) { }
+    LockCounter(MetricUnit unit)
+      : LockSimpleMetric<T>(MetricType::COUNTER, unit) {}
     virtual ~LockCounter() { }
 };
 
@@ -140,7 +175,8 @@ public:
 template<typename T>
 class LockGauge : public LockSimpleMetric<T> {
 public:
-    LockGauge() :LockSimpleMetric<T>(MetricType::GAUGE) { }
+    LockGauge(MetricUnit unit)
+      : LockSimpleMetric<T>(MetricType::GAUGE, unit) {}
     virtual ~LockGauge() { }
 };
 
@@ -274,8 +310,10 @@ public:
         return _metrics;
     }
     MetricType type() const { return _type; }
+    MetricUnit unit() const { return _unit; }
 private:
     MetricType _type = MetricType::UNTYPED;
+    MetricUnit _unit = MetricUnit::NOUNIT;
     std::map<MetricLabels, Metric*> _metrics;
 };
 
@@ -343,4 +381,26 @@ using IntGauge = LockGauge<int64_t>;
 using UIntGauge = LockGauge<uint64_t>;
 using DoubleGauge = LockGauge<double>;
 
-}
+} // namespace doris
+
+// Convenience macros to metric
+#define METRIC_DEFINE_INT_COUNTER(metric_name, unit)        \
+    doris::IntCounter metric_name{unit}
+
+#define METRIC_DEFINE_INT_LOCK_COUNTER(metric_name, unit)   \
+    doris::IntLockCounter metric_name{unit}
+
+#define METRIC_DEFINE_UINT_COUNTER(metric_name, unit)       \
+    doris::UIntCounter metric_name{unit}
+
+#define METRIC_DEFINE_DOUBLE_COUNTER(metric_name, unit)     \
+    doris::DoubleCounter metric_name{unit}
+
+#define METRIC_DEFINE_INT_GAUGE(metric_name, unit)          \
+    doris::IntGauge metric_name{unit}
+
+#define METRIC_DEFINE_UINT_GAUGE(metric_name, unit)         \
+    doris::UIntGauge metric_name{unit}
+
+#define METRIC_DEFINE_DOUBLE_GAUGE(metric_name, unit)       \
+    doris::DoubleGauge metric_name{unit}
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index 7b69153..07f4658 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -529,7 +529,7 @@ void RuntimeProfile::pretty_print(std::ostream* s, const 
std::string& prefix) co
     {
         boost::lock_guard<boost::mutex> l(_info_strings_lock);
         BOOST_FOREACH (const std::string& key, _info_strings_display_order) {
-            stream << prefix << "  " << key << ": " << 
_info_strings.find(key)->second << std::endl;
+            stream << prefix << "   - " << key << ": " << 
_info_strings.find(key)->second << std::endl;
         }
     }
 
diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp
index ee43b20..26358e8 100644
--- a/be/src/util/system_metrics.cpp
+++ b/be/src/util/system_metrics.cpp
@@ -28,40 +28,47 @@ const char* SystemMetrics::_s_hook_name = "system_metrics";
 
 // /proc/stat: http://www.linuxhowtos.org/System/procstat.htm
 struct CpuMetrics {
-    static constexpr int k_num_metrics = 10;
-    static const char* k_names[k_num_metrics];
-    IntLockCounter metrics[k_num_metrics];
+    static constexpr int cpu_num_metrics = 10;
+    IntLockCounter metrics[cpu_num_metrics] = {
+        {MetricUnit::PERCENT}, {MetricUnit::PERCENT},
+        {MetricUnit::PERCENT}, {MetricUnit::PERCENT},
+        {MetricUnit::PERCENT}, {MetricUnit::PERCENT},
+        {MetricUnit::PERCENT}, {MetricUnit::PERCENT},
+        {MetricUnit::PERCENT}, {MetricUnit::PERCENT}
+    };
+    static const char* cpu_metrics[cpu_num_metrics];
 };
 
-const char* CpuMetrics::k_names[] = {
+const char* CpuMetrics::cpu_metrics[] = {
     "user", "nice", "system", "idle", "iowait",
-    "irq", "soft_irq", "steal", "guest", "guest_nice"};
+    "irq", "soft_irq", "steal", "guest", "guest_nice"
+};
 
 struct MemoryMetrics {
-    IntGauge allocated_bytes;
+    METRIC_DEFINE_INT_GAUGE(allocated_bytes, MetricUnit::BYTES);
 };
 
 struct DiskMetrics {
-    IntLockCounter reads_completed;
-    IntLockCounter bytes_read;
-    IntLockCounter read_time_ms;
-    IntLockCounter writes_completed;
-    IntLockCounter bytes_written;
-    IntLockCounter write_time_ms;
-    IntLockCounter io_time_ms;
-    IntLockCounter io_time_weigthed;
+    METRIC_DEFINE_INT_LOCK_COUNTER(reads_completed, MetricUnit::NUMBER);
+    METRIC_DEFINE_INT_LOCK_COUNTER(bytes_read, MetricUnit::BYTES);
+    METRIC_DEFINE_INT_LOCK_COUNTER(read_time_ms, MetricUnit::MILLISECONDS);
+    METRIC_DEFINE_INT_LOCK_COUNTER(writes_completed, MetricUnit::NUMBER);
+    METRIC_DEFINE_INT_LOCK_COUNTER(bytes_written, MetricUnit::BYTES);
+    METRIC_DEFINE_INT_LOCK_COUNTER(write_time_ms, MetricUnit::MILLISECONDS);
+    METRIC_DEFINE_INT_LOCK_COUNTER(io_time_ms, MetricUnit::MILLISECONDS);
+    METRIC_DEFINE_INT_LOCK_COUNTER(io_time_weigthed, MetricUnit::MILLISECONDS);
 };
 
 struct NetMetrics {
-    IntLockCounter receive_bytes;
-    IntLockCounter receive_packets;
-    IntLockCounter send_bytes;
-    IntLockCounter send_packets;
+    METRIC_DEFINE_INT_LOCK_COUNTER(receive_bytes, MetricUnit::BYTES);
+    METRIC_DEFINE_INT_LOCK_COUNTER(receive_packets, MetricUnit::NUMBER);
+    METRIC_DEFINE_INT_LOCK_COUNTER(send_bytes, MetricUnit::BYTES);
+    METRIC_DEFINE_INT_LOCK_COUNTER(send_packets, MetricUnit::NUMBER);
 };
 
 struct FileDescriptorMetrics {
-    IntGauge fd_num_limit;
-    IntGauge fd_num_used;
+    METRIC_DEFINE_INT_GAUGE(fd_num_limit, MetricUnit::NUMBER);
+    METRIC_DEFINE_INT_GAUGE(fd_num_used, MetricUnit::NUMBER);
 };
 
 SystemMetrics::SystemMetrics() {
@@ -110,9 +117,9 @@ void SystemMetrics::update() {
 void SystemMetrics::_install_cpu_metrics(MetricRegistry* registry) {
     _cpu_total.reset(new CpuMetrics());
 
-    for (int i = 0; i < CpuMetrics::k_num_metrics; ++i) {
+    for (int i = 0; i < CpuMetrics::cpu_num_metrics; ++i) {
         registry->register_metric("cpu",
-                                  MetricLabels().add("mode", 
CpuMetrics::k_names[i]),
+                                  MetricLabels().add("mode", 
CpuMetrics::cpu_metrics[i]),
                                   &_cpu_total->metrics[i]);
     }
 }
@@ -146,7 +153,7 @@ void SystemMetrics::_update_cpu_metrics() {
     }
 
     char cpu[16];
-    int64_t values[CpuMetrics::k_num_metrics];
+    int64_t values[CpuMetrics::cpu_num_metrics];
     memset(values, 0, sizeof(values));
     sscanf(_line_ptr, "%15s"
            " %" PRId64 " %" PRId64 " %" PRId64
@@ -159,7 +166,7 @@ void SystemMetrics::_update_cpu_metrics() {
            &values[6], &values[7], &values[8],
            &values[9]);
 
-    for (int i = 0; i < CpuMetrics::k_num_metrics; ++i) {
+    for (int i = 0; i < CpuMetrics::cpu_num_metrics; ++i) {
         _cpu_total->metrics[i].set_value(values[i]);
     }
 
diff --git a/be/src/util/thrift_server.cpp b/be/src/util/thrift_server.cpp
index c9b69d8..9985732 100644
--- a/be/src/util/thrift_server.cpp
+++ b/be/src/util/thrift_server.cpp
@@ -276,12 +276,12 @@ ThriftServer::ThriftServer(
             _session_handler(NULL) {
     if (metrics != NULL) {
         _metrics_enabled = true;
-        _current_connections.reset(new IntGauge());
+        _current_connections.reset(new IntGauge(MetricUnit::NUMBER));
         metrics->register_metric("thrift_current_connections", 
                                  MetricLabels().add("name", name),
                                  _current_connections.get());
 
-        _connections_total.reset(new IntCounter());
+        _connections_total.reset(new IntCounter(MetricUnit::NUMBER));
         metrics->register_metric("thrift_connections_total",
                                  MetricLabels().add("name", name),
                                  _connections_total.get());
diff --git a/be/test/http/metrics_action_test.cpp 
b/be/test/http/metrics_action_test.cpp
index 4c63761..6bbab74 100644
--- a/be/test/http/metrics_action_test.cpp
+++ b/be/test/http/metrics_action_test.cpp
@@ -53,10 +53,10 @@ private:
 
 TEST_F(MetricsActionTest, prometheus_output) {
     MetricRegistry registry("test");
-    IntGauge cpu_idle;
+    IntGauge cpu_idle(MetricUnit::PERCENT);
     cpu_idle.set_value(50);
     registry.register_metric("cpu_idle", &cpu_idle);
-    IntCounter put_requests_total;
+    IntCounter put_requests_total(MetricUnit::NUMBER);
     put_requests_total.increment(2345);
     registry.register_metric("requests_total",
                              MetricLabels().add("type", "put").add("path", 
"/sports"),
@@ -73,7 +73,7 @@ TEST_F(MetricsActionTest, prometheus_output) {
 
 TEST_F(MetricsActionTest, prometheus_no_prefix) {
     MetricRegistry registry("");
-    IntGauge cpu_idle;
+    IntGauge cpu_idle(MetricUnit::PERCENT);
     cpu_idle.set_value(50);
     registry.register_metric("cpu_idle", &cpu_idle);
     s_expect_response =
@@ -86,7 +86,7 @@ TEST_F(MetricsActionTest, prometheus_no_prefix) {
 
 TEST_F(MetricsActionTest, prometheus_no_name) {
     MetricRegistry registry("test");
-    IntGauge cpu_idle;
+    IntGauge cpu_idle(MetricUnit::PERCENT);
     cpu_idle.set_value(50);
     registry.register_metric("", &cpu_idle);
     s_expect_response = "";
diff --git a/be/test/util/doris_metrics_test.cpp 
b/be/test/util/doris_metrics_test.cpp
index e308f45..76a6a2e 100644
--- a/be/test/util/doris_metrics_test.cpp
+++ b/be/test/util/doris_metrics_test.cpp
@@ -60,7 +60,7 @@ public:
                         _ss << "}";
                     }
                 }
-                _ss << " " << ((SimpleMetric*)metric)->to_string() << 
std::endl;
+                _ss << " " << metric->to_string() << std::endl;
                 break;
             }
             default:
@@ -85,81 +85,81 @@ TEST_F(DorisMetricsTest, Normal) {
         DorisMetrics::instance()->fragment_requests_total.increment(12);
         auto metric = metrics->get_metric("fragment_requests_total");
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("12", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("12", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->fragment_request_duration_us.increment(101);
         auto metric = metrics->get_metric("fragment_request_duration_us");
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("101", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("101", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->http_requests_total.increment(102);
         auto metric = metrics->get_metric("http_requests_total");
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("102", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("102", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->http_request_duration_us.increment(103);
         auto metric = metrics->get_metric("http_request_duration_us");
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("103", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("103", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->http_request_send_bytes.increment(104);
         auto metric = metrics->get_metric("http_request_send_bytes");
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("104", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("104", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->query_scan_bytes.increment(104);
         auto metric = metrics->get_metric("query_scan_bytes");
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("104", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("104", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->query_scan_rows.increment(105);
         auto metric = metrics->get_metric("query_scan_rows");
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("105", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("105", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->ranges_processed_total.increment(13);
         auto metric = metrics->get_metric("ranges_processed_total");
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("13", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("13", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->push_requests_success_total.increment(106);
         auto metric = metrics->get_metric("push_requests_total",
                                           MetricLabels().add("status", 
"SUCCESS"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("106", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("106", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->push_requests_fail_total.increment(107);
         auto metric = metrics->get_metric("push_requests_total",
                                           MetricLabels().add("status", 
"FAIL"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("107", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("107", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->push_request_duration_us.increment(108);
         auto metric = metrics->get_metric("push_request_duration_us");
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("108", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("108", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->push_request_write_bytes.increment(109);
         auto metric = metrics->get_metric("push_request_write_bytes");
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("109", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("109", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->push_request_write_rows.increment(110);
         auto metric = metrics->get_metric("push_request_write_rows");
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("110", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("110", metric->to_string().c_str());
     }
     // engine request
     {
@@ -168,7 +168,7 @@ TEST_F(DorisMetricsTest, Normal) {
                                           MetricLabels().add("type", 
"create_tablet")
                                           .add("status", "total"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("15", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("15", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->drop_tablet_requests_total.increment(16);
@@ -176,7 +176,7 @@ TEST_F(DorisMetricsTest, Normal) {
                                           MetricLabels().add("type", 
"drop_tablet")
                                           .add("status", "total"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("16", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("16", metric->to_string().c_str());
     }
     {
         
DorisMetrics::instance()->report_all_tablets_requests_total.increment(17);
@@ -184,7 +184,7 @@ TEST_F(DorisMetricsTest, Normal) {
                                           MetricLabels().add("type", 
"report_all_tablets")
                                           .add("status", "total"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("17", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("17", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->report_tablet_requests_total.increment(18);
@@ -192,7 +192,7 @@ TEST_F(DorisMetricsTest, Normal) {
                                           MetricLabels().add("type", 
"report_tablet")
                                           .add("status", "total"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("18", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("18", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->schema_change_requests_total.increment(19);
@@ -200,7 +200,7 @@ TEST_F(DorisMetricsTest, Normal) {
                                           MetricLabels().add("type", 
"schema_change")
                                           .add("status", "total"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("19", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("19", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->create_rollup_requests_total.increment(20);
@@ -208,7 +208,7 @@ TEST_F(DorisMetricsTest, Normal) {
                                           MetricLabels().add("type", 
"create_rollup")
                                           .add("status", "total"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("20", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("20", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->storage_migrate_requests_total.increment(21);
@@ -216,7 +216,7 @@ TEST_F(DorisMetricsTest, Normal) {
                                           MetricLabels().add("type", 
"storage_migrate")
                                           .add("status", "total"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("21", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("21", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->delete_requests_total.increment(22);
@@ -224,7 +224,7 @@ TEST_F(DorisMetricsTest, Normal) {
                                           MetricLabels().add("type", "delete")
                                           .add("status", "total"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("22", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("22", metric->to_string().c_str());
     }
     //  comapction
     {
@@ -232,35 +232,35 @@ TEST_F(DorisMetricsTest, Normal) {
         auto metric = metrics->get_metric("compaction_deltas_total",
                                           MetricLabels().add("type", "base"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("30", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("30", metric->to_string().c_str());
     }
     {
         
DorisMetrics::instance()->cumulative_compaction_deltas_total.increment(31);
         auto metric = metrics->get_metric("compaction_deltas_total",
                                           MetricLabels().add("type", 
"cumulative"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("31", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("31", metric->to_string().c_str());
     }
     {
         DorisMetrics::instance()->base_compaction_bytes_total.increment(32);
         auto metric = metrics->get_metric("compaction_bytes_total",
                                           MetricLabels().add("type", "base"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("32", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("32", metric->to_string().c_str());
     }
     {
         
DorisMetrics::instance()->cumulative_compaction_bytes_total.increment(33);
         auto metric = metrics->get_metric("compaction_bytes_total",
                                           MetricLabels().add("type", 
"cumulative"));
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("33", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("33", metric->to_string().c_str());
     }
     // Gauge
     {
         DorisMetrics::instance()->memory_pool_bytes_total.increment(40);
         auto metric = metrics->get_metric("memory_pool_bytes_total");
         ASSERT_TRUE(metric != nullptr);
-        ASSERT_STREQ("40", ((SimpleMetric*)metric)->to_string().c_str());
+        ASSERT_STREQ("40", metric->to_string().c_str());
     }
 }
 
diff --git a/be/test/util/new_metrics_test.cpp 
b/be/test/util/new_metrics_test.cpp
index d35d6cf..649593b 100644
--- a/be/test/util/new_metrics_test.cpp
+++ b/be/test/util/new_metrics_test.cpp
@@ -36,7 +36,7 @@ public:
 
 TEST_F(MetricsTest, Counter) {
     {
-        IntCounter counter;
+        IntCounter counter(MetricUnit::NUMBER);
         ASSERT_EQ(0, counter.value());
         counter.increment(100);
         ASSERT_EQ(100, counter.value());
@@ -44,7 +44,7 @@ TEST_F(MetricsTest, Counter) {
         ASSERT_STREQ("100", counter.to_string().c_str());
     }
     {
-        DoubleCounter counter;
+        DoubleCounter counter(MetricUnit::NUMBER);
         ASSERT_EQ(0.0, counter.value());
         counter.increment(1.23);
         ASSERT_EQ(1.23, counter.value());
@@ -65,7 +65,7 @@ void mt_updater(IntCounter* counter, std::atomic<uint64_t>* 
used_time) {
 }
 
 TEST_F(MetricsTest, CounterPerf) {
-    IntCounter counter;
+    IntCounter counter(MetricUnit::NUMBER);
     volatile int64_t sum = 0;
 
     {
@@ -91,7 +91,7 @@ TEST_F(MetricsTest, CounterPerf) {
     ASSERT_EQ(100000000, counter.value());
     ASSERT_EQ(100000000, sum);
     {
-        IntCounter mt_counter;
+        IntCounter mt_counter(MetricUnit::NUMBER);
         std::vector<std::thread> updaters;
         std::atomic<uint64_t> used_time(0);
         for (int i = 0; i < 8; ++i) {
@@ -108,7 +108,7 @@ TEST_F(MetricsTest, CounterPerf) {
 
 TEST_F(MetricsTest, Gauge) {
     {
-        IntGauge gauge;
+        IntGauge gauge(MetricUnit::NUMBER);
         ASSERT_EQ(0, gauge.value());
         gauge.set_value(100);
         ASSERT_EQ(100, gauge.value());
@@ -116,7 +116,7 @@ TEST_F(MetricsTest, Gauge) {
         ASSERT_STREQ("100", gauge.to_string().c_str());
     }
     {
-        DoubleGauge gauge;
+        DoubleGauge gauge(MetricUnit::NUMBER);
         ASSERT_EQ(0.0, gauge.value());
         gauge.set_value(1.23);
         ASSERT_EQ(1.23, gauge.value());
@@ -189,7 +189,7 @@ public:
                     }
                     _ss << labels.to_string();
                 }
-                _ss << " " << ((SimpleMetric*)metric)->to_string() << 
std::endl;
+                _ss << " " << metric->to_string() << std::endl;
                 break;
             }
             default:
@@ -205,9 +205,9 @@ private:
 };
 
 TEST_F(MetricsTest, MetricCollector) {
-    IntCounter puts;
+    IntCounter puts(MetricUnit::NUMBER);
     puts.increment(101);
-    IntCounter gets;
+    IntCounter gets(MetricUnit::NUMBER);
     gets.increment(201);
     MetricCollector collector;
     ASSERT_TRUE(collector.add_metic(MetricLabels().add("type", "put"), &puts));
@@ -216,7 +216,7 @@ TEST_F(MetricsTest, MetricCollector) {
 
     {
         // Can't add different type to one collector
-        IntGauge post;
+        IntGauge post(MetricUnit::NUMBER);
         ASSERT_FALSE(collector.add_metic(MetricLabels().add("type", "post"), 
&post));
     }
 
@@ -241,13 +241,13 @@ TEST_F(MetricsTest, MetricCollector) {
 
 TEST_F(MetricsTest, MetricRegistry) {
     MetricRegistry registry("test");
-    IntCounter cpu_idle;
+    IntCounter cpu_idle(MetricUnit::PERCENT);
     cpu_idle.increment(12);
     ASSERT_TRUE(registry.register_metric("cpu_idle", &cpu_idle));
     // registry failed
-    IntCounter dummy;
+    IntCounter dummy(MetricUnit::PERCENT);
     ASSERT_FALSE(registry.register_metric("cpu_idle", &dummy));
-    IntCounter memory_usage;
+    IntCounter memory_usage(MetricUnit::BYTES);
     memory_usage.increment(24);
     ASSERT_TRUE(registry.register_metric("memory_usage", &memory_usage));
     {
@@ -268,13 +268,13 @@ TEST_F(MetricsTest, MetricRegistry) {
 
 TEST_F(MetricsTest, MetricRegistry2) {
     MetricRegistry registry("test");
-    IntCounter cpu_idle;
+    IntCounter cpu_idle(MetricUnit::PERCENT);
     cpu_idle.increment(12);
     ASSERT_TRUE(registry.register_metric("cpu_idle", &cpu_idle));
 
     {
         // memory_usage will deregister after this block
-        IntCounter memory_usage;
+        IntCounter memory_usage(MetricUnit::BYTES);
         memory_usage.increment(24);
         ASSERT_TRUE(registry.register_metric("memory_usage", &memory_usage));
         TestMetricsVisitor visitor;
diff --git a/be/test/util/system_metrics_test.cpp 
b/be/test/util/system_metrics_test.cpp
index 70e1d3e..4ec253e 100644
--- a/be/test/util/system_metrics_test.cpp
+++ b/be/test/util/system_metrics_test.cpp
@@ -65,7 +65,7 @@ public:
                         _ss << "}";
                     }
                 }
-                _ss << " " << ((SimpleMetric*)metric)->to_string() << 
std::endl;
+                _ss << " " << metric->to_string() << std::endl;
                 break;
             }
             default:
@@ -118,104 +118,104 @@ TEST_F(SystemMetricsTest, normal) {
         LOG(INFO) << "\n" << visitor.to_string();
 
         // cpu
-        SimpleMetric* cpu_user = (SimpleMetric*)registry.get_metric(
+        Metric* cpu_user = registry.get_metric(
             "cpu", MetricLabels().add("mode", "user"));
         ASSERT_TRUE(cpu_user != nullptr);
         // ASSERT_STREQ("57199151", cpu_user->to_string().c_str());
-        SimpleMetric* cpu_nice = (SimpleMetric*)registry.get_metric(
+        Metric* cpu_nice = registry.get_metric(
             "cpu", MetricLabels().add("mode", "nice"));
         ASSERT_TRUE(cpu_nice != nullptr);
         ASSERT_STREQ("2616310", cpu_nice->to_string().c_str());
-        SimpleMetric* cpu_system = (SimpleMetric*)registry.get_metric(
+        Metric* cpu_system = registry.get_metric(
             "cpu", MetricLabels().add("mode", "system"));
         ASSERT_TRUE(cpu_system != nullptr);
         ASSERT_STREQ("10600935", cpu_system->to_string().c_str());
-        SimpleMetric* cpu_idle = (SimpleMetric*)registry.get_metric(
+        Metric* cpu_idle = registry.get_metric(
             "cpu", MetricLabels().add("mode", "idle"));
         ASSERT_TRUE(cpu_idle != nullptr);
         ASSERT_STREQ("1517505423", cpu_idle->to_string().c_str());
-        SimpleMetric* cpu_iowait = (SimpleMetric*)registry.get_metric(
+        Metric* cpu_iowait = registry.get_metric(
             "cpu", MetricLabels().add("mode", "iowait"));
         ASSERT_TRUE(cpu_iowait != nullptr);
         ASSERT_STREQ("2137148", cpu_iowait->to_string().c_str());
-        SimpleMetric* cpu_irq = (SimpleMetric*)registry.get_metric(
+        Metric* cpu_irq = registry.get_metric(
             "cpu", MetricLabels().add("mode", "irq"));
         ASSERT_TRUE(cpu_irq != nullptr);
         ASSERT_STREQ("0", cpu_irq->to_string().c_str());
-        SimpleMetric* cpu_softirq = (SimpleMetric*)registry.get_metric(
+        Metric* cpu_softirq = registry.get_metric(
             "cpu", MetricLabels().add("mode", "soft_irq"));
         ASSERT_TRUE(cpu_softirq != nullptr);
         ASSERT_STREQ("108277", cpu_softirq->to_string().c_str());
-        SimpleMetric* cpu_steal = (SimpleMetric*)registry.get_metric(
+        Metric* cpu_steal = registry.get_metric(
             "cpu", MetricLabels().add("mode", "steal"));
         ASSERT_TRUE(cpu_steal != nullptr);
         ASSERT_STREQ("0", cpu_steal->to_string().c_str());
-        SimpleMetric* cpu_guest = (SimpleMetric*)registry.get_metric(
+        Metric* cpu_guest = registry.get_metric(
             "cpu", MetricLabels().add("mode", "guest"));
         ASSERT_TRUE(cpu_guest != nullptr);
         ASSERT_STREQ("0", cpu_guest->to_string().c_str());
         // memroy
-        SimpleMetric* memory_allocated_bytes = 
(SimpleMetric*)registry.get_metric(
+        Metric* memory_allocated_bytes = registry.get_metric(
             "memory_allocated_bytes");
         ASSERT_TRUE(memory_allocated_bytes != nullptr);
         // network
-        SimpleMetric* receive_bytes = (SimpleMetric*)registry.get_metric(
+        Metric* receive_bytes = registry.get_metric(
             "network_receive_bytes", MetricLabels().add("device", "xgbe0"));
         ASSERT_TRUE(receive_bytes != nullptr);
         ASSERT_STREQ("52567436039", receive_bytes->to_string().c_str());
-        SimpleMetric* receive_packets = (SimpleMetric*)registry.get_metric(
+        Metric* receive_packets = registry.get_metric(
             "network_receive_packets", MetricLabels().add("device", "xgbe0"));
         ASSERT_TRUE(receive_packets != nullptr);
         ASSERT_STREQ("65066152", receive_packets->to_string().c_str());
-        SimpleMetric* send_bytes = (SimpleMetric*)registry.get_metric(
+        Metric* send_bytes = registry.get_metric(
             "network_send_bytes", MetricLabels().add("device", "xgbe0"));
         ASSERT_TRUE(send_bytes != nullptr);
         ASSERT_STREQ("45480856156", send_bytes->to_string().c_str());
-        SimpleMetric* send_packets = (SimpleMetric*)registry.get_metric(
+        Metric* send_packets = registry.get_metric(
             "network_send_packets", MetricLabels().add("device", "xgbe0"));
         ASSERT_TRUE(send_packets != nullptr);
         ASSERT_STREQ("88277614", send_packets->to_string().c_str());
         // disk
-        SimpleMetric* bytes_read = (SimpleMetric*)registry.get_metric(
+        Metric* bytes_read = registry.get_metric(
             "disk_bytes_read", MetricLabels().add("device", "sda"));
         ASSERT_TRUE(bytes_read != nullptr);
         ASSERT_STREQ("20142745600", bytes_read->to_string().c_str());
-        SimpleMetric* reads_completed = (SimpleMetric*)registry.get_metric(
+        Metric* reads_completed = registry.get_metric(
             "disk_reads_completed", MetricLabels().add("device", "sda"));
         ASSERT_TRUE(reads_completed != nullptr);
         ASSERT_STREQ("759548", reads_completed->to_string().c_str());
-        SimpleMetric* read_time_ms = (SimpleMetric*)registry.get_metric(
+        Metric* read_time_ms = registry.get_metric(
             "disk_read_time_ms", MetricLabels().add("device", "sda"));
         ASSERT_TRUE(read_time_ms != nullptr);
         ASSERT_STREQ("4308146", read_time_ms->to_string().c_str());
 
-        SimpleMetric* bytes_written = (SimpleMetric*)registry.get_metric(
+        Metric* bytes_written = registry.get_metric(
             "disk_bytes_written", MetricLabels().add("device", "sda"));
         ASSERT_TRUE(bytes_written != nullptr);
         ASSERT_STREQ("1624753500160", bytes_written->to_string().c_str());
-        SimpleMetric* writes_completed = (SimpleMetric*)registry.get_metric(
+        Metric* writes_completed = registry.get_metric(
             "disk_writes_completed", MetricLabels().add("device", "sda"));
         ASSERT_TRUE(writes_completed != nullptr);
         ASSERT_STREQ("18282936", writes_completed->to_string().c_str());
-        SimpleMetric* write_time_ms = (SimpleMetric*)registry.get_metric(
+        Metric* write_time_ms = registry.get_metric(
             "disk_write_time_ms", MetricLabels().add("device", "sda"));
         ASSERT_TRUE(write_time_ms != nullptr);
         ASSERT_STREQ("1907755230", write_time_ms->to_string().c_str());
-        SimpleMetric* io_time_ms = (SimpleMetric*)registry.get_metric(
+        Metric* io_time_ms = registry.get_metric(
             "disk_io_time_ms", MetricLabels().add("device", "sda"));
         ASSERT_TRUE(io_time_ms != nullptr);
         ASSERT_STREQ("19003350", io_time_ms->to_string().c_str());
-        SimpleMetric* io_time_weigthed = (SimpleMetric*)registry.get_metric(
+        Metric* io_time_weigthed = registry.get_metric(
             "disk_io_time_weigthed", MetricLabels().add("device", "sda"));
         ASSERT_TRUE(write_time_ms != nullptr);
         ASSERT_STREQ("1912122964", io_time_weigthed->to_string().c_str());
 
         // fd
-        SimpleMetric* fd_metric = (SimpleMetric*)registry.get_metric(
+        Metric* fd_metric = registry.get_metric(
             "fd_num_limit");
         ASSERT_TRUE(fd_metric != nullptr);
         ASSERT_STREQ("13052138", fd_metric->to_string().c_str());
-        fd_metric = (SimpleMetric*)registry.get_metric(
+        fd_metric = registry.get_metric(
             "fd_num_used");
         ASSERT_TRUE(fd_metric != nullptr);
         ASSERT_STREQ("19520", fd_metric->to_string().c_str());
@@ -263,21 +263,21 @@ TEST_F(SystemMetricsTest, no_proc_file) {
         LOG(INFO) << "\n" << visitor.to_string();
 
         // cpu
-        SimpleMetric* cpu_user = (SimpleMetric*)registry.get_metric(
+        Metric* cpu_user = registry.get_metric(
             "cpu", MetricLabels().add("mode", "user"));
         ASSERT_TRUE(cpu_user != nullptr);
         ASSERT_STREQ("0", cpu_user->to_string().c_str());
         // memroy
-        SimpleMetric* memory_allocated_bytes = 
(SimpleMetric*)registry.get_metric(
+        Metric* memory_allocated_bytes = registry.get_metric(
             "memory_allocated_bytes");
         ASSERT_TRUE(memory_allocated_bytes != nullptr);
         // network
-        SimpleMetric* receive_bytes = (SimpleMetric*)registry.get_metric(
+        Metric* receive_bytes = registry.get_metric(
             "network_receive_bytes", MetricLabels().add("device", "xgbe0"));
         ASSERT_TRUE(receive_bytes != nullptr);
         ASSERT_STREQ("0", receive_bytes->to_string().c_str());
         // disk
-        SimpleMetric* bytes_read = (SimpleMetric*)registry.get_metric(
+        Metric* bytes_read = registry.get_metric(
             "disk_bytes_read", MetricLabels().add("device", "sda"));
         ASSERT_TRUE(bytes_read != nullptr);
         ASSERT_STREQ("0", bytes_read->to_string().c_str());
diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
index f7c67c7..814dc49 100644
--- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -34,6 +34,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.DataPartition;
 import org.apache.doris.planner.DataSink;
@@ -294,6 +295,7 @@ public class InsertStmt extends DdlStmt {
 
             if (targetTable instanceof OlapTable) {
                 LoadJobSourceType sourceType = 
LoadJobSourceType.INSERT_STREAMING;
+                MetricRepo.COUNTER_LOAD_ADD.increase(1L);
                 transactionId = 
Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
                         Lists.newArrayList(targetTable.getId()), label,
                         new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
diff --git a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java 
b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index 20d2fc2..ef9b088 100644
--- a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.common;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.doris.metric.GaugeMetric;
+import org.apache.doris.metric.Metric.MetricUnit;
 import org.apache.doris.metric.MetricLabel;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.logging.log4j.LogManager;
@@ -68,7 +69,7 @@ public class ThreadPoolManager {
 
     public static void registerThreadPoolMetric(String poolName, 
ThreadPoolExecutor threadPool) {
         for (String poolMetricType : poolMerticTypes) {
-            GaugeMetric<Integer> gauge = new 
GaugeMetric<Integer>("thread_pool", "thread_pool statistics") {
+            GaugeMetric<Integer> gauge = new 
GaugeMetric<Integer>("thread_pool", MetricUnit.NUMBER, "thread_pool 
statistics") {
                 @Override
                 public Integer getValue() {
                     String metricType = this.getLabels().get(1).getValue();
diff --git 
a/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java 
b/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java
index f0f89a3..b278c47 100644
--- 
a/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java
+++ 
b/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java
@@ -29,17 +29,20 @@ import java.util.List;
 
 public class IncompleteTabletsProcNode implements ProcNodeInterface {
     public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            .add("UnhealthyTablets").add("InconsistentTablets")
+            
.add("UnhealthyTablets").add("InconsistentTablets").add("CloningTablets")
             .build();
-
     private static final Joiner JOINER = Joiner.on(",");
 
     Collection<Long> unhealthyTabletIds;
     Collection<Long> inconsistentTabletIds;
+    Collection<Long> cloningTabletIds;
 
-    public IncompleteTabletsProcNode(Collection<Long> unhealthyTabletIds, 
Collection<Long> inconsistentTabletIds) {
+    public IncompleteTabletsProcNode(Collection<Long> unhealthyTabletIds,
+                                     Collection<Long> inconsistentTabletIds,
+                                     Collection<Long> cloningTabletIds) {
         this.unhealthyTabletIds = unhealthyTabletIds;
         this.inconsistentTabletIds = inconsistentTabletIds;
+        this.cloningTabletIds = cloningTabletIds;
     }
 
     @Override
@@ -52,8 +55,10 @@ public class IncompleteTabletsProcNode implements 
ProcNodeInterface {
 
         String incompleteTablets = 
JOINER.join(Arrays.asList(unhealthyTabletIds));
         String inconsistentTablets = 
JOINER.join(Arrays.asList(inconsistentTabletIds));
+        String cloningTablets = JOINER.join(Arrays.asList(cloningTabletIds));
         row.add(incompleteTablets);
         row.add(inconsistentTablets);
+        row.add(cloningTablets);
 
         result.addRow(row);
 
diff --git 
a/fe/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java 
b/fe/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
index 72ca897..e75a3fe 100644
--- a/fe/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
+++ b/fe/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
@@ -32,6 +32,8 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.thrift.TTaskType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
@@ -42,12 +44,16 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 public class StatisticProcDir implements ProcDirInterface {
     public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
             .add("DbId").add("DbName").add("TableNum").add("PartitionNum")
             
.add("IndexNum").add("TabletNum").add("ReplicaNum").add("UnhealthyTabletNum")
-            .add("InconsistentTabletNum")
+            .add("InconsistentTabletNum").add("CloningTabletNum")
             .build();
+    private static final Logger LOG = 
LogManager.getLogger(StatisticProcDir.class);
 
     private Catalog catalog;
 
@@ -55,11 +61,14 @@ public class StatisticProcDir implements ProcDirInterface {
     Multimap<Long, Long> unhealthyTabletIds;
     // db id -> set(tablet id)
     Multimap<Long, Long> inconsistentTabletIds;
+    // db id -> set(tablet id)
+    Multimap<Long, Long> cloningTabletIds;
 
     public StatisticProcDir(Catalog catalog) {
         this.catalog = catalog;
         unhealthyTabletIds = HashMultimap.create();
         inconsistentTabletIds = HashMultimap.create();
+        cloningTabletIds = HashMultimap.create();
     }
 
     @Override
@@ -86,6 +95,7 @@ public class StatisticProcDir implements ProcDirInterface {
 
         unhealthyTabletIds.clear();
         inconsistentTabletIds.clear();
+        cloningTabletIds = AgentTaskQueue.getTabletIdsByType(TTaskType.CLONE);
         List<List<Comparable>> lines = new ArrayList<List<Comparable>>();
         for (Long dbId : dbIds) {
             if (dbId == 0) {
@@ -153,6 +163,7 @@ public class StatisticProcDir implements ProcDirInterface {
                 oneLine.add(dbReplicaNum);
                 oneLine.add(unhealthyTabletIds.get(dbId).size());
                 oneLine.add(inconsistentTabletIds.get(dbId).size());
+                oneLine.add(cloningTabletIds.get(dbId).size());
 
                 lines.add(oneLine);
 
@@ -181,6 +192,7 @@ public class StatisticProcDir implements ProcDirInterface {
         finalLine.add(totalReplicaNum);
         finalLine.add(unhealthyTabletIds.size());
         finalLine.add(inconsistentTabletIds.size());
+        finalLine.add(cloningTabletIds.size());
         lines.add(finalLine);
 
         // add result
@@ -209,6 +221,8 @@ public class StatisticProcDir implements ProcDirInterface {
             throw new AnalysisException("Invalid db id format: " + dbIdStr);
         }
 
-        return new IncompleteTabletsProcNode(unhealthyTabletIds.get(dbId), 
inconsistentTabletIds.get(dbId));
+        return new IncompleteTabletsProcNode(unhealthyTabletIds.get(dbId),
+                                             inconsistentTabletIds.get(dbId),
+                                             cloningTabletIds.get(dbId));
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java 
b/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java
index 10e1874..24a6224 100644
--- a/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java
+++ b/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java
@@ -23,6 +23,7 @@ import org.apache.doris.http.BaseResponse;
 import org.apache.doris.http.IllegalArgException;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.metric.MetricVisitor;
+import org.apache.doris.metric.JsonMetricVisitor;
 import org.apache.doris.metric.PrometheusMetricVisitor;
 import org.apache.doris.metric.SimpleCoreMetricVisitor;
 
@@ -50,6 +51,8 @@ public class MetricsAction extends RestBaseAction {
         MetricVisitor visitor = null;
         if (!Strings.isNullOrEmpty(type) && type.equalsIgnoreCase("core")) {
             visitor = new SimpleCoreMetricVisitor("doris_fe");
+        } else if (!Strings.isNullOrEmpty(type) && 
type.equalsIgnoreCase("agent")) {
+            visitor = new JsonMetricVisitor("doris_fe");
         } else {
             visitor = new PrometheusMetricVisitor("doris_fe");
         }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 7222dd8..c4d7d2c 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -45,6 +45,7 @@ import org.apache.doris.load.BrokerFileGroupAggInfo;
 import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.SessionVariable;
@@ -209,6 +210,7 @@ public class BrokerLoadJob extends LoadJob {
     @Override
     public void beginTxn()
             throws LabelAlreadyUsedException, BeginTransactionException, 
AnalysisException, DuplicatedRequestException {
+        MetricRepo.COUNTER_LOAD_ADD.increase(1L);
         transactionId = Catalog.getCurrentGlobalTransactionMgr()
                 .beginTransaction(dbId, 
Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
                                   new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
@@ -468,6 +470,7 @@ public class BrokerLoadJob extends LoadJob {
                              .add("txn_id", transactionId)
                              .add("msg", "Load job try to commit txn")
                              .build());
+            MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
             Catalog.getCurrentGlobalTransactionMgr().commitTransaction(
                     dbId, transactionId, commitInfos,
                     new LoadJobFinalOperation(id, loadingStatus, progress, 
loadStartTimestamp,
diff --git 
a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java 
b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 218bfd1..28469fc 100644
--- 
a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.transaction.BeginTransactionException;
@@ -165,6 +166,7 @@ public abstract class RoutineLoadTaskInfo {
         // begin a txn for task
         RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
         try {
+            MetricRepo.COUNTER_LOAD_ADD.increase(1L);
             txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
                     routineLoadJob.getDbId(), 
Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null,
                     new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
index 8dbe044..83ed5b7 100644
--- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -38,6 +38,7 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.Daemon;
 import org.apache.doris.metric.GaugeMetric;
+import org.apache.doris.metric.Metric.MetricUnit;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.persist.BackendTabletsInfo;
 import org.apache.doris.persist.ReplicaPersistInfo;
@@ -96,7 +97,7 @@ public class ReportHandler extends Daemon {
 
     public ReportHandler() {
         GaugeMetric<Long> gaugeQueueSize = new GaugeMetric<Long>(
-                "report_queue_size", "report queue size") {
+                "report_queue_size", MetricUnit.NUMBER, "report queue size") {
             @Override
             public Long getValue() {
                 return (long) reportQueue.size();
diff --git a/fe/src/main/java/org/apache/doris/metric/CounterMetric.java 
b/fe/src/main/java/org/apache/doris/metric/CounterMetric.java
index 9cdd452..ecc96a4 100644
--- a/fe/src/main/java/org/apache/doris/metric/CounterMetric.java
+++ b/fe/src/main/java/org/apache/doris/metric/CounterMetric.java
@@ -22,8 +22,8 @@ package org.apache.doris.metric;
  */
 public abstract class CounterMetric<T> extends Metric<T> {
 
-    public CounterMetric(String name, String description) {
-        super(name, MetricType.COUNTER, description);
+    public CounterMetric(String name, MetricUnit unit, String description) {
+        super(name, MetricType.COUNTER, unit, description);
     }
 
     abstract public void increase(T delta);
diff --git a/fe/src/main/java/org/apache/doris/metric/DoubleCounterMetric.java 
b/fe/src/main/java/org/apache/doris/metric/DoubleCounterMetric.java
index b4b9240..c3cf3b8 100644
--- a/fe/src/main/java/org/apache/doris/metric/DoubleCounterMetric.java
+++ b/fe/src/main/java/org/apache/doris/metric/DoubleCounterMetric.java
@@ -21,8 +21,8 @@ import com.google.common.util.concurrent.AtomicDouble;
 
 public class DoubleCounterMetric extends CounterMetric<Double> {
 
-    public DoubleCounterMetric(String name, String description) {
-        super(name, description);
+    public DoubleCounterMetric(String name, MetricUnit unit, String 
description) {
+        super(name, unit, description);
     }
 
     private AtomicDouble value = new AtomicDouble(0.0);
diff --git a/fe/src/main/java/org/apache/doris/metric/GaugeMetric.java 
b/fe/src/main/java/org/apache/doris/metric/GaugeMetric.java
index 581da72..2e8d819 100644
--- a/fe/src/main/java/org/apache/doris/metric/GaugeMetric.java
+++ b/fe/src/main/java/org/apache/doris/metric/GaugeMetric.java
@@ -22,7 +22,7 @@ package org.apache.doris.metric;
  */
 public abstract class GaugeMetric<T> extends Metric<T> {
 
-    public GaugeMetric(String name, String description) {
-        super(name, MetricType.GAUGE, description);
+    public GaugeMetric(String name, MetricUnit unit, String description) {
+        super(name, MetricType.GAUGE, unit, description);
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java 
b/fe/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java
index b4cbe4c..a66bc4f 100644
--- a/fe/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java
+++ b/fe/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java
@@ -19,8 +19,8 @@ package org.apache.doris.metric;
 
 public class GaugeMetricImpl<T> extends GaugeMetric<T> {
 
-    public GaugeMetricImpl(String name, String description) {
-        super(name, description);
+    public GaugeMetricImpl(String name, MetricUnit unit, String description) {
+        super(name, unit, description);
     }
 
     private T value;
diff --git a/fe/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java 
b/fe/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java
new file mode 100644
index 0000000..2463a7a
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.metric;
+
+import org.apache.doris.monitor.jvm.JvmStats;
+import com.codahale.metrics.Histogram;
+import java.util.List;
+
+public class JsonMetricVisitor extends MetricVisitor {
+    private int ordinal = 0;
+    private int metricNumber = 0;
+
+    public JsonMetricVisitor(String prefix) {
+        super(prefix);
+    }
+
+    @Override
+    public void setMetricNumber(int metricNumber) {
+        this.metricNumber = metricNumber;
+    }
+
+    @Override
+    public void visitJvm(StringBuilder sb, JvmStats jvmStats) {
+        return;
+    }
+
+    @Override
+    public void visit(StringBuilder sb, @SuppressWarnings("rawtypes") Metric 
metric) {
+        if (ordinal++ == 0) {
+            sb.append("[\n");
+        }
+        sb.append("{\n\t\"tags\":\n\t{\n");
+        sb.append("\t\t\"metric\":\"").append(metric.getName()).append("\"");
+
+        // name
+        @SuppressWarnings("unchecked")
+        List<MetricLabel> labels = metric.getLabels();
+        if (!labels.isEmpty()) {
+            sb.append(",\n");
+            int i = 0;
+            for (MetricLabel label : labels) {
+                if (i++ > 0) {
+                    sb.append(",\n");
+                }
+                
sb.append("\t\t\"").append(label.getKey()).append("\":\"").append(label.getValue()).append("\"");
+            }
+        }
+        sb.append("\n\t},\n");
+        
sb.append("\t\"unit\":\"").append(metric.getUnit().name().toLowerCase()).append(
 "\",\n");
+
+        // value
+        
sb.append("\t\"value\":").append(metric.getValue().toString()).append("\n}");
+        if (ordinal < metricNumber) {
+            sb.append(",\n");
+        } else {
+            sb.append("\n]");
+        }
+        return;
+    }
+
+    @Override
+    public void visitHistogram(StringBuilder sb, String name, Histogram 
histogram) {
+        return;
+    }
+
+    @Override
+    public void getNodeInfo(StringBuilder sb) {
+        return;
+    }
+}
+
diff --git a/fe/src/main/java/org/apache/doris/metric/LongCounterMetric.java 
b/fe/src/main/java/org/apache/doris/metric/LongCounterMetric.java
index cf4c652..c56a616 100644
--- a/fe/src/main/java/org/apache/doris/metric/LongCounterMetric.java
+++ b/fe/src/main/java/org/apache/doris/metric/LongCounterMetric.java
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong;
 
 public class LongCounterMetric extends CounterMetric<Long> {
 
-    public LongCounterMetric(String name, String description) {
-        super(name, description);
+    public LongCounterMetric(String name, MetricUnit unit, String description) 
{
+        super(name, unit, description);
     }
 
     private AtomicLong value = new AtomicLong(0L);
diff --git a/fe/src/main/java/org/apache/doris/metric/Metric.java 
b/fe/src/main/java/org/apache/doris/metric/Metric.java
index c4e6302..e5e029c 100644
--- a/fe/src/main/java/org/apache/doris/metric/Metric.java
+++ b/fe/src/main/java/org/apache/doris/metric/Metric.java
@@ -26,14 +26,28 @@ public abstract class Metric<T> {
         GAUGE, COUNTER
     }
 
+    public enum MetricUnit {
+        NANOSECONDS,
+        MICROSECONDS,
+        MILLISECONDS,
+        SECONDS,
+        BYTES,
+        ROWS,
+        NUMBER,
+        PERCENT,
+        NOUNIT
+    };
+
     protected String name;
     protected MetricType type;
+    protected MetricUnit unit;
     protected List<MetricLabel> labels = Lists.newArrayList();
     protected String description;
 
-    public Metric(String name, MetricType type, String description) {
+    public Metric(String name, MetricType type, MetricUnit unit, String 
description) {
         this.name = name;
         this.type = type;
+        this.unit = unit;
         this.description = description;
     }
 
@@ -45,6 +59,10 @@ public abstract class Metric<T> {
         return type;
     }
 
+    public MetricUnit getUnit() {
+        return unit;
+    }
+
     public String getDescription() {
         return description;
     }
diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
index 8985f95..bbd3a7d 100644
--- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.loadv2.JobState;
 import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.metric.Metric.MetricUnit;
 import org.apache.doris.monitor.jvm.JvmService;
 import org.apache.doris.monitor.jvm.JvmStats;
 import org.apache.doris.persist.EditLog;
@@ -96,7 +97,7 @@ public final class MetricRepo {
         for (EtlJobType jobType : EtlJobType.values()) {
             for (JobState state : JobState.values()) {
                 GaugeMetric<Long> gauge = (GaugeMetric<Long>) new 
GaugeMetric<Long>("job",
-                        "job statistics") {
+                        MetricUnit.NUMBER, "job statistics") {
                     @Override
                     public Long getValue() {
                         if (!Catalog.getInstance().isMaster()) {
@@ -120,7 +121,7 @@ public final class MetricRepo {
             }
             
             GaugeMetric<Long> gauge = (GaugeMetric<Long>) new 
GaugeMetric<Long>("job",
-                    "job statistics") {
+                    MetricUnit.NUMBER, "job statistics") {
                 @Override
                 public Long getValue() {
                     if (!Catalog.getInstance().isMaster()) {
@@ -144,7 +145,7 @@ public final class MetricRepo {
 
         // connections
         GaugeMetric<Integer> conections = (GaugeMetric<Integer>) new 
GaugeMetric<Integer>(
-                "connection_total", "total connections") {
+                "connection_total", MetricUnit.NUMBER, "total connections") {
             @Override
             public Integer getValue() {
                 return 
ExecuteEnv.getInstance().getScheduler().getConnectionNum();
@@ -154,7 +155,7 @@ public final class MetricRepo {
 
         // journal id
         GaugeMetric<Long> maxJournalId = (GaugeMetric<Long>) new 
GaugeMetric<Long>(
-                "max_journal_id", "max journal id of this frontends") {
+                "max_journal_id", MetricUnit.NUMBER, "max journal id of this 
frontends") {
             @Override
             public Long getValue() {
                 EditLog editLog = Catalog.getInstance().getEditLog();
@@ -168,7 +169,7 @@ public final class MetricRepo {
 
         // scheduled tablet num
         GaugeMetric<Long> scheduledTabletNum = (GaugeMetric<Long>) new 
GaugeMetric<Long>(
-                "scheduled_tablet_num", "number of tablets being scheduled") {
+                "scheduled_tablet_num", MetricUnit.NUMBER, "number of tablets 
being scheduled") {
             @Override
             public Long getValue() {
                 if (!Catalog.getInstance().isMaster()) {
@@ -181,58 +182,58 @@ public final class MetricRepo {
 
         // qps, rps and error rate
         // these metrics should be set an init value, in case that metric 
calculator is not running
-        GAUGE_QUERY_PER_SECOND = new GaugeMetricImpl<>("qps", "query per 
second");
+        GAUGE_QUERY_PER_SECOND = new GaugeMetricImpl<>("qps", 
MetricUnit.NUMBER, "query per second");
         GAUGE_QUERY_PER_SECOND.setValue(0.0);
         PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_QUERY_PER_SECOND);
-        GAUGE_REQUEST_PER_SECOND = new GaugeMetricImpl<>("rps", "request per 
second");
+        GAUGE_REQUEST_PER_SECOND = new GaugeMetricImpl<>("rps", 
MetricUnit.NUMBER, "request per second");
         GAUGE_REQUEST_PER_SECOND.setValue(0.0);
         PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_REQUEST_PER_SECOND);
-        GAUGE_QUERY_ERR_RATE = new GaugeMetricImpl<>("query_err_rate", "query 
error rate");
+        GAUGE_QUERY_ERR_RATE = new GaugeMetricImpl<>("query_err_rate", 
MetricUnit.NUMBER, "query error rate");
         PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_QUERY_ERR_RATE);
         GAUGE_QUERY_ERR_RATE.setValue(0.0);
         GAUGE_MAX_TABLET_COMPACTION_SCORE = new 
GaugeMetricImpl<>("max_tablet_compaction_score",
-                "max tablet compaction score of all backends");
+                MetricUnit.NUMBER, "max tablet compaction score of all 
backends");
         PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_MAX_TABLET_COMPACTION_SCORE);
         GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(0L);
 
         // 2. counter
-        COUNTER_REQUEST_ALL = new LongCounterMetric("request_total", "total 
request");
+        COUNTER_REQUEST_ALL = new LongCounterMetric("request_total", 
MetricUnit.NUMBER, "total request");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_REQUEST_ALL);
-        COUNTER_QUERY_ALL = new LongCounterMetric("query_total", "total 
query");
+        COUNTER_QUERY_ALL = new LongCounterMetric("query_total", 
MetricUnit.NUMBER, "total query");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_QUERY_ALL);
-        COUNTER_QUERY_ERR = new LongCounterMetric("query_err", "total error 
query");
+        COUNTER_QUERY_ERR = new LongCounterMetric("query_err", 
MetricUnit.NUMBER, "total error query");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_QUERY_ERR);
-        COUNTER_LOAD_ADD = new LongCounterMetric("load_add", "total load 
submit");
+        COUNTER_LOAD_ADD = new LongCounterMetric("load_add", 
MetricUnit.NUMBER, "total load submit");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_LOAD_ADD);
-        COUNTER_LOAD_FINISHED = new LongCounterMetric("load_finished", "total 
load finished");
+        COUNTER_LOAD_FINISHED = new LongCounterMetric("load_finished", 
MetricUnit.NUMBER, "total load finished");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_LOAD_FINISHED);
-        COUNTER_EDIT_LOG_WRITE = new LongCounterMetric("edit_log_write", 
"counter of edit log write into bdbje");
+        COUNTER_EDIT_LOG_WRITE = new LongCounterMetric("edit_log_write", 
MetricUnit.NUMBER, "counter of edit log write into bdbje");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_WRITE);
-        COUNTER_EDIT_LOG_READ = new LongCounterMetric("edit_log_read", 
"counter of edit log read from bdbje");
+        COUNTER_EDIT_LOG_READ = new LongCounterMetric("edit_log_read", 
MetricUnit.NUMBER, "counter of edit log read from bdbje");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_READ);
-        COUNTER_EDIT_LOG_SIZE_BYTES = new 
LongCounterMetric("edit_log_size_bytes", "size of edit log");
+        COUNTER_EDIT_LOG_SIZE_BYTES = new 
LongCounterMetric("edit_log_size_bytes", MetricUnit.BYTES, "size of edit log");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_SIZE_BYTES);
-        COUNTER_IMAGE_WRITE = new LongCounterMetric("image_write", "counter of 
image generated");
+        COUNTER_IMAGE_WRITE = new LongCounterMetric("image_write", 
MetricUnit.NUMBER, "counter of image generated");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_WRITE);
-        COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push",
+        COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push", 
MetricUnit.NUMBER,
                 "counter of image succeeded in pushing to other frontends");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH);
 
-        COUNTER_TXN_REJECT = new LongCounterMetric("txn_reject", "counter of 
rejected transactions");
+        COUNTER_TXN_REJECT = new LongCounterMetric("txn_reject", 
MetricUnit.NUMBER, "counter of rejected transactions");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_REJECT);
-        COUNTER_TXN_BEGIN = new LongCounterMetric("txn_begin", "counter of 
begining transactions");
+        COUNTER_TXN_BEGIN = new LongCounterMetric("txn_begin", 
MetricUnit.NUMBER, "counter of begining transactions");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_BEGIN);
-        COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success", "counter of 
success transactions");
+        COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success", 
MetricUnit.NUMBER, "counter of success transactions");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_SUCCESS);
-        COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed", "counter of 
failed transactions");
+        COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed", 
MetricUnit.NUMBER, "counter of failed transactions");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_FAILED);
 
-        COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows", 
"total rows of routine load");
+        COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows", 
MetricUnit.ROWS, "total rows of routine load");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_ROWS);
-        COUNTER_ROUTINE_LOAD_RECEIVED_BYTES = new 
LongCounterMetric("routine_load_receive_bytes",
+        COUNTER_ROUTINE_LOAD_RECEIVED_BYTES = new 
LongCounterMetric("routine_load_receive_bytes", MetricUnit.BYTES,
                 "total received bytes of routine load");
         
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_RECEIVED_BYTES);
-        COUNTER_ROUTINE_LOAD_ERROR_ROWS = new 
LongCounterMetric("routine_load_error_rows",
+        COUNTER_ROUTINE_LOAD_ERROR_ROWS = new 
LongCounterMetric("routine_load_error_rows", MetricUnit.ROWS,
                 "total error rows of routine load");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS);
 
@@ -266,7 +267,7 @@ public final class MetricRepo {
 
             // tablet number of each backends
             GaugeMetric<Long> tabletNum = (GaugeMetric<Long>) new 
GaugeMetric<Long>(TABLET_NUM,
-                    "tablet number") {
+                    MetricUnit.NUMBER, "tablet number") {
                 @Override
                 public Long getValue() {
                     if (!Catalog.getInstance().isMaster()) {
@@ -280,7 +281,7 @@ public final class MetricRepo {
 
             // max compaction score of tablets on each backends
             GaugeMetric<Long> tabletMaxCompactionScore = (GaugeMetric<Long>) 
new GaugeMetric<Long>(
-                    TABLET_MAX_COMPACTION_SCORE,
+                    TABLET_MAX_COMPACTION_SCORE, MetricUnit.NUMBER,
                     "tablet max compaction score") {
                 @Override
                 public Long getValue() {
@@ -306,6 +307,7 @@ public final class MetricRepo {
         JvmStats jvmStats = jvmService.stats();
         visitor.visitJvm(sb, jvmStats);
 
+        visitor.setMetricNumber(PALO_METRIC_REGISTER.getPaloMetrics().size());
         // doris metrics
         for (Metric metric : PALO_METRIC_REGISTER.getPaloMetrics()) {
             visitor.visit(sb, metric);
diff --git a/fe/src/main/java/org/apache/doris/metric/MetricVisitor.java 
b/fe/src/main/java/org/apache/doris/metric/MetricVisitor.java
index 7abb998..681b6df 100644
--- a/fe/src/main/java/org/apache/doris/metric/MetricVisitor.java
+++ b/fe/src/main/java/org/apache/doris/metric/MetricVisitor.java
@@ -32,6 +32,8 @@ public abstract class MetricVisitor {
         this.prefix = prefix;
     }
 
+    public abstract void setMetricNumber(int metricNumber);
+
     public abstract void visitJvm(StringBuilder sb, JvmStats jvmStats);
 
     public abstract void visit(StringBuilder sb, Metric metric);
diff --git 
a/fe/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java 
b/fe/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java
index 6a98b94..d4a638d 100644
--- a/fe/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java
+++ b/fe/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java
@@ -52,11 +52,19 @@ public class PrometheusMetricVisitor extends MetricVisitor {
     private static final String HELP = "# HELP ";
     private static final String TYPE = "# TYPE ";
 
+    private int ordinal = 0;
+    private int metricNumber = 0;
+
     public PrometheusMetricVisitor(String prefix) {
         super(prefix);
     }
 
     @Override
+    public void setMetricNumber(int metricNumber) {
+        this.metricNumber = metricNumber;
+    }
+
+    @Override
     public void visitJvm(StringBuilder sb, JvmStats jvmStats) {
         // heap
         sb.append(Joiner.on(" ").join(HELP, JVM_HEAP_SIZE_BYTES, "jvm heap 
stat\n"));
diff --git 
a/fe/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java 
b/fe/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java
index cf832ad..a65f742 100644
--- a/fe/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java
+++ b/fe/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java
@@ -56,6 +56,9 @@ public class SimpleCoreMetricVisitor extends MetricVisitor {
 
     public static final String MAX_TABLET_COMPACTION_SCORE = 
"max_tablet_compaction_score";
 
+    private int ordinal = 0;
+    private int metricNumber = 0;
+
     private static final Map<String, String> CORE_METRICS = Maps.newHashMap();
     static {
         CORE_METRICS.put(MAX_JOURMAL_ID, TYPE_LONG);
@@ -72,6 +75,11 @@ public class SimpleCoreMetricVisitor extends MetricVisitor {
     }
 
     @Override
+    public void setMetricNumber(int metricNumber) {
+        this.metricNumber = metricNumber;
+    }
+
+    @Override
     public void visitJvm(StringBuilder sb, JvmStats jvmStats) {
         Iterator<MemoryPool> memIter = jvmStats.getMem().iterator();
         while (memIter.hasNext()) {
@@ -134,4 +142,4 @@ public class SimpleCoreMetricVisitor extends MetricVisitor {
         sb.append(prefix + "_backend_dead_num").append(" 
").append(String.valueOf(beDeadNum)).append("\n");
         sb.append(prefix + "_broker_dead_num").append(" 
").append(String.valueOf(brokerDeadNum)).append("\n");
     }
-}
\ No newline at end of file
+}
diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index fcbf6f3..d94733a 100644
--- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -211,10 +211,10 @@ public class ConnectProcessor {
         // TODO(cmy): when user send multi-statement, the executor is the last 
statement's executor.
         // We may need to find some way to resolve this.
         if (executor != null) {
-            auditAfterExec(originStmt.replace("\n", " \\n"), 
executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog());
+            auditAfterExec(originStmt.replace("\n", " "), 
executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog());
         } else {
             // executor can be null if we encounter analysis error.
-            auditAfterExec(originStmt.replace("\n", " \\n"), null, null);
+            auditAfterExec(originStmt.replace("\n", " "), null, null);
         }
     }
 
diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 90133e2..b5ccb65 100644
--- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -56,6 +56,7 @@ import org.apache.doris.common.util.ProfileManager;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlEofPacket;
@@ -707,6 +708,7 @@ public class StmtExecutor {
                     TabletCommitInfo.fromThrift(coord.getCommitInfos()),
                     10000)) {
                 txnStatus = TransactionStatus.VISIBLE;
+                MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
             } else {
                 txnStatus = TransactionStatus.COMMITTED;
             }
diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 6677050..3ec7032 100644
--- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -42,6 +42,7 @@ import org.apache.doris.load.EtlStatus;
 import org.apache.doris.load.LoadJob;
 import org.apache.doris.load.MiniEtlTaskInfo;
 import org.apache.doris.master.MasterImpl;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.StreamLoadPlanner;
 import org.apache.doris.plugin.AuditEvent;
@@ -696,6 +697,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
         // begin
         long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : 
Config.stream_load_default_timeout_second;
+        MetricRepo.COUNTER_LOAD_ADD.increase(1L);
         return Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
                 db.getId(), Lists.newArrayList(table.getId()), 
request.getLabel(), request.getRequest_id(),
                 new TxnCoordinator(TxnSourceType.BE, clientIp),
@@ -757,10 +759,15 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             throw new UserException("unknown database, database=" + dbName);
         }
 
-        return 
Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
-                db, request.getTxnId(),
-                TabletCommitInfo.fromThrift(request.getCommitInfos()),
-                5000, 
TxnCommitAttachment.fromThrift(request.txnCommitAttachment));
+        boolean ret = 
Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
+                        db, request.getTxnId(),
+                        TabletCommitInfo.fromThrift(request.getCommitInfos()),
+                        5000, 
TxnCommitAttachment.fromThrift(request.txnCommitAttachment));
+        if (ret) {
+            // if commit and publish is success, load can be regarded as 
success
+            MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
+        }
+        return ret;
     }
 
     @Override
diff --git a/fe/src/main/java/org/apache/doris/task/AgentTaskQueue.java 
b/fe/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index 36c0ef6..96aba8e 100644
--- a/fe/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -21,8 +21,10 @@ import org.apache.doris.thrift.TPushType;
 import org.apache.doris.thrift.TTaskType;
 
 import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Table;
 
 import org.apache.logging.log4j.LogManager;
@@ -215,6 +217,19 @@ public class AgentTaskQueue {
         return taskNum;
     }
 
+    public static synchronized Multimap<Long, Long> 
getTabletIdsByType(TTaskType type) {
+        Multimap<Long, Long> tabletIds = HashMultimap.create();
+        Map<Long, Map<Long, AgentTask>> taskMap = tasks.column(type);
+        if (taskMap != null) {
+            for (Map<Long, AgentTask> signatureMap : taskMap.values()) {
+                for (AgentTask task : signatureMap.values()) {
+                    tabletIds.put(task.getDbId(), task.getTabletId());
+                }
+            }
+        }
+        return tabletIds;
+    }
+
     public static synchronized int getTaskNum(long backendId, TTaskType type, 
boolean isFailed) {
         int taskNum = 0;
         if (backendId != -1) {
diff --git 
a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java 
b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index da73c8c..9c7192a 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -35,6 +35,7 @@ import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.EtlStatus;
 import org.apache.doris.load.Load;
 import org.apache.doris.load.Source;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.task.MasterTaskExecutor;
 import org.apache.doris.transaction.TransactionState;
@@ -44,6 +45,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.List;
@@ -58,6 +61,11 @@ import mockit.Mocked;
 
 public class BrokerLoadJobTest {
 
+    @BeforeClass
+    public static void start() {
+        MetricRepo.init();
+    }
+
     @Test
     public void testFromLoadStmt(@Injectable LoadStmt loadStmt,
                                  @Injectable LabelName labelName,
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java 
b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
index d99a4c1..dd69932 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
@@ -39,6 +39,8 @@ import org.apache.doris.transaction.TransactionState;
 import com.google.common.collect.Maps;
 
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.Map;
@@ -48,6 +50,11 @@ import mockit.Mocked;
 
 public class LoadJobTest {
 
+    @BeforeClass
+    public static void start() {
+        MetricRepo.init();
+    }
+
     @Test
     public void testGetDbNotExists(@Mocked Catalog catalog) {
         LoadJob loadJob = new BrokerLoadJob();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to