This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6344bda53b0 branch-3.0: [improve](routine load) introduce routine load 
abnormal job monitor metrics #48171 (#49045)
6344bda53b0 is described below

commit 6344bda53b0a0758a43503115bc1222375dece05
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 14 09:54:38 2025 +0800

    branch-3.0: [improve](routine load) introduce routine load abnormal job 
monitor metrics #48171 (#49045)
    
    Cherry-picked from #48171
    
    Co-authored-by: hui lai <lai...@selectdb.com>
---
 .../doris/load/routineload/KafkaProgress.java      |   3 +
 .../load/routineload/KafkaRoutineLoadJob.java      |  14 ++
 .../doris/load/routineload/RoutineLoadJob.java     |  16 ++
 .../doris/load/routineload/RoutineLoadManager.java |   6 +
 .../java/org/apache/doris/metric/MetricRepo.java   | 110 ++++++++--
 .../data/test_abnormal_job_monitor.csv             |  20 ++
 .../test_routin_load_abnormal_job_monitor.groovy   | 225 +++++++++++++++++++++
 7 files changed, 373 insertions(+), 21 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
index 0619188ea98..60715e45fbb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
@@ -246,4 +246,7 @@ public class KafkaProgress extends RoutineLoadProgress {
         }
     }
 
+    public Long totalProgress() {
+        return partitionIdToOffset.values().stream().reduce(0L, Long::sum);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 6952a32b8b3..4904b59b5b6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -925,4 +925,18 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     public double getMaxFilterRatio() {
         return maxFilterRatio;
     }
+
+    @Override
+    public Long totalProgress() {
+        return ((KafkaProgress) progress).totalProgress();
+    }
+
+    @Override
+    public Long totalLag() {
+        Map<Integer, Long> partitionIdToOffsetLag = ((KafkaProgress) 
progress).getLag(cachedPartitionWithLatestOffsets);
+        return partitionIdToOffsetLag.values().stream()
+                .filter(lag -> lag >= 0)
+                .mapToLong(v -> v)
+                .sum();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 67bebee04f0..688174a1afb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -511,6 +511,14 @@ public abstract class RoutineLoadJob
         }
     }
 
+    public ErrorReason getPauseReason() {
+        return pauseReason;
+    }
+
+    public RoutineLoadStatistic getRoutineLoadStatistic() {
+        return jobStatistic;
+    }
+
     public String getDbFullName() throws MetaNotFoundException {
         return 
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId).getFullName();
     }
@@ -965,6 +973,14 @@ public abstract class RoutineLoadJob
         }
     }
 
+    public Long totalProgress() {
+        return 0L;
+    }
+
+    public Long totalLag() {
+        return 0L;
+    }
+
     abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo);
 
     // call before first scheduling
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 97f6aba8c58..a984a672d34 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -110,6 +110,12 @@ public class RoutineLoadManager implements Writable {
     public RoutineLoadManager() {
     }
 
+    public List<RoutineLoadJob> getActiveRoutineLoadJobs() {
+        return idToRoutineLoadJob.values().stream()
+                .filter(job -> !job.state.isFinalState())
+                .collect(Collectors.toList());
+    }
+
     public void addMultiLoadTaskTxnIdToRoutineLoadJobId(long txnId, long 
routineLoadJobId) {
         multiLoadTaskTxnIdToRoutineLoadJobId.put(txnId, routineLoadJobId);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 40930edd052..a0515cc87d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.Version;
@@ -46,17 +47,17 @@ import com.codahale.metrics.Histogram;
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 public final class MetricRepo {
@@ -137,11 +138,16 @@ public final class MetricRepo {
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS;
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES;
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS;
+
+    public static GaugeMetric<Long> GAUGE_ROUTINE_LOAD_PROGRESS;
+    public static GaugeMetric<Long> GAUGE_ROUTINE_LOAD_LAG;
+    public static GaugeMetric<Long> GAUGE_ROUTINE_LOAD_ABORT_TASK_NUM;
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_LANTENCY;
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_COUNT;
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT;
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME;
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT;
+
     public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE;
 
     public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_ALL;
@@ -212,25 +218,7 @@ public final class MetricRepo {
             }
         }
 
-        //  routine load jobs
-        RoutineLoadManager routineLoadManager = 
Env.getCurrentEnv().getRoutineLoadManager();
-        for (RoutineLoadJob.JobState jobState : 
RoutineLoadJob.JobState.values()) {
-            GaugeMetric<Long> gauge = new GaugeMetric<Long>("job", 
MetricUnit.NOUNIT, "routine load job statistics") {
-                @Override
-                public Long getValue() {
-                    if (!Env.getCurrentEnv().isMaster()) {
-                        return 0L;
-                    }
-                    Set<RoutineLoadJob.JobState> states = Sets.newHashSet();
-                    states.add(jobState);
-                    List<RoutineLoadJob> jobs = 
routineLoadManager.getRoutineLoadJobByState(states);
-                    return Long.valueOf(jobs.size());
-                }
-            };
-            gauge.addLabel(new MetricLabel("job", "load")).addLabel(new 
MetricLabel("type", "ROUTINE_LOAD"))
-                    .addLabel(new MetricLabel("state", jobState.name()));
-            DORIS_METRIC_REGISTER.addMetrics(gauge);
-        }
+        initRoutineLoadJobMetrics();
 
         // running alter job
         Alter alter = Env.getCurrentEnv().getAlterInstance();
@@ -626,6 +614,86 @@ public final class MetricRepo {
         }
     }
 
+    private static void initRoutineLoadJobMetrics() {
+        //  routine load jobs
+        RoutineLoadManager routineLoadManager = 
Env.getCurrentEnv().getRoutineLoadManager();
+        for (RoutineLoadJob.JobState jobState : 
RoutineLoadJob.JobState.values()) {
+            if (jobState == RoutineLoadJob.JobState.PAUSED) {
+                addRoutineLoadJobStateGaugeMetric(routineLoadManager, 
jobState, "USER_PAUSED",
+                        job -> job.getPauseReason() != null
+                            && job.getPauseReason().getCode() == 
InternalErrorCode.MANUAL_PAUSE_ERR);
+                addRoutineLoadJobStateGaugeMetric(routineLoadManager, 
jobState, "ABNORMAL_PAUSED",
+                        job -> job.getPauseReason() != null
+                            && job.getPauseReason().getCode() != 
InternalErrorCode.MANUAL_PAUSE_ERR);
+            }
+            addRoutineLoadJobStateGaugeMetric(routineLoadManager, jobState, 
jobState.name(), job -> true);
+        }
+        GAUGE_ROUTINE_LOAD_PROGRESS = new 
GaugeMetric<Long>("routine_load_progress",
+                MetricUnit.NOUNIT, "total routine load progress") {
+            @Override
+            public Long getValue() {
+                if (!Env.getCurrentEnv().isMaster()) {
+                    return 0L;
+                }
+                return routineLoadManager
+                        .getActiveRoutineLoadJobs().stream()
+                        .mapToLong(RoutineLoadJob::totalProgress)
+                        .sum();
+            }
+        };
+        DORIS_METRIC_REGISTER.addMetrics(GAUGE_ROUTINE_LOAD_PROGRESS);
+        GAUGE_ROUTINE_LOAD_LAG = new GaugeMetric<Long>("routine_load_lag",
+                MetricUnit.NOUNIT, "total routine load lag") {
+            @Override
+            public Long getValue() {
+                if (!Env.getCurrentEnv().isMaster()) {
+                    return 0L;
+                }
+                return routineLoadManager
+                        .getActiveRoutineLoadJobs().stream()
+                        .mapToLong(RoutineLoadJob::totalLag)
+                        .sum();
+            }
+        };
+        DORIS_METRIC_REGISTER.addMetrics(GAUGE_ROUTINE_LOAD_LAG);
+        GAUGE_ROUTINE_LOAD_ABORT_TASK_NUM = new 
GaugeMetric<Long>("routine_load_abort_task_num",
+                MetricUnit.NOUNIT, "total number of aborted tasks in active 
routine load jobs") {
+            @Override
+            public Long getValue() {
+                if (!Env.getCurrentEnv().isMaster()) {
+                    return 0L;
+                }
+                return routineLoadManager
+                        .getActiveRoutineLoadJobs().stream()
+                        .mapToLong(job -> 
job.getRoutineLoadStatistic().abortedTaskNum)
+                        .sum();
+            }
+        };
+        DORIS_METRIC_REGISTER.addMetrics(GAUGE_ROUTINE_LOAD_ABORT_TASK_NUM);
+    }
+
+    private static void addRoutineLoadJobStateGaugeMetric(RoutineLoadManager 
routineLoadManager,
+                                            RoutineLoadJob.JobState jobState,
+                                            String stateLabel, 
Predicate<RoutineLoadJob> filter) {
+        GaugeMetric<Long> gauge = new GaugeMetric<Long>("job", 
MetricUnit.NOUNIT, "routine load job statistics") {
+            @Override
+            public Long getValue() {
+                if (!Env.getCurrentEnv().isMaster()) {
+                    return 0L;
+                }
+                return routineLoadManager
+                        
.getRoutineLoadJobByState(Collections.singleton(jobState))
+                        .stream()
+                        .filter(filter)
+                        .count();
+            }
+        };
+        gauge.addLabel(new MetricLabel("job", "load"))
+                .addLabel(new MetricLabel("type", "ROUTINE_LOAD"))
+                .addLabel(new MetricLabel("state", stateLabel));
+        DORIS_METRIC_REGISTER.addMetrics(gauge);
+    }
+
     private static void initSystemMetrics() {
         // TCP retransSegs
         GaugeMetric<Long> tcpRetransSegs = (GaugeMetric<Long>) new 
GaugeMetric<Long>(
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_abnormal_job_monitor.csv
 
b/regression-test/suites/load_p0/routine_load/data/test_abnormal_job_monitor.csv
new file mode 100644
index 00000000000..b58285ed575
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/data/test_abnormal_job_monitor.csv
@@ -0,0 +1,20 @@
+57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10
 18:39:10|2023-02-12|2023-01-27 
07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city": 
"New York"}
+49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, 
"name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}
+66|2023-08-15|TRUE|-91|28378|609923317|4872185586197131212|1207709464099378591|\N|-1863683325.985123|-783792012.0|-708986976.0|2022-09-24
 10:39:23|2022-09-24|2022-10-16 
18:36:43|Y|z|AI1BSPQdKiHJiQH1kguyLSWsDXkC7zwy7PwgWnyGSaa9tBKRex8vHBdxg2QSKZKL2mV2lHz7iI1PnsTd4MXDcIKhqiHyPuQPt2tEtgt0UgF6|{"book":
 {"title": "The Great Gatsby", "author": "F. Scott Fitzgerald"}, "year": 1925}
+91|2023-08-27|TRUE|90|2465|702240964|6373830997821598984|305860046137409400|15991.356445|1599972327.386147|-165530947.0|\N|2023-04-26
 
19:31:10|2023-07-21|\N|2||B7YKYBYT8w0YC926bZ8Yz1VzyiWw2NWDAiTlEoPVyz9AXGti2Npg1FxWqWk4hEaALw0ZBSuiAIPj41lq36g5QRpPmAjNPK|{"fruit":
 "apple", "color": "red", "qty": 5, "price": 2.5}
+80|2023-08-18|FALSE|-18|-8971|679027874|6535956962935330265|3960889045799757165|-13219.759766|1187161924.505394|-526615878.0|-947410627.0|2023-03-11
 07:40:00|2022-11-29|2023-01-14 
07:24:07|\N|D|3Nhx6xX1qdwaq7lxwLRSKMtJFbC03swWv12mpySSVysH3igGZTiGPuKMsYW7HAkf6CWc7c0nzqDsjuH3FYVMNCWRmfxMrmY8rykQCC4Ve|{"car":
 "BMW", "model": "X5", "year": 2020, "color": "black"}
+85|2023-08-11|TRUE|-7|24304|-2043877415|-2024144417867729183|\N|5363.024414|-578615669.042831|-378574346.0|-810302932.0|2023-07-15
 01:07:41|2023-08-13|2023-01-20 11:57:48|i||WQ9dh9ajPu0y|{"country": "France", 
"capital": "Paris", "population": 67081000}
+31|2023-08-27|FALSE|17|-18849|1728109133|3266501886640700374|527195452623418935|-24062.328125|-1514348021.262435|-322205854.0|-278237157.0|2022-10-07
 
03:24:23|2022-09-25|\N|0|8|yKMiAntORoRa8svnMfcxlOPwwND1m5s2fdS26Xu6cfs6HK5SAibqIp9h8sZcpjHy4|{"team":
 "Manchester United", "players": ["Ronaldo", "Rooney", "Giggs"], "coach": "Ole 
Gunnar Solskjaer"}
+20|2023-08-17|FALSE|-5|18158|784479801|1485484354598941738|-6632681928222776815|9708.430664|-330432620.706069|-816424174.0|571112646.0|2022-09-15
 21:40:55|2023-02-23|2023-08-13 
21:31:54|O|X|2pYmX2vAhfEEHZZYPsgAmda1G7otnwx5TmUC879FPhDeIjvWI79ksBZpfFG2gp7jhCSbpZiecKGklB5SvG8tm31i5SUqe1xrWgLt4HSq7lMJWp75tx2kxD7pRIOpn|{"name":
 "Sarah", "age": 30, "city": "London", "isMarried": false}
+90|2023-08-27|TRUE|22|16456|-1476824962|-3279894870153540825|8990195191470116763|26651.906250|206860148.942546|-580959198.0|-210329147.0|2022-10-07
 03:11:03|2023-03-18|2023-04-15 
00:38:33|T|L|QW0GQ3GoMtHgxPQOWGfVaveynahNpsNs09siMFA1OtO6QEDBQTdivmGyq7bFzejAqwbbVQQpREAmeLjcFSXLnQuou2KbwYD|{"company":
 "Apple", "products": [{"name": "iPhone", "price": 1000}, {"name": "MacBook", 
"price": 1500}]}
+8|2023-08-14|TRUE|109|-31573|-1362465190|3990845741226497177|2732763251146840270|-25698.552734|1312831962.567818|771983879.0|173937916.0|2023-03-07
 14:13:19|2022-10-18|2023-07-16 
05:03:13|D||PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme|{"animal":
 "lion", "weight": 200, "habitat": ["savannah", "grassland"]}
+65|2023-08-09|FALSE|94|31514|814994517|-297697460695940343|734910652450318597|-13061.891602|62750847.041706|-9808654.0|\N|2023-08-14
 22:01:27|2023-05-19|2022-11-13 
13:44:28|V||aGeMsI24O12chGlP5ak0AHghAz7bu5MargJBStHnt0yMnChH0JnfYhsfH1u59XIHkJKMsHYktBqORkGlovu8V47E74KeFpaqxn5yLyXfDbhhzUKf|{"language":
 "Python", "version": 3.9, "frameworks": ["Django", "Flask"]}
+62|2023-08-21|FALSE|81|20302|-200761532|6365479976421007608|\N|-29916.533203|1709141750.828478|549873536.0|-119205359.0|2023-05-04
 01:14:51|2022-09-17|2022-12-04 
19:30:09|d|v|BKWy9dTNg1aZW7ancEJAmEDOPK5TwFsNSHbI78emu9gymeIlx5NoLmyii0QAqdzRvSQPZKiqKkwInGCTIBnK1yYkK7zD|{"username":
 "user123", "password": "pass123", "email": "user...@example.com"}
+50|2023-08-06|TRUE|109|-6330|1479023892|-8630800697573159428|-1645095773540208759|17880.960938|-1453844792.013949|-158871820.0|-862940384.0|2022-09-22
 02:03:21|2023-05-14|2023-03-25 
02:18:34|m||JKnIgXvGVidGiWl9YRSi3mFI7wHKt1sBpWSadKF8VX3LAuElm4sdc9gtxREaUr57oikSYlU8We8h1MWqQlYNiJObl|{"city":
 "Tokyo", "temperature": 20.5, "humidity": 75}
+58|2023-08-22|\N|0|-18231|1832867360|6997858407575297145|2480714305422728023|-5450.488770|1475901032.138386|-893480655.0|-607891858.0|2023-02-02
 05:13:24|2022-09-18|2023-04-23 
10:51:15|k||LdFXF7Kmfzgmnn2R6zLsXdmi3A2cLBLq4G4WDVNDhxvH7dYH8Kga2WA47uSIxp6NSrwPSdw0ssB1TS8RFJTDJAB0Uba3e05NL2Aiw0ja|{"restaurant":
 "Pizza Hut", "menu": ["pizza", "pasta", "salad"]}
+60|2023-08-27|FALSE|-52|-2338|-757056972|1047567408607120856|6541476642780646552|6614.089355|-1204448798.517855|236657733.0|731515433.0|2022-12-29
 14:47:30|2022-09-24|2023-08-01 
12:41:59|O|F|RM4F1Ke7lkcnuxF2nK0j9VBW3MDcgyHR4pseBjtFnqS6GUkVFuzF6u3Cp9Nv7ab0O6UYrpP4DhU|{"game":
 "Chess", "players": 2, "time": "1 hour"}
+68|2023-08-23|TRUE|-73|20117|1737338128|795638676048937749|-5551546237562433901|-30627.039062|68589475.684545|585022347.0|513722420.0|2022-12-28
 20:26:51|2022-10-04|2023-07-30 
00:20:06|y||keZ3JlWWpdnPBejf0cuiCQCVBBTd5gjvO08NVdcAFewqL7nRT4N9lnvSU6pWmletA5VbPQCeQapJdcnQCHfZUDCf4ulCnczyqr7SGrbGRT0XYcd7iktKM|{"country":
 "Brazil", "continent": "South America", "population": 211049527}
+50|2023-08-24|TRUE|15|14403|\N|-6418906115745394180|9205303779366462513|-4331.548828|-615112179.557648|367305015.0|-551652958.0|2022-12-29
 02:27:20|2023-06-01|2023-08-12 
04:50:04|a||eCl38sztIvBQvGvGKyYZmyMXy9vIJx197iu3JwP9doJGcrYUl9Uova0rz4iCCgrjlAiZU18Fs9YtCq830nhM|{"band":
 "The Beatles", "members": ["John Lennon", "Paul McCartney", "George Harrison", 
"Ringo Starr"]}
+81|2023-08-23|FALSE|106|11492|-667795397|4480250461471356146|-5346660566234294101|9082.750000|385167225.902608|-717553011.0|649146853.0|2023-03-20
 03:33:16|2022-11-24|2023-02-16 
18:29:41|G|9|Lk3eNVQNjucbekD1rZmUlGPiXS5JvcWr2LQzRU8GSGIbSag|{"flower": "rose", 
"color": "red", "fragrance": true}
+41|2023-08-27|TRUE|-104|22750|\N|8527773271030840740|5554497317268279215|-5296.828125|-1715646888.013040|-306075962.0|897769189.0|2022-12-02
 17:56:44|2022-10-12|2023-02-19 
07:02:54|V|\N|E9GzQdTwX1ITUQz27IVznAs6Ca4WwprKk6Odjs6SH75D2F1089QiY3HQ52LXRD1V6xAWjhLE2hWgW3EdHuAOnUDVrb5V|{"food":
 "Sushi", "price": 10, "restaurant": "Sushi King"}
+21|2023-08-18|FALSE|63|-27847|-35409596|8638201997392767650|4919963231735304178|-23382.541016|-1803403621.426313|-22009767.0|661750756.0|2023-03-31
 10:56:14|2023-01-20|2023-02-18 
13:37:52|N|T|PSiFwUEx3eVFNtjlnQ70YkgZNvKrGmQ2DN5K9yYHiSdFWeEDB1UpL3Frt8z1kEAIWRDWqXZuyi|{"city":
 "Sydney", "population": 5312000, "area": 2058.7}
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
new file mode 100644
index 00000000000..5b5f124408d
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+import groovy.json.JsonSlurper
+
+suite("test_routine_load_abnormal_job_monitor","p0") {
+    def kafkaCsvTpoics = [
+                  "test_abnormal_job_monitor",
+                ]
+
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    def jobName1 = "test_abnormal_job_monitor1"
+    def jobName2 = "test_abnormal_job_monitor2"
+    def tableName = "test_abnormal_job_monitor"
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        try {
+            sql """
+            CREATE TABLE IF NOT EXISTS ${tableName}
+            (
+                k00 INT             NOT NULL,
+                k01 DATE            NOT NULL,
+                k02 BOOLEAN         NULL,
+                k03 TINYINT         NULL,
+                k04 SMALLINT        NULL,
+                k05 INT             NULL,
+                k06 BIGINT          NULL,
+                k07 LARGEINT        NULL,
+                k08 FLOAT           NULL,
+                k09 DOUBLE          NULL,
+                k10 DECIMAL(9,1)    NULL,
+                k11 DECIMALV3(9,1)  NULL,
+                k12 DATETIME        NULL,
+                k13 DATEV2          NULL,
+                k14 DATETIMEV2      NULL,
+                k15 CHAR            NULL,
+                k16 VARCHAR         NULL,
+                k17 STRING          NULL,
+                k18 JSON            NULL,
+                kd01 BOOLEAN         NOT NULL DEFAULT "TRUE",
+                kd02 TINYINT         NOT NULL DEFAULT "1",
+                kd03 SMALLINT        NOT NULL DEFAULT "2",
+                kd04 INT             NOT NULL DEFAULT "3",
+                kd05 BIGINT          NOT NULL DEFAULT "4",
+                kd06 LARGEINT        NOT NULL DEFAULT "5",
+                kd07 FLOAT           NOT NULL DEFAULT "6.0",
+                kd08 DOUBLE          NOT NULL DEFAULT "7.0",
+                kd09 DECIMAL         NOT NULL DEFAULT "888888888",
+                kd10 DECIMALV3       NOT NULL DEFAULT "999999999",
+                kd11 DATE            NOT NULL DEFAULT "2023-08-24",
+                kd12 DATETIME        NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd13 DATEV2          NOT NULL DEFAULT "2023-08-24",
+                kd14 DATETIMEV2      NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd15 CHAR(255)       NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd16 VARCHAR(300)    NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd17 STRING          NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd18 JSON            NULL,
+                
+                INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+                INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+                INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+                INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+                INDEX idx_inverted_k117 (`k17`) USING INVERTED 
PROPERTIES("parser" = "english"),
+                INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+
+                INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+                INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+                
+            )
+            DUPLICATE KEY(k00)
+            PARTITION BY RANGE(k01)
+            (
+                PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+                PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+                PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+            )
+            DISTRIBUTED BY HASH(k00) BUCKETS 32
+            PROPERTIES (
+                "bloom_filter_columns"="k05",
+                "replication_num" = "1"
+            );
+            """
+            sql """
+                CREATE ROUTINE LOAD ${jobName1} on ${tableName}
+                
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+                COLUMNS TERMINATED BY "|"
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "test_abnormal_job_monitor_invaild",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql """
+                CREATE ROUTINE LOAD ${jobName2} on ${tableName}
+                
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+                COLUMNS TERMINATED BY "|"
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "test_abnormal_job_monitor",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            sql "pause routine load for ${jobName2}"
+
+            def count = 0
+            def metricCount = 0
+            while (true) {
+                count = 0
+                metricCount = 0
+                httpTest {
+                    endpoint context.config.feHttpAddress
+                    uri "/metrics?type=json"
+                    op "get"
+                    check { code, body ->
+                        def jsonSlurper = new JsonSlurper()
+                        def result = jsonSlurper.parseText(body)
+
+                        def entry = result.find { it.tags?.metric == 
"doris_fe_job" && it.tags?.state == "ABNORMAL_PAUSED"}
+                        def value = entry ? entry.value : null
+                        log.info("Contains ABNORMAL_PAUSE: ${entry != 
null}".toString())
+                        log.info("Value of ABNORMAL_PAUSE: 
${value}".toString())
+                        if (value > 0) {
+                            metricCount++
+                        }
+
+                        entry = result.find { it.tags?.metric == 
"doris_fe_job" && it.tags?.state == "USER_PAUSED"}
+                        value = entry ? entry.value : null
+                        log.info("Contains USER_PAUSE: ${entry != 
null}".toString())
+                        log.info("Value of USER_PAUSE: ${value}".toString())
+                        if (value > 0) {
+                            metricCount++
+                        }
+
+                        if (body.contains("doris_fe_routine_load_progress")){
+                            log.info("contain doris_fe_routine_load_progress")
+                            metricCount++
+                        }
+
+                        if (body.contains("doris_fe_routine_load_lag")){
+                            log.info("contain doris_fe_routine_load_lag")
+                            metricCount++
+                        }
+
+                        if 
(body.contains("doris_fe_routine_load_abort_task_num")){
+                            log.info("contain 
doris_fe_routine_load_abort_task_num")
+                            metricCount++
+                        }
+                    }
+                }
+                if (metricCount == 5) {
+                    break
+                }
+                count++
+                sleep(1000)
+                if (count > 60) {
+                    assertEquals(1, 2)
+                }
+            }
+        } finally {
+            sql "stop routine load for ${jobName1}"
+            sql "stop routine load for ${jobName2}"
+            sql "DROP TABLE IF EXISTS ${tableName}"
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to