This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e18ea534d9a [improve](information schema) introduce routine load job system table (#48963) e18ea534d9a is described below commit e18ea534d9a009bc3c128515fa56c07d9c538b74 Author: hui lai <lai...@selectdb.com> AuthorDate: Mon Mar 17 11:06:53 2025 +0800 [improve](information schema) introduce routine load job system table (#48963) ### What problem does this PR solve? Part IV of https://github.com/apache/doris/issues/48511 doc https://github.com/apache/doris-website/pull/2196 **Introduce routine load job statistic system table:** ``` mysql> show create table information_schema.routine_load_job\G *************************** 1. row *************************** Table: routine_load_job Create Table: CREATE TABLE `routine_load_job` ( `JOB_ID` text NULL, `JOB_NAME` text NULL, `CREATE_TIME` text NULL, `PAUSE_TIME` text NULL, `END_TIME` text NULL, `DB_NAME` text NULL, `TABLE_NAME` text NULL, `STATE` text NULL, `CURRENT_TASK_NUM` text NULL, `JOB_PROPERTIES` text NULL, `DATA_SOURCE_PROPERTIES` text NULL, `CUSTOM_PROPERTIES` text NULL, `STATISTIC` text NULL, `PROGRESS` text NULL, `LAG` text NULL, `REASON_OF_STATE_CHANGED` text NULL, `ERROR_LOG_URLS` text NULL, `USER_NAME` text NULL, `CURRENT_ABORT_TASK_NUM` int NULL, `IS_ABNORMAL_PAUSE` boolean NULL ) ENGINE=SCHEMA; 1 row in set (0.00 sec) ``` **There are some benefits to empower job with SQL query capability for statistical information:** - It can be used in conjunction with metrics add through https://github.com/apache/doris/pull/48209 to roughly locate abnormal jobs when Grafana alarms, and the following SQL can be used: ``` SELECT JOB_NAME FROM information_schema.routine_load_job_statistics WHERE CURRENT_ABORT_TASK_NUM > 0 OR IS_ABNORMAL_PAUSE = TRUE; ``` - User can use the `select * from information_schema.routine_load_job` instead of the `show routine load`. The advantage is that the `show routine load` can only be searched by name, but SQL can be very flexible in locating jobs --- be/src/exec/schema_scanner.cpp | 3 + be/src/exec/schema_scanner/schema_helper.cpp | 9 + be/src/exec/schema_scanner/schema_helper.h | 6 + .../schema_routine_load_job_scanner.cpp | 199 +++++++++++++++++++++ .../schema_routine_load_job_scanner.h | 50 ++++++ .../org/apache/doris/analysis/SchemaTableType.java | 4 +- .../java/org/apache/doris/catalog/SchemaTable.java | 24 +++ .../load/routineload/KafkaRoutineLoadJob.java | 12 +- .../doris/load/routineload/RoutineLoadJob.java | 63 +++++-- .../doris/load/routineload/RoutineLoadManager.java | 4 + .../load/routineload/RoutineLoadProgress.java | 2 +- .../load/routineload/RoutineLoadStatistic.java | 1 + .../apache/doris/service/FrontendServiceImpl.java | 56 ++++++ gensrc/thrift/Descriptors.thrift | 3 +- gensrc/thrift/FrontendService.thrift | 32 ++++ .../test_routine_load_job_info_system_table.groovy | 142 +++++++++++++++ 16 files changed, 584 insertions(+), 26 deletions(-) diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 3c3e57a8522..40b0616c9a5 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -40,6 +40,7 @@ #include "exec/schema_scanner/schema_partitions_scanner.h" #include "exec/schema_scanner/schema_processlist_scanner.h" #include "exec/schema_scanner/schema_profiling_scanner.h" +#include "exec/schema_scanner/schema_routine_load_job_scanner.h" #include "exec/schema_scanner/schema_routine_scanner.h" #include "exec/schema_scanner/schema_rowsets_scanner.h" #include "exec/schema_scanner/schema_schema_privileges_scanner.h" @@ -228,6 +229,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type return SchemaCatalogMetaCacheStatsScanner::create_unique(); case TSchemaTableType::SCH_BACKEND_KERBEROS_TICKET_CACHE: return SchemaBackendKerberosTicketCacheScanner::create_unique(); + case TSchemaTableType::SCH_ROUTINE_LOAD_JOB: + return SchemaRoutineLoadJobScanner::create_unique(); default: return SchemaDummyScanner::create_unique(); break; diff --git a/be/src/exec/schema_scanner/schema_helper.cpp b/be/src/exec/schema_scanner/schema_helper.cpp index 7cf95187b02..13670444e53 100644 --- a/be/src/exec/schema_scanner/schema_helper.cpp +++ b/be/src/exec/schema_scanner/schema_helper.cpp @@ -142,4 +142,13 @@ Status SchemaHelper::show_user(const std::string& ip, const int32_t port, }); } +Status SchemaHelper::fetch_routine_load_job(const std::string& ip, const int32_t port, + const TFetchRoutineLoadJobRequest& request, + TFetchRoutineLoadJobResult* result) { + return ThriftRpcHelper::rpc<FrontendServiceClient>( + ip, port, [&request, &result](FrontendServiceConnection& client) { + client->fetchRoutineLoadJob(*result, request); + }); +} + } // namespace doris diff --git a/be/src/exec/schema_scanner/schema_helper.h b/be/src/exec/schema_scanner/schema_helper.h index bc794093128..53d37389371 100644 --- a/be/src/exec/schema_scanner/schema_helper.h +++ b/be/src/exec/schema_scanner/schema_helper.h @@ -26,6 +26,8 @@ namespace doris { class TDescribeTablesParams; class TDescribeTablesResult; +class TFetchRoutineLoadJobRequest; +class TFetchRoutineLoadJobResult; class TGetDbsParams; class TGetDbsResult; class TGetTablesParams; @@ -84,6 +86,10 @@ public: TShowProcessListResult* result); static Status show_user(const std::string& ip, const int32_t port, const TShowUserRequest& request, TShowUserResult* result); + + static Status fetch_routine_load_job(const std::string& ip, const int32_t port, + const TFetchRoutineLoadJobRequest& request, + TFetchRoutineLoadJobResult* result); }; } // namespace doris diff --git a/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp new file mode 100644 index 00000000000..e061ab790cf --- /dev/null +++ b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_routine_load_job_scanner.h" + +#include <gen_cpp/Descriptors_types.h> +#include <gen_cpp/FrontendService_types.h> + +#include <string> + +#include "exec/schema_scanner/schema_helper.h" +#include "runtime/runtime_state.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +std::vector<SchemaScanner::ColumnDesc> SchemaRoutineLoadJobScanner::_s_tbls_columns = { + // name, type, size, is_null + {"JOB_ID", TYPE_STRING, sizeof(StringRef), true}, + {"JOB_NAME", TYPE_STRING, sizeof(StringRef), true}, + {"CREATE_TIME", TYPE_STRING, sizeof(StringRef), true}, + {"PAUSE_TIME", TYPE_STRING, sizeof(StringRef), true}, + {"END_TIME", TYPE_STRING, sizeof(StringRef), true}, + {"DB_NAME", TYPE_STRING, sizeof(StringRef), true}, + {"TABLE_NAME", TYPE_STRING, sizeof(StringRef), true}, + {"STATE", TYPE_STRING, sizeof(StringRef), true}, + {"CURRENT_TASK_NUM", TYPE_STRING, sizeof(StringRef), true}, + {"JOB_PROPERTIES", TYPE_STRING, sizeof(StringRef), true}, + {"DATA_SOURCE_PROPERTIES", TYPE_STRING, sizeof(StringRef), true}, + {"CUSTOM_PROPERTIES", TYPE_STRING, sizeof(StringRef), true}, + {"STATISTIC", TYPE_STRING, sizeof(StringRef), true}, + {"PROGRESS", TYPE_STRING, sizeof(StringRef), true}, + {"LAG", TYPE_STRING, sizeof(StringRef), true}, + {"REASON_OF_STATE_CHANGED", TYPE_STRING, sizeof(StringRef), true}, + {"ERROR_LOG_URLS", TYPE_STRING, sizeof(StringRef), true}, + {"USER_NAME", TYPE_STRING, sizeof(StringRef), true}, + {"CURRENT_ABORT_TASK_NUM", TYPE_INT, sizeof(int32_t), true}, + {"IS_ABNORMAL_PAUSE", TYPE_BOOLEAN, sizeof(int8_t), true}, +}; + +SchemaRoutineLoadJobScanner::SchemaRoutineLoadJobScanner() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_ROUTINE_LOAD_JOB) {} + +SchemaRoutineLoadJobScanner::~SchemaRoutineLoadJobScanner() {} + +Status SchemaRoutineLoadJobScanner::start(RuntimeState* state) { + if (!_is_init) { + return Status::InternalError("used before initialized."); + } + TFetchRoutineLoadJobRequest request; + RETURN_IF_ERROR(SchemaHelper::fetch_routine_load_job( + *(_param->common_param->ip), _param->common_param->port, request, &_result)); + return Status::OK(); +} + +Status SchemaRoutineLoadJobScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("call this before initial."); + } + if (block == nullptr || eos == nullptr) { + return Status::InternalError("invalid parameter."); + } + + *eos = true; + if (_result.routineLoadJobs.empty()) { + return Status::OK(); + } + + return _fill_block_impl(block); +} + +Status SchemaRoutineLoadJobScanner::_fill_block_impl(vectorized::Block* block) { + SCOPED_TIMER(_fill_block_timer); + + const auto& jobs_info = _result.routineLoadJobs; + size_t row_num = jobs_info.size(); + if (row_num == 0) { + return Status::OK(); + } + + for (size_t col_idx = 0; col_idx < _s_tbls_columns.size(); ++col_idx) { + const auto& col_desc = _s_tbls_columns[col_idx]; + + std::vector<StringRef> str_refs(row_num); + std::vector<int32_t> int_vals(row_num); + std::vector<int8_t> bool_vals(row_num); + std::vector<void*> datas(row_num); + std::vector<std::string> column_values(row_num); + + for (size_t row_idx = 0; row_idx < row_num; ++row_idx) { + const auto& job_info = jobs_info[row_idx]; + std::string& column_value = column_values[row_idx]; + + if (col_desc.type == TYPE_STRING) { + switch (col_idx) { + case 0: // JOB_ID + column_value = job_info.__isset.job_id ? job_info.job_id : ""; + break; + case 1: // JOB_NAME + column_value = job_info.__isset.job_name ? job_info.job_name : ""; + break; + case 2: // CREATE_TIME + column_value = job_info.__isset.create_time ? job_info.create_time : ""; + break; + case 3: // PAUSE_TIME + column_value = job_info.__isset.pause_time ? job_info.pause_time : ""; + break; + case 4: // END_TIME + column_value = job_info.__isset.end_time ? job_info.end_time : ""; + break; + case 5: // DB_NAME + column_value = job_info.__isset.db_name ? job_info.db_name : ""; + break; + case 6: // TABLE_NAME + column_value = job_info.__isset.table_name ? job_info.table_name : ""; + break; + case 7: // STATE + column_value = job_info.__isset.state ? job_info.state : ""; + break; + case 8: // CURRENT_TASK_NUM + column_value = + job_info.__isset.current_task_num ? job_info.current_task_num : ""; + break; + case 9: // JOB_PROPERTIES + column_value = job_info.__isset.job_properties ? job_info.job_properties : ""; + break; + case 10: // DATA_SOURCE_PROPERTIES + column_value = job_info.__isset.data_source_properties + ? job_info.data_source_properties + : ""; + break; + case 11: // CUSTOM_PROPERTIES + column_value = + job_info.__isset.custom_properties ? job_info.custom_properties : ""; + break; + case 12: // STATISTIC + column_value = job_info.__isset.statistic ? job_info.statistic : ""; + break; + case 13: // PROGRESS + column_value = job_info.__isset.progress ? job_info.progress : ""; + break; + case 14: // LAG + column_value = job_info.__isset.lag ? job_info.lag : ""; + break; + case 15: // REASON_OF_STATE_CHANGED + column_value = job_info.__isset.reason_of_state_changed + ? job_info.reason_of_state_changed + : ""; + break; + case 16: // ERROR_LOG_URLS + column_value = job_info.__isset.error_log_urls ? job_info.error_log_urls : ""; + break; + case 17: // USER_NAME + column_value = job_info.__isset.user_name ? job_info.user_name : ""; + break; + } + + str_refs[row_idx] = + StringRef(column_values[row_idx].data(), column_values[row_idx].size()); + datas[row_idx] = &str_refs[row_idx]; + } else if (col_desc.type == TYPE_INT) { + int_vals[row_idx] = job_info.__isset.current_abort_task_num + ? job_info.current_abort_task_num + : 0; + datas[row_idx] = &int_vals[row_idx]; + } else if (col_desc.type == TYPE_BOOLEAN) { + bool_vals[row_idx] = + job_info.__isset.is_abnormal_pause ? job_info.is_abnormal_pause : false; + datas[row_idx] = &bool_vals[row_idx]; + } + } + + RETURN_IF_ERROR(fill_dest_column_for_range(block, col_idx, datas)); + } + + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/schema_scanner/schema_routine_load_job_scanner.h b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.h new file mode 100644 index 00000000000..1105328776a --- /dev/null +++ b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.h @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/FrontendService_types.h> + +#include <vector> + +#include "common/status.h" +#include "exec/schema_scanner.h" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +class SchemaRoutineLoadJobScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaRoutineLoadJobScanner); + +public: + SchemaRoutineLoadJobScanner(); + ~SchemaRoutineLoadJobScanner() override; + + Status start(RuntimeState* state) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; + +private: + Status _fill_block_impl(vectorized::Block* block); + + TFetchRoutineLoadJobResult _result; + static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns; +}; + +} // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java index 7ec9e5cab37..5c5bdeb832c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java @@ -90,7 +90,9 @@ public enum SchemaTableType { SCH_CATALOG_META_CACHE_STATISTICS("CATALOG_META_CACHE_STATISTICS", "CATALOG_META_CACHE_STATISTICS", TSchemaTableType.SCH_CATALOG_META_CACHE_STATISTICS), SCH_BACKEND_KERBEROS_TICKET_CACHE("BACKEND_KERBEROS_TICKET_CACHE", "BACKEND_KERBEROS_TICKET_CACHE", - TSchemaTableType.SCH_BACKEND_KERBEROS_TICKET_CACHE); + TSchemaTableType.SCH_BACKEND_KERBEROS_TICKET_CACHE), + SCH_ROUTINE_LOAD_JOB("ROUTINE_LOAD_JOB", "ROUTINE_LOAD_JOB", + TSchemaTableType.SCH_ROUTINE_LOAD_JOB); private static final String dbName = "INFORMATION_SCHEMA"; private static SelectList fullSelectLists; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 75072390821..36992833639 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -597,6 +597,30 @@ public class SchemaTable extends Table { .column("REFRESH_INTERVAL_SECOND", ScalarType.createType(PrimitiveType.BIGINT)) .build()) ) + .put("routine_load_job", + new SchemaTable(SystemIdGenerator.getNextId(), "routine_load_job", TableType.SCHEMA, + builder().column("JOB_ID", ScalarType.createStringType()) + .column("JOB_NAME", ScalarType.createStringType()) + .column("CREATE_TIME", ScalarType.createStringType()) + .column("PAUSE_TIME", ScalarType.createStringType()) + .column("END_TIME", ScalarType.createStringType()) + .column("DB_NAME", ScalarType.createStringType()) + .column("TABLE_NAME", ScalarType.createStringType()) + .column("STATE", ScalarType.createStringType()) + .column("CURRENT_TASK_NUM", ScalarType.createStringType()) + .column("JOB_PROPERTIES", ScalarType.createStringType()) + .column("DATA_SOURCE_PROPERTIES", ScalarType.createStringType()) + .column("CUSTOM_PROPERTIES", ScalarType.createStringType()) + .column("STATISTIC", ScalarType.createStringType()) + .column("PROGRESS", ScalarType.createStringType()) + .column("LAG", ScalarType.createStringType()) + .column("REASON_OF_STATE_CHANGED", ScalarType.createStringType()) + .column("ERROR_LOG_URLS", ScalarType.createStringType()) + .column("USER_NAME", ScalarType.createStringType()) + .column("CURRENT_ABORT_TASK_NUM", ScalarType.createType(PrimitiveType.INT)) + .column("IS_ABNORMAL_PAUSE", ScalarType.createType(PrimitiveType.BOOLEAN)) + .build()) + ) .build(); private boolean fetchAllFe = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 4904b59b5b6..bbf34b93258 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -474,7 +474,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } @Override - protected String getStatistic() { + public String getStatistic() { Map<String, Object> summary = this.jobStatistic.summary(); Gson gson = new GsonBuilder().disableHtmlEscaping().create(); return gson.toJson(summary); @@ -635,7 +635,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } @Override - protected String dataSourcePropertiesJsonToString() { + public String dataSourcePropertiesJsonToString() { Map<String, String> dataSourceProperties = Maps.newHashMap(); dataSourceProperties.put("brokerList", brokerList); dataSourceProperties.put("topic", topic); @@ -647,13 +647,13 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } @Override - protected String customPropertiesJsonToString() { + public String customPropertiesJsonToString() { Gson gson = new GsonBuilder().disableHtmlEscaping().create(); return gson.toJson(customProperties); } @Override - protected Map<String, String> getDataSourceProperties() { + public Map<String, String> getDataSourceProperties() { Map<String, String> dataSourceProperties = Maps.newHashMap(); dataSourceProperties.put("kafka_broker_list", brokerList); dataSourceProperties.put("kafka_topic", topic); @@ -661,7 +661,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } @Override - protected Map<String, String> getCustomProperties() { + public Map<String, String> getCustomProperties() { Map<String, String> ret = new HashMap<>(); customProperties.forEach((k, v) -> ret.put("property." + k, v)); return ret; @@ -910,7 +910,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } @Override - protected String getLag() { + public String getLag() { Map<Integer, Long> partitionIdToOffsetLag = ((KafkaProgress) progress).getLag(cachedPartitionWithLatestOffsets); Gson gson = new Gson(); return gson.toJson(partitionIdToOffsetLag); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 5dbca308f6c..859d5df43e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -502,6 +502,18 @@ public abstract class RoutineLoadJob return dbId; } + public String getCreateTimestampString() { + return TimeUtils.longToTimeString(createTimestamp); + } + + public String getPauseTimestampString() { + return TimeUtils.longToTimeString(pauseTimestamp); + } + + public String getEndTimestampString() { + return TimeUtils.longToTimeString(endTimestamp); + } + public void setOtherMsg(String otherMsg) { writeLock(); try { @@ -555,6 +567,10 @@ public abstract class RoutineLoadJob return endTimestamp; } + public RoutineLoadStatistic getJobStatistic() { + return jobStatistic; + } + public PartitionNames getPartitions() { return partitions; } @@ -790,6 +806,10 @@ public abstract class RoutineLoadJob } } + public Queue<String> getErrorLogUrls() { + return errorLogUrls; + } + // RoutineLoadScheduler will run this method at fixed interval, and renew the timeout tasks public void processTimeoutTasks() { writeLock(); @@ -845,6 +865,11 @@ public abstract class RoutineLoadJob } } + public boolean isAbnormalPause() { + return this.state == JobState.PAUSED && this.pauseReason != null + && this.pauseReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR; + } + // All of private method could not be call without lock private void checkStateTransform(RoutineLoadJob.JobState desireState) throws UserException { switch (state) { @@ -939,6 +964,7 @@ public abstract class RoutineLoadJob this.jobStatistic.currentErrorRows = 0; this.jobStatistic.currentTotalRows = 0; this.otherMsg = ""; + this.jobStatistic.currentAbortedTaskNum = 0; } else if (this.jobStatistic.currentErrorRows > maxErrorNum || (this.jobStatistic.currentTotalRows > 0 && ((double) this.jobStatistic.currentErrorRows @@ -1246,6 +1272,7 @@ public abstract class RoutineLoadJob .build()); } ++this.jobStatistic.abortedTaskNum; + ++this.jobStatistic.currentAbortedTaskNum; TransactionState.TxnStatusChangeReason txnStatusChangeReason = null; if (txnStatusChangeReasonString != null) { txnStatusChangeReason = @@ -1615,9 +1642,20 @@ public abstract class RoutineLoadJob TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason); - protected abstract String getStatistic(); + public abstract String getStatistic(); + + public abstract String getLag(); - protected abstract String getLag(); + public String getStateReason() { + switch (state) { + case PAUSED: + return pauseReason == null ? "" : pauseReason.toString(); + case CANCELLED: + return cancelReason == null ? "" : cancelReason.toString(); + default: + return ""; + } + } public List<String> getShowInfo() { Optional<Database> database = Env.getCurrentInternalCatalog().getDb(dbId); @@ -1647,16 +1685,7 @@ public abstract class RoutineLoadJob row.add(getStatistic()); row.add(getProgress().toJsonString()); row.add(getLag()); - switch (state) { - case PAUSED: - row.add(pauseReason == null ? "" : pauseReason.toString()); - break; - case CANCELLED: - row.add(cancelReason == null ? "" : cancelReason.toString()); - break; - default: - row.add(""); - } + row.add(getStateReason()); row.add(Joiner.on(", ").join(errorLogUrls)); row.add(otherMsg); row.add(userIdentity.getQualifiedUser()); @@ -1819,7 +1848,7 @@ public abstract class RoutineLoadJob } } - private String jobPropertiesToJsonString() { + public String jobPropertiesToJsonString() { Map<String, String> jobProperties = Maps.newHashMap(); jobProperties.put("partitions", partitions == null ? STAR_STRING : Joiner.on(",").join(partitions.getPartitionNames())); @@ -1853,13 +1882,13 @@ public abstract class RoutineLoadJob return gson.toJson(jobProperties); } - abstract String dataSourcePropertiesJsonToString(); + public abstract String dataSourcePropertiesJsonToString(); - abstract String customPropertiesJsonToString(); + public abstract String customPropertiesJsonToString(); - abstract Map<String, String> getDataSourceProperties(); + public abstract Map<String, String> getDataSourceProperties(); - abstract Map<String, String> getCustomProperties(); + public abstract Map<String, String> getCustomProperties(); public boolean isExpired() { if (!isFinal()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index a984a672d34..bfe42ad7695 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -110,6 +110,10 @@ public class RoutineLoadManager implements Writable { public RoutineLoadManager() { } + public List<RoutineLoadJob> getAllRoutineLoadJobs() { + return new ArrayList<>(idToRoutineLoadJob.values()); + } + public List<RoutineLoadJob> getActiveRoutineLoadJobs() { return idToRoutineLoadJob.values().stream() .filter(job -> !job.state.isFinalState()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java index bb3b3e88daa..cf16b45fa13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java @@ -39,7 +39,7 @@ public abstract class RoutineLoadProgress { abstract void update(RLTaskTxnCommitAttachment attachment); - abstract String toJsonString(); + public abstract String toJsonString(); public static RoutineLoadProgress read(DataInput in) throws IOException { RoutineLoadProgress progress = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java index ad10367f982..e5b01c50e26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java @@ -61,6 +61,7 @@ public class RoutineLoadStatistic { public long committedTaskNum = 0; @SerializedName(value = "abortedTaskNum") public long abortedTaskNum = 0; + public int currentAbortedTaskNum = 0; // Save all transactions current running. Including PREPARE, COMMITTED. // No need to persist, only for tracing txn of routine load job. diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 2928d84e549..9c9b2eedc75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -85,6 +85,7 @@ import org.apache.doris.load.StreamLoadHandler; import org.apache.doris.load.routineload.ErrorReason; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.routineload.RoutineLoadJob.JobState; +import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.master.MasterImpl; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -147,6 +148,8 @@ import org.apache.doris.thrift.TDropPlsqlPackageRequest; import org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest; import org.apache.doris.thrift.TFeResult; import org.apache.doris.thrift.TFetchResourceResult; +import org.apache.doris.thrift.TFetchRoutineLoadJobRequest; +import org.apache.doris.thrift.TFetchRoutineLoadJobResult; import org.apache.doris.thrift.TFetchRunningQueriesRequest; import org.apache.doris.thrift.TFetchRunningQueriesResult; import org.apache.doris.thrift.TFetchSchemaTableDataRequest; @@ -228,6 +231,7 @@ import org.apache.doris.thrift.TRestoreSnapshotRequest; import org.apache.doris.thrift.TRestoreSnapshotResult; import org.apache.doris.thrift.TRollbackTxnRequest; import org.apache.doris.thrift.TRollbackTxnResult; +import org.apache.doris.thrift.TRoutineLoadJob; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TSchemaTableName; import org.apache.doris.thrift.TShowProcessListRequest; @@ -266,6 +270,7 @@ import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TransactionStatus; import org.apache.doris.transaction.TxnCommitAttachment; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -4211,4 +4216,55 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setRunningQueries(runningQueries); return result; } + + @Override + public TFetchRoutineLoadJobResult fetchRoutineLoadJob(TFetchRoutineLoadJobRequest request) { + TFetchRoutineLoadJobResult result = new TFetchRoutineLoadJobResult(); + + if (!Env.getCurrentEnv().isReady()) { + return result; + } + + RoutineLoadManager routineLoadManager = Env.getCurrentEnv().getRoutineLoadManager(); + List<TRoutineLoadJob> jobInfos = Lists.newArrayList(); + List<RoutineLoadJob> routineLoadJobs = routineLoadManager.getAllRoutineLoadJobs(); + for (RoutineLoadJob job : routineLoadJobs) { + TRoutineLoadJob jobInfo = new TRoutineLoadJob(); + jobInfo.setJobId(String.valueOf(job.getId())); + jobInfo.setJobName(job.getName()); + jobInfo.setCreateTime(job.getCreateTimestampString()); + jobInfo.setPauseTime(job.getPauseTimestampString()); + jobInfo.setEndTime(job.getEndTimestampString()); + String dbName = ""; + String tableName = ""; + try { + dbName = job.getDbFullName(); + tableName = job.getTableName(); + } catch (MetaNotFoundException e) { + LOG.warn("Failed to get db or table name for routine load job: {}", job.getId(), e); + } + jobInfo.setDbName(dbName); + jobInfo.setTableName(tableName); + jobInfo.setState(job.getState().name()); + jobInfo.setCurrentTaskNum(String.valueOf(job.getSizeOfRoutineLoadTaskInfoList())); + jobInfo.setJobProperties(job.jobPropertiesToJsonString()); + jobInfo.setDataSourceProperties(job.dataSourcePropertiesJsonToString()); + jobInfo.setCustomProperties(job.customPropertiesJsonToString()); + jobInfo.setStatistic(job.getStatistic()); + jobInfo.setProgress(job.getProgress().toJsonString()); + jobInfo.setLag(job.getLag()); + jobInfo.setReasonOfStateChanged(job.getStateReason()); + jobInfo.setErrorLogUrls(Joiner.on(", ").join(job.getErrorLogUrls())); + jobInfo.setUserName(job.getUserIdentity().getQualifiedUser()); + jobInfo.setCurrentAbortTaskNum(job.getJobStatistic().currentAbortedTaskNum); + jobInfo.setIsAbnormalPause(job.isAbnormalPause()); + jobInfos.add(jobInfo); + } + if (LOG.isDebugEnabled()) { + LOG.debug("routine load job infos: {}", jobInfos); + } + result.setRoutineLoadJobs(jobInfos); + + return result; + } } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index f4070114617..502246052da 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -139,7 +139,8 @@ enum TSchemaTableType { SCH_TABLE_PROPERTIES = 50, SCH_FILE_CACHE_STATISTICS = 51, SCH_CATALOG_META_CACHE_STATISTICS = 52, - SCH_BACKEND_KERBEROS_TICKET_CACHE = 53; + SCH_BACKEND_KERBEROS_TICKET_CACHE = 53, + SCH_ROUTINE_LOAD_JOB = 54; } enum THdfsCompression { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 7a13bd504cb..83774eda5de 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1557,6 +1557,36 @@ struct TFetchRunningQueriesResult { struct TFetchRunningQueriesRequest { } +struct TFetchRoutineLoadJobRequest { +} + +struct TRoutineLoadJob { + 1: optional string job_id + 2: optional string job_name + 3: optional string create_time + 4: optional string pause_time + 5: optional string end_time + 6: optional string db_name + 7: optional string table_name + 8: optional string state + 9: optional string current_task_num + 10: optional string job_properties + 11: optional string data_source_properties + 12: optional string custom_properties + 13: optional string statistic + 14: optional string progress + 15: optional string lag + 16: optional string reason_of_state_changed + 17: optional string error_log_urls + 18: optional string user_name + 19: optional i32 current_abort_task_num + 20: optional bool is_abnormal_pause +} + +struct TFetchRoutineLoadJobResult { + 1: optional list<TRoutineLoadJob> routineLoadJobs +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1655,4 +1685,6 @@ service FrontendService { Status.TStatus updatePartitionStatsCache(1: TUpdateFollowerPartitionStatsCacheRequest request) TFetchRunningQueriesResult fetchRunningQueries(1: TFetchRunningQueriesRequest request) + + TFetchRoutineLoadJobResult fetchRoutineLoadJob(1: TFetchRoutineLoadJobRequest request) } diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy new file mode 100644 index 00000000000..3fa360bf281 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +import groovy.json.JsonSlurper + +suite("test_routine_load_job_info_system_table","p0") { + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + def jobName = "test_job_info_system_table_invaild" + def tableName = "test_job_info_system_table" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} + ( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL, + kd01 BOOLEAN NOT NULL DEFAULT "TRUE", + kd02 TINYINT NOT NULL DEFAULT "1", + kd03 SMALLINT NOT NULL DEFAULT "2", + kd04 INT NOT NULL DEFAULT "3", + kd05 BIGINT NOT NULL DEFAULT "4", + kd06 LARGEINT NOT NULL DEFAULT "5", + kd07 FLOAT NOT NULL DEFAULT "6.0", + kd08 DOUBLE NOT NULL DEFAULT "7.0", + kd09 DECIMAL NOT NULL DEFAULT "888888888", + kd10 DECIMALV3 NOT NULL DEFAULT "999999999", + kd11 DATE NOT NULL DEFAULT "2023-08-24", + kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00", + kd13 DATEV2 NOT NULL DEFAULT "2023-08-24", + kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00", + kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd18 JSON NULL, + + INDEX idx_inverted_k104 (`k05`) USING INVERTED, + INDEX idx_inverted_k110 (`k11`) USING INVERTED, + INDEX idx_inverted_k113 (`k13`) USING INVERTED, + INDEX idx_inverted_k114 (`k14`) USING INVERTED, + INDEX idx_inverted_k117 (`k17`) USING INVERTED PROPERTIES("parser" = "english"), + INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_bitmap_k104 (`k02`) USING BITMAP, + INDEX idx_bitmap_k110 (`kd01`) USING BITMAP + + ) + DUPLICATE KEY(k00) + PARTITION BY RANGE(k01) + ( + PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')), + PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')), + PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01')) + ) + DISTRIBUTED BY HASH(k00) BUCKETS 32 + PROPERTIES ( + "bloom_filter_columns"="k05", + "replication_num" = "1" + ); + """ + sql """ + CREATE ROUTINE LOAD ${jobName} on ${tableName} + COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18), + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "test_job_info_system_table_invaild", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + def count = 0 + while (true) { + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (state[0][8] == "PAUSED") { + break + } + if (count >= 30) { + assertEquals(1, 2) + break + } + sleep(1000) + count++ + } + def res = sql "SELECT JOB_NAME FROM information_schema.routine_load_job WHERE CURRENT_ABORT_TASK_NUM > 0 OR IS_ABNORMAL_PAUSE = TRUE" + log.info("res: ${res}".toString()) + assertTrue(res.toString().contains("${jobName}")) + } finally { + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org