This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e18ea534d9a [improve](information schema) introduce routine load job 
system table (#48963)
e18ea534d9a is described below

commit e18ea534d9a009bc3c128515fa56c07d9c538b74
Author: hui lai <lai...@selectdb.com>
AuthorDate: Mon Mar 17 11:06:53 2025 +0800

    [improve](information schema) introduce routine load job system table 
(#48963)
    
    ### What problem does this PR solve?
    
    Part IV of https://github.com/apache/doris/issues/48511
    
    doc https://github.com/apache/doris-website/pull/2196
    
    **Introduce routine load job statistic system table:**
    ```
    mysql> show create table information_schema.routine_load_job\G
    *************************** 1. row ***************************
           Table: routine_load_job
    Create Table: CREATE TABLE `routine_load_job` (
      `JOB_ID` text NULL,
      `JOB_NAME` text NULL,
      `CREATE_TIME` text NULL,
      `PAUSE_TIME` text NULL,
      `END_TIME` text NULL,
      `DB_NAME` text NULL,
      `TABLE_NAME` text NULL,
      `STATE` text NULL,
      `CURRENT_TASK_NUM` text NULL,
      `JOB_PROPERTIES` text NULL,
      `DATA_SOURCE_PROPERTIES` text NULL,
      `CUSTOM_PROPERTIES` text NULL,
      `STATISTIC` text NULL,
      `PROGRESS` text NULL,
      `LAG` text NULL,
      `REASON_OF_STATE_CHANGED` text NULL,
      `ERROR_LOG_URLS` text NULL,
      `USER_NAME` text NULL,
      `CURRENT_ABORT_TASK_NUM` int NULL,
      `IS_ABNORMAL_PAUSE` boolean NULL
    ) ENGINE=SCHEMA;
    1 row in set (0.00 sec)
    ```
    
    **There are some benefits to empower job with SQL query capability for
    statistical information:**
    
    - It can be used in conjunction with metrics add through
    https://github.com/apache/doris/pull/48209 to roughly locate abnormal
    jobs when Grafana alarms, and the following SQL can be used:
    
    ```
    SELECT JOB_NAME
    FROM information_schema.routine_load_job_statistics
    WHERE CURRENT_ABORT_TASK_NUM > 0
       OR IS_ABNORMAL_PAUSE = TRUE;
    ```
    
    - User can use the `select * from information_schema.routine_load_job`
    instead of the `show routine load`. The advantage is that the `show
    routine load` can only be searched by name, but SQL can be very flexible
    in locating jobs
---
 be/src/exec/schema_scanner.cpp                     |   3 +
 be/src/exec/schema_scanner/schema_helper.cpp       |   9 +
 be/src/exec/schema_scanner/schema_helper.h         |   6 +
 .../schema_routine_load_job_scanner.cpp            | 199 +++++++++++++++++++++
 .../schema_routine_load_job_scanner.h              |  50 ++++++
 .../org/apache/doris/analysis/SchemaTableType.java |   4 +-
 .../java/org/apache/doris/catalog/SchemaTable.java |  24 +++
 .../load/routineload/KafkaRoutineLoadJob.java      |  12 +-
 .../doris/load/routineload/RoutineLoadJob.java     |  63 +++++--
 .../doris/load/routineload/RoutineLoadManager.java |   4 +
 .../load/routineload/RoutineLoadProgress.java      |   2 +-
 .../load/routineload/RoutineLoadStatistic.java     |   1 +
 .../apache/doris/service/FrontendServiceImpl.java  |  56 ++++++
 gensrc/thrift/Descriptors.thrift                   |   3 +-
 gensrc/thrift/FrontendService.thrift               |  32 ++++
 .../test_routine_load_job_info_system_table.groovy | 142 +++++++++++++++
 16 files changed, 584 insertions(+), 26 deletions(-)

diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index 3c3e57a8522..40b0616c9a5 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -40,6 +40,7 @@
 #include "exec/schema_scanner/schema_partitions_scanner.h"
 #include "exec/schema_scanner/schema_processlist_scanner.h"
 #include "exec/schema_scanner/schema_profiling_scanner.h"
+#include "exec/schema_scanner/schema_routine_load_job_scanner.h"
 #include "exec/schema_scanner/schema_routine_scanner.h"
 #include "exec/schema_scanner/schema_rowsets_scanner.h"
 #include "exec/schema_scanner/schema_schema_privileges_scanner.h"
@@ -228,6 +229,8 @@ std::unique_ptr<SchemaScanner> 
SchemaScanner::create(TSchemaTableType::type type
         return SchemaCatalogMetaCacheStatsScanner::create_unique();
     case TSchemaTableType::SCH_BACKEND_KERBEROS_TICKET_CACHE:
         return SchemaBackendKerberosTicketCacheScanner::create_unique();
+    case TSchemaTableType::SCH_ROUTINE_LOAD_JOB:
+        return SchemaRoutineLoadJobScanner::create_unique();
     default:
         return SchemaDummyScanner::create_unique();
         break;
diff --git a/be/src/exec/schema_scanner/schema_helper.cpp 
b/be/src/exec/schema_scanner/schema_helper.cpp
index 7cf95187b02..13670444e53 100644
--- a/be/src/exec/schema_scanner/schema_helper.cpp
+++ b/be/src/exec/schema_scanner/schema_helper.cpp
@@ -142,4 +142,13 @@ Status SchemaHelper::show_user(const std::string& ip, 
const int32_t port,
             });
 }
 
+Status SchemaHelper::fetch_routine_load_job(const std::string& ip, const 
int32_t port,
+                                            const TFetchRoutineLoadJobRequest& 
request,
+                                            TFetchRoutineLoadJobResult* 
result) {
+    return ThriftRpcHelper::rpc<FrontendServiceClient>(
+            ip, port, [&request, &result](FrontendServiceConnection& client) {
+                client->fetchRoutineLoadJob(*result, request);
+            });
+}
+
 } // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_helper.h 
b/be/src/exec/schema_scanner/schema_helper.h
index bc794093128..53d37389371 100644
--- a/be/src/exec/schema_scanner/schema_helper.h
+++ b/be/src/exec/schema_scanner/schema_helper.h
@@ -26,6 +26,8 @@
 namespace doris {
 class TDescribeTablesParams;
 class TDescribeTablesResult;
+class TFetchRoutineLoadJobRequest;
+class TFetchRoutineLoadJobResult;
 class TGetDbsParams;
 class TGetDbsResult;
 class TGetTablesParams;
@@ -84,6 +86,10 @@ public:
                                     TShowProcessListResult* result);
     static Status show_user(const std::string& ip, const int32_t port,
                             const TShowUserRequest& request, TShowUserResult* 
result);
+
+    static Status fetch_routine_load_job(const std::string& ip, const int32_t 
port,
+                                         const TFetchRoutineLoadJobRequest& 
request,
+                                         TFetchRoutineLoadJobResult* result);
 };
 
 } // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp 
b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp
new file mode 100644
index 00000000000..e061ab790cf
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/schema_scanner/schema_routine_load_job_scanner.h"
+
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/FrontendService_types.h>
+
+#include <string>
+
+#include "exec/schema_scanner/schema_helper.h"
+#include "runtime/runtime_state.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+std::vector<SchemaScanner::ColumnDesc> 
SchemaRoutineLoadJobScanner::_s_tbls_columns = {
+        //   name,       type,          size,     is_null
+        {"JOB_ID", TYPE_STRING, sizeof(StringRef), true},
+        {"JOB_NAME", TYPE_STRING, sizeof(StringRef), true},
+        {"CREATE_TIME", TYPE_STRING, sizeof(StringRef), true},
+        {"PAUSE_TIME", TYPE_STRING, sizeof(StringRef), true},
+        {"END_TIME", TYPE_STRING, sizeof(StringRef), true},
+        {"DB_NAME", TYPE_STRING, sizeof(StringRef), true},
+        {"TABLE_NAME", TYPE_STRING, sizeof(StringRef), true},
+        {"STATE", TYPE_STRING, sizeof(StringRef), true},
+        {"CURRENT_TASK_NUM", TYPE_STRING, sizeof(StringRef), true},
+        {"JOB_PROPERTIES", TYPE_STRING, sizeof(StringRef), true},
+        {"DATA_SOURCE_PROPERTIES", TYPE_STRING, sizeof(StringRef), true},
+        {"CUSTOM_PROPERTIES", TYPE_STRING, sizeof(StringRef), true},
+        {"STATISTIC", TYPE_STRING, sizeof(StringRef), true},
+        {"PROGRESS", TYPE_STRING, sizeof(StringRef), true},
+        {"LAG", TYPE_STRING, sizeof(StringRef), true},
+        {"REASON_OF_STATE_CHANGED", TYPE_STRING, sizeof(StringRef), true},
+        {"ERROR_LOG_URLS", TYPE_STRING, sizeof(StringRef), true},
+        {"USER_NAME", TYPE_STRING, sizeof(StringRef), true},
+        {"CURRENT_ABORT_TASK_NUM", TYPE_INT, sizeof(int32_t), true},
+        {"IS_ABNORMAL_PAUSE", TYPE_BOOLEAN, sizeof(int8_t), true},
+};
+
+SchemaRoutineLoadJobScanner::SchemaRoutineLoadJobScanner()
+        : SchemaScanner(_s_tbls_columns, 
TSchemaTableType::SCH_ROUTINE_LOAD_JOB) {}
+
+SchemaRoutineLoadJobScanner::~SchemaRoutineLoadJobScanner() {}
+
+Status SchemaRoutineLoadJobScanner::start(RuntimeState* state) {
+    if (!_is_init) {
+        return Status::InternalError("used before initialized.");
+    }
+    TFetchRoutineLoadJobRequest request;
+    RETURN_IF_ERROR(SchemaHelper::fetch_routine_load_job(
+            *(_param->common_param->ip), _param->common_param->port, request, 
&_result));
+    return Status::OK();
+}
+
+Status SchemaRoutineLoadJobScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
+    if (!_is_init) {
+        return Status::InternalError("call this before initial.");
+    }
+    if (block == nullptr || eos == nullptr) {
+        return Status::InternalError("invalid parameter.");
+    }
+
+    *eos = true;
+    if (_result.routineLoadJobs.empty()) {
+        return Status::OK();
+    }
+
+    return _fill_block_impl(block);
+}
+
+Status SchemaRoutineLoadJobScanner::_fill_block_impl(vectorized::Block* block) 
{
+    SCOPED_TIMER(_fill_block_timer);
+
+    const auto& jobs_info = _result.routineLoadJobs;
+    size_t row_num = jobs_info.size();
+    if (row_num == 0) {
+        return Status::OK();
+    }
+
+    for (size_t col_idx = 0; col_idx < _s_tbls_columns.size(); ++col_idx) {
+        const auto& col_desc = _s_tbls_columns[col_idx];
+
+        std::vector<StringRef> str_refs(row_num);
+        std::vector<int32_t> int_vals(row_num);
+        std::vector<int8_t> bool_vals(row_num);
+        std::vector<void*> datas(row_num);
+        std::vector<std::string> column_values(row_num);
+
+        for (size_t row_idx = 0; row_idx < row_num; ++row_idx) {
+            const auto& job_info = jobs_info[row_idx];
+            std::string& column_value = column_values[row_idx];
+
+            if (col_desc.type == TYPE_STRING) {
+                switch (col_idx) {
+                case 0: // JOB_ID
+                    column_value = job_info.__isset.job_id ? job_info.job_id : 
"";
+                    break;
+                case 1: // JOB_NAME
+                    column_value = job_info.__isset.job_name ? 
job_info.job_name : "";
+                    break;
+                case 2: // CREATE_TIME
+                    column_value = job_info.__isset.create_time ? 
job_info.create_time : "";
+                    break;
+                case 3: // PAUSE_TIME
+                    column_value = job_info.__isset.pause_time ? 
job_info.pause_time : "";
+                    break;
+                case 4: // END_TIME
+                    column_value = job_info.__isset.end_time ? 
job_info.end_time : "";
+                    break;
+                case 5: // DB_NAME
+                    column_value = job_info.__isset.db_name ? job_info.db_name 
: "";
+                    break;
+                case 6: // TABLE_NAME
+                    column_value = job_info.__isset.table_name ? 
job_info.table_name : "";
+                    break;
+                case 7: // STATE
+                    column_value = job_info.__isset.state ? job_info.state : 
"";
+                    break;
+                case 8: // CURRENT_TASK_NUM
+                    column_value =
+                            job_info.__isset.current_task_num ? 
job_info.current_task_num : "";
+                    break;
+                case 9: // JOB_PROPERTIES
+                    column_value = job_info.__isset.job_properties ? 
job_info.job_properties : "";
+                    break;
+                case 10: // DATA_SOURCE_PROPERTIES
+                    column_value = job_info.__isset.data_source_properties
+                                           ? job_info.data_source_properties
+                                           : "";
+                    break;
+                case 11: // CUSTOM_PROPERTIES
+                    column_value =
+                            job_info.__isset.custom_properties ? 
job_info.custom_properties : "";
+                    break;
+                case 12: // STATISTIC
+                    column_value = job_info.__isset.statistic ? 
job_info.statistic : "";
+                    break;
+                case 13: // PROGRESS
+                    column_value = job_info.__isset.progress ? 
job_info.progress : "";
+                    break;
+                case 14: // LAG
+                    column_value = job_info.__isset.lag ? job_info.lag : "";
+                    break;
+                case 15: // REASON_OF_STATE_CHANGED
+                    column_value = job_info.__isset.reason_of_state_changed
+                                           ? job_info.reason_of_state_changed
+                                           : "";
+                    break;
+                case 16: // ERROR_LOG_URLS
+                    column_value = job_info.__isset.error_log_urls ? 
job_info.error_log_urls : "";
+                    break;
+                case 17: // USER_NAME
+                    column_value = job_info.__isset.user_name ? 
job_info.user_name : "";
+                    break;
+                }
+
+                str_refs[row_idx] =
+                        StringRef(column_values[row_idx].data(), 
column_values[row_idx].size());
+                datas[row_idx] = &str_refs[row_idx];
+            } else if (col_desc.type == TYPE_INT) {
+                int_vals[row_idx] = job_info.__isset.current_abort_task_num
+                                            ? job_info.current_abort_task_num
+                                            : 0;
+                datas[row_idx] = &int_vals[row_idx];
+            } else if (col_desc.type == TYPE_BOOLEAN) {
+                bool_vals[row_idx] =
+                        job_info.__isset.is_abnormal_pause ? 
job_info.is_abnormal_pause : false;
+                datas[row_idx] = &bool_vals[row_idx];
+            }
+        }
+
+        RETURN_IF_ERROR(fill_dest_column_for_range(block, col_idx, datas));
+    }
+
+    return Status::OK();
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/schema_scanner/schema_routine_load_job_scanner.h 
b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.h
new file mode 100644
index 00000000000..1105328776a
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/FrontendService_types.h>
+
+#include <vector>
+
+#include "common/status.h"
+#include "exec/schema_scanner.h"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+class SchemaRoutineLoadJobScanner : public SchemaScanner {
+    ENABLE_FACTORY_CREATOR(SchemaRoutineLoadJobScanner);
+
+public:
+    SchemaRoutineLoadJobScanner();
+    ~SchemaRoutineLoadJobScanner() override;
+
+    Status start(RuntimeState* state) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
+
+private:
+    Status _fill_block_impl(vectorized::Block* block);
+
+    TFetchRoutineLoadJobResult _result;
+    static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
index 7ec9e5cab37..5c5bdeb832c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
@@ -90,7 +90,9 @@ public enum SchemaTableType {
     SCH_CATALOG_META_CACHE_STATISTICS("CATALOG_META_CACHE_STATISTICS", 
"CATALOG_META_CACHE_STATISTICS",
             TSchemaTableType.SCH_CATALOG_META_CACHE_STATISTICS),
     SCH_BACKEND_KERBEROS_TICKET_CACHE("BACKEND_KERBEROS_TICKET_CACHE", 
"BACKEND_KERBEROS_TICKET_CACHE",
-            TSchemaTableType.SCH_BACKEND_KERBEROS_TICKET_CACHE);
+            TSchemaTableType.SCH_BACKEND_KERBEROS_TICKET_CACHE),
+    SCH_ROUTINE_LOAD_JOB("ROUTINE_LOAD_JOB", "ROUTINE_LOAD_JOB",
+            TSchemaTableType.SCH_ROUTINE_LOAD_JOB);
 
     private static final String dbName = "INFORMATION_SCHEMA";
     private static SelectList fullSelectLists;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 75072390821..36992833639 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -597,6 +597,30 @@ public class SchemaTable extends Table {
                                     .column("REFRESH_INTERVAL_SECOND", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .build())
             )
+            .put("routine_load_job",
+                    new SchemaTable(SystemIdGenerator.getNextId(), 
"routine_load_job", TableType.SCHEMA,
+                            builder().column("JOB_ID", 
ScalarType.createStringType())
+                                    .column("JOB_NAME", 
ScalarType.createStringType())
+                                    .column("CREATE_TIME", 
ScalarType.createStringType())
+                                    .column("PAUSE_TIME", 
ScalarType.createStringType())
+                                    .column("END_TIME", 
ScalarType.createStringType())
+                                    .column("DB_NAME", 
ScalarType.createStringType())
+                                    .column("TABLE_NAME", 
ScalarType.createStringType())
+                                    .column("STATE", 
ScalarType.createStringType())
+                                    .column("CURRENT_TASK_NUM", 
ScalarType.createStringType())
+                                    .column("JOB_PROPERTIES", 
ScalarType.createStringType())
+                                    .column("DATA_SOURCE_PROPERTIES", 
ScalarType.createStringType())
+                                    .column("CUSTOM_PROPERTIES", 
ScalarType.createStringType())
+                                    .column("STATISTIC", 
ScalarType.createStringType())
+                                    .column("PROGRESS", 
ScalarType.createStringType())
+                                    .column("LAG", 
ScalarType.createStringType())
+                                    .column("REASON_OF_STATE_CHANGED", 
ScalarType.createStringType())
+                                    .column("ERROR_LOG_URLS", 
ScalarType.createStringType())
+                                    .column("USER_NAME", 
ScalarType.createStringType())
+                                    .column("CURRENT_ABORT_TASK_NUM", 
ScalarType.createType(PrimitiveType.INT))
+                                    .column("IS_ABNORMAL_PAUSE", 
ScalarType.createType(PrimitiveType.BOOLEAN))
+                                    .build())
+            )
             .build();
 
     private boolean fetchAllFe = false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 4904b59b5b6..bbf34b93258 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -474,7 +474,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     @Override
-    protected String getStatistic() {
+    public String getStatistic() {
         Map<String, Object> summary = this.jobStatistic.summary();
         Gson gson = new GsonBuilder().disableHtmlEscaping().create();
         return gson.toJson(summary);
@@ -635,7 +635,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     @Override
-    protected String dataSourcePropertiesJsonToString() {
+    public String dataSourcePropertiesJsonToString() {
         Map<String, String> dataSourceProperties = Maps.newHashMap();
         dataSourceProperties.put("brokerList", brokerList);
         dataSourceProperties.put("topic", topic);
@@ -647,13 +647,13 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     @Override
-    protected String customPropertiesJsonToString() {
+    public String customPropertiesJsonToString() {
         Gson gson = new GsonBuilder().disableHtmlEscaping().create();
         return gson.toJson(customProperties);
     }
 
     @Override
-    protected Map<String, String> getDataSourceProperties() {
+    public Map<String, String> getDataSourceProperties() {
         Map<String, String> dataSourceProperties = Maps.newHashMap();
         dataSourceProperties.put("kafka_broker_list", brokerList);
         dataSourceProperties.put("kafka_topic", topic);
@@ -661,7 +661,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     @Override
-    protected Map<String, String> getCustomProperties() {
+    public Map<String, String> getCustomProperties() {
         Map<String, String> ret = new HashMap<>();
         customProperties.forEach((k, v) -> ret.put("property." + k, v));
         return ret;
@@ -910,7 +910,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     @Override
-    protected String getLag() {
+    public String getLag() {
         Map<Integer, Long> partitionIdToOffsetLag = ((KafkaProgress) 
progress).getLag(cachedPartitionWithLatestOffsets);
         Gson gson = new Gson();
         return gson.toJson(partitionIdToOffsetLag);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 5dbca308f6c..859d5df43e6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -502,6 +502,18 @@ public abstract class RoutineLoadJob
         return dbId;
     }
 
+    public String getCreateTimestampString() {
+        return TimeUtils.longToTimeString(createTimestamp);
+    }
+
+    public String getPauseTimestampString() {
+        return TimeUtils.longToTimeString(pauseTimestamp);
+    }
+
+    public String getEndTimestampString() {
+        return TimeUtils.longToTimeString(endTimestamp);
+    }
+
     public void setOtherMsg(String otherMsg) {
         writeLock();
         try {
@@ -555,6 +567,10 @@ public abstract class RoutineLoadJob
         return endTimestamp;
     }
 
+    public RoutineLoadStatistic getJobStatistic() {
+        return jobStatistic;
+    }
+
     public PartitionNames getPartitions() {
         return partitions;
     }
@@ -790,6 +806,10 @@ public abstract class RoutineLoadJob
         }
     }
 
+    public Queue<String> getErrorLogUrls() {
+        return errorLogUrls;
+    }
+
     // RoutineLoadScheduler will run this method at fixed interval, and renew 
the timeout tasks
     public void processTimeoutTasks() {
         writeLock();
@@ -845,6 +865,11 @@ public abstract class RoutineLoadJob
         }
     }
 
+    public boolean isAbnormalPause() {
+        return this.state == JobState.PAUSED && this.pauseReason != null
+                    && this.pauseReason.getCode() != 
InternalErrorCode.MANUAL_PAUSE_ERR;
+    }
+
     // All of private method could not be call without lock
     private void checkStateTransform(RoutineLoadJob.JobState desireState) 
throws UserException {
         switch (state) {
@@ -939,6 +964,7 @@ public abstract class RoutineLoadJob
             this.jobStatistic.currentErrorRows = 0;
             this.jobStatistic.currentTotalRows = 0;
             this.otherMsg = "";
+            this.jobStatistic.currentAbortedTaskNum = 0;
         } else if (this.jobStatistic.currentErrorRows > maxErrorNum
                 || (this.jobStatistic.currentTotalRows > 0
                     && ((double) this.jobStatistic.currentErrorRows
@@ -1246,6 +1272,7 @@ public abstract class RoutineLoadJob
                             .build());
                 }
                 ++this.jobStatistic.abortedTaskNum;
+                ++this.jobStatistic.currentAbortedTaskNum;
                 TransactionState.TxnStatusChangeReason txnStatusChangeReason = 
null;
                 if (txnStatusChangeReasonString != null) {
                     txnStatusChangeReason =
@@ -1615,9 +1642,20 @@ public abstract class RoutineLoadJob
                                                TransactionState txnState,
                                                
TransactionState.TxnStatusChangeReason txnStatusChangeReason);
 
-    protected abstract String getStatistic();
+    public abstract String getStatistic();
+
+    public abstract String getLag();
 
-    protected abstract String getLag();
+    public String getStateReason() {
+        switch (state) {
+            case PAUSED:
+                return pauseReason == null ? "" : pauseReason.toString();
+            case CANCELLED:
+                return cancelReason == null ? "" : cancelReason.toString();
+            default:
+                return "";
+        }
+    }
 
     public List<String> getShowInfo() {
         Optional<Database> database = 
Env.getCurrentInternalCatalog().getDb(dbId);
@@ -1647,16 +1685,7 @@ public abstract class RoutineLoadJob
             row.add(getStatistic());
             row.add(getProgress().toJsonString());
             row.add(getLag());
-            switch (state) {
-                case PAUSED:
-                    row.add(pauseReason == null ? "" : pauseReason.toString());
-                    break;
-                case CANCELLED:
-                    row.add(cancelReason == null ? "" : 
cancelReason.toString());
-                    break;
-                default:
-                    row.add("");
-            }
+            row.add(getStateReason());
             row.add(Joiner.on(", ").join(errorLogUrls));
             row.add(otherMsg);
             row.add(userIdentity.getQualifiedUser());
@@ -1819,7 +1848,7 @@ public abstract class RoutineLoadJob
         }
     }
 
-    private String jobPropertiesToJsonString() {
+    public String jobPropertiesToJsonString() {
         Map<String, String> jobProperties = Maps.newHashMap();
         jobProperties.put("partitions", partitions == null
                 ? STAR_STRING : 
Joiner.on(",").join(partitions.getPartitionNames()));
@@ -1853,13 +1882,13 @@ public abstract class RoutineLoadJob
         return gson.toJson(jobProperties);
     }
 
-    abstract String dataSourcePropertiesJsonToString();
+    public abstract String dataSourcePropertiesJsonToString();
 
-    abstract String customPropertiesJsonToString();
+    public abstract String customPropertiesJsonToString();
 
-    abstract Map<String, String> getDataSourceProperties();
+    public abstract Map<String, String> getDataSourceProperties();
 
-    abstract Map<String, String> getCustomProperties();
+    public abstract Map<String, String> getCustomProperties();
 
     public boolean isExpired() {
         if (!isFinal()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index a984a672d34..bfe42ad7695 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -110,6 +110,10 @@ public class RoutineLoadManager implements Writable {
     public RoutineLoadManager() {
     }
 
+    public List<RoutineLoadJob> getAllRoutineLoadJobs() {
+        return new ArrayList<>(idToRoutineLoadJob.values());
+    }
+
     public List<RoutineLoadJob> getActiveRoutineLoadJobs() {
         return idToRoutineLoadJob.values().stream()
                 .filter(job -> !job.state.isFinalState())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
index bb3b3e88daa..cf16b45fa13 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
@@ -39,7 +39,7 @@ public abstract class RoutineLoadProgress {
 
     abstract void update(RLTaskTxnCommitAttachment attachment);
 
-    abstract String toJsonString();
+    public abstract String toJsonString();
 
     public static RoutineLoadProgress read(DataInput in) throws IOException {
         RoutineLoadProgress progress = null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
index ad10367f982..e5b01c50e26 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
@@ -61,6 +61,7 @@ public class RoutineLoadStatistic {
     public long committedTaskNum = 0;
     @SerializedName(value = "abortedTaskNum")
     public long abortedTaskNum = 0;
+    public int currentAbortedTaskNum = 0;
 
     // Save all transactions current running. Including PREPARE, COMMITTED.
     // No need to persist, only for tracing txn of routine load job.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 2928d84e549..9c9b2eedc75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -85,6 +85,7 @@ import org.apache.doris.load.StreamLoadHandler;
 import org.apache.doris.load.routineload.ErrorReason;
 import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.load.routineload.RoutineLoadJob.JobState;
+import org.apache.doris.load.routineload.RoutineLoadManager;
 import org.apache.doris.master.MasterImpl;
 import org.apache.doris.mysql.privilege.AccessControllerManager;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -147,6 +148,8 @@ import org.apache.doris.thrift.TDropPlsqlPackageRequest;
 import org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest;
 import org.apache.doris.thrift.TFeResult;
 import org.apache.doris.thrift.TFetchResourceResult;
+import org.apache.doris.thrift.TFetchRoutineLoadJobRequest;
+import org.apache.doris.thrift.TFetchRoutineLoadJobResult;
 import org.apache.doris.thrift.TFetchRunningQueriesRequest;
 import org.apache.doris.thrift.TFetchRunningQueriesResult;
 import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
@@ -228,6 +231,7 @@ import org.apache.doris.thrift.TRestoreSnapshotRequest;
 import org.apache.doris.thrift.TRestoreSnapshotResult;
 import org.apache.doris.thrift.TRollbackTxnRequest;
 import org.apache.doris.thrift.TRollbackTxnResult;
+import org.apache.doris.thrift.TRoutineLoadJob;
 import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.thrift.TSchemaTableName;
 import org.apache.doris.thrift.TShowProcessListRequest;
@@ -266,6 +270,7 @@ import 
org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TransactionStatus;
 import org.apache.doris.transaction.TxnCommitAttachment;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -4211,4 +4216,55 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         result.setRunningQueries(runningQueries);
         return result;
     }
+
+    @Override
+    public TFetchRoutineLoadJobResult 
fetchRoutineLoadJob(TFetchRoutineLoadJobRequest request) {
+        TFetchRoutineLoadJobResult result = new TFetchRoutineLoadJobResult();
+
+        if (!Env.getCurrentEnv().isReady()) {
+            return result;
+        }
+
+        RoutineLoadManager routineLoadManager = 
Env.getCurrentEnv().getRoutineLoadManager();
+        List<TRoutineLoadJob> jobInfos = Lists.newArrayList();
+        List<RoutineLoadJob> routineLoadJobs = 
routineLoadManager.getAllRoutineLoadJobs();
+        for (RoutineLoadJob job : routineLoadJobs) {
+            TRoutineLoadJob jobInfo = new TRoutineLoadJob();
+            jobInfo.setJobId(String.valueOf(job.getId()));
+            jobInfo.setJobName(job.getName());
+            jobInfo.setCreateTime(job.getCreateTimestampString());
+            jobInfo.setPauseTime(job.getPauseTimestampString());
+            jobInfo.setEndTime(job.getEndTimestampString());
+            String dbName = "";
+            String tableName = "";
+            try {
+                dbName = job.getDbFullName();
+                tableName = job.getTableName();
+            } catch (MetaNotFoundException e) {
+                LOG.warn("Failed to get db or table name for routine load job: 
{}", job.getId(), e);
+            }
+            jobInfo.setDbName(dbName);
+            jobInfo.setTableName(tableName);
+            jobInfo.setState(job.getState().name());
+            
jobInfo.setCurrentTaskNum(String.valueOf(job.getSizeOfRoutineLoadTaskInfoList()));
+            jobInfo.setJobProperties(job.jobPropertiesToJsonString());
+            
jobInfo.setDataSourceProperties(job.dataSourcePropertiesJsonToString());
+            jobInfo.setCustomProperties(job.customPropertiesJsonToString());
+            jobInfo.setStatistic(job.getStatistic());
+            jobInfo.setProgress(job.getProgress().toJsonString());
+            jobInfo.setLag(job.getLag());
+            jobInfo.setReasonOfStateChanged(job.getStateReason());
+            jobInfo.setErrorLogUrls(Joiner.on(", 
").join(job.getErrorLogUrls()));
+            jobInfo.setUserName(job.getUserIdentity().getQualifiedUser());
+            
jobInfo.setCurrentAbortTaskNum(job.getJobStatistic().currentAbortedTaskNum);
+            jobInfo.setIsAbnormalPause(job.isAbnormalPause());
+            jobInfos.add(jobInfo);
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("routine load job infos: {}", jobInfos);
+        }
+        result.setRoutineLoadJobs(jobInfos);
+
+        return result;
+    }
 }
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index f4070114617..502246052da 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -139,7 +139,8 @@ enum TSchemaTableType {
     SCH_TABLE_PROPERTIES = 50,
     SCH_FILE_CACHE_STATISTICS = 51,
     SCH_CATALOG_META_CACHE_STATISTICS = 52,
-    SCH_BACKEND_KERBEROS_TICKET_CACHE = 53;
+    SCH_BACKEND_KERBEROS_TICKET_CACHE = 53,
+    SCH_ROUTINE_LOAD_JOB = 54;
 }
 
 enum THdfsCompression {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 7a13bd504cb..83774eda5de 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1557,6 +1557,36 @@ struct TFetchRunningQueriesResult {
 struct TFetchRunningQueriesRequest {
 }
 
+struct TFetchRoutineLoadJobRequest {
+}
+
+struct TRoutineLoadJob {
+    1: optional string job_id
+    2: optional string job_name
+    3: optional string create_time
+    4: optional string pause_time
+    5: optional string end_time
+    6: optional string db_name
+    7: optional string table_name
+    8: optional string state
+    9: optional string current_task_num
+    10: optional string job_properties
+    11: optional string data_source_properties
+    12: optional string custom_properties
+    13: optional string statistic
+    14: optional string progress
+    15: optional string lag
+    16: optional string reason_of_state_changed
+    17: optional string error_log_urls
+    18: optional string user_name
+    19: optional i32 current_abort_task_num
+    20: optional bool is_abnormal_pause
+}
+
+struct TFetchRoutineLoadJobResult {
+    1: optional list<TRoutineLoadJob> routineLoadJobs
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1655,4 +1685,6 @@ service FrontendService {
     Status.TStatus updatePartitionStatsCache(1: 
TUpdateFollowerPartitionStatsCacheRequest request)
 
     TFetchRunningQueriesResult fetchRunningQueries(1: 
TFetchRunningQueriesRequest request)
+
+    TFetchRoutineLoadJobResult fetchRoutineLoadJob(1: 
TFetchRoutineLoadJobRequest request)
 }
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
new file mode 100644
index 00000000000..3fa360bf281
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+import groovy.json.JsonSlurper
+
+suite("test_routine_load_job_info_system_table","p0") {
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+    def jobName = "test_job_info_system_table_invaild"
+    def tableName = "test_job_info_system_table"
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        try {
+            sql """
+            CREATE TABLE IF NOT EXISTS ${tableName}
+            (
+                k00 INT             NOT NULL,
+                k01 DATE            NOT NULL,
+                k02 BOOLEAN         NULL,
+                k03 TINYINT         NULL,
+                k04 SMALLINT        NULL,
+                k05 INT             NULL,
+                k06 BIGINT          NULL,
+                k07 LARGEINT        NULL,
+                k08 FLOAT           NULL,
+                k09 DOUBLE          NULL,
+                k10 DECIMAL(9,1)    NULL,
+                k11 DECIMALV3(9,1)  NULL,
+                k12 DATETIME        NULL,
+                k13 DATEV2          NULL,
+                k14 DATETIMEV2      NULL,
+                k15 CHAR            NULL,
+                k16 VARCHAR         NULL,
+                k17 STRING          NULL,
+                k18 JSON            NULL,
+                kd01 BOOLEAN         NOT NULL DEFAULT "TRUE",
+                kd02 TINYINT         NOT NULL DEFAULT "1",
+                kd03 SMALLINT        NOT NULL DEFAULT "2",
+                kd04 INT             NOT NULL DEFAULT "3",
+                kd05 BIGINT          NOT NULL DEFAULT "4",
+                kd06 LARGEINT        NOT NULL DEFAULT "5",
+                kd07 FLOAT           NOT NULL DEFAULT "6.0",
+                kd08 DOUBLE          NOT NULL DEFAULT "7.0",
+                kd09 DECIMAL         NOT NULL DEFAULT "888888888",
+                kd10 DECIMALV3       NOT NULL DEFAULT "999999999",
+                kd11 DATE            NOT NULL DEFAULT "2023-08-24",
+                kd12 DATETIME        NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd13 DATEV2          NOT NULL DEFAULT "2023-08-24",
+                kd14 DATETIMEV2      NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd15 CHAR(255)       NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd16 VARCHAR(300)    NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd17 STRING          NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd18 JSON            NULL,
+                
+                INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+                INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+                INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+                INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+                INDEX idx_inverted_k117 (`k17`) USING INVERTED 
PROPERTIES("parser" = "english"),
+                INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+                INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+                
+            )
+            DUPLICATE KEY(k00)
+            PARTITION BY RANGE(k01)
+            (
+                PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+                PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+                PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+            )
+            DISTRIBUTED BY HASH(k00) BUCKETS 32
+            PROPERTIES (
+                "bloom_filter_columns"="k05",
+                "replication_num" = "1"
+            );
+            """
+            sql """
+                CREATE ROUTINE LOAD ${jobName} on ${tableName}
+                
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+                COLUMNS TERMINATED BY "|"
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "test_job_info_system_table_invaild",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            def count = 0
+            while (true) {
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (state[0][8] == "PAUSED") {
+                    break
+                }
+                if (count >= 30) {
+                    assertEquals(1, 2)
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+            def res = sql "SELECT JOB_NAME FROM 
information_schema.routine_load_job WHERE CURRENT_ABORT_TASK_NUM > 0 OR 
IS_ABNORMAL_PAUSE = TRUE"
+            log.info("res: ${res}".toString())
+            assertTrue(res.toString().contains("${jobName}"))
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql "DROP TABLE IF EXISTS ${tableName}"
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to