This is an automated email from the ASF dual-hosted git repository. lichaoyong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 1cc78fe [Enhancement] Convert metric to Json format (#3635) 1cc78fe is described below commit 1cc78fe69b3b07487d97592de78eefdf140bde83 Author: lichaoyong <lichaoyong...@gmail.com> AuthorDate: Wed May 27 08:49:30 2020 +0800 [Enhancement] Convert metric to Json format (#3635) Add a JSON format for existing metrics like this. ``` { "tags": { "metric":"thread_pool", "name":"thrift-server-pool", "type":"active_thread_num" }, "unit":"number", "value":3 } ``` I add a new JsonMetricVisitor to handle the transformation. It's not to modify existing PrometheusMetricVisitor and SimpleCoreMetricVisitor. Also I add 1. A unit item to indicate the metric better 2. Cloning tablet statistics divided by database. 3. Use white space to replace newline in audit.log --- be/src/exec/tablet_sink.cpp | 3 + be/src/http/action/metrics_action.cpp | 80 ++++++- be/src/http/action/stream_load.cpp | 28 +-- be/src/runtime/client_cache.cpp | 4 +- be/src/runtime/memory/chunk_allocator.cpp | 12 +- be/src/runtime/runtime_state.h | 14 ++ be/src/runtime/tmp_file_mgr.cc | 2 +- be/src/util/doris_metrics.cpp | 13 +- be/src/util/doris_metrics.h | 264 +++++++++++---------- be/src/util/metrics.cpp | 26 +- be/src/util/metrics.h | 92 +++++-- be/src/util/runtime_profile.cpp | 2 +- be/src/util/system_metrics.cpp | 55 +++-- be/src/util/thrift_server.cpp | 4 +- be/test/http/metrics_action_test.cpp | 8 +- be/test/util/doris_metrics_test.cpp | 54 ++--- be/test/util/new_metrics_test.cpp | 30 +-- be/test/util/system_metrics_test.cpp | 58 ++--- .../java/org/apache/doris/analysis/InsertStmt.java | 2 + .../org/apache/doris/common/ThreadPoolManager.java | 3 +- .../common/proc/IncompleteTabletsProcNode.java | 11 +- .../apache/doris/common/proc/StatisticProcDir.java | 18 +- .../org/apache/doris/http/rest/MetricsAction.java | 3 + .../apache/doris/load/loadv2/BrokerLoadJob.java | 3 + .../load/routineload/RoutineLoadTaskInfo.java | 2 + .../org/apache/doris/master/ReportHandler.java | 3 +- .../org/apache/doris/metric/CounterMetric.java | 4 +- .../apache/doris/metric/DoubleCounterMetric.java | 4 +- .../java/org/apache/doris/metric/GaugeMetric.java | 4 +- .../org/apache/doris/metric/GaugeMetricImpl.java | 4 +- .../org/apache/doris/metric/JsonMetricVisitor.java | 86 +++++++ .../org/apache/doris/metric/LongCounterMetric.java | 4 +- .../main/java/org/apache/doris/metric/Metric.java | 20 +- .../java/org/apache/doris/metric/MetricRepo.java | 58 ++--- .../org/apache/doris/metric/MetricVisitor.java | 2 + .../doris/metric/PrometheusMetricVisitor.java | 8 + .../doris/metric/SimpleCoreMetricVisitor.java | 10 +- .../java/org/apache/doris/qe/ConnectProcessor.java | 4 +- .../java/org/apache/doris/qe/StmtExecutor.java | 2 + .../apache/doris/service/FrontendServiceImpl.java | 15 +- .../java/org/apache/doris/task/AgentTaskQueue.java | 15 ++ .../doris/load/loadv2/BrokerLoadJobTest.java | 8 + .../org/apache/doris/load/loadv2/LoadJobTest.java | 7 + 43 files changed, 711 insertions(+), 338 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 4941bb3..7060a62 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -598,6 +598,9 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { // update incrementally so that FE can get the progress. // the real 'num_rows_load_total' will be set when sink being closed. state->update_num_rows_load_total(input_batch->num_rows()); + state->update_num_bytes_load_total(input_batch->total_byte_size()); + DorisMetrics::instance()->load_rows_total.increment(input_batch->num_rows()); + DorisMetrics::instance()->load_bytes_total.increment(input_batch->total_byte_size()); RowBatch* batch = input_batch; if (!_output_expr_ctxs.empty()) { SCOPED_RAW_TIMER(&_convert_batch_ns); diff --git a/be/src/http/action/metrics_action.cpp b/be/src/http/action/metrics_action.cpp index e37e2d1..b487820 100644 --- a/be/src/http/action/metrics_action.cpp +++ b/be/src/http/action/metrics_action.cpp @@ -17,6 +17,10 @@ #include "http/action/metrics_action.h" +#include <rapidjson/rapidjson.h> +#include <rapidjson/stringbuffer.h> +#include <rapidjson/document.h> +#include <rapidjson/writer.h> #include <string> #include "http/http_request.h" @@ -36,7 +40,7 @@ public: std::string to_string() const { return _ss.str(); } private: void _visit_simple_metric( - const std::string& name, const MetricLabels& labels, SimpleMetric* metric); + const std::string& name, const MetricLabels& labels, Metric* metric); private: std::stringstream _ss; }; @@ -88,7 +92,7 @@ void PrometheusMetricsVisitor::visit(const std::string& prefix, case MetricType::COUNTER: case MetricType::GAUGE: for (auto& it : collector->metrics()) { - _visit_simple_metric(metric_name, it.first, (SimpleMetric*) it.second); + _visit_simple_metric(metric_name, it.first, (Metric*) it.second); } break; default: @@ -97,7 +101,7 @@ void PrometheusMetricsVisitor::visit(const std::string& prefix, } void PrometheusMetricsVisitor::_visit_simple_metric( - const std::string& name, const MetricLabels& labels, SimpleMetric* metric) { + const std::string& name, const MetricLabels& labels, Metric* metric) { _ss << name; // labels if (!labels.empty()) { @@ -138,20 +142,80 @@ void SimpleCoreMetricsVisitor::visit(const std::string& prefix, } for (auto& it : collector->metrics()) { - _ss << metric_name << " LONG " << ((SimpleMetric*) it.second)->to_string() + _ss << metric_name << " LONG " << ((Metric*) it.second)->to_string() << "\n"; } } +class JsonMetricsVisitor : public MetricsVisitor { +public: + JsonMetricsVisitor() { + } + virtual ~JsonMetricsVisitor() {} + void visit(const std::string& prefix, const std::string& name, + MetricCollector* collector) override; + std::string to_string() { + rapidjson::StringBuffer strBuf; + rapidjson::Writer<rapidjson::StringBuffer> writer(strBuf); + doc.Accept(writer); + return strBuf.GetString(); + } + +private: + rapidjson::Document doc{rapidjson::kArrayType}; +}; + +void JsonMetricsVisitor::visit(const std::string& prefix, + const std::string& name, + MetricCollector* collector) { + if (collector->empty() || name.empty()) { + return; + } + + rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); + switch (collector->type()) { + case MetricType::COUNTER: + case MetricType::GAUGE: + for (auto& it : collector->metrics()) { + const MetricLabels& labels = it.first; + Metric* metric = reinterpret_cast<Metric*>(it.second); + rapidjson::Value metric_obj(rapidjson::kObjectType); + rapidjson::Value tag_obj(rapidjson::kObjectType); + tag_obj.AddMember("metric", rapidjson::Value(name.c_str(), allocator), allocator); + // labels + if (!labels.empty()) { + for (auto& label : labels.labels) { + tag_obj.AddMember( + rapidjson::Value(label.name.c_str(), allocator), + rapidjson::Value(label.value.c_str(), allocator), + allocator); + } + } + metric_obj.AddMember("tags", tag_obj, allocator); + rapidjson::Value unit_val(unit_name(metric->unit()), allocator); + metric_obj.AddMember("unit", unit_val, allocator); + metric->write_value(metric_obj, allocator); + doc.PushBack(metric_obj, allocator); + } + break; + default: + break; + } +} + void MetricsAction::handle(HttpRequest* req) { const std::string& type = req->param("type"); std::string str; - if (type != "core") { - PrometheusMetricsVisitor visitor; + if (type == "core") { + SimpleCoreMetricsVisitor visitor; + _metrics->collect(&visitor); + str.assign(visitor.to_string()); + } else if (type == "agent") { + JsonMetricsVisitor visitor; _metrics->collect(&visitor); str.assign(visitor.to_string()); } else { - SimpleCoreMetricsVisitor visitor; + PrometheusMetricsVisitor visitor; _metrics->collect(&visitor); str.assign(visitor.to_string()); } @@ -160,4 +224,4 @@ void MetricsAction::handle(HttpRequest* req) { HttpChannel::send_reply(req, str); } -} +} // namespace doris diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 7d490fe..15979ea 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -59,10 +59,10 @@ namespace doris { -IntCounter k_streaming_load_requests_total; -IntCounter k_streaming_load_bytes; -IntCounter k_streaming_load_duration_ms; -static IntGauge k_streaming_load_current_processing; +METRIC_DEFINE_INT_COUNTER(streaming_load_requests_total, MetricUnit::NUMBER); +METRIC_DEFINE_INT_COUNTER(streaming_load_bytes, MetricUnit::BYTES); +METRIC_DEFINE_INT_COUNTER(streaming_load_duration_ms, MetricUnit::MILLISECONDS); +METRIC_DEFINE_INT_GAUGE(streaming_load_current_processing, MetricUnit::NUMBER); #ifdef BE_TEST TStreamLoadPutResult k_stream_load_put_result; @@ -89,13 +89,13 @@ static bool is_format_support_streaming(TFileFormatType::type format) { StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) { DorisMetrics::instance()->metrics()->register_metric("streaming_load_requests_total", - &k_streaming_load_requests_total); + &streaming_load_requests_total); DorisMetrics::instance()->metrics()->register_metric("streaming_load_bytes", - &k_streaming_load_bytes); + &streaming_load_bytes); DorisMetrics::instance()->metrics()->register_metric("streaming_load_duration_ms", - &k_streaming_load_duration_ms); + &streaming_load_duration_ms); DorisMetrics::instance()->metrics()->register_metric("streaming_load_current_processing", - &k_streaming_load_current_processing); + &streaming_load_current_processing); } StreamLoadAction::~StreamLoadAction() { @@ -131,10 +131,10 @@ void StreamLoadAction::handle(HttpRequest* req) { HttpChannel::send_reply(req, str); // update statstics - k_streaming_load_requests_total.increment(1); - k_streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000); - k_streaming_load_bytes.increment(ctx->receive_bytes); - k_streaming_load_current_processing.increment(-1); + streaming_load_requests_total.increment(1); + streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000); + streaming_load_bytes.increment(ctx->receive_bytes); + streaming_load_current_processing.increment(-1); } Status StreamLoadAction::_handle(StreamLoadContext* ctx) { @@ -164,7 +164,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) { } int StreamLoadAction::on_header(HttpRequest* req) { - k_streaming_load_current_processing.increment(1); + streaming_load_current_processing.increment(1); StreamLoadContext* ctx = new StreamLoadContext(_exec_env); ctx->ref(); @@ -195,7 +195,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { } auto str = ctx->to_json(); HttpChannel::send_reply(req, str); - k_streaming_load_current_processing.increment(-1); + streaming_load_current_processing.increment(-1); return -1; } return 0; diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp index 3a46ab0..7144684 100644 --- a/be/src/runtime/client_cache.cpp +++ b/be/src/runtime/client_cache.cpp @@ -216,12 +216,12 @@ void ClientCacheHelper::init_metrics(MetricRegistry* metrics, const std::string& // usage, but ensures that _metrics_enabled is published. boost::lock_guard<boost::mutex> lock(_lock); - _used_clients.reset(new IntGauge()); + _used_clients.reset(new IntGauge(MetricUnit::NUMBER)); metrics->register_metric("thrift_used_clients", MetricLabels().add("name", key_prefix), _used_clients.get()); - _opened_clients.reset(new IntGauge()); + _opened_clients.reset(new IntGauge(MetricUnit::NUMBER)); metrics->register_metric("thrift_opened_clients", MetricLabels().add("name", key_prefix), _opened_clients.get()); diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp index e331ef3..c5e68f2 100644 --- a/be/src/runtime/memory/chunk_allocator.cpp +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -34,12 +34,12 @@ namespace doris { ChunkAllocator* ChunkAllocator::_s_instance = nullptr; -static IntCounter local_core_alloc_count; -static IntCounter other_core_alloc_count; -static IntCounter system_alloc_count; -static IntCounter system_free_count; -static IntCounter system_alloc_cost_ns; -static IntCounter system_free_cost_ns; +static IntCounter local_core_alloc_count(MetricUnit::NUMBER); +static IntCounter other_core_alloc_count(MetricUnit::NUMBER); +static IntCounter system_alloc_count(MetricUnit::NUMBER); +static IntCounter system_free_count(MetricUnit::NUMBER); +static IntCounter system_alloc_cost_ns(MetricUnit::NANOSECONDS); +static IntCounter system_free_cost_ns(MetricUnit::NANOSECONDS); #ifdef BE_TEST static std::mutex s_mutex; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 876bcc3..c3c4873 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -389,6 +389,10 @@ public: void append_error_msg_to_file(const std::string& line, const std::string& error_msg, bool is_summary = false); + int64_t num_bytes_load_total() { + return _num_bytes_load_total.load(); + } + int64_t num_rows_load_total() { return _num_rows_load_total.load(); } @@ -413,6 +417,14 @@ public: _num_rows_load_total.store(num_rows); } + void update_num_bytes_load_total(int64_t bytes_load) { + _num_bytes_load_total.fetch_add(bytes_load); + } + + void set_update_num_bytes_load_total(int64_t bytes_load) { + _num_bytes_load_total.store(bytes_load); + } + void update_num_rows_load_filtered(int64_t num_rows) { _num_rows_load_filtered.fetch_add(num_rows); } @@ -587,6 +599,8 @@ private: std::atomic<int64_t> _num_rows_load_unselected; // rows filtered by predicates std::atomic<int64_t> _num_print_error_rows; + std::atomic<int64_t> _num_bytes_load_total; // total bytes read from source + std::vector<std::string> _export_output_files; std::string _import_label; diff --git a/be/src/runtime/tmp_file_mgr.cc b/be/src/runtime/tmp_file_mgr.cc index 87ed68b..badc903 100644 --- a/be/src/runtime/tmp_file_mgr.cc +++ b/be/src/runtime/tmp_file_mgr.cc @@ -119,7 +119,7 @@ Status TmpFileMgr::init_custom( } DCHECK(metrics != NULL); - _num_active_scratch_dirs_metric.reset(new IntGauge()); + _num_active_scratch_dirs_metric.reset(new IntGauge(MetricUnit::NUMBER)); metrics->register_metric("active_scratch_dirs", _num_active_scratch_dirs_metric.get()); //_active_scratch_dirs_metric = metrics->register_metric(new SetMetric<std::string>( // TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 36c3e3b..2c466e9 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -150,6 +150,11 @@ DorisMetrics::DorisMetrics() : _name("doris_be"), _hook_name("doris_metrics"), _ _metrics.register_metric( "stream_load", MetricLabels().add("type", "load_rows"), &stream_load_rows_total); + _metrics.register_metric( + "load", MetricLabels().add("type", "receive_bytes"), + &stream_receive_bytes_total); + _metrics.register_metric("load_rows", &load_rows_total); + _metrics.register_metric("load_bytes", &load_bytes_total); // Gauge REGISTER_DORIS_METRIC(memory_pool_bytes_total); @@ -188,13 +193,13 @@ void DorisMetrics::initialize( const std::vector<std::string>& network_interfaces) { // disk usage for (auto& path : paths) { - IntGauge* gauge = disks_total_capacity.set_key(path); + IntGauge* gauge = disks_total_capacity.set_key(path, MetricUnit::BYTES); _metrics.register_metric("disks_total_capacity", MetricLabels().add("path", path), gauge); - gauge = disks_avail_capacity.set_key(path); + gauge = disks_avail_capacity.set_key(path, MetricUnit::BYTES); _metrics.register_metric("disks_avail_capacity", MetricLabels().add("path", path), gauge); - gauge = disks_data_used_capacity.set_key(path); + gauge = disks_data_used_capacity.set_key(path, MetricUnit::BYTES); _metrics.register_metric("disks_data_used_capacity", MetricLabels().add("path", path), gauge); - gauge = disks_state.set_key(path); + gauge = disks_state.set_key(path, MetricUnit::BYTES); _metrics.register_metric("disks_state", MetricLabels().add("path", path), gauge); } diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 7a3c0bc..e6775b5 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -37,8 +37,8 @@ public: } } - IntGauge* set_key(const std::string& key) { - metrics.emplace(key, IntGauge()); + IntGauge* set_key(const std::string& key, const MetricUnit unit) { + metrics.emplace(key, IntGauge(unit)); return &metrics.find(key)->second; } @@ -54,139 +54,141 @@ private: class DorisMetrics { public: - // counters - IntCounter fragment_requests_total; - IntCounter fragment_request_duration_us; - IntCounter http_requests_total; - IntCounter http_request_duration_us; - IntCounter http_request_send_bytes; - IntCounter query_scan_bytes; - IntCounter query_scan_rows; - IntCounter ranges_processed_total; - IntCounter push_requests_success_total; - IntCounter push_requests_fail_total; - IntCounter push_request_duration_us; - IntCounter push_request_write_bytes; - IntCounter push_request_write_rows; - IntCounter create_tablet_requests_total; - IntCounter create_tablet_requests_failed; - IntCounter drop_tablet_requests_total; - - IntCounter report_all_tablets_requests_total; - IntCounter report_all_tablets_requests_failed; - IntCounter report_tablet_requests_total; - IntCounter report_tablet_requests_failed; - IntCounter report_disk_requests_total; - IntCounter report_disk_requests_failed; - IntCounter report_task_requests_total; - IntCounter report_task_requests_failed; - - IntCounter schema_change_requests_total; - IntCounter schema_change_requests_failed; - IntCounter create_rollup_requests_total; - IntCounter create_rollup_requests_failed; - IntCounter storage_migrate_requests_total; - IntCounter delete_requests_total; - IntCounter delete_requests_failed; - IntCounter clone_requests_total; - IntCounter clone_requests_failed; - - IntCounter finish_task_requests_total; - IntCounter finish_task_requests_failed; - - IntCounter base_compaction_request_total; - IntCounter base_compaction_request_failed; - IntCounter cumulative_compaction_request_total; - IntCounter cumulative_compaction_request_failed; - - IntCounter base_compaction_deltas_total; - IntCounter base_compaction_bytes_total; - IntCounter cumulative_compaction_deltas_total; - IntCounter cumulative_compaction_bytes_total; - - IntCounter publish_task_request_total; - IntCounter publish_task_failed_total; - - IntCounter meta_write_request_total; - IntCounter meta_write_request_duration_us; - IntCounter meta_read_request_total; - IntCounter meta_read_request_duration_us; - - // Counters for segment_v2 - // ----------------------- - // total number of segments read - IntCounter segment_read_total; - // total number of rows in queried segments (before index pruning) - IntCounter segment_row_total; - // total number of rows selected by short key index - IntCounter segment_rows_by_short_key; - // total number of rows selected by zone map index - IntCounter segment_rows_read_by_zone_map; - - IntCounter txn_begin_request_total; - IntCounter txn_commit_request_total; - IntCounter txn_rollback_request_total; - IntCounter txn_exec_plan_total; - IntCounter stream_receive_bytes_total; - IntCounter stream_load_rows_total; - - IntCounter memtable_flush_total; - IntCounter memtable_flush_duration_us; - - // Gauges - IntGauge memory_pool_bytes_total; - IntGauge process_thread_num; - IntGauge process_fd_num_used; - IntGauge process_fd_num_limit_soft; - IntGauge process_fd_num_limit_hard; + // counters + METRIC_DEFINE_INT_COUNTER(fragment_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(fragment_request_duration_us, MetricUnit::MICROSECONDS); + METRIC_DEFINE_INT_COUNTER(http_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(http_request_duration_us, MetricUnit::MICROSECONDS); + METRIC_DEFINE_INT_COUNTER(http_request_send_bytes, MetricUnit::BYTES); + METRIC_DEFINE_INT_COUNTER(query_scan_bytes, MetricUnit::BYTES); + METRIC_DEFINE_INT_COUNTER(query_scan_rows, MetricUnit::BYTES); + METRIC_DEFINE_INT_COUNTER(ranges_processed_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(push_requests_success_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(push_requests_fail_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(push_request_duration_us, MetricUnit::MICROSECONDS); + METRIC_DEFINE_INT_COUNTER(push_request_write_bytes, MetricUnit::BYTES); + METRIC_DEFINE_INT_COUNTER(push_request_write_rows, MetricUnit::ROWS); + METRIC_DEFINE_INT_COUNTER(create_tablet_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(create_tablet_requests_failed, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(drop_tablet_requests_total, MetricUnit::NUMBER); + + METRIC_DEFINE_INT_COUNTER(report_all_tablets_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(report_all_tablets_requests_failed, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(report_tablet_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(report_tablet_requests_failed, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(report_disk_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(report_disk_requests_failed, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(report_task_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(report_task_requests_failed, MetricUnit::NUMBER); + + METRIC_DEFINE_INT_COUNTER(schema_change_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(schema_change_requests_failed, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(create_rollup_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(create_rollup_requests_failed, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(storage_migrate_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(delete_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(delete_requests_failed, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(clone_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(clone_requests_failed, MetricUnit::NUMBER); + + METRIC_DEFINE_INT_COUNTER(finish_task_requests_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(finish_task_requests_failed, MetricUnit::NUMBER); + + METRIC_DEFINE_INT_COUNTER(base_compaction_request_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(base_compaction_request_failed, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(cumulative_compaction_request_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(cumulative_compaction_request_failed, MetricUnit::NUMBER); + + METRIC_DEFINE_INT_COUNTER(base_compaction_deltas_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(base_compaction_bytes_total, MetricUnit::BYTES); + METRIC_DEFINE_INT_COUNTER(cumulative_compaction_deltas_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(cumulative_compaction_bytes_total, MetricUnit::BYTES); + + METRIC_DEFINE_INT_COUNTER(publish_task_request_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(publish_task_failed_total, MetricUnit::NUMBER); + + METRIC_DEFINE_INT_COUNTER(meta_write_request_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(meta_write_request_duration_us, MetricUnit::MICROSECONDS); + METRIC_DEFINE_INT_COUNTER(meta_read_request_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(meta_read_request_duration_us, MetricUnit::MICROSECONDS); + + // Counters for segment_v2 + // ----------------------- + // total number of segments read + METRIC_DEFINE_INT_COUNTER(segment_read_total, MetricUnit::NUMBER); + // total number of rows in queried segments (before index pruning) + METRIC_DEFINE_INT_COUNTER(segment_row_total, MetricUnit::ROWS); + // total number of rows selected by short key index + METRIC_DEFINE_INT_COUNTER(segment_rows_by_short_key, MetricUnit::ROWS); + // total number of rows selected by zone map index + METRIC_DEFINE_INT_COUNTER(segment_rows_read_by_zone_map, MetricUnit::ROWS); + + METRIC_DEFINE_INT_COUNTER(txn_begin_request_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(txn_commit_request_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(txn_rollback_request_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(txn_exec_plan_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(stream_receive_bytes_total, MetricUnit::BYTES); + METRIC_DEFINE_INT_COUNTER(stream_load_rows_total, MetricUnit::ROWS); + METRIC_DEFINE_INT_COUNTER(load_rows_total, MetricUnit::ROWS); + METRIC_DEFINE_INT_COUNTER(load_bytes_total, MetricUnit::BYTES); + + METRIC_DEFINE_INT_COUNTER(memtable_flush_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(memtable_flush_duration_us, MetricUnit::MICROSECONDS); + + // Gauges + METRIC_DEFINE_INT_GAUGE(memory_pool_bytes_total, MetricUnit::BYTES); + METRIC_DEFINE_INT_GAUGE(process_thread_num, MetricUnit::NUMBER); + METRIC_DEFINE_INT_GAUGE(process_fd_num_used, MetricUnit::NUMBER); + METRIC_DEFINE_INT_GAUGE(process_fd_num_limit_soft, MetricUnit::NUMBER); + METRIC_DEFINE_INT_GAUGE(process_fd_num_limit_hard, MetricUnit::NUMBER); IntGaugeMetricsMap disks_total_capacity; IntGaugeMetricsMap disks_avail_capacity; IntGaugeMetricsMap disks_data_used_capacity; IntGaugeMetricsMap disks_state; - - // the max compaction score of all tablets. - // Record base and cumulative scores separately, because - // we need to get the larger of the two. - IntGauge tablet_cumulative_max_compaction_score; - IntGauge tablet_base_max_compaction_score; - - // The following metrics will be calculated - // by metric calculator - IntGauge push_request_write_bytes_per_second; - IntGauge query_scan_bytes_per_second; - IntGauge max_disk_io_util_percent; - IntGauge max_network_send_bytes_rate; - IntGauge max_network_receive_bytes_rate; - - // Metrics related with BlockManager - IntCounter readable_blocks_total; - IntCounter writable_blocks_total; - IntCounter blocks_created_total; - IntCounter blocks_deleted_total; - IntCounter bytes_read_total; - IntCounter bytes_written_total; - IntCounter disk_sync_total; - IntGauge blocks_open_reading; - IntGauge blocks_open_writing; - - IntCounter blocks_push_remote_duration_us; - - // Size of some global containers - UIntGauge rowset_count_generated_and_in_use; - UIntGauge unused_rowsets_count; - UIntGauge broker_count; - UIntGauge data_stream_receiver_count; - UIntGauge fragment_endpoint_count; - UIntGauge active_scan_context_count; - UIntGauge plan_fragment_count; - UIntGauge load_channel_count; - UIntGauge result_buffer_block_count; - UIntGauge result_block_queue_count; - UIntGauge routine_load_task_count; - UIntGauge small_file_cache_count; - UIntGauge stream_load_pipe_count; - UIntGauge brpc_endpoint_stub_count; - UIntGauge tablet_writer_count; + + // the max compaction score of all tablets. + // Record base and cumulative scores separately, because + // we need to get the larger of the two. + METRIC_DEFINE_INT_GAUGE(tablet_cumulative_max_compaction_score, MetricUnit::NUMBER); + METRIC_DEFINE_INT_GAUGE(tablet_base_max_compaction_score, MetricUnit::NUMBER); + + // The following metrics will be calculated + // by metric calculator + METRIC_DEFINE_INT_GAUGE(push_request_write_bytes_per_second, MetricUnit::NUMBER); + METRIC_DEFINE_INT_GAUGE(query_scan_bytes_per_second, MetricUnit::NUMBER); + METRIC_DEFINE_INT_GAUGE(max_disk_io_util_percent, MetricUnit::PERCENT); + METRIC_DEFINE_INT_GAUGE(max_network_send_bytes_rate, MetricUnit::NUMBER); + METRIC_DEFINE_INT_GAUGE(max_network_receive_bytes_rate, MetricUnit::NUMBER); + + // Metrics related with BlockManager + METRIC_DEFINE_INT_COUNTER(readable_blocks_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(writable_blocks_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(blocks_created_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(blocks_deleted_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_COUNTER(bytes_read_total, MetricUnit::BYTES); + METRIC_DEFINE_INT_COUNTER(bytes_written_total, MetricUnit::BYTES); + METRIC_DEFINE_INT_COUNTER(disk_sync_total, MetricUnit::NUMBER); + METRIC_DEFINE_INT_GAUGE(blocks_open_reading, MetricUnit::NUMBER); + METRIC_DEFINE_INT_GAUGE(blocks_open_writing, MetricUnit::NUMBER); + + METRIC_DEFINE_INT_COUNTER(blocks_push_remote_duration_us, MetricUnit::MICROSECONDS); + + // Size of some global containers + METRIC_DEFINE_UINT_GAUGE(rowset_count_generated_and_in_use, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(unused_rowsets_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(broker_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(data_stream_receiver_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(fragment_endpoint_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(active_scan_context_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(plan_fragment_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(load_channel_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(result_buffer_block_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(result_block_queue_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(routine_load_task_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(small_file_cache_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(stream_load_pipe_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(brpc_endpoint_stub_count, MetricUnit::NUMBER); + METRIC_DEFINE_UINT_GAUGE(tablet_writer_count, MetricUnit::NUMBER); static DorisMetrics* instance() { static DorisMetrics instance; diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp index 3b37353..cbd5900 100644 --- a/be/src/util/metrics.cpp +++ b/be/src/util/metrics.cpp @@ -45,6 +45,29 @@ std::ostream& operator<<(std::ostream& os, MetricType type) { return os; } +const char* unit_name(MetricUnit unit) { + switch (unit) { + case MetricUnit::NANOSECONDS: + return "nanoseconds"; + case MetricUnit::MICROSECONDS: + return "microseconds"; + case MetricUnit::MILLISECONDS: + return "milliseconds"; + case MetricUnit::SECONDS: + return "seconds"; + case MetricUnit::BYTES: + return "bytes"; + case MetricUnit::ROWS: + return "rows"; + case MetricUnit::NUMBER: + return "number"; + case MetricUnit::PERCENT: + return "percent"; + default: + return "nounit"; + } +} + void Metric::hide() { if (_registry == nullptr) { return; @@ -56,8 +79,9 @@ void Metric::hide() { bool MetricCollector::add_metic(const MetricLabels& labels, Metric* metric) { if (empty()) { _type = metric->type(); + _unit = metric->unit(); } else { - if (metric->type() != _type) { + if (metric->type() != _type || metric->unit() != _unit) { return false; } } diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h index 331c5c3..46450c4 100644 --- a/be/src/util/metrics.h +++ b/be/src/util/metrics.h @@ -26,12 +26,17 @@ #include <mutex> #include <iomanip> +#include <rapidjson/rapidjson.h> +#include <rapidjson/document.h> + #include "common/config.h" #include "util/spinlock.h" #include "util/core_local.h" namespace doris { +namespace rj = RAPIDJSON_NAMESPACE; + class MetricRegistry; enum class MetricType { @@ -42,33 +47,49 @@ enum class MetricType { UNTYPED }; +enum class MetricUnit { + NANOSECONDS, + MICROSECONDS, + MILLISECONDS, + SECONDS, + BYTES, + ROWS, + NUMBER, + PERCENT, + NOUNIT +}; + std::ostream& operator<<(std::ostream& os, MetricType type); +const char* unit_name(MetricUnit unit); class Metric { public: - Metric(MetricType type) :_type(type), _registry(nullptr) { } + Metric(MetricType type, MetricUnit unit) + : _type(type), + _unit(unit), + _registry(nullptr) {} virtual ~Metric() { hide(); } + virtual std::string to_string() const = 0; MetricType type() const { return _type; } + MetricUnit unit() const { return _unit; } void hide(); + virtual void write_value(rj::Value& metric_obj, + rj::Document::AllocatorType& allocator) = 0; private: friend class MetricRegistry; - MetricType _type; + MetricType _type = MetricType::UNTYPED; + MetricUnit _unit = MetricUnit::NOUNIT; MetricRegistry* _registry; }; -class SimpleMetric : public Metric { -public: - SimpleMetric(MetricType type) :Metric(type) { } - virtual ~SimpleMetric() { } - virtual std::string to_string() const = 0; -}; - // Metric that only can increment template<typename T> -class LockSimpleMetric : public SimpleMetric { +class LockSimpleMetric : public Metric { public: - LockSimpleMetric(MetricType type) :SimpleMetric(type), _value(T()) { } + LockSimpleMetric(MetricType type, MetricUnit unit) + : Metric(type, unit), + _value(T()) {} virtual ~LockSimpleMetric() { } std::string to_string() const override { @@ -76,6 +97,11 @@ public: ss << value(); return ss.str(); } + + void write_value(rj::Value& metric_obj, + rj::Document::AllocatorType& allocator) override { + metric_obj.AddMember("value", rj::Value(value()), allocator); + } T value() const { std::lock_guard<SpinLock> l(_lock); @@ -103,9 +129,12 @@ protected: }; template<typename T> -class CoreLocalCounter : public SimpleMetric { +class CoreLocalCounter : public Metric { public: - CoreLocalCounter() :SimpleMetric(MetricType::COUNTER), _value() { } + CoreLocalCounter(MetricUnit unit) + : Metric(MetricType::COUNTER, unit), + _value() {} + virtual ~CoreLocalCounter() { } std::string to_string() const override { @@ -113,6 +142,11 @@ public: ss << value(); return ss.str(); } + + void write_value(rj::Value& metric_obj, + rj::Document::AllocatorType& allocator) override { + metric_obj.AddMember("value", rj::Value(value()), allocator); + } T value() const { T sum = 0; @@ -132,7 +166,8 @@ protected: template<typename T> class LockCounter : public LockSimpleMetric<T> { public: - LockCounter() :LockSimpleMetric<T>(MetricType::COUNTER) { } + LockCounter(MetricUnit unit) + : LockSimpleMetric<T>(MetricType::COUNTER, unit) {} virtual ~LockCounter() { } }; @@ -140,7 +175,8 @@ public: template<typename T> class LockGauge : public LockSimpleMetric<T> { public: - LockGauge() :LockSimpleMetric<T>(MetricType::GAUGE) { } + LockGauge(MetricUnit unit) + : LockSimpleMetric<T>(MetricType::GAUGE, unit) {} virtual ~LockGauge() { } }; @@ -274,8 +310,10 @@ public: return _metrics; } MetricType type() const { return _type; } + MetricUnit unit() const { return _unit; } private: MetricType _type = MetricType::UNTYPED; + MetricUnit _unit = MetricUnit::NOUNIT; std::map<MetricLabels, Metric*> _metrics; }; @@ -343,4 +381,26 @@ using IntGauge = LockGauge<int64_t>; using UIntGauge = LockGauge<uint64_t>; using DoubleGauge = LockGauge<double>; -} +} // namespace doris + +// Convenience macros to metric +#define METRIC_DEFINE_INT_COUNTER(metric_name, unit) \ + doris::IntCounter metric_name{unit} + +#define METRIC_DEFINE_INT_LOCK_COUNTER(metric_name, unit) \ + doris::IntLockCounter metric_name{unit} + +#define METRIC_DEFINE_UINT_COUNTER(metric_name, unit) \ + doris::UIntCounter metric_name{unit} + +#define METRIC_DEFINE_DOUBLE_COUNTER(metric_name, unit) \ + doris::DoubleCounter metric_name{unit} + +#define METRIC_DEFINE_INT_GAUGE(metric_name, unit) \ + doris::IntGauge metric_name{unit} + +#define METRIC_DEFINE_UINT_GAUGE(metric_name, unit) \ + doris::UIntGauge metric_name{unit} + +#define METRIC_DEFINE_DOUBLE_GAUGE(metric_name, unit) \ + doris::DoubleGauge metric_name{unit} diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index 7b69153..07f4658 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -529,7 +529,7 @@ void RuntimeProfile::pretty_print(std::ostream* s, const std::string& prefix) co { boost::lock_guard<boost::mutex> l(_info_strings_lock); BOOST_FOREACH (const std::string& key, _info_strings_display_order) { - stream << prefix << " " << key << ": " << _info_strings.find(key)->second << std::endl; + stream << prefix << " - " << key << ": " << _info_strings.find(key)->second << std::endl; } } diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp index ee43b20..26358e8 100644 --- a/be/src/util/system_metrics.cpp +++ b/be/src/util/system_metrics.cpp @@ -28,40 +28,47 @@ const char* SystemMetrics::_s_hook_name = "system_metrics"; // /proc/stat: http://www.linuxhowtos.org/System/procstat.htm struct CpuMetrics { - static constexpr int k_num_metrics = 10; - static const char* k_names[k_num_metrics]; - IntLockCounter metrics[k_num_metrics]; + static constexpr int cpu_num_metrics = 10; + IntLockCounter metrics[cpu_num_metrics] = { + {MetricUnit::PERCENT}, {MetricUnit::PERCENT}, + {MetricUnit::PERCENT}, {MetricUnit::PERCENT}, + {MetricUnit::PERCENT}, {MetricUnit::PERCENT}, + {MetricUnit::PERCENT}, {MetricUnit::PERCENT}, + {MetricUnit::PERCENT}, {MetricUnit::PERCENT} + }; + static const char* cpu_metrics[cpu_num_metrics]; }; -const char* CpuMetrics::k_names[] = { +const char* CpuMetrics::cpu_metrics[] = { "user", "nice", "system", "idle", "iowait", - "irq", "soft_irq", "steal", "guest", "guest_nice"}; + "irq", "soft_irq", "steal", "guest", "guest_nice" +}; struct MemoryMetrics { - IntGauge allocated_bytes; + METRIC_DEFINE_INT_GAUGE(allocated_bytes, MetricUnit::BYTES); }; struct DiskMetrics { - IntLockCounter reads_completed; - IntLockCounter bytes_read; - IntLockCounter read_time_ms; - IntLockCounter writes_completed; - IntLockCounter bytes_written; - IntLockCounter write_time_ms; - IntLockCounter io_time_ms; - IntLockCounter io_time_weigthed; + METRIC_DEFINE_INT_LOCK_COUNTER(reads_completed, MetricUnit::NUMBER); + METRIC_DEFINE_INT_LOCK_COUNTER(bytes_read, MetricUnit::BYTES); + METRIC_DEFINE_INT_LOCK_COUNTER(read_time_ms, MetricUnit::MILLISECONDS); + METRIC_DEFINE_INT_LOCK_COUNTER(writes_completed, MetricUnit::NUMBER); + METRIC_DEFINE_INT_LOCK_COUNTER(bytes_written, MetricUnit::BYTES); + METRIC_DEFINE_INT_LOCK_COUNTER(write_time_ms, MetricUnit::MILLISECONDS); + METRIC_DEFINE_INT_LOCK_COUNTER(io_time_ms, MetricUnit::MILLISECONDS); + METRIC_DEFINE_INT_LOCK_COUNTER(io_time_weigthed, MetricUnit::MILLISECONDS); }; struct NetMetrics { - IntLockCounter receive_bytes; - IntLockCounter receive_packets; - IntLockCounter send_bytes; - IntLockCounter send_packets; + METRIC_DEFINE_INT_LOCK_COUNTER(receive_bytes, MetricUnit::BYTES); + METRIC_DEFINE_INT_LOCK_COUNTER(receive_packets, MetricUnit::NUMBER); + METRIC_DEFINE_INT_LOCK_COUNTER(send_bytes, MetricUnit::BYTES); + METRIC_DEFINE_INT_LOCK_COUNTER(send_packets, MetricUnit::NUMBER); }; struct FileDescriptorMetrics { - IntGauge fd_num_limit; - IntGauge fd_num_used; + METRIC_DEFINE_INT_GAUGE(fd_num_limit, MetricUnit::NUMBER); + METRIC_DEFINE_INT_GAUGE(fd_num_used, MetricUnit::NUMBER); }; SystemMetrics::SystemMetrics() { @@ -110,9 +117,9 @@ void SystemMetrics::update() { void SystemMetrics::_install_cpu_metrics(MetricRegistry* registry) { _cpu_total.reset(new CpuMetrics()); - for (int i = 0; i < CpuMetrics::k_num_metrics; ++i) { + for (int i = 0; i < CpuMetrics::cpu_num_metrics; ++i) { registry->register_metric("cpu", - MetricLabels().add("mode", CpuMetrics::k_names[i]), + MetricLabels().add("mode", CpuMetrics::cpu_metrics[i]), &_cpu_total->metrics[i]); } } @@ -146,7 +153,7 @@ void SystemMetrics::_update_cpu_metrics() { } char cpu[16]; - int64_t values[CpuMetrics::k_num_metrics]; + int64_t values[CpuMetrics::cpu_num_metrics]; memset(values, 0, sizeof(values)); sscanf(_line_ptr, "%15s" " %" PRId64 " %" PRId64 " %" PRId64 @@ -159,7 +166,7 @@ void SystemMetrics::_update_cpu_metrics() { &values[6], &values[7], &values[8], &values[9]); - for (int i = 0; i < CpuMetrics::k_num_metrics; ++i) { + for (int i = 0; i < CpuMetrics::cpu_num_metrics; ++i) { _cpu_total->metrics[i].set_value(values[i]); } diff --git a/be/src/util/thrift_server.cpp b/be/src/util/thrift_server.cpp index c9b69d8..9985732 100644 --- a/be/src/util/thrift_server.cpp +++ b/be/src/util/thrift_server.cpp @@ -276,12 +276,12 @@ ThriftServer::ThriftServer( _session_handler(NULL) { if (metrics != NULL) { _metrics_enabled = true; - _current_connections.reset(new IntGauge()); + _current_connections.reset(new IntGauge(MetricUnit::NUMBER)); metrics->register_metric("thrift_current_connections", MetricLabels().add("name", name), _current_connections.get()); - _connections_total.reset(new IntCounter()); + _connections_total.reset(new IntCounter(MetricUnit::NUMBER)); metrics->register_metric("thrift_connections_total", MetricLabels().add("name", name), _connections_total.get()); diff --git a/be/test/http/metrics_action_test.cpp b/be/test/http/metrics_action_test.cpp index 4c63761..6bbab74 100644 --- a/be/test/http/metrics_action_test.cpp +++ b/be/test/http/metrics_action_test.cpp @@ -53,10 +53,10 @@ private: TEST_F(MetricsActionTest, prometheus_output) { MetricRegistry registry("test"); - IntGauge cpu_idle; + IntGauge cpu_idle(MetricUnit::PERCENT); cpu_idle.set_value(50); registry.register_metric("cpu_idle", &cpu_idle); - IntCounter put_requests_total; + IntCounter put_requests_total(MetricUnit::NUMBER); put_requests_total.increment(2345); registry.register_metric("requests_total", MetricLabels().add("type", "put").add("path", "/sports"), @@ -73,7 +73,7 @@ TEST_F(MetricsActionTest, prometheus_output) { TEST_F(MetricsActionTest, prometheus_no_prefix) { MetricRegistry registry(""); - IntGauge cpu_idle; + IntGauge cpu_idle(MetricUnit::PERCENT); cpu_idle.set_value(50); registry.register_metric("cpu_idle", &cpu_idle); s_expect_response = @@ -86,7 +86,7 @@ TEST_F(MetricsActionTest, prometheus_no_prefix) { TEST_F(MetricsActionTest, prometheus_no_name) { MetricRegistry registry("test"); - IntGauge cpu_idle; + IntGauge cpu_idle(MetricUnit::PERCENT); cpu_idle.set_value(50); registry.register_metric("", &cpu_idle); s_expect_response = ""; diff --git a/be/test/util/doris_metrics_test.cpp b/be/test/util/doris_metrics_test.cpp index e308f45..76a6a2e 100644 --- a/be/test/util/doris_metrics_test.cpp +++ b/be/test/util/doris_metrics_test.cpp @@ -60,7 +60,7 @@ public: _ss << "}"; } } - _ss << " " << ((SimpleMetric*)metric)->to_string() << std::endl; + _ss << " " << metric->to_string() << std::endl; break; } default: @@ -85,81 +85,81 @@ TEST_F(DorisMetricsTest, Normal) { DorisMetrics::instance()->fragment_requests_total.increment(12); auto metric = metrics->get_metric("fragment_requests_total"); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("12", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("12", metric->to_string().c_str()); } { DorisMetrics::instance()->fragment_request_duration_us.increment(101); auto metric = metrics->get_metric("fragment_request_duration_us"); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("101", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("101", metric->to_string().c_str()); } { DorisMetrics::instance()->http_requests_total.increment(102); auto metric = metrics->get_metric("http_requests_total"); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("102", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("102", metric->to_string().c_str()); } { DorisMetrics::instance()->http_request_duration_us.increment(103); auto metric = metrics->get_metric("http_request_duration_us"); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("103", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("103", metric->to_string().c_str()); } { DorisMetrics::instance()->http_request_send_bytes.increment(104); auto metric = metrics->get_metric("http_request_send_bytes"); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("104", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("104", metric->to_string().c_str()); } { DorisMetrics::instance()->query_scan_bytes.increment(104); auto metric = metrics->get_metric("query_scan_bytes"); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("104", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("104", metric->to_string().c_str()); } { DorisMetrics::instance()->query_scan_rows.increment(105); auto metric = metrics->get_metric("query_scan_rows"); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("105", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("105", metric->to_string().c_str()); } { DorisMetrics::instance()->ranges_processed_total.increment(13); auto metric = metrics->get_metric("ranges_processed_total"); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("13", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("13", metric->to_string().c_str()); } { DorisMetrics::instance()->push_requests_success_total.increment(106); auto metric = metrics->get_metric("push_requests_total", MetricLabels().add("status", "SUCCESS")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("106", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("106", metric->to_string().c_str()); } { DorisMetrics::instance()->push_requests_fail_total.increment(107); auto metric = metrics->get_metric("push_requests_total", MetricLabels().add("status", "FAIL")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("107", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("107", metric->to_string().c_str()); } { DorisMetrics::instance()->push_request_duration_us.increment(108); auto metric = metrics->get_metric("push_request_duration_us"); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("108", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("108", metric->to_string().c_str()); } { DorisMetrics::instance()->push_request_write_bytes.increment(109); auto metric = metrics->get_metric("push_request_write_bytes"); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("109", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("109", metric->to_string().c_str()); } { DorisMetrics::instance()->push_request_write_rows.increment(110); auto metric = metrics->get_metric("push_request_write_rows"); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("110", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("110", metric->to_string().c_str()); } // engine request { @@ -168,7 +168,7 @@ TEST_F(DorisMetricsTest, Normal) { MetricLabels().add("type", "create_tablet") .add("status", "total")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("15", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("15", metric->to_string().c_str()); } { DorisMetrics::instance()->drop_tablet_requests_total.increment(16); @@ -176,7 +176,7 @@ TEST_F(DorisMetricsTest, Normal) { MetricLabels().add("type", "drop_tablet") .add("status", "total")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("16", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("16", metric->to_string().c_str()); } { DorisMetrics::instance()->report_all_tablets_requests_total.increment(17); @@ -184,7 +184,7 @@ TEST_F(DorisMetricsTest, Normal) { MetricLabels().add("type", "report_all_tablets") .add("status", "total")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("17", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("17", metric->to_string().c_str()); } { DorisMetrics::instance()->report_tablet_requests_total.increment(18); @@ -192,7 +192,7 @@ TEST_F(DorisMetricsTest, Normal) { MetricLabels().add("type", "report_tablet") .add("status", "total")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("18", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("18", metric->to_string().c_str()); } { DorisMetrics::instance()->schema_change_requests_total.increment(19); @@ -200,7 +200,7 @@ TEST_F(DorisMetricsTest, Normal) { MetricLabels().add("type", "schema_change") .add("status", "total")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("19", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("19", metric->to_string().c_str()); } { DorisMetrics::instance()->create_rollup_requests_total.increment(20); @@ -208,7 +208,7 @@ TEST_F(DorisMetricsTest, Normal) { MetricLabels().add("type", "create_rollup") .add("status", "total")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("20", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("20", metric->to_string().c_str()); } { DorisMetrics::instance()->storage_migrate_requests_total.increment(21); @@ -216,7 +216,7 @@ TEST_F(DorisMetricsTest, Normal) { MetricLabels().add("type", "storage_migrate") .add("status", "total")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("21", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("21", metric->to_string().c_str()); } { DorisMetrics::instance()->delete_requests_total.increment(22); @@ -224,7 +224,7 @@ TEST_F(DorisMetricsTest, Normal) { MetricLabels().add("type", "delete") .add("status", "total")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("22", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("22", metric->to_string().c_str()); } // comapction { @@ -232,35 +232,35 @@ TEST_F(DorisMetricsTest, Normal) { auto metric = metrics->get_metric("compaction_deltas_total", MetricLabels().add("type", "base")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("30", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("30", metric->to_string().c_str()); } { DorisMetrics::instance()->cumulative_compaction_deltas_total.increment(31); auto metric = metrics->get_metric("compaction_deltas_total", MetricLabels().add("type", "cumulative")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("31", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("31", metric->to_string().c_str()); } { DorisMetrics::instance()->base_compaction_bytes_total.increment(32); auto metric = metrics->get_metric("compaction_bytes_total", MetricLabels().add("type", "base")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("32", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("32", metric->to_string().c_str()); } { DorisMetrics::instance()->cumulative_compaction_bytes_total.increment(33); auto metric = metrics->get_metric("compaction_bytes_total", MetricLabels().add("type", "cumulative")); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("33", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("33", metric->to_string().c_str()); } // Gauge { DorisMetrics::instance()->memory_pool_bytes_total.increment(40); auto metric = metrics->get_metric("memory_pool_bytes_total"); ASSERT_TRUE(metric != nullptr); - ASSERT_STREQ("40", ((SimpleMetric*)metric)->to_string().c_str()); + ASSERT_STREQ("40", metric->to_string().c_str()); } } diff --git a/be/test/util/new_metrics_test.cpp b/be/test/util/new_metrics_test.cpp index d35d6cf..649593b 100644 --- a/be/test/util/new_metrics_test.cpp +++ b/be/test/util/new_metrics_test.cpp @@ -36,7 +36,7 @@ public: TEST_F(MetricsTest, Counter) { { - IntCounter counter; + IntCounter counter(MetricUnit::NUMBER); ASSERT_EQ(0, counter.value()); counter.increment(100); ASSERT_EQ(100, counter.value()); @@ -44,7 +44,7 @@ TEST_F(MetricsTest, Counter) { ASSERT_STREQ("100", counter.to_string().c_str()); } { - DoubleCounter counter; + DoubleCounter counter(MetricUnit::NUMBER); ASSERT_EQ(0.0, counter.value()); counter.increment(1.23); ASSERT_EQ(1.23, counter.value()); @@ -65,7 +65,7 @@ void mt_updater(IntCounter* counter, std::atomic<uint64_t>* used_time) { } TEST_F(MetricsTest, CounterPerf) { - IntCounter counter; + IntCounter counter(MetricUnit::NUMBER); volatile int64_t sum = 0; { @@ -91,7 +91,7 @@ TEST_F(MetricsTest, CounterPerf) { ASSERT_EQ(100000000, counter.value()); ASSERT_EQ(100000000, sum); { - IntCounter mt_counter; + IntCounter mt_counter(MetricUnit::NUMBER); std::vector<std::thread> updaters; std::atomic<uint64_t> used_time(0); for (int i = 0; i < 8; ++i) { @@ -108,7 +108,7 @@ TEST_F(MetricsTest, CounterPerf) { TEST_F(MetricsTest, Gauge) { { - IntGauge gauge; + IntGauge gauge(MetricUnit::NUMBER); ASSERT_EQ(0, gauge.value()); gauge.set_value(100); ASSERT_EQ(100, gauge.value()); @@ -116,7 +116,7 @@ TEST_F(MetricsTest, Gauge) { ASSERT_STREQ("100", gauge.to_string().c_str()); } { - DoubleGauge gauge; + DoubleGauge gauge(MetricUnit::NUMBER); ASSERT_EQ(0.0, gauge.value()); gauge.set_value(1.23); ASSERT_EQ(1.23, gauge.value()); @@ -189,7 +189,7 @@ public: } _ss << labels.to_string(); } - _ss << " " << ((SimpleMetric*)metric)->to_string() << std::endl; + _ss << " " << metric->to_string() << std::endl; break; } default: @@ -205,9 +205,9 @@ private: }; TEST_F(MetricsTest, MetricCollector) { - IntCounter puts; + IntCounter puts(MetricUnit::NUMBER); puts.increment(101); - IntCounter gets; + IntCounter gets(MetricUnit::NUMBER); gets.increment(201); MetricCollector collector; ASSERT_TRUE(collector.add_metic(MetricLabels().add("type", "put"), &puts)); @@ -216,7 +216,7 @@ TEST_F(MetricsTest, MetricCollector) { { // Can't add different type to one collector - IntGauge post; + IntGauge post(MetricUnit::NUMBER); ASSERT_FALSE(collector.add_metic(MetricLabels().add("type", "post"), &post)); } @@ -241,13 +241,13 @@ TEST_F(MetricsTest, MetricCollector) { TEST_F(MetricsTest, MetricRegistry) { MetricRegistry registry("test"); - IntCounter cpu_idle; + IntCounter cpu_idle(MetricUnit::PERCENT); cpu_idle.increment(12); ASSERT_TRUE(registry.register_metric("cpu_idle", &cpu_idle)); // registry failed - IntCounter dummy; + IntCounter dummy(MetricUnit::PERCENT); ASSERT_FALSE(registry.register_metric("cpu_idle", &dummy)); - IntCounter memory_usage; + IntCounter memory_usage(MetricUnit::BYTES); memory_usage.increment(24); ASSERT_TRUE(registry.register_metric("memory_usage", &memory_usage)); { @@ -268,13 +268,13 @@ TEST_F(MetricsTest, MetricRegistry) { TEST_F(MetricsTest, MetricRegistry2) { MetricRegistry registry("test"); - IntCounter cpu_idle; + IntCounter cpu_idle(MetricUnit::PERCENT); cpu_idle.increment(12); ASSERT_TRUE(registry.register_metric("cpu_idle", &cpu_idle)); { // memory_usage will deregister after this block - IntCounter memory_usage; + IntCounter memory_usage(MetricUnit::BYTES); memory_usage.increment(24); ASSERT_TRUE(registry.register_metric("memory_usage", &memory_usage)); TestMetricsVisitor visitor; diff --git a/be/test/util/system_metrics_test.cpp b/be/test/util/system_metrics_test.cpp index 70e1d3e..4ec253e 100644 --- a/be/test/util/system_metrics_test.cpp +++ b/be/test/util/system_metrics_test.cpp @@ -65,7 +65,7 @@ public: _ss << "}"; } } - _ss << " " << ((SimpleMetric*)metric)->to_string() << std::endl; + _ss << " " << metric->to_string() << std::endl; break; } default: @@ -118,104 +118,104 @@ TEST_F(SystemMetricsTest, normal) { LOG(INFO) << "\n" << visitor.to_string(); // cpu - SimpleMetric* cpu_user = (SimpleMetric*)registry.get_metric( + Metric* cpu_user = registry.get_metric( "cpu", MetricLabels().add("mode", "user")); ASSERT_TRUE(cpu_user != nullptr); // ASSERT_STREQ("57199151", cpu_user->to_string().c_str()); - SimpleMetric* cpu_nice = (SimpleMetric*)registry.get_metric( + Metric* cpu_nice = registry.get_metric( "cpu", MetricLabels().add("mode", "nice")); ASSERT_TRUE(cpu_nice != nullptr); ASSERT_STREQ("2616310", cpu_nice->to_string().c_str()); - SimpleMetric* cpu_system = (SimpleMetric*)registry.get_metric( + Metric* cpu_system = registry.get_metric( "cpu", MetricLabels().add("mode", "system")); ASSERT_TRUE(cpu_system != nullptr); ASSERT_STREQ("10600935", cpu_system->to_string().c_str()); - SimpleMetric* cpu_idle = (SimpleMetric*)registry.get_metric( + Metric* cpu_idle = registry.get_metric( "cpu", MetricLabels().add("mode", "idle")); ASSERT_TRUE(cpu_idle != nullptr); ASSERT_STREQ("1517505423", cpu_idle->to_string().c_str()); - SimpleMetric* cpu_iowait = (SimpleMetric*)registry.get_metric( + Metric* cpu_iowait = registry.get_metric( "cpu", MetricLabels().add("mode", "iowait")); ASSERT_TRUE(cpu_iowait != nullptr); ASSERT_STREQ("2137148", cpu_iowait->to_string().c_str()); - SimpleMetric* cpu_irq = (SimpleMetric*)registry.get_metric( + Metric* cpu_irq = registry.get_metric( "cpu", MetricLabels().add("mode", "irq")); ASSERT_TRUE(cpu_irq != nullptr); ASSERT_STREQ("0", cpu_irq->to_string().c_str()); - SimpleMetric* cpu_softirq = (SimpleMetric*)registry.get_metric( + Metric* cpu_softirq = registry.get_metric( "cpu", MetricLabels().add("mode", "soft_irq")); ASSERT_TRUE(cpu_softirq != nullptr); ASSERT_STREQ("108277", cpu_softirq->to_string().c_str()); - SimpleMetric* cpu_steal = (SimpleMetric*)registry.get_metric( + Metric* cpu_steal = registry.get_metric( "cpu", MetricLabels().add("mode", "steal")); ASSERT_TRUE(cpu_steal != nullptr); ASSERT_STREQ("0", cpu_steal->to_string().c_str()); - SimpleMetric* cpu_guest = (SimpleMetric*)registry.get_metric( + Metric* cpu_guest = registry.get_metric( "cpu", MetricLabels().add("mode", "guest")); ASSERT_TRUE(cpu_guest != nullptr); ASSERT_STREQ("0", cpu_guest->to_string().c_str()); // memroy - SimpleMetric* memory_allocated_bytes = (SimpleMetric*)registry.get_metric( + Metric* memory_allocated_bytes = registry.get_metric( "memory_allocated_bytes"); ASSERT_TRUE(memory_allocated_bytes != nullptr); // network - SimpleMetric* receive_bytes = (SimpleMetric*)registry.get_metric( + Metric* receive_bytes = registry.get_metric( "network_receive_bytes", MetricLabels().add("device", "xgbe0")); ASSERT_TRUE(receive_bytes != nullptr); ASSERT_STREQ("52567436039", receive_bytes->to_string().c_str()); - SimpleMetric* receive_packets = (SimpleMetric*)registry.get_metric( + Metric* receive_packets = registry.get_metric( "network_receive_packets", MetricLabels().add("device", "xgbe0")); ASSERT_TRUE(receive_packets != nullptr); ASSERT_STREQ("65066152", receive_packets->to_string().c_str()); - SimpleMetric* send_bytes = (SimpleMetric*)registry.get_metric( + Metric* send_bytes = registry.get_metric( "network_send_bytes", MetricLabels().add("device", "xgbe0")); ASSERT_TRUE(send_bytes != nullptr); ASSERT_STREQ("45480856156", send_bytes->to_string().c_str()); - SimpleMetric* send_packets = (SimpleMetric*)registry.get_metric( + Metric* send_packets = registry.get_metric( "network_send_packets", MetricLabels().add("device", "xgbe0")); ASSERT_TRUE(send_packets != nullptr); ASSERT_STREQ("88277614", send_packets->to_string().c_str()); // disk - SimpleMetric* bytes_read = (SimpleMetric*)registry.get_metric( + Metric* bytes_read = registry.get_metric( "disk_bytes_read", MetricLabels().add("device", "sda")); ASSERT_TRUE(bytes_read != nullptr); ASSERT_STREQ("20142745600", bytes_read->to_string().c_str()); - SimpleMetric* reads_completed = (SimpleMetric*)registry.get_metric( + Metric* reads_completed = registry.get_metric( "disk_reads_completed", MetricLabels().add("device", "sda")); ASSERT_TRUE(reads_completed != nullptr); ASSERT_STREQ("759548", reads_completed->to_string().c_str()); - SimpleMetric* read_time_ms = (SimpleMetric*)registry.get_metric( + Metric* read_time_ms = registry.get_metric( "disk_read_time_ms", MetricLabels().add("device", "sda")); ASSERT_TRUE(read_time_ms != nullptr); ASSERT_STREQ("4308146", read_time_ms->to_string().c_str()); - SimpleMetric* bytes_written = (SimpleMetric*)registry.get_metric( + Metric* bytes_written = registry.get_metric( "disk_bytes_written", MetricLabels().add("device", "sda")); ASSERT_TRUE(bytes_written != nullptr); ASSERT_STREQ("1624753500160", bytes_written->to_string().c_str()); - SimpleMetric* writes_completed = (SimpleMetric*)registry.get_metric( + Metric* writes_completed = registry.get_metric( "disk_writes_completed", MetricLabels().add("device", "sda")); ASSERT_TRUE(writes_completed != nullptr); ASSERT_STREQ("18282936", writes_completed->to_string().c_str()); - SimpleMetric* write_time_ms = (SimpleMetric*)registry.get_metric( + Metric* write_time_ms = registry.get_metric( "disk_write_time_ms", MetricLabels().add("device", "sda")); ASSERT_TRUE(write_time_ms != nullptr); ASSERT_STREQ("1907755230", write_time_ms->to_string().c_str()); - SimpleMetric* io_time_ms = (SimpleMetric*)registry.get_metric( + Metric* io_time_ms = registry.get_metric( "disk_io_time_ms", MetricLabels().add("device", "sda")); ASSERT_TRUE(io_time_ms != nullptr); ASSERT_STREQ("19003350", io_time_ms->to_string().c_str()); - SimpleMetric* io_time_weigthed = (SimpleMetric*)registry.get_metric( + Metric* io_time_weigthed = registry.get_metric( "disk_io_time_weigthed", MetricLabels().add("device", "sda")); ASSERT_TRUE(write_time_ms != nullptr); ASSERT_STREQ("1912122964", io_time_weigthed->to_string().c_str()); // fd - SimpleMetric* fd_metric = (SimpleMetric*)registry.get_metric( + Metric* fd_metric = registry.get_metric( "fd_num_limit"); ASSERT_TRUE(fd_metric != nullptr); ASSERT_STREQ("13052138", fd_metric->to_string().c_str()); - fd_metric = (SimpleMetric*)registry.get_metric( + fd_metric = registry.get_metric( "fd_num_used"); ASSERT_TRUE(fd_metric != nullptr); ASSERT_STREQ("19520", fd_metric->to_string().c_str()); @@ -263,21 +263,21 @@ TEST_F(SystemMetricsTest, no_proc_file) { LOG(INFO) << "\n" << visitor.to_string(); // cpu - SimpleMetric* cpu_user = (SimpleMetric*)registry.get_metric( + Metric* cpu_user = registry.get_metric( "cpu", MetricLabels().add("mode", "user")); ASSERT_TRUE(cpu_user != nullptr); ASSERT_STREQ("0", cpu_user->to_string().c_str()); // memroy - SimpleMetric* memory_allocated_bytes = (SimpleMetric*)registry.get_metric( + Metric* memory_allocated_bytes = registry.get_metric( "memory_allocated_bytes"); ASSERT_TRUE(memory_allocated_bytes != nullptr); // network - SimpleMetric* receive_bytes = (SimpleMetric*)registry.get_metric( + Metric* receive_bytes = registry.get_metric( "network_receive_bytes", MetricLabels().add("device", "xgbe0")); ASSERT_TRUE(receive_bytes != nullptr); ASSERT_STREQ("0", receive_bytes->to_string().c_str()); // disk - SimpleMetric* bytes_read = (SimpleMetric*)registry.get_metric( + Metric* bytes_read = registry.get_metric( "disk_bytes_read", MetricLabels().add("device", "sda")); ASSERT_TRUE(bytes_read != nullptr); ASSERT_STREQ("0", bytes_read->to_string().c_str()); diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java index f7c67c7..814dc49 100644 --- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -34,6 +34,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSink; @@ -294,6 +295,7 @@ public class InsertStmt extends DdlStmt { if (targetTable instanceof OlapTable) { LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; + MetricRepo.COUNTER_LOAD_ADD.increase(1L); transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), Lists.newArrayList(targetTable.getId()), label, new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), diff --git a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java index 20d2fc2..ef9b088 100644 --- a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -20,6 +20,7 @@ package org.apache.doris.common; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.doris.metric.GaugeMetric; +import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.metric.MetricLabel; import org.apache.doris.metric.MetricRepo; import org.apache.logging.log4j.LogManager; @@ -68,7 +69,7 @@ public class ThreadPoolManager { public static void registerThreadPoolMetric(String poolName, ThreadPoolExecutor threadPool) { for (String poolMetricType : poolMerticTypes) { - GaugeMetric<Integer> gauge = new GaugeMetric<Integer>("thread_pool", "thread_pool statistics") { + GaugeMetric<Integer> gauge = new GaugeMetric<Integer>("thread_pool", MetricUnit.NUMBER, "thread_pool statistics") { @Override public Integer getValue() { String metricType = this.getLabels().get(1).getValue(); diff --git a/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java index f0f89a3..b278c47 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java @@ -29,17 +29,20 @@ import java.util.List; public class IncompleteTabletsProcNode implements ProcNodeInterface { public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() - .add("UnhealthyTablets").add("InconsistentTablets") + .add("UnhealthyTablets").add("InconsistentTablets").add("CloningTablets") .build(); - private static final Joiner JOINER = Joiner.on(","); Collection<Long> unhealthyTabletIds; Collection<Long> inconsistentTabletIds; + Collection<Long> cloningTabletIds; - public IncompleteTabletsProcNode(Collection<Long> unhealthyTabletIds, Collection<Long> inconsistentTabletIds) { + public IncompleteTabletsProcNode(Collection<Long> unhealthyTabletIds, + Collection<Long> inconsistentTabletIds, + Collection<Long> cloningTabletIds) { this.unhealthyTabletIds = unhealthyTabletIds; this.inconsistentTabletIds = inconsistentTabletIds; + this.cloningTabletIds = cloningTabletIds; } @Override @@ -52,8 +55,10 @@ public class IncompleteTabletsProcNode implements ProcNodeInterface { String incompleteTablets = JOINER.join(Arrays.asList(unhealthyTabletIds)); String inconsistentTablets = JOINER.join(Arrays.asList(inconsistentTabletIds)); + String cloningTablets = JOINER.join(Arrays.asList(cloningTabletIds)); row.add(incompleteTablets); row.add(inconsistentTablets); + row.add(cloningTablets); result.addRow(row); diff --git a/fe/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java index 72ca897..e75a3fe 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java @@ -32,6 +32,8 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.common.util.ListComparator; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.thrift.TTaskType; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; @@ -42,12 +44,16 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + public class StatisticProcDir implements ProcDirInterface { public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() .add("DbId").add("DbName").add("TableNum").add("PartitionNum") .add("IndexNum").add("TabletNum").add("ReplicaNum").add("UnhealthyTabletNum") - .add("InconsistentTabletNum") + .add("InconsistentTabletNum").add("CloningTabletNum") .build(); + private static final Logger LOG = LogManager.getLogger(StatisticProcDir.class); private Catalog catalog; @@ -55,11 +61,14 @@ public class StatisticProcDir implements ProcDirInterface { Multimap<Long, Long> unhealthyTabletIds; // db id -> set(tablet id) Multimap<Long, Long> inconsistentTabletIds; + // db id -> set(tablet id) + Multimap<Long, Long> cloningTabletIds; public StatisticProcDir(Catalog catalog) { this.catalog = catalog; unhealthyTabletIds = HashMultimap.create(); inconsistentTabletIds = HashMultimap.create(); + cloningTabletIds = HashMultimap.create(); } @Override @@ -86,6 +95,7 @@ public class StatisticProcDir implements ProcDirInterface { unhealthyTabletIds.clear(); inconsistentTabletIds.clear(); + cloningTabletIds = AgentTaskQueue.getTabletIdsByType(TTaskType.CLONE); List<List<Comparable>> lines = new ArrayList<List<Comparable>>(); for (Long dbId : dbIds) { if (dbId == 0) { @@ -153,6 +163,7 @@ public class StatisticProcDir implements ProcDirInterface { oneLine.add(dbReplicaNum); oneLine.add(unhealthyTabletIds.get(dbId).size()); oneLine.add(inconsistentTabletIds.get(dbId).size()); + oneLine.add(cloningTabletIds.get(dbId).size()); lines.add(oneLine); @@ -181,6 +192,7 @@ public class StatisticProcDir implements ProcDirInterface { finalLine.add(totalReplicaNum); finalLine.add(unhealthyTabletIds.size()); finalLine.add(inconsistentTabletIds.size()); + finalLine.add(cloningTabletIds.size()); lines.add(finalLine); // add result @@ -209,6 +221,8 @@ public class StatisticProcDir implements ProcDirInterface { throw new AnalysisException("Invalid db id format: " + dbIdStr); } - return new IncompleteTabletsProcNode(unhealthyTabletIds.get(dbId), inconsistentTabletIds.get(dbId)); + return new IncompleteTabletsProcNode(unhealthyTabletIds.get(dbId), + inconsistentTabletIds.get(dbId), + cloningTabletIds.get(dbId)); } } diff --git a/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java b/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java index 10e1874..24a6224 100644 --- a/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java +++ b/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java @@ -23,6 +23,7 @@ import org.apache.doris.http.BaseResponse; import org.apache.doris.http.IllegalArgException; import org.apache.doris.metric.MetricRepo; import org.apache.doris.metric.MetricVisitor; +import org.apache.doris.metric.JsonMetricVisitor; import org.apache.doris.metric.PrometheusMetricVisitor; import org.apache.doris.metric.SimpleCoreMetricVisitor; @@ -50,6 +51,8 @@ public class MetricsAction extends RestBaseAction { MetricVisitor visitor = null; if (!Strings.isNullOrEmpty(type) && type.equalsIgnoreCase("core")) { visitor = new SimpleCoreMetricVisitor("doris_fe"); + } else if (!Strings.isNullOrEmpty(type) && type.equalsIgnoreCase("agent")) { + visitor = new JsonMetricVisitor("doris_fe"); } else { visitor = new PrometheusMetricVisitor("doris_fe"); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 7222dd8..c4d7d2c 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -45,6 +45,7 @@ import org.apache.doris.load.BrokerFileGroupAggInfo; import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; @@ -209,6 +210,7 @@ public class BrokerLoadJob extends LoadJob { @Override public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException { + MetricRepo.COUNTER_LOAD_ADD.increase(1L); transactionId = Catalog.getCurrentGlobalTransactionMgr() .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null, new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), @@ -468,6 +470,7 @@ public class BrokerLoadJob extends LoadJob { .add("txn_id", transactionId) .add("msg", "Load job try to commit txn") .build()); + MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); Catalog.getCurrentGlobalTransactionMgr().commitTransaction( dbId, transactionId, commitInfos, new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp, diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 218bfd1..28469fc 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -25,6 +25,7 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; @@ -165,6 +166,7 @@ public abstract class RoutineLoadTaskInfo { // begin a txn for task RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); try { + MetricRepo.COUNTER_LOAD_ADD.increase(1L); txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( routineLoadJob.getDbId(), Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null, new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java index 8dbe044..83ed5b7 100644 --- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java @@ -38,6 +38,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.util.Daemon; import org.apache.doris.metric.GaugeMetric; +import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.BackendTabletsInfo; import org.apache.doris.persist.ReplicaPersistInfo; @@ -96,7 +97,7 @@ public class ReportHandler extends Daemon { public ReportHandler() { GaugeMetric<Long> gaugeQueueSize = new GaugeMetric<Long>( - "report_queue_size", "report queue size") { + "report_queue_size", MetricUnit.NUMBER, "report queue size") { @Override public Long getValue() { return (long) reportQueue.size(); diff --git a/fe/src/main/java/org/apache/doris/metric/CounterMetric.java b/fe/src/main/java/org/apache/doris/metric/CounterMetric.java index 9cdd452..ecc96a4 100644 --- a/fe/src/main/java/org/apache/doris/metric/CounterMetric.java +++ b/fe/src/main/java/org/apache/doris/metric/CounterMetric.java @@ -22,8 +22,8 @@ package org.apache.doris.metric; */ public abstract class CounterMetric<T> extends Metric<T> { - public CounterMetric(String name, String description) { - super(name, MetricType.COUNTER, description); + public CounterMetric(String name, MetricUnit unit, String description) { + super(name, MetricType.COUNTER, unit, description); } abstract public void increase(T delta); diff --git a/fe/src/main/java/org/apache/doris/metric/DoubleCounterMetric.java b/fe/src/main/java/org/apache/doris/metric/DoubleCounterMetric.java index b4b9240..c3cf3b8 100644 --- a/fe/src/main/java/org/apache/doris/metric/DoubleCounterMetric.java +++ b/fe/src/main/java/org/apache/doris/metric/DoubleCounterMetric.java @@ -21,8 +21,8 @@ import com.google.common.util.concurrent.AtomicDouble; public class DoubleCounterMetric extends CounterMetric<Double> { - public DoubleCounterMetric(String name, String description) { - super(name, description); + public DoubleCounterMetric(String name, MetricUnit unit, String description) { + super(name, unit, description); } private AtomicDouble value = new AtomicDouble(0.0); diff --git a/fe/src/main/java/org/apache/doris/metric/GaugeMetric.java b/fe/src/main/java/org/apache/doris/metric/GaugeMetric.java index 581da72..2e8d819 100644 --- a/fe/src/main/java/org/apache/doris/metric/GaugeMetric.java +++ b/fe/src/main/java/org/apache/doris/metric/GaugeMetric.java @@ -22,7 +22,7 @@ package org.apache.doris.metric; */ public abstract class GaugeMetric<T> extends Metric<T> { - public GaugeMetric(String name, String description) { - super(name, MetricType.GAUGE, description); + public GaugeMetric(String name, MetricUnit unit, String description) { + super(name, MetricType.GAUGE, unit, description); } } diff --git a/fe/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java b/fe/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java index b4cbe4c..a66bc4f 100644 --- a/fe/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java +++ b/fe/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java @@ -19,8 +19,8 @@ package org.apache.doris.metric; public class GaugeMetricImpl<T> extends GaugeMetric<T> { - public GaugeMetricImpl(String name, String description) { - super(name, description); + public GaugeMetricImpl(String name, MetricUnit unit, String description) { + super(name, unit, description); } private T value; diff --git a/fe/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java b/fe/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java new file mode 100644 index 0000000..2463a7a --- /dev/null +++ b/fe/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.metric; + +import org.apache.doris.monitor.jvm.JvmStats; +import com.codahale.metrics.Histogram; +import java.util.List; + +public class JsonMetricVisitor extends MetricVisitor { + private int ordinal = 0; + private int metricNumber = 0; + + public JsonMetricVisitor(String prefix) { + super(prefix); + } + + @Override + public void setMetricNumber(int metricNumber) { + this.metricNumber = metricNumber; + } + + @Override + public void visitJvm(StringBuilder sb, JvmStats jvmStats) { + return; + } + + @Override + public void visit(StringBuilder sb, @SuppressWarnings("rawtypes") Metric metric) { + if (ordinal++ == 0) { + sb.append("[\n"); + } + sb.append("{\n\t\"tags\":\n\t{\n"); + sb.append("\t\t\"metric\":\"").append(metric.getName()).append("\""); + + // name + @SuppressWarnings("unchecked") + List<MetricLabel> labels = metric.getLabels(); + if (!labels.isEmpty()) { + sb.append(",\n"); + int i = 0; + for (MetricLabel label : labels) { + if (i++ > 0) { + sb.append(",\n"); + } + sb.append("\t\t\"").append(label.getKey()).append("\":\"").append(label.getValue()).append("\""); + } + } + sb.append("\n\t},\n"); + sb.append("\t\"unit\":\"").append(metric.getUnit().name().toLowerCase()).append( "\",\n"); + + // value + sb.append("\t\"value\":").append(metric.getValue().toString()).append("\n}"); + if (ordinal < metricNumber) { + sb.append(",\n"); + } else { + sb.append("\n]"); + } + return; + } + + @Override + public void visitHistogram(StringBuilder sb, String name, Histogram histogram) { + return; + } + + @Override + public void getNodeInfo(StringBuilder sb) { + return; + } +} + diff --git a/fe/src/main/java/org/apache/doris/metric/LongCounterMetric.java b/fe/src/main/java/org/apache/doris/metric/LongCounterMetric.java index cf4c652..c56a616 100644 --- a/fe/src/main/java/org/apache/doris/metric/LongCounterMetric.java +++ b/fe/src/main/java/org/apache/doris/metric/LongCounterMetric.java @@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong; public class LongCounterMetric extends CounterMetric<Long> { - public LongCounterMetric(String name, String description) { - super(name, description); + public LongCounterMetric(String name, MetricUnit unit, String description) { + super(name, unit, description); } private AtomicLong value = new AtomicLong(0L); diff --git a/fe/src/main/java/org/apache/doris/metric/Metric.java b/fe/src/main/java/org/apache/doris/metric/Metric.java index c4e6302..e5e029c 100644 --- a/fe/src/main/java/org/apache/doris/metric/Metric.java +++ b/fe/src/main/java/org/apache/doris/metric/Metric.java @@ -26,14 +26,28 @@ public abstract class Metric<T> { GAUGE, COUNTER } + public enum MetricUnit { + NANOSECONDS, + MICROSECONDS, + MILLISECONDS, + SECONDS, + BYTES, + ROWS, + NUMBER, + PERCENT, + NOUNIT + }; + protected String name; protected MetricType type; + protected MetricUnit unit; protected List<MetricLabel> labels = Lists.newArrayList(); protected String description; - public Metric(String name, MetricType type, String description) { + public Metric(String name, MetricType type, MetricUnit unit, String description) { this.name = name; this.type = type; + this.unit = unit; this.description = description; } @@ -45,6 +59,10 @@ public abstract class Metric<T> { return type; } + public MetricUnit getUnit() { + return unit; + } + public String getDescription() { return description; } diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java index 8985f95..bbd3a7d 100644 --- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -25,6 +25,7 @@ import org.apache.doris.common.Config; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.loadv2.JobState; import org.apache.doris.load.loadv2.LoadManager; +import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.monitor.jvm.JvmService; import org.apache.doris.monitor.jvm.JvmStats; import org.apache.doris.persist.EditLog; @@ -96,7 +97,7 @@ public final class MetricRepo { for (EtlJobType jobType : EtlJobType.values()) { for (JobState state : JobState.values()) { GaugeMetric<Long> gauge = (GaugeMetric<Long>) new GaugeMetric<Long>("job", - "job statistics") { + MetricUnit.NUMBER, "job statistics") { @Override public Long getValue() { if (!Catalog.getInstance().isMaster()) { @@ -120,7 +121,7 @@ public final class MetricRepo { } GaugeMetric<Long> gauge = (GaugeMetric<Long>) new GaugeMetric<Long>("job", - "job statistics") { + MetricUnit.NUMBER, "job statistics") { @Override public Long getValue() { if (!Catalog.getInstance().isMaster()) { @@ -144,7 +145,7 @@ public final class MetricRepo { // connections GaugeMetric<Integer> conections = (GaugeMetric<Integer>) new GaugeMetric<Integer>( - "connection_total", "total connections") { + "connection_total", MetricUnit.NUMBER, "total connections") { @Override public Integer getValue() { return ExecuteEnv.getInstance().getScheduler().getConnectionNum(); @@ -154,7 +155,7 @@ public final class MetricRepo { // journal id GaugeMetric<Long> maxJournalId = (GaugeMetric<Long>) new GaugeMetric<Long>( - "max_journal_id", "max journal id of this frontends") { + "max_journal_id", MetricUnit.NUMBER, "max journal id of this frontends") { @Override public Long getValue() { EditLog editLog = Catalog.getInstance().getEditLog(); @@ -168,7 +169,7 @@ public final class MetricRepo { // scheduled tablet num GaugeMetric<Long> scheduledTabletNum = (GaugeMetric<Long>) new GaugeMetric<Long>( - "scheduled_tablet_num", "number of tablets being scheduled") { + "scheduled_tablet_num", MetricUnit.NUMBER, "number of tablets being scheduled") { @Override public Long getValue() { if (!Catalog.getInstance().isMaster()) { @@ -181,58 +182,58 @@ public final class MetricRepo { // qps, rps and error rate // these metrics should be set an init value, in case that metric calculator is not running - GAUGE_QUERY_PER_SECOND = new GaugeMetricImpl<>("qps", "query per second"); + GAUGE_QUERY_PER_SECOND = new GaugeMetricImpl<>("qps", MetricUnit.NUMBER, "query per second"); GAUGE_QUERY_PER_SECOND.setValue(0.0); PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_QUERY_PER_SECOND); - GAUGE_REQUEST_PER_SECOND = new GaugeMetricImpl<>("rps", "request per second"); + GAUGE_REQUEST_PER_SECOND = new GaugeMetricImpl<>("rps", MetricUnit.NUMBER, "request per second"); GAUGE_REQUEST_PER_SECOND.setValue(0.0); PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_REQUEST_PER_SECOND); - GAUGE_QUERY_ERR_RATE = new GaugeMetricImpl<>("query_err_rate", "query error rate"); + GAUGE_QUERY_ERR_RATE = new GaugeMetricImpl<>("query_err_rate", MetricUnit.NUMBER, "query error rate"); PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_QUERY_ERR_RATE); GAUGE_QUERY_ERR_RATE.setValue(0.0); GAUGE_MAX_TABLET_COMPACTION_SCORE = new GaugeMetricImpl<>("max_tablet_compaction_score", - "max tablet compaction score of all backends"); + MetricUnit.NUMBER, "max tablet compaction score of all backends"); PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_MAX_TABLET_COMPACTION_SCORE); GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(0L); // 2. counter - COUNTER_REQUEST_ALL = new LongCounterMetric("request_total", "total request"); + COUNTER_REQUEST_ALL = new LongCounterMetric("request_total", MetricUnit.NUMBER, "total request"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_REQUEST_ALL); - COUNTER_QUERY_ALL = new LongCounterMetric("query_total", "total query"); + COUNTER_QUERY_ALL = new LongCounterMetric("query_total", MetricUnit.NUMBER, "total query"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_QUERY_ALL); - COUNTER_QUERY_ERR = new LongCounterMetric("query_err", "total error query"); + COUNTER_QUERY_ERR = new LongCounterMetric("query_err", MetricUnit.NUMBER, "total error query"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_QUERY_ERR); - COUNTER_LOAD_ADD = new LongCounterMetric("load_add", "total load submit"); + COUNTER_LOAD_ADD = new LongCounterMetric("load_add", MetricUnit.NUMBER, "total load submit"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_LOAD_ADD); - COUNTER_LOAD_FINISHED = new LongCounterMetric("load_finished", "total load finished"); + COUNTER_LOAD_FINISHED = new LongCounterMetric("load_finished", MetricUnit.NUMBER, "total load finished"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_LOAD_FINISHED); - COUNTER_EDIT_LOG_WRITE = new LongCounterMetric("edit_log_write", "counter of edit log write into bdbje"); + COUNTER_EDIT_LOG_WRITE = new LongCounterMetric("edit_log_write", MetricUnit.NUMBER, "counter of edit log write into bdbje"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_WRITE); - COUNTER_EDIT_LOG_READ = new LongCounterMetric("edit_log_read", "counter of edit log read from bdbje"); + COUNTER_EDIT_LOG_READ = new LongCounterMetric("edit_log_read", MetricUnit.NUMBER, "counter of edit log read from bdbje"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_READ); - COUNTER_EDIT_LOG_SIZE_BYTES = new LongCounterMetric("edit_log_size_bytes", "size of edit log"); + COUNTER_EDIT_LOG_SIZE_BYTES = new LongCounterMetric("edit_log_size_bytes", MetricUnit.BYTES, "size of edit log"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_SIZE_BYTES); - COUNTER_IMAGE_WRITE = new LongCounterMetric("image_write", "counter of image generated"); + COUNTER_IMAGE_WRITE = new LongCounterMetric("image_write", MetricUnit.NUMBER, "counter of image generated"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_WRITE); - COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push", + COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push", MetricUnit.NUMBER, "counter of image succeeded in pushing to other frontends"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH); - COUNTER_TXN_REJECT = new LongCounterMetric("txn_reject", "counter of rejected transactions"); + COUNTER_TXN_REJECT = new LongCounterMetric("txn_reject", MetricUnit.NUMBER, "counter of rejected transactions"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_REJECT); - COUNTER_TXN_BEGIN = new LongCounterMetric("txn_begin", "counter of begining transactions"); + COUNTER_TXN_BEGIN = new LongCounterMetric("txn_begin", MetricUnit.NUMBER, "counter of begining transactions"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_BEGIN); - COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success", "counter of success transactions"); + COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success", MetricUnit.NUMBER, "counter of success transactions"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_SUCCESS); - COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed", "counter of failed transactions"); + COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed", MetricUnit.NUMBER, "counter of failed transactions"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_FAILED); - COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows", "total rows of routine load"); + COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows", MetricUnit.ROWS, "total rows of routine load"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_ROWS); - COUNTER_ROUTINE_LOAD_RECEIVED_BYTES = new LongCounterMetric("routine_load_receive_bytes", + COUNTER_ROUTINE_LOAD_RECEIVED_BYTES = new LongCounterMetric("routine_load_receive_bytes", MetricUnit.BYTES, "total received bytes of routine load"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_RECEIVED_BYTES); - COUNTER_ROUTINE_LOAD_ERROR_ROWS = new LongCounterMetric("routine_load_error_rows", + COUNTER_ROUTINE_LOAD_ERROR_ROWS = new LongCounterMetric("routine_load_error_rows", MetricUnit.ROWS, "total error rows of routine load"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS); @@ -266,7 +267,7 @@ public final class MetricRepo { // tablet number of each backends GaugeMetric<Long> tabletNum = (GaugeMetric<Long>) new GaugeMetric<Long>(TABLET_NUM, - "tablet number") { + MetricUnit.NUMBER, "tablet number") { @Override public Long getValue() { if (!Catalog.getInstance().isMaster()) { @@ -280,7 +281,7 @@ public final class MetricRepo { // max compaction score of tablets on each backends GaugeMetric<Long> tabletMaxCompactionScore = (GaugeMetric<Long>) new GaugeMetric<Long>( - TABLET_MAX_COMPACTION_SCORE, + TABLET_MAX_COMPACTION_SCORE, MetricUnit.NUMBER, "tablet max compaction score") { @Override public Long getValue() { @@ -306,6 +307,7 @@ public final class MetricRepo { JvmStats jvmStats = jvmService.stats(); visitor.visitJvm(sb, jvmStats); + visitor.setMetricNumber(PALO_METRIC_REGISTER.getPaloMetrics().size()); // doris metrics for (Metric metric : PALO_METRIC_REGISTER.getPaloMetrics()) { visitor.visit(sb, metric); diff --git a/fe/src/main/java/org/apache/doris/metric/MetricVisitor.java b/fe/src/main/java/org/apache/doris/metric/MetricVisitor.java index 7abb998..681b6df 100644 --- a/fe/src/main/java/org/apache/doris/metric/MetricVisitor.java +++ b/fe/src/main/java/org/apache/doris/metric/MetricVisitor.java @@ -32,6 +32,8 @@ public abstract class MetricVisitor { this.prefix = prefix; } + public abstract void setMetricNumber(int metricNumber); + public abstract void visitJvm(StringBuilder sb, JvmStats jvmStats); public abstract void visit(StringBuilder sb, Metric metric); diff --git a/fe/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java b/fe/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java index 6a98b94..d4a638d 100644 --- a/fe/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java +++ b/fe/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java @@ -52,11 +52,19 @@ public class PrometheusMetricVisitor extends MetricVisitor { private static final String HELP = "# HELP "; private static final String TYPE = "# TYPE "; + private int ordinal = 0; + private int metricNumber = 0; + public PrometheusMetricVisitor(String prefix) { super(prefix); } @Override + public void setMetricNumber(int metricNumber) { + this.metricNumber = metricNumber; + } + + @Override public void visitJvm(StringBuilder sb, JvmStats jvmStats) { // heap sb.append(Joiner.on(" ").join(HELP, JVM_HEAP_SIZE_BYTES, "jvm heap stat\n")); diff --git a/fe/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java b/fe/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java index cf832ad..a65f742 100644 --- a/fe/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java +++ b/fe/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java @@ -56,6 +56,9 @@ public class SimpleCoreMetricVisitor extends MetricVisitor { public static final String MAX_TABLET_COMPACTION_SCORE = "max_tablet_compaction_score"; + private int ordinal = 0; + private int metricNumber = 0; + private static final Map<String, String> CORE_METRICS = Maps.newHashMap(); static { CORE_METRICS.put(MAX_JOURMAL_ID, TYPE_LONG); @@ -72,6 +75,11 @@ public class SimpleCoreMetricVisitor extends MetricVisitor { } @Override + public void setMetricNumber(int metricNumber) { + this.metricNumber = metricNumber; + } + + @Override public void visitJvm(StringBuilder sb, JvmStats jvmStats) { Iterator<MemoryPool> memIter = jvmStats.getMem().iterator(); while (memIter.hasNext()) { @@ -134,4 +142,4 @@ public class SimpleCoreMetricVisitor extends MetricVisitor { sb.append(prefix + "_backend_dead_num").append(" ").append(String.valueOf(beDeadNum)).append("\n"); sb.append(prefix + "_broker_dead_num").append(" ").append(String.valueOf(brokerDeadNum)).append("\n"); } -} \ No newline at end of file +} diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java index fcbf6f3..d94733a 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -211,10 +211,10 @@ public class ConnectProcessor { // TODO(cmy): when user send multi-statement, the executor is the last statement's executor. // We may need to find some way to resolve this. if (executor != null) { - auditAfterExec(originStmt.replace("\n", " \\n"), executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog()); + auditAfterExec(originStmt.replace("\n", " "), executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog()); } else { // executor can be null if we encounter analysis error. - auditAfterExec(originStmt.replace("\n", " \\n"), null, null); + auditAfterExec(originStmt.replace("\n", " "), null, null); } } diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index 90133e2..b5ccb65 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -56,6 +56,7 @@ import org.apache.doris.common.util.ProfileManager; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.load.EtlJobType; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlEofPacket; @@ -707,6 +708,7 @@ public class StmtExecutor { TabletCommitInfo.fromThrift(coord.getCommitInfos()), 10000)) { txnStatus = TransactionStatus.VISIBLE; + MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); } else { txnStatus = TransactionStatus.COMMITTED; } diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 6677050..3ec7032 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -42,6 +42,7 @@ import org.apache.doris.load.EtlStatus; import org.apache.doris.load.LoadJob; import org.apache.doris.load.MiniEtlTaskInfo; import org.apache.doris.master.MasterImpl; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.plugin.AuditEvent; @@ -696,6 +697,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { // begin long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; + MetricRepo.COUNTER_LOAD_ADD.increase(1L); return Catalog.getCurrentGlobalTransactionMgr().beginTransaction( db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequest_id(), new TxnCoordinator(TxnSourceType.BE, clientIp), @@ -757,10 +759,15 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw new UserException("unknown database, database=" + dbName); } - return Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( - db, request.getTxnId(), - TabletCommitInfo.fromThrift(request.getCommitInfos()), - 5000, TxnCommitAttachment.fromThrift(request.txnCommitAttachment)); + boolean ret = Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( + db, request.getTxnId(), + TabletCommitInfo.fromThrift(request.getCommitInfos()), + 5000, TxnCommitAttachment.fromThrift(request.txnCommitAttachment)); + if (ret) { + // if commit and publish is success, load can be regarded as success + MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); + } + return ret; } @Override diff --git a/fe/src/main/java/org/apache/doris/task/AgentTaskQueue.java b/fe/src/main/java/org/apache/doris/task/AgentTaskQueue.java index 36c0ef6..96aba8e 100644 --- a/fe/src/main/java/org/apache/doris/task/AgentTaskQueue.java +++ b/fe/src/main/java/org/apache/doris/task/AgentTaskQueue.java @@ -21,8 +21,10 @@ import org.apache.doris.thrift.TPushType; import org.apache.doris.thrift.TTaskType; import com.google.common.collect.HashBasedTable; +import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import com.google.common.collect.Table; import org.apache.logging.log4j.LogManager; @@ -215,6 +217,19 @@ public class AgentTaskQueue { return taskNum; } + public static synchronized Multimap<Long, Long> getTabletIdsByType(TTaskType type) { + Multimap<Long, Long> tabletIds = HashMultimap.create(); + Map<Long, Map<Long, AgentTask>> taskMap = tasks.column(type); + if (taskMap != null) { + for (Map<Long, AgentTask> signatureMap : taskMap.values()) { + for (AgentTask task : signatureMap.values()) { + tabletIds.put(task.getDbId(), task.getTabletId()); + } + } + } + return tabletIds; + } + public static synchronized int getTaskNum(long backendId, TTaskType type, boolean isFailed) { int taskNum = 0; if (backendId != -1) { diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index da73c8c..9c7192a 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -35,6 +35,7 @@ import org.apache.doris.load.EtlJobType; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.Load; import org.apache.doris.load.Source; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.qe.OriginStatement; import org.apache.doris.task.MasterTaskExecutor; import org.apache.doris.transaction.TransactionState; @@ -44,6 +45,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import java.util.List; @@ -58,6 +61,11 @@ import mockit.Mocked; public class BrokerLoadJobTest { + @BeforeClass + public static void start() { + MetricRepo.init(); + } + @Test public void testFromLoadStmt(@Injectable LoadStmt loadStmt, @Injectable LabelName labelName, diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java index d99a4c1..dd69932 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java @@ -39,6 +39,8 @@ import org.apache.doris.transaction.TransactionState; import com.google.common.collect.Maps; import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import java.util.Map; @@ -48,6 +50,11 @@ import mockit.Mocked; public class LoadJobTest { + @BeforeClass + public static void start() { + MetricRepo.init(); + } + @Test public void testGetDbNotExists(@Mocked Catalog catalog) { LoadJob loadJob = new BrokerLoadJob(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org