This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b51ce415e7 [Feature](load) Add submitter and comments to load job (#16878) b51ce415e7 is described below commit b51ce415e7d5ad19ab5d4cc1aa0e3a331d8d8818 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Tue Feb 28 09:06:19 2023 +0800 [Feature](load) Add submitter and comments to load job (#16878) * [Feature](load) Add submitter and comments to load job --- be/src/http/action/stream_load.cpp | 4 +- be/src/http/http_common.h | 1 + be/src/runtime/stream_load/stream_load_context.cpp | 16 ++++++ be/src/runtime/stream_load/stream_load_context.h | 1 + .../Load/BROKER-LOAD.md | 8 ++- .../Load/CREATE-ROUTINE-LOAD.md | 6 +- .../Load/STREAM-LOAD.md | 1 + .../Show-Statements/SHOW-CREATE-LOAD.md | 60 +++++++++++++++++++ docs/sidebars.json | 1 + .../Load/BROKER-LOAD.md | 5 +- .../Load/CREATE-ROUTINE-LOAD.md | 5 +- .../Load/STREAM-LOAD.md | 1 + .../Show-Statements/SHOW-CREATE-LOAD.md | 59 +++++++++++++++++++ .../org/apache/doris/common/FeMetaVersion.java | 5 +- fe/fe-core/src/main/cup/sql_parser.cup | 24 ++++++-- .../doris/analysis/CreateRoutineLoadStmt.java | 12 +++- .../java/org/apache/doris/analysis/LoadStmt.java | 29 +++++++++- .../apache/doris/analysis/ShowCreateLoadStmt.java | 67 ++++++++++++++++++++++ .../apache/doris/analysis/ShowRoutineLoadStmt.java | 2 + .../apache/doris/analysis/ShowStreamLoadStmt.java | 4 +- .../org/apache/doris/common/proc/LoadProcDir.java | 2 +- .../org/apache/doris/httpv2/rest/MultiAction.java | 2 +- .../src/main/java/org/apache/doris/load/Load.java | 4 ++ .../main/java/org/apache/doris/load/LoadJob.java | 39 ++++++++++--- .../org/apache/doris/load/StreamLoadRecord.java | 9 ++- .../org/apache/doris/load/StreamLoadRecordMgr.java | 5 +- .../org/apache/doris/load/loadv2/BulkLoadJob.java | 15 ++--- .../apache/doris/load/loadv2/InsertLoadJob.java | 5 +- .../java/org/apache/doris/load/loadv2/LoadJob.java | 42 ++++++++++++++ .../org/apache/doris/load/loadv2/LoadManager.java | 43 +++++++++++++- .../doris/load/routineload/RoutineLoadJob.java | 15 ++++- .../doris/load/routineload/RoutineLoadManager.java | 1 + .../java/org/apache/doris/qe/MultiLoadMgr.java | 10 +++- .../java/org/apache/doris/qe/ShowExecutor.java | 25 +++++++- .../java/org/apache/doris/qe/StmtExecutor.java | 2 +- .../apache/doris/analysis/CancelLoadStmtTest.java | 7 ++- .../doris/analysis/CreateRoutineLoadStmtTest.java | 4 +- .../org/apache/doris/analysis/LoadStmtTest.java | 8 +-- .../doris/load/loadv2/InsertLoadJobTest.java | 4 +- .../apache/doris/load/loadv2/LoadManagerTest.java | 6 +- .../load/routineload/KafkaRoutineLoadJobTest.java | 2 +- .../doris/load/routineload/RoutineLoadJobTest.java | 13 ++++- .../load/routineload/RoutineLoadManagerTest.java | 4 +- gensrc/thrift/BackendService.thrift | 1 + 44 files changed, 511 insertions(+), 68 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index c0fc86b68b..7340e6c10c 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -331,7 +331,9 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea return Status::InvalidArgument("Invalid timeout format"); } } - + if (!http_req->header(HTTP_COMMENT).empty()) { + ctx->load_comment = http_req->header(HTTP_COMMENT); + } // begin transaction int64_t begin_txn_start_time = MonotonicNanos(); RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get())); diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index f3d7caa06a..6dbf93609e 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -53,6 +53,7 @@ static const std::string HTTP_LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns"; static const std::string HTTP_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; static const std::string HTTP_SKIP_LINES = "skip_lines"; +static const std::string HTTP_COMMENT = "comment"; static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; static const std::string HTTP_TXN_ID_KEY = "txn_id"; diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index 93544cfbe3..1f3ecd4372 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -35,6 +35,10 @@ std::string StreamLoadContext::to_json() const { writer.Key("Label"); writer.String(label.c_str()); + // comment + writer.Key("Comment"); + writer.String(load_comment.c_str()); + writer.Key("TwoPhaseCommit"); std::string need_two_phase_commit = two_phase_commit ? "true" : "false"; writer.String(need_two_phase_commit.c_str()); @@ -136,6 +140,12 @@ std::string StreamLoadContext::prepare_stream_load_record(const std::string& str document.AddMember("ClientIp", client_ip_value, allocator); } + rapidjson::Value comment_value(rapidjson::kStringType); + comment_value.SetString(load_comment.c_str(), load_comment.size()); + if (!comment_value.IsNull()) { + document.AddMember("Comment", comment_value, allocator); + } + document.AddMember("StartTime", start_millis, allocator); document.AddMember("FinishTime", start_millis + load_cost_millis, allocator); rapidjson::StringBuffer buffer; @@ -246,6 +256,12 @@ void StreamLoadContext::parse_stream_load_record(const std::string& stream_load_ ss << ", FinishTime: " << finish_time.GetInt64(); } + if (document.HasMember("Comment")) { + const rapidjson::Value& comment_value = document["Comment"]; + stream_load_item.__set_comment(comment_value.GetString()); + ss << ", Comment: " << comment_value.GetString(); + } + VLOG(1) << "parse json from rocksdb. " << ss.str(); } diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 8951630afb..d12148b527 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -132,6 +132,7 @@ public: int32_t timeout_second = -1; AuthInfo auth; bool two_phase_commit = false; + std::string load_comment; // the following members control the max progress of a consuming // process. if any of them reach, the consuming will finish. diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md index 6665b87c83..6745507aac 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md @@ -41,8 +41,9 @@ data_desc1[, data_desc2, ...] ) WITH BROKER broker_name [broker_properties] -[load_properties]; -```` +[load_properties] +[COMMENT "comment"]; +``` - `load_label` @@ -185,7 +186,8 @@ WITH BROKER broker_name - `load_to_single_tablet` Boolean type, True means that one task can only load data to one tablet in the corresponding partition at a time. The default value is false. The number of tasks for the job depends on the overall concurrency. This parameter can only be set when loading data into the OLAP table with random partition. - +- <version since="1.2.3" type="inline"> comment </version> + Specify the comment for the import job. The comment can be viewed in the `show load` statement. ### Example 1. Import a batch of data from HDFS diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md index 7fc7401853..052a7ae9e8 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md @@ -44,7 +44,8 @@ CREATE ROUTINE LOAD [db.]job_name ON tbl_name [load_properties] [job_properties] FROM data_source [data_source_properties] -```` +[COMMENT "comment"] +``` - `[db.]job_name` @@ -325,7 +326,8 @@ FROM data_source [data_source_properties] ````text "property.kafka_default_offsets" = "OFFSET_BEGINNING" ```` - +- <version since="1.2.3" type="inline"> comment </version> + Comment for the routine load job. ### Example 1. Create a Kafka routine import task named test1 for example_tbl of example_db. Specify the column separator and group.id and client.id, and automatically consume all partitions by default, and start subscribing from the location where there is data (OFFSET_BEGINNING) diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md index 7f3cede342..d61dd1047b 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md @@ -183,6 +183,7 @@ ERRORS: 25. trim_double_quotes: Boolean type, The default value is false. True means that the outermost double quotes of each field in the csv file are trimmed. 26. skip_lines: <version since="dev" type="inline"> Integer type, the default value is 0. It will skip some lines in the head of csv file. It will be disabled when format is `csv_with_names` or `csv_with_names_and_types`. </version> +27. comment: <version since="1.2.3" type="inline"> String type, the default value is "". </version> ### Example diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-CREATE-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-CREATE-LOAD.md new file mode 100644 index 0000000000..2484e8d64c --- /dev/null +++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-CREATE-LOAD.md @@ -0,0 +1,60 @@ +--- +{ + "title": "SHOW-CREATE-LOAD", + "language": "en" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## SHOW-CREATE-LOAD + +### Name + +SHOW CREATE LOAD + +### Description + +This statement is used to demonstrate the creation statement of a import job. + +grammar: + +```sql +SHOW CREATE LOAD for load_name; +```` + +illustrate: + +1`load_name`: import job name + +### Example + +1. Show the creation statement of the specified import job under the default db + + ```sql + SHOW CREATE LOAD for test_load + ```` + +### Keywords + + SHOW, CREATE, LOAD + +### Best Practice + diff --git a/docs/sidebars.json b/docs/sidebars.json index ed35625e5b..1c3f2db74f 100644 --- a/docs/sidebars.json +++ b/docs/sidebars.json @@ -887,6 +887,7 @@ "sql-manual/sql-reference/Show-Statements/SHOW-CREATE-CATALOG", "sql-manual/sql-reference/Show-Statements/SHOW-CREATE-DATABASE", "sql-manual/sql-reference/Show-Statements/SHOW-CREATE-MATERIALIZED-VIEW", + "sql-manual/sql-reference/Show-Statements/SHOW-CREATE-LOAD", "sql-manual/sql-reference/Show-Statements/SHOW-CREATE-ROUTINE-LOAD", "sql-manual/sql-reference/Show-Statements/SHOW-CREATE-FUNCTION", "sql-manual/sql-reference/Show-Statements/SHOW-COLUMNS", diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md index 03f73e5578..40836f572e 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md @@ -41,7 +41,8 @@ data_desc1[, data_desc2, ...] ) WITH BROKER broker_name [broker_properties] -[load_properties]; +[load_properties] +[COMMENT "comments"]; ``` - `load_label` @@ -185,6 +186,8 @@ WITH BROKER broker_name 布尔类型,为true表示支持一个任务只导入数据到对应分区的一个tablet,默认值为false,作业的任务数取决于整体并发度。该参数只允许在对带有random分区的olap表导数的时候设置。 +- <version since="1.2.3" type="inline"> comment </version> + - 指定导入任务的备注信息。可选参数。 ### Example 1. 从 HDFS 导入一批数据 diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md index a0d86f0000..0bf6bff7e4 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md @@ -45,6 +45,8 @@ CREATE ROUTINE LOAD [db.]job_name ON tbl_name [load_properties] [job_properties] FROM data_source [data_source_properties] +[COMMENT "comment"] +``` ``` - `[db.]job_name` @@ -328,7 +330,8 @@ FROM data_source [data_source_properties] ```text "property.kafka_default_offsets" = "OFFSET_BEGINNING" ``` - +- <version since="1.2.3" type="inline"> comment </version> + - 例行导入任务的注释信息。 ### Example 1. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。指定列分隔符和 group.id 和 client.id,并且自动默认消费所有分区,且从有数据的位置(OFFSET_BEGINNING)开始订阅 diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md index 8a87915c02..cc18014745 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md @@ -180,6 +180,7 @@ ERRORS: 25. trim_double_quotes: 布尔类型,默认值为 false,为 true 时表示裁剪掉 csv 文件每个字段最外层的双引号。 26. skip_lines: <version since="dev" type="inline"> 整数类型, 默认值为0, 含义为跳过csv文件的前几行. 当设置format设置为 `csv_with_names` 或、`csv_with_names_and_types` 时, 该参数会失效. </version> +27. comment: <version since="1.2.3" type="inline"> 字符串类型, 默认值为空. 给任务增加额外的信息. </version> ### Example 1. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表,使用Label用于去重。指定超时时间为 100 秒 diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-CREATE-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-CREATE-LOAD.md new file mode 100644 index 0000000000..9104cb91b3 --- /dev/null +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-CREATE-LOAD.md @@ -0,0 +1,59 @@ +--- +{ + "title": "SHOW-CREATE-LOAD", + "language": "zh-CN" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## SHOW-CREATE-LOAD + +### Name + +SHOW CREATE LOAD + +### Description + +该语句用于展示导入作业的创建语句. + +语法: + +```sql +SHOW CREATE LOAD for load_name; +``` + +说明: + 1. `load_name`: 例行导入作业名称 + +### Example + +1. 展示默认db下指定导入作业的创建语句 + + ```sql + SHOW CREATE LOAD for test_load + ``` + +### Keywords + + SHOW, CREATE, LOAD + +### Best Practice + diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index c0a214b166..5ebc0d14e7 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -52,8 +52,11 @@ public final class FeMetaVersion { public static final int VERSION_115 = 115; // change Auto to rbac public static final int VERSION_116 = 116; + // add user and comment to load job + public static final int VERSION_117 = 117; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_116; + + public static final int VERSION_CURRENT = VERSION_117; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 823c7b89e0..e252a8dd70 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -642,7 +642,7 @@ terminal String COMMENTED_PLAN_HINTS; nonterminal List<StatementBase> stmts; nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt, create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt, - show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt, + show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt, show_create_load_stmt, describe_stmt, alter_stmt, use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt, link_stmt, migrate_stmt, switch_stmt, enter_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, @@ -1097,6 +1097,8 @@ stmt ::= {: RESULT = stmt; :} | show_create_routine_load_stmt : stmt {: RESULT = stmt; :} + | show_create_load_stmt : stmt + {: RESULT = stmt; :} | cancel_stmt : stmt {: RESULT = stmt; :} | delete_stmt : stmt @@ -2270,19 +2272,21 @@ load_stmt ::= opt_broker:broker opt_system:system opt_properties:properties + opt_comment:comment {: - RESULT = new LoadStmt(label, dataDescList, broker, system, properties); + RESULT = new LoadStmt(label, dataDescList, broker, system, properties, comment); :} | KW_LOAD KW_LABEL job_label:label LPAREN data_desc_list:dataDescList RPAREN resource_desc:resource opt_properties:properties + opt_comment:comment {: - RESULT = new LoadStmt(label, dataDescList, resource, properties); + RESULT = new LoadStmt(label, dataDescList, resource, properties, comment); :} - | KW_LOAD mysql_data_desc:desc opt_properties:properties + | KW_LOAD mysql_data_desc:desc opt_properties:properties opt_comment:comment {: - RESULT = new LoadStmt(desc, properties); + RESULT = new LoadStmt(desc, properties, comment); :} ; @@ -2594,9 +2598,10 @@ create_routine_load_stmt ::= opt_load_property_list:loadPropertyList opt_properties:properties KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN + opt_comment:comment {: RESULT = new CreateRoutineLoadStmt(jobLabel, tableName, loadPropertyList, - properties, type, customProperties, mergeType); + properties, type, customProperties, mergeType, comment); :} ; @@ -2755,6 +2760,13 @@ show_create_routine_load_stmt ::= :} ; +show_create_load_stmt ::= + KW_SHOW KW_CREATE KW_LOAD KW_FOR job_label:jobLabel + {: + RESULT = new ShowCreateLoadStmt(jobLabel); + :} + ; + // analyze statment analyze_stmt ::= KW_ANALYZE KW_TABLE table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 3e6dec508e..31df659216 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -172,6 +172,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { private boolean numAsString = false; private boolean fuzzyParse = false; + private String comment = ""; + private LoadTask.MergeType mergeType; public static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v) -> v > 0L; @@ -184,7 +186,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { public CreateRoutineLoadStmt(LabelName labelName, String tableName, List<ParseNode> loadPropertyList, Map<String, String> jobProperties, String typeName, - Map<String, String> dataSourceProperties, LoadTask.MergeType mergeType) { + Map<String, String> dataSourceProperties, LoadTask.MergeType mergeType, + String comment) { this.labelName = labelName; this.tableName = tableName; this.loadPropertyList = loadPropertyList; @@ -192,6 +195,9 @@ public class CreateRoutineLoadStmt extends DdlStmt { this.typeName = typeName.toUpperCase(); this.dataSourceProperties = new RoutineLoadDataSourceProperties(this.typeName, dataSourceProperties, false); this.mergeType = mergeType; + if (comment != null) { + this.comment = comment; + } } public String getName() { @@ -302,6 +308,10 @@ public class CreateRoutineLoadStmt extends DdlStmt { return this.dataSourceProperties.isOffsetsForTimes(); } + public String getComment() { + return comment; + } + @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 600deefb6f..1e0ea08572 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -121,6 +121,8 @@ public class LoadStmt extends DdlStmt { public static final String KEY_SKIP_LINES = "skip_lines"; public static final String KEY_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; + public static final String KEY_COMMENT = "comment"; + private final LabelName label; private final List<DataDescription> dataDescriptions; private final BrokerDesc brokerDesc; @@ -133,6 +135,8 @@ public class LoadStmt extends DdlStmt { private EtlJobType etlJobType = EtlJobType.UNKNOWN; + private String comment; + public static final ImmutableMap<String, Function> PROPERTIES_MAP = new ImmutableMap.Builder<String, Function>() .put(TIMEOUT_PROPERTY, new Function<String, Long>() { @Override @@ -208,7 +212,7 @@ public class LoadStmt extends DdlStmt { }) .build(); - public LoadStmt(DataDescription dataDescription, Map<String, String> properties) { + public LoadStmt(DataDescription dataDescription, Map<String, String> properties, String comment) { this.label = new LabelName(); this.dataDescriptions = Lists.newArrayList(dataDescription); this.brokerDesc = null; @@ -217,10 +221,15 @@ public class LoadStmt extends DdlStmt { this.properties = properties; this.user = null; this.isMysqlLoad = true; + if (comment != null) { + this.comment = comment; + } else { + this.comment = ""; + } } public LoadStmt(LabelName label, List<DataDescription> dataDescriptions, - BrokerDesc brokerDesc, String cluster, Map<String, String> properties) { + BrokerDesc brokerDesc, String cluster, Map<String, String> properties, String comment) { this.label = label; this.dataDescriptions = dataDescriptions; this.brokerDesc = brokerDesc; @@ -228,10 +237,15 @@ public class LoadStmt extends DdlStmt { this.resourceDesc = null; this.properties = properties; this.user = null; + if (comment != null) { + this.comment = comment; + } else { + this.comment = ""; + } } public LoadStmt(LabelName label, List<DataDescription> dataDescriptions, - ResourceDesc resourceDesc, Map<String, String> properties) { + ResourceDesc resourceDesc, Map<String, String> properties, String comment) { this.label = label; this.dataDescriptions = dataDescriptions; this.brokerDesc = null; @@ -239,6 +253,11 @@ public class LoadStmt extends DdlStmt { this.resourceDesc = resourceDesc; this.properties = properties; this.user = null; + if (comment != null) { + this.comment = comment; + } else { + this.comment = ""; + } } public LabelName getLabel() { @@ -450,6 +469,10 @@ public class LoadStmt extends DdlStmt { user = ConnectContext.get().getQualifiedUser(); } + public String getComment() { + return comment; + } + @Override public boolean needAuditEncryption() { if (brokerDesc != null || resourceDesc != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateLoadStmt.java new file mode 100644 index 0000000000..346fb107c9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateLoadStmt.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ShowResultSetMetaData; + +// SHOW CREATE LOAD statement. +public class ShowCreateLoadStmt extends ShowStmt { + + private static final ShowResultSetMetaData META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("JobId", ScalarType.createVarchar(128))) + .addColumn(new Column("CreateStmt", ScalarType.createVarchar(65535))) + .build(); + + private final LabelName labelName; + + public ShowCreateLoadStmt(LabelName labelName) { + this.labelName = labelName; + } + + public String getDb() { + return labelName.getDbName(); + } + + public String getLabel() { + return labelName.getLabelName(); + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN) + && !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), + PrivPredicate.OPERATOR)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN/OPERATOR"); + } + labelName.analyze(analyzer); + } + + @Override + public ShowResultSetMetaData getMetaData() { + return META_DATA; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java index 3d3afea86a..862858a684 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java @@ -86,6 +86,8 @@ public class ShowRoutineLoadStmt extends ShowStmt { .add("ReasonOfStateChanged") .add("ErrorLogUrls") .add("OtherMsg") + .add("User") + .add("Comment") .build(); private final LabelName labelName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowStreamLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowStreamLoadStmt.java index 9ae208e936..560a11c68c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowStreamLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowStreamLoadStmt.java @@ -62,10 +62,10 @@ public class ShowStreamLoadStmt extends ShowStmt { private ArrayList<OrderByPair> orderByPairs; private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() - .add("Label").add("Db").add("Table").add("User") + .add("Label").add("Db").add("Table") .add("ClientIp").add("Status").add("Message").add("Url").add("TotalRows") .add("LoadedRows").add("FilteredRows").add("UnselectedRows").add("LoadBytes") - .add("StartTime").add("FinishTime") + .add("StartTime").add("FinishTime").add("User").add("Comment") .build(); public ShowStreamLoadStmt(String db, Expr labelExpr, diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java index fa4ce9567b..c2cd21d815 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java @@ -35,7 +35,7 @@ public class LoadProcDir implements ProcDirInterface { .add("JobId").add("Label").add("State").add("Progress") .add("Type").add("EtlInfo").add("TaskInfo").add("ErrorMsg").add("CreateTime") .add("EtlStartTime").add("EtlFinishTime").add("LoadStartTime").add("LoadFinishTime") - .add("URL").add("JobDetails").add("TransactionId").add("ErrorTablets") + .add("URL").add("JobDetails").add("TransactionId").add("ErrorTablets").add("User").add("Comment") .build(); // label and state column index of result diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/MultiAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/MultiAction.java index 1b755f18bf..14d79ab15c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/MultiAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/MultiAction.java @@ -130,7 +130,7 @@ public class MultiAction extends RestBaseController { } Map<String, String> properties = Maps.newHashMap(); - String[] keys = {LoadStmt.TIMEOUT_PROPERTY, LoadStmt.MAX_FILTER_RATIO_PROPERTY}; + String[] keys = {LoadStmt.TIMEOUT_PROPERTY, LoadStmt.MAX_FILTER_RATIO_PROPERTY, LoadStmt.KEY_COMMENT}; for (String key : keys) { String value = request.getParameter(key); if (!Strings.isNullOrEmpty(value)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 180ccd12a0..c6d08b3054 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -1494,6 +1494,10 @@ public class Load { jobInfo.add(loadJob.getTransactionId()); // error tablets(not used for hadoop load, just return an empty string) jobInfo.add(""); + // user + jobInfo.add(loadJob.getUser()); + // comment + jobInfo.add(loadJob.getComment()); loadJobInfos.add(jobInfo); } // end for loadJobs diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java index 26b376beb8..97c40c4a1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java @@ -29,6 +29,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Replica; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.load.FailMsg.CancelType; @@ -125,6 +126,10 @@ public class LoadJob implements Writable { private long execMemLimit; + private String user = ""; + + private String comment = ""; + // save table names for auth check private Set<String> tableNames = Sets.newHashSet(); @@ -286,6 +291,22 @@ public class LoadJob implements Writable { } } + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } + public long getLoadStartTimeMs() { return loadStartTimeMs; } @@ -778,6 +799,8 @@ public class LoadJob implements Writable { for (String tableName : tableNames) { Text.writeString(out, tableName); } + Text.writeString(out, user); + Text.writeString(out, comment); } public void readFields(DataInput in) throws IOException { @@ -859,15 +882,6 @@ public class LoadJob implements Writable { resourceInfo = new TResourceInfo(user, group); } - if (version >= 3 && version < 7) { - // CHECKSTYLE OFF - // bos 3 parameters - String bosEndpoint = Text.readString(in); - String bosAccessKey = Text.readString(in); - String bosSecretAccessKey = Text.readString(in); - // CHECKSTYLE ON - } - this.priority = TPriority.valueOf(Text.readString(in)); // Broker description @@ -913,6 +927,13 @@ public class LoadJob implements Writable { for (int i = 0; i < size; i++) { tableNames.add(Text.readString(in)); } + if (version >= FeMetaVersion.VERSION_117) { + this.user = Text.readString(in); + this.comment = Text.readString(in); + } else { + this.user = ""; + this.comment = ""; + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecord.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecord.java index 9ce323c914..ecbaa2f48e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecord.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecord.java @@ -41,11 +41,12 @@ public class StreamLoadRecord { private String loadBytes; private String startTime; private String finishTime; + private String comment; - public StreamLoadRecord(String label, String db, String table, String user, String clientIp, String status, + public StreamLoadRecord(String label, String db, String table, String clientIp, String status, String message, String url, String totalRows, String loadedRows, String filteredRows, String unselectedRows, - String loadBytes, String startTime, String finishTime) { + String loadBytes, String startTime, String finishTime, String user, String comment) { this.label = label; this.db = db; this.table = table; @@ -61,6 +62,7 @@ public class StreamLoadRecord { this.loadBytes = loadBytes; this.startTime = startTime; this.finishTime = finishTime; + this.comment = comment; } public List<Comparable> getStreamLoadInfo() { @@ -68,7 +70,6 @@ public class StreamLoadRecord { streamLoadInfo.add(this.label); streamLoadInfo.add(this.db); streamLoadInfo.add(this.table); - streamLoadInfo.add(this.user); streamLoadInfo.add(this.clientIp); streamLoadInfo.add(this.status); streamLoadInfo.add(this.message); @@ -80,6 +81,8 @@ public class StreamLoadRecord { streamLoadInfo.add(this.loadBytes); streamLoadInfo.add(this.startTime); streamLoadInfo.add(this.finishTime); + streamLoadInfo.add(this.user); + streamLoadInfo.add(this.comment); return streamLoadInfo; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java index d7815088d2..2884c8e609 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java @@ -287,13 +287,14 @@ public class StreamLoadRecordMgr extends MasterDaemon { } StreamLoadRecord streamLoadRecord = new StreamLoadRecord(streamLoadItem.getLabel(), streamLoadItem.getDb(), - streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(), + streamLoadItem.getTbl(), streamLoadItem.getUserIp(), streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), String.valueOf(streamLoadItem.getTotalRows()), String.valueOf(streamLoadItem.getLoadedRows()), String.valueOf(streamLoadItem.getFilteredRows()), String.valueOf(streamLoadItem.getUnselectedRows()), - String.valueOf(streamLoadItem.getLoadBytes()), startTime, finishTime); + String.valueOf(streamLoadItem.getLoadBytes()), + startTime, finishTime, streamLoadItem.getUser(), streamLoadItem.getComment()); String cluster = streamLoadItem.getCluster(); if (Strings.isNullOrEmpty(cluster)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 602fd7a42d..d54a95e6b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -29,6 +29,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DebugUtil; @@ -82,8 +83,6 @@ public abstract class BulkLoadJob extends LoadJob { // the expr of columns will be reanalyze when the log is replayed private OriginStatement originStmt; - private UserIdentity userInfo; - // include broker desc and data desc protected BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo(); protected List<TabletCommitInfo> commitInfos = Lists.newArrayList(); @@ -138,6 +137,7 @@ public abstract class BulkLoadJob extends LoadJob { default: throw new DdlException("Unknown load job type."); } + bulkLoadJob.setComment(stmt.getComment()); bulkLoadJob.setJobProperties(stmt.getProperties()); bulkLoadJob.checkAndSetDataSourceInfo((Database) db, stmt.getDataDescriptions()); return bulkLoadJob; @@ -297,7 +297,6 @@ public abstract class BulkLoadJob extends LoadJob { super.write(out); brokerDesc.write(out); originStmt.write(out); - userInfo.write(out); out.writeInt(sessionVariables.size()); for (Map.Entry<String, String> entry : sessionVariables.entrySet()) { @@ -315,12 +314,14 @@ public abstract class BulkLoadJob extends LoadJob { brokerDesc = BrokerDesc.read(in); originStmt = OriginStatement.read(in); // The origin stmt does not be analyzed in here. - // The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName. + // The reason is that it will throw MetaNotFoundException when the tableId could not be found by tableName. // The origin stmt will be analyzed after the replay is completed. - userInfo = UserIdentity.read(in); - // must set is as analyzed, because when write the user info to meta image, it will be checked. - userInfo.setIsAnalyzed(); + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_117) { + userInfo = UserIdentity.read(in); + // must set is as analyzed, because when write the user info to meta image, it will be checked. + userInfo.setIsAnalyzed(); + } int size = in.readInt(); for (int i = 0; i < size; i++) { String key = Text.readString(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java index 91e14e0040..bb3a09f451 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java @@ -17,6 +17,7 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.AuthorizationInfo; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -50,7 +51,8 @@ public class InsertLoadJob extends LoadJob { } public InsertLoadJob(String label, long transactionId, long dbId, long tableId, - long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException { + long createTimestamp, String failMsg, String trackingUrl, + UserIdentity userInfo) throws MetaNotFoundException { super(EtlJobType.INSERT, dbId, label); this.tableId = tableId; this.transactionId = transactionId; @@ -67,6 +69,7 @@ public class InsertLoadJob extends LoadJob { } this.authorizationInfo = gatherAuthInfo(); this.loadingStatus.setTrackingUrl(trackingUrl); + this.userInfo = userInfo; } public AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 58b3409152..2916f2e713 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -18,6 +18,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.AuthorizationInfo; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -28,6 +29,7 @@ import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; @@ -130,6 +132,11 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected List<ErrorTabletInfo> errorTabletInfos = Lists.newArrayList(); + protected UserIdentity userInfo; + + protected String comment = ""; + + public static class LoadStatistic { // number of rows processed on BE, this number will be updated periodically by query report. // A load job may has several load tasks(queries), and each task has several fragments. @@ -379,6 +386,22 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements } } + public UserIdentity getUserInfo() { + return userInfo; + } + + public void setUserInfo(UserIdentity userInfo) { + this.userInfo = userInfo; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } + private void initDefaultJobProperties() { long timeout = Config.broker_load_default_timeout_second; switch (jobType) { @@ -789,6 +812,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements jobInfo.add(transactionId); // error tablets jobInfo.add(errorTabletsToJson()); + // user + jobInfo.add(userInfo.getQualifiedUser()); + // comment + jobInfo.add(comment); return jobInfo; } finally { readUnlock(); @@ -1034,6 +1061,13 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements Text.writeString(out, entry.getKey()); Text.writeString(out, String.valueOf(entry.getValue())); } + if (userInfo == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + userInfo.write(out); + } + Text.writeString(out, comment); } public void readFields(DataInput in) throws IOException { @@ -1077,6 +1111,14 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements // should not happen throw new IOException("failed to replay job property", e); } + if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_117) { + if (in.readBoolean()) { + userInfo = UserIdentity.read(in); + // must set is as analyzed, because when write the user info to meta image, it will be checked. + userInfo.setIsAnalyzed(); + } + comment = Text.readString(in); + } } public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 75d65b6225..1eb5d24a75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.CleanLabelStmt; import org.apache.doris.analysis.CompoundPredicate.Operator; import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.cluster.ClusterNamespace; @@ -31,6 +32,7 @@ import org.apache.doris.common.DataQualityException; import org.apache.doris.common.DdlException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.Pair; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherWrapper; import org.apache.doris.common.UserException; @@ -42,6 +44,7 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; import org.apache.doris.persist.CleanLabelOperationLog; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TransactionState; @@ -58,6 +61,8 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; @@ -200,7 +205,8 @@ public class LoadManager implements Writable { * Record finished load job by editLog. **/ public void recordFinishedLoadJob(String label, long transactionId, String dbName, long tableId, EtlJobType jobType, - long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException { + long createTimestamp, String failMsg, String trackingUrl, + UserIdentity userInfo) throws MetaNotFoundException { // get db id Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbName); @@ -209,7 +215,7 @@ public class LoadManager implements Writable { switch (jobType) { case INSERT: loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg, - trackingUrl); + trackingUrl, userInfo); break; default: return; @@ -423,6 +429,39 @@ public class LoadManager implements Writable { }); } + public List<Pair<Long, String>> getCreateLoadStmt(long dbId, String label) throws DdlException { + List<Pair<Long, String>> result = new ArrayList<>(); + readLock(); + try { + if (dbIdToLabelToLoadJobs.containsKey(dbId)) { + Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId); + if (labelToLoadJobs.containsKey(label)) { + List<LoadJob> labelLoadJobs = labelToLoadJobs.get(label); + for (LoadJob job : labelLoadJobs) { + try { + Method getOriginStmt = job.getClass().getMethod("getOriginStmt"); + if (getOriginStmt != null) { + result.add( + Pair.of(job.getId(), ((OriginStatement) getOriginStmt.invoke(job)).originStmt)); + } else { + throw new DdlException("Not support load job type: " + job.getClass().getName()); + } + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new DdlException("Not support load job type: " + job.getClass().getName()); + } + } + } else { + throw new DdlException("Label does not exist: " + label); + } + } else { + throw new DdlException("Database does not exist"); + } + return result; + } finally { + readUnlock(); + } + } + /** * This method will return the jobs info which can meet the condition of input param. * diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 78a0a58015..4180c39eee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -232,6 +232,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // User who submit this job. Maybe null for the old version job(before v1.1) protected UserIdentity userIdentity; + protected String comment = ""; + protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); protected LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // default is all data is load no delete protected Expr deleteCondition; @@ -618,6 +620,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl return !Strings.isNullOrEmpty(sequenceCol); } + public void setComment(String comment) { + this.comment = comment; + } + public int getSizeOfRoutineLoadTaskInfoList() { readLock(); try { @@ -1290,7 +1296,6 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl Optional<Database> database = Env.getCurrentInternalCatalog().getDb(dbId); Optional<Table> table = database.flatMap(db -> db.getTable(tableId)); - readLock(); try { List<String> row = Lists.newArrayList(); @@ -1322,6 +1327,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } row.add(Joiner.on(", ").join(errorLogUrls)); row.add(otherMsg); + row.add(userIdentity.getQualifiedUser()); + row.add(comment); return row; } finally { readUnlock(); @@ -1562,6 +1569,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl out.writeBoolean(true); userIdentity.write(out); } + Text.writeString(out, comment); } public void readFields(DataInput in) throws IOException { @@ -1646,6 +1654,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl userIdentity = null; } } + if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_117) { + comment = Text.readString(in); + } else { + comment = null; + } } public abstract void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index b9b87c9d79..6070355fc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -154,6 +154,7 @@ public class RoutineLoadManager implements Writable { } routineLoadJob.setOrigStmt(createRoutineLoadStmt.getOrigStmt()); + routineLoadJob.setComment(createRoutineLoadStmt.getComment()); addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index de76e4b878..94dc77e5e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -346,7 +346,15 @@ public class MultiLoadMgr { Map<String, String> brokerProperties = Maps.newHashMap(); brokerProperties.put(BrokerDesc.MULTI_LOAD_BROKER_BACKEND_KEY, backendId.toString()); BrokerDesc brokerDesc = new BrokerDesc(BrokerDesc.MULTI_LOAD_BROKER, brokerProperties); - LoadStmt loadStmt = new LoadStmt(commitLabel, dataDescriptions, brokerDesc, null, properties); + + String comment = "multi load"; + if (properties.containsKey(LoadStmt.KEY_COMMENT)) { + comment = properties.get(LoadStmt.KEY_COMMENT); + properties.remove(LoadStmt.KEY_COMMENT); + } + + properties.remove(LoadStmt.KEY_COMMENT); + LoadStmt loadStmt = new LoadStmt(commitLabel, dataDescriptions, brokerDesc, null, properties, comment); loadStmt.setEtlJobType(EtlJobType.BROKER); loadStmt.setOrigStmt(new OriginStatement("", 0)); loadStmt.setUserInfo(ConnectContext.get().getCurrentUserIdentity()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index ffd3e0e532..5e4d0e5f2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -41,6 +41,7 @@ import org.apache.doris.analysis.ShowColumnStmt; import org.apache.doris.analysis.ShowCreateCatalogStmt; import org.apache.doris.analysis.ShowCreateDbStmt; import org.apache.doris.analysis.ShowCreateFunctionStmt; +import org.apache.doris.analysis.ShowCreateLoadStmt; import org.apache.doris.analysis.ShowCreateMaterializedViewStmt; import org.apache.doris.analysis.ShowCreateRoutineLoadStmt; import org.apache.doris.analysis.ShowCreateTableStmt; @@ -287,7 +288,9 @@ public class ShowExecutor { handleShowRoutineLoadTask(); } else if (stmt instanceof ShowCreateRoutineLoadStmt) { handleShowCreateRoutineLoad(); - } else if (stmt instanceof ShowDeleteStmt) { + } else if (stmt instanceof ShowCreateLoadStmt) { + handleShowCreateLoad(); + } else if (stmt instanceof ShowDeleteStmt) { handleShowDelete(); } else if (stmt instanceof ShowAlterStmt) { handleShowAlter(); @@ -2125,6 +2128,26 @@ public class ShowExecutor { resultSet = new ShowResultSet(showCreateRoutineLoadStmt.getMetaData(), rows); } + private void handleShowCreateLoad() throws AnalysisException { + ShowCreateLoadStmt showCreateLoadStmt = (ShowCreateLoadStmt) stmt; + List<List<String>> rows = Lists.newArrayList(); + String labelName = showCreateLoadStmt.getLabel(); + + Util.prohibitExternalCatalog(ctx.getDefaultCatalog(), stmt.getClass().getSimpleName()); + Env env = ctx.getEnv(); + DatabaseIf db = ctx.getCurrentCatalog().getDbOrAnalysisException(showCreateLoadStmt.getDb()); + long dbId = db.getId(); + try { + List<Pair<Long, String>> result = env.getLoadManager().getCreateLoadStmt(dbId, labelName); + rows.addAll(result.stream().map(pair -> Lists.newArrayList(String.valueOf(pair.first), pair.second)) + .collect(Collectors.toList())); + } catch (DdlException e) { + LOG.warn(e.getMessage(), e); + throw new AnalysisException(e.getMessage()); + } + resultSet = new ShowResultSet(showCreateLoadStmt.getMetaData(), rows); + } + private void handleShowDataSkew() throws AnalysisException { ShowDataSkewStmt showStmt = (ShowDataSkewStmt) stmt; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index d0d7d8a57e..dbf7226344 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1664,7 +1664,7 @@ public class StmtExecutor implements ProfileWriter { context.getEnv().getLoadManager() .recordFinishedLoadJob(label, txnId, insertStmt.getDb(), insertStmt.getTargetTable().getId(), EtlJobType.INSERT, createTime, throwable == null ? "" : throwable.getMessage(), - coord.getTrackingUrl()); + coord.getTrackingUrl(), insertStmt.getUserInfo()); } catch (MetaNotFoundException e) { LOG.warn("Record info of insert load with error {}", e.getMessage(), e); errMsg = "Record info of insert load with error " + e.getMessage(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java index 5351e638e8..ee084247c0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java @@ -41,6 +41,7 @@ public class CancelLoadStmtTest extends TestWithFeService { private Analyzer analyzer; private String dbName = "testDb"; private String tblName = "table1"; + private UserIdentity userInfo = new UserIdentity("root", "localhost"); @Override protected void runBeforeAll() throws Exception { @@ -110,11 +111,11 @@ public class CancelLoadStmtTest extends TestWithFeService { long dbId = db.getId(); Table tbl = db.getTableNullable(tblName); long tblId = tbl.getId(); - InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, dbId, tblId, 0, "", ""); + InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, dbId, tblId, 0, "", "", userInfo); loadJobs.add(insertLoadJob1); - InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, dbId, tblId, 0, "", ""); + InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, dbId, tblId, 0, "", "", userInfo); loadJobs.add(insertLoadJob2); - InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, dbId, tblId, 0, "", ""); + InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, dbId, tblId, 0, "", "", userInfo); loadJobs.add(insertLoadJob3); // label stmt = new CancelLoadStmt(null, labelBinaryPredicate); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java index 24722915bb..e2b16f17df 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java @@ -123,7 +123,7 @@ public class CreateRoutineLoadStmtTest { CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties, - LoadTask.MergeType.APPEND); + LoadTask.MergeType.APPEND, ""); new MockUp<StatementBase>() { @Mock @@ -173,7 +173,7 @@ public class CreateRoutineLoadStmtTest { CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties, - LoadTask.MergeType.APPEND); + LoadTask.MergeType.APPEND, ""); new MockUp<StatementBase>() { @Mock public void analyze(Analyzer analyzer1) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java index 2c54bc178e..a1a9c75bb2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java @@ -110,7 +110,7 @@ public class LoadStmtTest { } }; - LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList, null, null, null); + LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList, null, null, null, ""); stmt.analyze(analyzer); Assert.assertEquals("testCluster:testDb", stmt.getLabel().getDbName()); Assert.assertEquals(dataDescriptionList, stmt.getDataDescriptions()); @@ -121,7 +121,7 @@ public class LoadStmtTest { // test ResourceDesc stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList, - new ResourceDesc(resourceName, null), null); + new ResourceDesc(resourceName, null), null, ""); stmt.analyze(analyzer); Assert.assertEquals(EtlJobType.SPARK, stmt.getResourceDesc().getEtlJobType()); Assert.assertEquals("LOAD LABEL `testCluster:testDb`.`testLabel`\n(XXX)\nWITH RESOURCE 'spark0'", @@ -137,7 +137,7 @@ public class LoadStmtTest { } }; - LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), null, null, null, null); + LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), null, null, null, null, ""); stmt.analyze(analyzer); Assert.fail("No exception throws."); @@ -220,7 +220,7 @@ public class LoadStmtTest { } }; - LoadStmt stmt = new LoadStmt(desc, Maps.newHashMap()); + LoadStmt stmt = new LoadStmt(desc, Maps.newHashMap(), ""); try { stmt.analyze(analyzer); } catch (AnalysisException ae) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java index 596a65892a..5c16c1554a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java @@ -17,6 +17,7 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; @@ -38,7 +39,8 @@ public class InsertLoadJobTest { @Test public void testGetTableNames(@Mocked Env env, @Mocked InternalCatalog catalog, @Injectable Database database, @Injectable Table table) throws MetaNotFoundException { - InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1L, 1000, "", ""); + UserIdentity userInfo = new UserIdentity("root", "localhost"); + InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1L, 1000, "", "", userInfo); String tableName = "table1"; new Expectations() { { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java index c2a88ff670..a30d25a944 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java @@ -17,6 +17,7 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; @@ -44,6 +45,7 @@ import java.util.Map; public class LoadManagerTest { private LoadManager loadManager; private final String fieldName = "idToLoadJob"; + private UserIdentity userInfo = UserIdentity.createAnalyzedUserIdentWithIp("root", "localhost"); @Before public void setUp() throws Exception { @@ -82,7 +84,7 @@ public class LoadManagerTest { }; loadManager = new LoadManager(new LoadJobScheduler()); - LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", ""); + LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", "", userInfo); Deencapsulation.invoke(loadManager, "addLoadJob", job1); File file = serializeToFile(loadManager); @@ -118,7 +120,7 @@ public class LoadManagerTest { }; loadManager = new LoadManager(new LoadJobScheduler()); - LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", ""); + LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", "", userInfo); Deencapsulation.invoke(loadManager, "addLoadJob", job1); //make job1 don't serialize diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 6b50e362bb..457e8a0c5b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -316,7 +316,7 @@ public class KafkaRoutineLoadJobTest { CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties, - LoadTask.MergeType.APPEND); + LoadTask.MergeType.APPEND, ""); Deencapsulation.setField(createRoutineLoadStmt, "name", jobName); return createRoutineLoadStmt; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index 1fb8f1d3a4..55fcf585b3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -179,12 +179,21 @@ public class RoutineLoadJobTest { } @Test - public void testGetShowInfo(@Mocked KafkaProgress kafkaProgress) { + public void testGetShowInfo(@Mocked KafkaProgress kafkaProgress, @Injectable UserIdentity userIdentity) { + new Expectations() { + { + userIdentity.getQualifiedUser(); + minTimes = 0; + result = "root"; + } + }; RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); - ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR, TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString()); + ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR, + TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString()); Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason); Deencapsulation.setField(routineLoadJob, "progress", kafkaProgress); + Deencapsulation.setField(routineLoadJob, "userIdentity", userIdentity); List<String> showInfo = routineLoadJob.getShowInfo(); Assert.assertEquals(true, showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity)) diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 907774437a..cffc723ddc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -97,7 +97,7 @@ public class RoutineLoadManagerTest { CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties, - LoadTask.MergeType.APPEND); + LoadTask.MergeType.APPEND, ""); createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0)); KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, @@ -166,7 +166,7 @@ public class RoutineLoadManagerTest { CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties, - LoadTask.MergeType.APPEND); + LoadTask.MergeType.APPEND, ""); createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0)); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index a68923f398..e95ef41272 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -103,6 +103,7 @@ struct TStreamLoadRecord { 16: required i64 load_bytes 17: required i64 start_time 18: required i64 finish_time + 19: optional string comment } struct TStreamLoadRecordResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org