This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 7c7646a908f branch-4.1: [feat](job) add per-job routine load metrics
#63576 (#63953)
7c7646a908f is described below
commit 7c7646a908f44a36a9941b1ea7303b371cfea034
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 2 14:02:01 2026 +0800
branch-4.1: [feat](job) add per-job routine load metrics #63576 (#63953)
Cherry-picked from #63576
Co-authored-by: hui lai <[email protected]>
---
.../java/org/apache/doris/metric/MetricRepo.java | 74 ++++++++++++++++++++++
1 file changed, 74 insertions(+)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 1340f81f180..a55214fb061 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -39,6 +39,7 @@ import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadManager;
+import org.apache.doris.load.routineload.RoutineLoadStatistic;
import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.monitor.jvm.JvmService;
import org.apache.doris.monitor.jvm.JvmStats;
@@ -92,6 +93,16 @@ public final class MetricRepo {
public static final String STREAMING_JOB_PER_JOB_SUCCEED_TASK_COUNT =
"streaming_job_per_job_succeed_task_count";
public static final String STREAMING_JOB_PER_JOB_FAILED_TASK_COUNT =
"streaming_job_per_job_failed_task_count";
public static final String STREAMING_JOB_PER_JOB_LAG =
"streaming_job_per_job_lag";
+ public static final String ROUTINE_LOAD_PER_JOB_TOTAL_ROWS =
"routine_load_per_job_total_rows";
+ public static final String ROUTINE_LOAD_PER_JOB_ERROR_ROWS =
"routine_load_per_job_error_rows";
+ public static final String ROUTINE_LOAD_PER_JOB_RECEIVED_BYTES =
"routine_load_per_job_received_bytes";
+ public static final String ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_TIME =
+ "routine_load_per_job_task_execute_time";
+ public static final String ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_COUNT =
+ "routine_load_per_job_task_execute_count";
+ public static final String ROUTINE_LOAD_PER_JOB_PROGRESS =
"routine_load_per_job_progress";
+ public static final String ROUTINE_LOAD_PER_JOB_LAG =
"routine_load_per_job_lag";
+ public static final String ROUTINE_LOAD_PER_JOB_ABORT_TASK_NUM =
"routine_load_per_job_abort_task_num";
public static final String CLOUD_TAG = "cloud";
public static LongCounterMetric COUNTER_REQUEST_ALL;
@@ -1163,6 +1174,66 @@ public final class MetricRepo {
DORIS_METRIC_REGISTER.addMetrics(gauge);
}
+ public static void updateRoutineLoadJobPerJobMetrics() {
+ // Clear previous per-job gauges before checking mastership. If this
FE loses mastership,
+ // the old gauges registered during the previous master epoch must not
be exported.
+ DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_TOTAL_ROWS);
+ DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_ERROR_ROWS);
+
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_RECEIVED_BYTES);
+
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_TIME);
+
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_COUNT);
+ DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_PROGRESS);
+ DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_LAG);
+
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_ABORT_TASK_NUM);
+
+ if (!Env.getCurrentEnv().isMaster()) {
+ return;
+ }
+
+ try {
+ RoutineLoadManager routineLoadManager =
Env.getCurrentEnv().getRoutineLoadManager();
+ for (RoutineLoadJob job :
routineLoadManager.getActiveRoutineLoadJobs()) {
+ String jobId = String.valueOf(job.getId());
+ String jobName = job.getName();
+ RoutineLoadStatistic stat = job.getRoutineLoadStatistic();
+
+
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_TOTAL_ROWS,
MetricUnit.ROWS,
+ "per job total rows of routine load", jobId, jobName,
() -> stat.totalRows);
+
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_ERROR_ROWS,
MetricUnit.ROWS,
+ "per job error rows of routine load", jobId, jobName,
() -> stat.errorRows);
+
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_RECEIVED_BYTES,
MetricUnit.BYTES,
+ "per job received bytes of routine load", jobId,
jobName, () -> stat.receivedBytes);
+
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_TIME,
MetricUnit.MILLISECONDS,
+ "per job task execute time of routine load", jobId,
jobName,
+ () -> stat.totalTaskExcutionTimeMs);
+
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_COUNT,
MetricUnit.NOUNIT,
+ "per job task execute count of routine load", jobId,
jobName, () -> stat.committedTaskNum);
+ addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_PROGRESS,
MetricUnit.NOUNIT,
+ "per job routine load progress", jobId, jobName,
job::totalProgress);
+ addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_LAG,
MetricUnit.NOUNIT,
+ "per job routine load lag", jobId, jobName,
job::totalLag);
+
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_ABORT_TASK_NUM,
MetricUnit.NOUNIT,
+ "per job number of aborted tasks in routine load",
jobId, jobName,
+ () -> stat.abortedTaskNum);
+ }
+ } catch (Throwable t) {
+ LOG.warn("failed to update routine load per-job metrics", t);
+ }
+ }
+
+ private static void addRoutineLoadPerJobGaugeMetric(String metricName,
MetricUnit unit, String description,
+ String jobId, String jobName, Supplier<Long> valueSupplier) {
+ GaugeMetric<Long> gauge = new GaugeMetric<Long>(metricName, unit,
description) {
+ @Override
+ public Long getValue() {
+ return valueSupplier.get();
+ }
+ };
+ gauge.addLabel(new MetricLabel("job_id", jobId))
+ .addLabel(new MetricLabel("job_name", jobName));
+ DORIS_METRIC_REGISTER.addMetrics(gauge);
+ }
+
private static void initStreamingJobMetrics() {
// streaming insert jobs
for (JobStatus jobStatus : JobStatus.values()) {
@@ -1480,6 +1551,9 @@ public final class MetricRepo {
// update load job metrics
updateLoadJobMetrics();
+ // update per-job routine load metrics
+ updateRoutineLoadJobPerJobMetrics();
+
// update per-job streaming job metrics
updateStreamingJobPerJobMetrics();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]