This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3b4d8b4ac8 [pipelineX](feature) Support schema scan operator (#24850) 3b4d8b4ac8 is described below commit 3b4d8b4ac81075d7ef7ba915b3fd8cd2d06608d9 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon Sep 25 14:42:25 2023 +0800 [pipelineX](feature) Support schema scan operator (#24850) --- be/src/exec/schema_scanner.h | 29 +-- .../exec/schema_scanner/schema_columns_scanner.cpp | 64 +++--- .../exec/schema_scanner/schema_files_scanner.cpp | 26 +-- .../schema_metadata_name_ids_scanner.cpp | 47 ++--- .../schema_scanner/schema_partitions_scanner.cpp | 26 +-- .../schema_scanner/schema_profiling_scanner.cpp | 22 +- .../schema_schema_privileges_scanner.cpp | 21 +- .../schema_scanner/schema_schemata_scanner.cpp | 26 +-- .../schema_table_privileges_scanner.cpp | 21 +- .../exec/schema_scanner/schema_tables_scanner.cpp | 47 ++--- .../schema_user_privileges_scanner.cpp | 21 +- .../schema_scanner/schema_variables_scanner.cpp | 12 +- .../exec/schema_scanner/schema_views_scanner.cpp | 47 ++--- be/src/pipeline/exec/meta_scan_operator.cpp | 57 +++++ be/src/pipeline/exec/meta_scan_operator.h | 69 +++++++ be/src/pipeline/exec/scan_operator.cpp | 3 + be/src/pipeline/exec/schema_scan_operator.cpp | 229 +++++++++++++++++++++ be/src/pipeline/exec/schema_scan_operator.h | 52 +++++ be/src/pipeline/pipeline_x/operator.cpp | 4 + .../pipeline_x/pipeline_x_fragment_context.cpp | 12 ++ be/src/vec/exec/scan/vmeta_scanner.cpp | 9 + be/src/vec/exec/scan/vmeta_scanner.h | 4 + be/src/vec/exec/vschema_scan_node.cpp | 29 +-- 23 files changed, 667 insertions(+), 210 deletions(-) diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h index e42cdde60f..2d4e468c59 100644 --- a/be/src/exec/schema_scanner.h +++ b/be/src/exec/schema_scanner.h @@ -42,8 +42,17 @@ namespace vectorized { class Block; } -// scanner parameter from frontend -struct SchemaScannerParam { +struct SchemaScannerCommonParam { + SchemaScannerCommonParam() + : db(nullptr), + table(nullptr), + wild(nullptr), + user(nullptr), + user_ip(nullptr), + current_user_ident(nullptr), + ip(nullptr), + port(0), + catalog(nullptr) {} const std::string* db; const std::string* table; const std::string* wild; @@ -54,18 +63,14 @@ struct SchemaScannerParam { int32_t port; // frontend thrift port int64_t thread_id; const std::string* catalog; +}; + +// scanner parameter from frontend +struct SchemaScannerParam { + std::shared_ptr<SchemaScannerCommonParam> common_param; std::unique_ptr<RuntimeProfile> profile; - SchemaScannerParam() - : db(nullptr), - table(nullptr), - wild(nullptr), - user(nullptr), - user_ip(nullptr), - current_user_ident(nullptr), - ip(nullptr), - port(0), - catalog(nullptr) {} + SchemaScannerParam() : common_param(new SchemaScannerCommonParam()) {} }; // virtual scanner for all schema table diff --git a/be/src/exec/schema_scanner/schema_columns_scanner.cpp b/be/src/exec/schema_scanner/schema_columns_scanner.cpp index 91e5c77422..e2c2c66d3d 100644 --- a/be/src/exec/schema_scanner/schema_columns_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_columns_scanner.cpp @@ -78,26 +78,26 @@ Status SchemaColumnsScanner::start(RuntimeState* state) { } // get all database TGetDbsParams db_params; - if (nullptr != _param->db) { - db_params.__set_pattern(*(_param->db)); + if (nullptr != _param->common_param->db) { + db_params.__set_pattern(*(_param->common_param->db)); } - if (nullptr != _param->catalog) { - db_params.__set_catalog(*(_param->catalog)); + if (nullptr != _param->common_param->catalog) { + db_params.__set_catalog(*(_param->common_param->catalog)); } - if (nullptr != _param->current_user_ident) { - db_params.__set_current_user_ident(*_param->current_user_ident); + if (nullptr != _param->common_param->current_user_ident) { + db_params.__set_current_user_ident(*_param->common_param->current_user_ident); } else { - if (nullptr != _param->user) { - db_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + db_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - db_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + db_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR( - SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result)); + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::get_db_names( + *(_param->common_param->ip), _param->common_param->port, db_params, &_db_result)); } else { return Status::InternalError("IP or port doesn't exists"); } @@ -256,19 +256,20 @@ Status SchemaColumnsScanner::_get_new_desc() { desc_params.tables_name.push_back(_table_result.tables[_table_index++]); } - if (nullptr != _param->current_user_ident) { - desc_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + desc_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - desc_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + desc_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - desc_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + desc_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(SchemaHelper::describe_tables(*(_param->ip), _param->port, desc_params, + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::describe_tables(*(_param->common_param->ip), + _param->common_param->port, desc_params, &_desc_result)); } else { return Status::InternalError("IP or port doesn't exists"); @@ -285,22 +286,23 @@ Status SchemaColumnsScanner::_get_new_table() { table_params.__set_catalog(_db_result.catalogs[_db_index]); } _db_index++; - if (nullptr != _param->table) { - table_params.__set_pattern(*(_param->table)); + if (nullptr != _param->common_param->table) { + table_params.__set_pattern(*(_param->common_param->table)); } - if (nullptr != _param->current_user_ident) { - table_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + table_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - table_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + table_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - table_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + table_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(SchemaHelper::get_table_names(*(_param->ip), _param->port, table_params, + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::get_table_names(*(_param->common_param->ip), + _param->common_param->port, table_params, &_table_result)); } else { return Status::InternalError("IP or port doesn't exists"); diff --git a/be/src/exec/schema_scanner/schema_files_scanner.cpp b/be/src/exec/schema_scanner/schema_files_scanner.cpp index 596d21c3dd..55b7a338c3 100644 --- a/be/src/exec/schema_scanner/schema_files_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_files_scanner.cpp @@ -87,26 +87,26 @@ Status SchemaFilesScanner::start(RuntimeState* state) { } SCOPED_TIMER(_get_db_timer); TGetDbsParams db_params; - if (nullptr != _param->db) { - db_params.__set_pattern(*(_param->db)); + if (nullptr != _param->common_param->db) { + db_params.__set_pattern(*(_param->common_param->db)); } - if (nullptr != _param->catalog) { - db_params.__set_catalog(*(_param->catalog)); + if (nullptr != _param->common_param->catalog) { + db_params.__set_catalog(*(_param->common_param->catalog)); } - if (nullptr != _param->current_user_ident) { - db_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + db_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - db_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + db_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - db_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + db_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR( - SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result)); + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::get_db_names( + *(_param->common_param->ip), _param->common_param->port, db_params, &_db_result)); } else { return Status::InternalError("IP or port doesn't exists"); } diff --git a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp index f99d05dc27..5c3b686b15 100644 --- a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp @@ -60,26 +60,26 @@ Status SchemaMetadataNameIdsScanner::start(RuntimeState* state) { } SCOPED_TIMER(_get_db_timer); TGetDbsParams db_params; - if (nullptr != _param->db) { - db_params.__set_pattern(*(_param->db)); + if (nullptr != _param->common_param->db) { + db_params.__set_pattern(*(_param->common_param->db)); } - if (nullptr != _param->catalog) { - db_params.__set_catalog(*(_param->catalog)); + if (nullptr != _param->common_param->catalog) { + db_params.__set_catalog(*(_param->common_param->catalog)); } - if (nullptr != _param->current_user_ident) { - db_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + db_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - db_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + db_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - db_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + db_params.__set_user_ip(*(_param->common_param->user_ip)); } } db_params.__set_get_null_catalog(true); - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR( - SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result)); + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::get_db_names( + *(_param->common_param->ip), _param->common_param->port, db_params, &_db_result)); } else { return Status::InternalError("IP or port doesn't exists"); } @@ -102,22 +102,23 @@ Status SchemaMetadataNameIdsScanner::_get_new_table() { table_params.__set_catalog(_db_result.catalogs[_db_index]); } _db_index++; - if (nullptr != _param->wild) { - table_params.__set_pattern(*(_param->wild)); + if (nullptr != _param->common_param->wild) { + table_params.__set_pattern(*(_param->common_param->wild)); } - if (nullptr != _param->current_user_ident) { - table_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + table_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - table_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + table_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - table_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + table_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(SchemaHelper::list_table_metadata_name_ids(*(_param->ip), _param->port, + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::list_table_metadata_name_ids(*(_param->common_param->ip), + _param->common_param->port, table_params, &_table_result)); } else { return Status::InternalError("IP or port doesn't exists"); diff --git a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp index 7c6b7827b9..f1ad1f594f 100644 --- a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp @@ -75,26 +75,26 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) { } SCOPED_TIMER(_get_db_timer); TGetDbsParams db_params; - if (nullptr != _param->db) { - db_params.__set_pattern(*(_param->db)); + if (nullptr != _param->common_param->db) { + db_params.__set_pattern(*(_param->common_param->db)); } - if (nullptr != _param->catalog) { - db_params.__set_catalog(*(_param->catalog)); + if (nullptr != _param->common_param->catalog) { + db_params.__set_catalog(*(_param->common_param->catalog)); } - if (nullptr != _param->current_user_ident) { - db_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + db_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - db_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + db_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - db_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + db_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR( - SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result)); + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::get_db_names( + *(_param->common_param->ip), _param->common_param->port, db_params, &_db_result)); } else { return Status::InternalError("IP or port doesn't exists"); } diff --git a/be/src/exec/schema_scanner/schema_profiling_scanner.cpp b/be/src/exec/schema_scanner/schema_profiling_scanner.cpp index d2bd8b256f..2f71eb96f2 100644 --- a/be/src/exec/schema_scanner/schema_profiling_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_profiling_scanner.cpp @@ -65,24 +65,24 @@ Status SchemaProfilingScanner::start(RuntimeState* state) { } SCOPED_TIMER(_get_db_timer); TGetDbsParams db_params; - if (nullptr != _param->db) { - db_params.__set_pattern(*(_param->db)); + if (nullptr != _param->common_param->db) { + db_params.__set_pattern(*(_param->common_param->db)); } - if (nullptr != _param->catalog) { - db_params.__set_catalog(*(_param->catalog)); + if (nullptr != _param->common_param->catalog) { + db_params.__set_catalog(*(_param->common_param->catalog)); } - if (nullptr != _param->current_user_ident) { - db_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + db_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - db_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + db_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - db_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + db_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr == _param->ip || 0 == _param->port) { + if (nullptr == _param->common_param->ip || 0 == _param->common_param->port) { return Status::InternalError("IP or port doesn't exists"); } return Status::OK(); diff --git a/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp b/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp index 4b210bc5eb..093d5e04ef 100644 --- a/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp @@ -57,22 +57,23 @@ Status SchemaSchemaPrivilegesScanner::start(RuntimeState* state) { Status SchemaSchemaPrivilegesScanner::_get_new_table() { TGetTablesParams table_params; - if (nullptr != _param->wild) { - table_params.__set_pattern(*(_param->wild)); + if (nullptr != _param->common_param->wild) { + table_params.__set_pattern(*(_param->common_param->wild)); } - if (nullptr != _param->current_user_ident) { - table_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + table_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - table_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + table_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - table_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + table_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(SchemaHelper::list_schema_privilege_status(*(_param->ip), _param->port, + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::list_schema_privilege_status(*(_param->common_param->ip), + _param->common_param->port, table_params, &_priv_result)); } else { return Status::InternalError("IP or port doesn't exists"); diff --git a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp index b0d0aca2bb..fb57f48b4f 100644 --- a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp @@ -53,26 +53,26 @@ Status SchemaSchemataScanner::start(RuntimeState* state) { return Status::InternalError("used before initial."); } TGetDbsParams db_params; - if (nullptr != _param->wild) { - db_params.__set_pattern(*(_param->wild)); + if (nullptr != _param->common_param->wild) { + db_params.__set_pattern(*(_param->common_param->wild)); } - if (nullptr != _param->catalog) { - db_params.__set_catalog(*(_param->catalog)); + if (nullptr != _param->common_param->catalog) { + db_params.__set_catalog(*(_param->common_param->catalog)); } - if (nullptr != _param->current_user_ident) { - db_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + db_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - db_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + db_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - db_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + db_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR( - SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result)); + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::get_db_names( + *(_param->common_param->ip), _param->common_param->port, db_params, &_db_result)); } else { return Status::InternalError("IP or port doesn't exists"); } diff --git a/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp b/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp index 4a4982bad7..3bbaf528c0 100644 --- a/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp @@ -59,22 +59,23 @@ Status SchemaTablePrivilegesScanner::start(RuntimeState* state) { Status SchemaTablePrivilegesScanner::_get_new_table() { SCOPED_TIMER(_get_table_timer); TGetTablesParams table_params; - if (nullptr != _param->wild) { - table_params.__set_pattern(*(_param->wild)); + if (nullptr != _param->common_param->wild) { + table_params.__set_pattern(*(_param->common_param->wild)); } - if (nullptr != _param->current_user_ident) { - table_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + table_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - table_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + table_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - table_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + table_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(SchemaHelper::list_table_privilege_status(*(_param->ip), _param->port, + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::list_table_privilege_status(*(_param->common_param->ip), + _param->common_param->port, table_params, &_priv_result)); } else { return Status::InternalError("IP or port doesn't exists"); diff --git a/be/src/exec/schema_scanner/schema_tables_scanner.cpp b/be/src/exec/schema_scanner/schema_tables_scanner.cpp index 41dcaa5439..14ad1a8948 100644 --- a/be/src/exec/schema_scanner/schema_tables_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_tables_scanner.cpp @@ -75,26 +75,26 @@ Status SchemaTablesScanner::start(RuntimeState* state) { } SCOPED_TIMER(_get_db_timer); TGetDbsParams db_params; - if (nullptr != _param->db) { - db_params.__set_pattern(*(_param->db)); + if (nullptr != _param->common_param->db) { + db_params.__set_pattern(*(_param->common_param->db)); } - if (nullptr != _param->catalog) { - db_params.__set_catalog(*(_param->catalog)); + if (nullptr != _param->common_param->catalog) { + db_params.__set_catalog(*(_param->common_param->catalog)); } - if (nullptr != _param->current_user_ident) { - db_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + db_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - db_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + db_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - db_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + db_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR( - SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result)); + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::get_db_names( + *(_param->common_param->ip), _param->common_param->port, db_params, &_db_result)); } else { return Status::InternalError("IP or port doesn't exists"); } @@ -109,22 +109,23 @@ Status SchemaTablesScanner::_get_new_table() { table_params.__set_catalog(_db_result.catalogs[_db_index]); } _db_index++; - if (nullptr != _param->wild) { - table_params.__set_pattern(*(_param->wild)); + if (nullptr != _param->common_param->wild) { + table_params.__set_pattern(*(_param->common_param->wild)); } - if (nullptr != _param->current_user_ident) { - table_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + table_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - table_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + table_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - table_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + table_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->ip), _param->port, table_params, + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->common_param->ip), + _param->common_param->port, table_params, &_table_result)); } else { return Status::InternalError("IP or port doesn't exists"); diff --git a/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp b/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp index f74436d8d1..a9bf7c3409 100644 --- a/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp @@ -57,22 +57,23 @@ Status SchemaUserPrivilegesScanner::start(RuntimeState* state) { Status SchemaUserPrivilegesScanner::_get_new_table() { SCOPED_TIMER(_get_table_timer); TGetTablesParams table_params; - if (nullptr != _param->wild) { - table_params.__set_pattern(*(_param->wild)); + if (nullptr != _param->common_param->wild) { + table_params.__set_pattern(*(_param->common_param->wild)); } - if (nullptr != _param->current_user_ident) { - table_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + table_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - table_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + table_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - table_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + table_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(SchemaHelper::list_user_privilege_status(*(_param->ip), _param->port, + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::list_user_privilege_status(*(_param->common_param->ip), + _param->common_param->port, table_params, &_priv_result)); } else { return Status::InternalError("IP or port doesn't exists"); diff --git a/be/src/exec/schema_scanner/schema_variables_scanner.cpp b/be/src/exec/schema_scanner/schema_variables_scanner.cpp index 5efcee07b7..8fbca7ca7d 100644 --- a/be/src/exec/schema_scanner/schema_variables_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_variables_scanner.cpp @@ -50,8 +50,8 @@ SchemaVariablesScanner::~SchemaVariablesScanner() {} Status SchemaVariablesScanner::start(RuntimeState* state) { TShowVariableRequest var_params; // Use db to save type - if (_param->db != nullptr) { - if (strcmp(_param->db->c_str(), "GLOBAL") == 0) { + if (_param->common_param->db != nullptr) { + if (strcmp(_param->common_param->db->c_str(), "GLOBAL") == 0) { var_params.__set_varType(TVarType::GLOBAL); } else { var_params.__set_varType(TVarType::SESSION); @@ -59,11 +59,11 @@ Status SchemaVariablesScanner::start(RuntimeState* state) { } else { var_params.__set_varType(_type); } - var_params.__set_threadId(_param->thread_id); + var_params.__set_threadId(_param->common_param->thread_id); - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(SchemaHelper::show_variables(*(_param->ip), _param->port, var_params, - &_var_result)); + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::show_variables( + *(_param->common_param->ip), _param->common_param->port, var_params, &_var_result)); } else { return Status::InternalError("IP or port doesn't exists"); } diff --git a/be/src/exec/schema_scanner/schema_views_scanner.cpp b/be/src/exec/schema_scanner/schema_views_scanner.cpp index 64d10e7274..8183cfaa9f 100644 --- a/be/src/exec/schema_scanner/schema_views_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_views_scanner.cpp @@ -58,26 +58,26 @@ Status SchemaViewsScanner::start(RuntimeState* state) { } SCOPED_TIMER(_get_db_timer); TGetDbsParams db_params; - if (nullptr != _param->db) { - db_params.__set_pattern(*(_param->db)); + if (nullptr != _param->common_param->db) { + db_params.__set_pattern(*(_param->common_param->db)); } - if (nullptr != _param->catalog) { - db_params.__set_catalog(*(_param->catalog)); + if (nullptr != _param->common_param->catalog) { + db_params.__set_catalog(*(_param->common_param->catalog)); } - if (nullptr != _param->current_user_ident) { - db_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + db_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - db_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + db_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - db_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + db_params.__set_user_ip(*(_param->common_param->user_ip)); } } - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR( - SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result)); + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::get_db_names( + *(_param->common_param->ip), _param->common_param->port, db_params, &_db_result)); } else { return Status::InternalError("IP or port doesn't exists"); } @@ -88,23 +88,24 @@ Status SchemaViewsScanner::_get_new_table() { SCOPED_TIMER(_get_table_timer); TGetTablesParams table_params; table_params.__set_db(_db_result.dbs[_db_index++]); - if (nullptr != _param->wild) { - table_params.__set_pattern(*(_param->wild)); + if (nullptr != _param->common_param->wild) { + table_params.__set_pattern(*(_param->common_param->wild)); } - if (nullptr != _param->current_user_ident) { - table_params.__set_current_user_ident(*(_param->current_user_ident)); + if (nullptr != _param->common_param->current_user_ident) { + table_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); } else { - if (nullptr != _param->user) { - table_params.__set_user(*(_param->user)); + if (nullptr != _param->common_param->user) { + table_params.__set_user(*(_param->common_param->user)); } - if (nullptr != _param->user_ip) { - table_params.__set_user_ip(*(_param->user_ip)); + if (nullptr != _param->common_param->user_ip) { + table_params.__set_user_ip(*(_param->common_param->user_ip)); } } table_params.__set_type("VIEW"); - if (nullptr != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->ip), _param->port, table_params, + if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { + RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->common_param->ip), + _param->common_param->port, table_params, &_table_result)); } else { return Status::InternalError("IP or port doesn't exists"); diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp b/be/src/pipeline/exec/meta_scan_operator.cpp new file mode 100644 index 0000000000..bc96a23398 --- /dev/null +++ b/be/src/pipeline/exec/meta_scan_operator.cpp @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/meta_scan_operator.h" + +#include "vec/exec/scan/vmeta_scanner.h" + +namespace doris::pipeline { + +Status MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) { + if (Base::_eos_dependency->read_blocked_by() == nullptr) { + return Status::OK(); + } + + auto& p = _parent->cast<MetaScanOperatorX>(); + + for (auto& scan_range : _scan_ranges) { + std::shared_ptr<vectorized::VMetaScanner> scanner = vectorized::VMetaScanner::create_shared( + state(), this, p._tuple_id, scan_range, p._limit_per_scanner, profile(), + p._user_identity); + RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts)); + scanners->push_back(scanner); + } + + return Status::OK(); +} + +void MetaScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { + _scan_ranges = scan_ranges; +} + +MetaScanOperatorX::MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : ScanOperatorX<MetaScanLocalState>(pool, tnode, descs), + _tuple_id(tnode.meta_scan_node.tuple_id), + _scan_params(tnode.meta_scan_node) { + _output_tuple_id = _tuple_id; + if (_scan_params.__isset.current_user_ident) { + _user_identity = _scan_params.current_user_ident; + } +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/meta_scan_operator.h b/be/src/pipeline/exec/meta_scan_operator.h new file mode 100644 index 0000000000..6463958236 --- /dev/null +++ b/be/src/pipeline/exec/meta_scan_operator.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include <string> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/scan_operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "vec/exec/scan/vscan_node.h" + +namespace doris { +class ExecNode; + +namespace vectorized { +class NewOlapScanner; +} +} // namespace doris + +namespace doris::pipeline { + +class MetaScanOperatorX; +class MetaScanLocalState final : public ScanLocalState<MetaScanLocalState> { +public: + using Parent = MetaScanOperatorX; + using Base = ScanLocalState<MetaScanLocalState>; + ENABLE_FACTORY_CREATOR(MetaScanLocalState); + MetaScanLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {} + +private: + friend class vectorized::NewOlapScanner; + + void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override; + + std::vector<TScanRangeParams> _scan_ranges; +}; + +class MetaScanOperatorX final : public ScanOperatorX<MetaScanLocalState> { +public: + MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + +private: + friend class MetaScanLocalState; + + TupleId _tuple_id; + TUserIdentity _user_identity; + TMetaScanNode _scan_params; +}; + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index d67228848f..d6876e6eb8 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -22,6 +22,7 @@ #include <memory> #include "pipeline/exec/es_scan_operator.h" +#include "pipeline/exec/meta_scan_operator.h" #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/operator.h" #include "vec/exec/runtime_filter_consumer.h" @@ -1435,5 +1436,7 @@ template class ScanOperatorX<OlapScanLocalState>; template class ScanLocalState<OlapScanLocalState>; template class ScanOperatorX<EsScanLocalState>; template class ScanLocalState<EsScanLocalState>; +template class ScanLocalState<MetaScanLocalState>; +template class ScanOperatorX<MetaScanLocalState>; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index 2eae24cd21..1daf8a53c6 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -17,10 +17,13 @@ #include "schema_scan_operator.h" +#include <gen_cpp/FrontendService_types.h> + #include <memory> #include "pipeline/exec/operator.h" #include "util/runtime_profile.h" +#include "vec/data_types/data_type_factory.hpp" #include "vec/exec/vschema_scan_node.h" namespace doris { @@ -41,4 +44,230 @@ Status SchemaScanOperator::close(RuntimeState* state) { return Status::OK(); } +Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); + + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(_open_timer); + auto& p = _parent->cast<SchemaScanOperatorX>(); + _scanner_param.common_param = p._common_scanner_param; + // init schema scanner profile + _scanner_param.profile.reset(new RuntimeProfile("SchemaScanner")); + profile()->add_child(_scanner_param.profile.get(), true, nullptr); + + // get src tuple desc + const SchemaTableDescriptor* schema_table = + static_cast<const SchemaTableDescriptor*>(p._dest_tuple_desc->table_desc()); + // new one scanner + _schema_scanner = SchemaScanner::create(schema_table->schema_table_type()); + + if (nullptr == _schema_scanner) { + return Status::InternalError("schema scanner get nullptr pointer."); + } + + RETURN_IF_ERROR(_schema_scanner->init(&_scanner_param, state->obj_pool())); + return _schema_scanner->start(state); +} + +SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : Base(pool, tnode, descs), + _table_name(tnode.schema_scan_node.table_name), + _common_scanner_param(new SchemaScannerCommonParam()), + _tuple_id(tnode.schema_scan_node.tuple_id), + _dest_tuple_desc(nullptr), + _tuple_idx(0), + _slot_num(0) {} + +Status SchemaScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + + if (tnode.schema_scan_node.__isset.db) { + _common_scanner_param->db = + state->obj_pool()->add(new std::string(tnode.schema_scan_node.db)); + } + + if (tnode.schema_scan_node.__isset.table) { + _common_scanner_param->table = + state->obj_pool()->add(new std::string(tnode.schema_scan_node.table)); + } + + if (tnode.schema_scan_node.__isset.wild) { + _common_scanner_param->wild = + state->obj_pool()->add(new std::string(tnode.schema_scan_node.wild)); + } + + if (tnode.schema_scan_node.__isset.current_user_ident) { + _common_scanner_param->current_user_ident = state->obj_pool()->add( + new TUserIdentity(tnode.schema_scan_node.current_user_ident)); + } else { + if (tnode.schema_scan_node.__isset.user) { + _common_scanner_param->user = + state->obj_pool()->add(new std::string(tnode.schema_scan_node.user)); + } + if (tnode.schema_scan_node.__isset.user_ip) { + _common_scanner_param->user_ip = + state->obj_pool()->add(new std::string(tnode.schema_scan_node.user_ip)); + } + } + + if (tnode.schema_scan_node.__isset.ip) { + _common_scanner_param->ip = + state->obj_pool()->add(new std::string(tnode.schema_scan_node.ip)); + } + if (tnode.schema_scan_node.__isset.port) { + _common_scanner_param->port = tnode.schema_scan_node.port; + } + + if (tnode.schema_scan_node.__isset.thread_id) { + _common_scanner_param->thread_id = tnode.schema_scan_node.thread_id; + } + + if (tnode.schema_scan_node.__isset.catalog) { + _common_scanner_param->catalog = + state->obj_pool()->add(new std::string(tnode.schema_scan_node.catalog)); + } + return Status::OK(); +} + +Status SchemaScanOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(Base::open(state)); + + if (_common_scanner_param->user) { + TSetSessionParams param; + param.__set_user(*_common_scanner_param->user); + //TStatus t_status; + //RETURN_IF_ERROR(SchemaJniHelper::set_session(param, &t_status)); + //RETURN_IF_ERROR(Status(t_status)); + } + + return Status::OK(); +} + +Status SchemaScanOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + + // get dest tuple desc + _dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + + if (nullptr == _dest_tuple_desc) { + return Status::InternalError("Failed to get tuple descriptor."); + } + + _slot_num = _dest_tuple_desc->slots().size(); + // get src tuple desc + const SchemaTableDescriptor* schema_table = + static_cast<const SchemaTableDescriptor*>(_dest_tuple_desc->table_desc()); + + if (nullptr == schema_table) { + return Status::InternalError("Failed to get schema table descriptor."); + } + + // new one scanner + _schema_scanner = SchemaScanner::create(schema_table->schema_table_type()); + + if (nullptr == _schema_scanner) { + return Status::InternalError("schema scanner get nullptr pointer."); + } + + const std::vector<SchemaScanner::ColumnDesc>& columns_desc(_schema_scanner->get_column_desc()); + + // if src columns size is zero, it's the dummy slots. + if (0 == columns_desc.size()) { + _slot_num = 0; + } + + // check if type is ok. + for (int i = 0; i < _slot_num; ++i) { + int j = 0; + for (; j < columns_desc.size(); ++j) { + if (boost::iequals(_dest_tuple_desc->slots()[i]->col_name(), columns_desc[j].name)) { + break; + } + } + + if (j >= columns_desc.size()) { + LOG(WARNING) << "no match column for this column(" + << _dest_tuple_desc->slots()[i]->col_name() << ")"; + return Status::InternalError("no match column for this column."); + } + + if (columns_desc[j].type != _dest_tuple_desc->slots()[i]->type().type) { + LOG(WARNING) << "schema not match. input is " << columns_desc[j].name << "(" + << columns_desc[j].type << ") and output is " + << _dest_tuple_desc->slots()[i]->col_name() << "(" + << _dest_tuple_desc->slots()[i]->type() << ")"; + return Status::InternalError("schema not match."); + } + } + + _tuple_idx = 0; + + return Status::OK(); +} + +Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + RETURN_IF_CANCELLED(state); + bool schema_eos = false; + + const std::vector<SchemaScanner::ColumnDesc>& columns_desc( + local_state._schema_scanner->get_column_desc()); + + do { + block->clear(); + for (int i = 0; i < _slot_num; ++i) { + auto dest_slot_desc = _dest_tuple_desc->slots()[i]; + block->insert(vectorized::ColumnWithTypeAndName( + dest_slot_desc->get_empty_mutable_column(), dest_slot_desc->get_data_type_ptr(), + dest_slot_desc->col_name())); + } + + // src block columns desc is filled by schema_scanner->get_column_desc. + vectorized::Block src_block; + for (int i = 0; i < columns_desc.size(); ++i) { + TypeDescriptor descriptor(columns_desc[i].type); + auto data_type = + vectorized::DataTypeFactory::instance().create_data_type(descriptor, true); + src_block.insert(vectorized::ColumnWithTypeAndName(data_type->create_column(), + data_type, columns_desc[i].name)); + } + while (true) { + RETURN_IF_CANCELLED(state); + + // get all slots from schema table. + RETURN_IF_ERROR(local_state._schema_scanner->get_next_block(&src_block, &schema_eos)); + + if (schema_eos) { + source_state = SourceState::FINISHED; + break; + } + + if (src_block.rows() >= state->batch_size()) { + break; + } + } + + if (src_block.rows()) { + // block->check_number_of_rows(); + for (int i = 0; i < _slot_num; ++i) { + auto dest_slot_desc = _dest_tuple_desc->slots()[i]; + vectorized::MutableColumnPtr column_ptr = + std::move(*block->get_by_position(i).column).mutate(); + column_ptr->insert_range_from( + *src_block.get_by_name(dest_slot_desc->col_name()).column, 0, + src_block.rows()); + } + RETURN_IF_ERROR(vectorized::VExprContext::filter_block( + local_state._conjuncts, block, _dest_tuple_desc->slots().size())); + src_block.clear(); + } + } while (block->rows() == 0 && source_state != SourceState::FINISHED); + + local_state.reached_limit(block, source_state); + return Status::OK(); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/schema_scan_operator.h b/be/src/pipeline/exec/schema_scan_operator.h index df0eab8391..8feabd2e2f 100644 --- a/be/src/pipeline/exec/schema_scan_operator.h +++ b/be/src/pipeline/exec/schema_scan_operator.h @@ -21,6 +21,7 @@ #include "common/status.h" #include "operator.h" +#include "pipeline/pipeline_x/operator.h" #include "vec/exec/vschema_scan_node.h" namespace doris { @@ -48,4 +49,55 @@ public: Status close(RuntimeState* state) override; }; +class SchemaScanOperatorX; +class SchemaScanLocalState final : public PipelineXLocalState<> { +public: + ENABLE_FACTORY_CREATOR(SchemaScanLocalState); + + SchemaScanLocalState(RuntimeState* state, OperatorXBase* parent) + : PipelineXLocalState<>(state, parent) {} + ~SchemaScanLocalState() override = default; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + +private: + friend class SchemaScanOperatorX; + + SchemaScannerParam _scanner_param; + std::unique_ptr<SchemaScanner> _schema_scanner = nullptr; +}; + +class SchemaScanOperatorX final : public OperatorX<SchemaScanLocalState> { +public: + using Base = OperatorX<SchemaScanLocalState>; + SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~SchemaScanOperatorX() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + + [[nodiscard]] bool is_source() const override { return true; } + +private: + friend class SchemaScanLocalState; + + const std::string _table_name; + + std::shared_ptr<SchemaScannerCommonParam> _common_scanner_param; + // Tuple id resolved in prepare() to set _tuple_desc; + TupleId _tuple_id; + + // Descriptor of dest tuples + const TupleDescriptor* _dest_tuple_desc; + // Tuple index in tuple row. + int _tuple_idx; + // slot num need to fill in and return + int _slot_num; + + std::unique_ptr<SchemaScanner> _schema_scanner = nullptr; +}; + } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 2937312976..aad4adc471 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -33,6 +33,7 @@ #include "pipeline/exec/exchange_source_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" +#include "pipeline/exec/meta_scan_operator.h" #include "pipeline/exec/multi_cast_data_stream_source.h" #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" @@ -42,6 +43,7 @@ #include "pipeline/exec/repeat_operator.h" #include "pipeline/exec/result_file_sink_operator.h" #include "pipeline/exec/result_sink_operator.h" +#include "pipeline/exec/schema_scan_operator.h" #include "pipeline/exec/select_operator.h" #include "pipeline/exec/sort_sink_operator.h" #include "pipeline/exec/sort_source_operator.h" @@ -408,6 +410,8 @@ DECLARE_OPERATOR_X(UnionSourceLocalState) DECLARE_OPERATOR_X(MultiCastDataStreamSourceLocalState) DECLARE_OPERATOR_X(PartitionSortSourceLocalState) DECLARE_OPERATOR_X(DataGenLocalState) +DECLARE_OPERATOR_X(SchemaScanLocalState) +DECLARE_OPERATOR_X(MetaScanLocalState) #undef DECLARE_OPERATOR_X diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index c2becd1683..fc18d11cfd 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -56,6 +56,7 @@ #include "pipeline/exec/exchange_source_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" +#include "pipeline/exec/meta_scan_operator.h" #include "pipeline/exec/multi_cast_data_stream_source.h" #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" @@ -66,6 +67,7 @@ #include "pipeline/exec/result_file_sink_operator.h" #include "pipeline/exec/result_sink_operator.h" #include "pipeline/exec/scan_operator.h" +#include "pipeline/exec/schema_scan_operator.h" #include "pipeline/exec/select_operator.h" #include "pipeline/exec/sort_sink_operator.h" #include "pipeline/exec/sort_source_operator.h" @@ -758,6 +760,16 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } + case TPlanNodeType::SCHEMA_SCAN_NODE: { + op.reset(new SchemaScanOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + break; + } + case TPlanNodeType::META_SCAN_NODE: { + op.reset(new MetaScanOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + break; + } case TPlanNodeType::SELECT_NODE: { op.reset(new SelectOperatorX(pool, tnode, descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index aea604e8f5..ad97460d0d 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -64,6 +64,15 @@ VMetaScanner::VMetaScanner(RuntimeState* state, VMetaScanNode* parent, int64_t t _user_identity(user_identity), _scan_range(scan_range.scan_range) {} +VMetaScanner::VMetaScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state, + int64_t tuple_id, const TScanRangeParams& scan_range, int64_t limit, + RuntimeProfile* profile, TUserIdentity user_identity) + : VScanner(state, local_state, limit, profile), + _meta_eos(false), + _tuple_id(tuple_id), + _user_identity(user_identity), + _scan_range(scan_range.scan_range) {} + Status VMetaScanner::open(RuntimeState* state) { VLOG_CRITICAL << "VMetaScanner::open"; RETURN_IF_ERROR(VScanner::open(state)); diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index 5eb2296d27..a3297f7e9c 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -55,6 +55,10 @@ public: const TScanRangeParams& scan_range, int64_t limit, RuntimeProfile* profile, TUserIdentity user_identity); + VMetaScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state, int64_t tuple_id, + const TScanRangeParams& scan_range, int64_t limit, RuntimeProfile* profile, + TUserIdentity user_identity); + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; Status prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts); diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp index ecc0fdb888..cf5659bf48 100644 --- a/be/src/vec/exec/vschema_scan_node.cpp +++ b/be/src/vec/exec/vschema_scan_node.cpp @@ -64,42 +64,47 @@ VSchemaScanNode::~VSchemaScanNode() {} Status VSchemaScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); if (tnode.schema_scan_node.__isset.db) { - _scanner_param.db = _pool->add(new std::string(tnode.schema_scan_node.db)); + _scanner_param.common_param->db = _pool->add(new std::string(tnode.schema_scan_node.db)); } if (tnode.schema_scan_node.__isset.table) { - _scanner_param.table = _pool->add(new std::string(tnode.schema_scan_node.table)); + _scanner_param.common_param->table = + _pool->add(new std::string(tnode.schema_scan_node.table)); } if (tnode.schema_scan_node.__isset.wild) { - _scanner_param.wild = _pool->add(new std::string(tnode.schema_scan_node.wild)); + _scanner_param.common_param->wild = + _pool->add(new std::string(tnode.schema_scan_node.wild)); } if (tnode.schema_scan_node.__isset.current_user_ident) { - _scanner_param.current_user_ident = + _scanner_param.common_param->current_user_ident = _pool->add(new TUserIdentity(tnode.schema_scan_node.current_user_ident)); } else { if (tnode.schema_scan_node.__isset.user) { - _scanner_param.user = _pool->add(new std::string(tnode.schema_scan_node.user)); + _scanner_param.common_param->user = + _pool->add(new std::string(tnode.schema_scan_node.user)); } if (tnode.schema_scan_node.__isset.user_ip) { - _scanner_param.user_ip = _pool->add(new std::string(tnode.schema_scan_node.user_ip)); + _scanner_param.common_param->user_ip = + _pool->add(new std::string(tnode.schema_scan_node.user_ip)); } } if (tnode.schema_scan_node.__isset.ip) { - _scanner_param.ip = _pool->add(new std::string(tnode.schema_scan_node.ip)); + _scanner_param.common_param->ip = _pool->add(new std::string(tnode.schema_scan_node.ip)); } if (tnode.schema_scan_node.__isset.port) { - _scanner_param.port = tnode.schema_scan_node.port; + _scanner_param.common_param->port = tnode.schema_scan_node.port; } if (tnode.schema_scan_node.__isset.thread_id) { - _scanner_param.thread_id = tnode.schema_scan_node.thread_id; + _scanner_param.common_param->thread_id = tnode.schema_scan_node.thread_id; } if (tnode.schema_scan_node.__isset.catalog) { - _scanner_param.catalog = _pool->add(new std::string(tnode.schema_scan_node.catalog)); + _scanner_param.common_param->catalog = + _pool->add(new std::string(tnode.schema_scan_node.catalog)); } return Status::OK(); } @@ -121,9 +126,9 @@ Status VSchemaScanNode::open(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); - if (_scanner_param.user) { + if (_scanner_param.common_param->user) { TSetSessionParams param; - param.__set_user(*_scanner_param.user); + param.__set_user(*_scanner_param.common_param->user); //TStatus t_status; //RETURN_IF_ERROR(SchemaJniHelper::set_session(param, &t_status)); //RETURN_IF_ERROR(Status(t_status)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org