This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b4d8b4ac8 [pipelineX](feature) Support schema scan operator (#24850)
3b4d8b4ac8 is described below

commit 3b4d8b4ac81075d7ef7ba915b3fd8cd2d06608d9
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Sep 25 14:42:25 2023 +0800

    [pipelineX](feature) Support schema scan operator (#24850)
---
 be/src/exec/schema_scanner.h                       |  29 +--
 .../exec/schema_scanner/schema_columns_scanner.cpp |  64 +++---
 .../exec/schema_scanner/schema_files_scanner.cpp   |  26 +--
 .../schema_metadata_name_ids_scanner.cpp           |  47 ++---
 .../schema_scanner/schema_partitions_scanner.cpp   |  26 +--
 .../schema_scanner/schema_profiling_scanner.cpp    |  22 +-
 .../schema_schema_privileges_scanner.cpp           |  21 +-
 .../schema_scanner/schema_schemata_scanner.cpp     |  26 +--
 .../schema_table_privileges_scanner.cpp            |  21 +-
 .../exec/schema_scanner/schema_tables_scanner.cpp  |  47 ++---
 .../schema_user_privileges_scanner.cpp             |  21 +-
 .../schema_scanner/schema_variables_scanner.cpp    |  12 +-
 .../exec/schema_scanner/schema_views_scanner.cpp   |  47 ++---
 be/src/pipeline/exec/meta_scan_operator.cpp        |  57 +++++
 be/src/pipeline/exec/meta_scan_operator.h          |  69 +++++++
 be/src/pipeline/exec/scan_operator.cpp             |   3 +
 be/src/pipeline/exec/schema_scan_operator.cpp      | 229 +++++++++++++++++++++
 be/src/pipeline/exec/schema_scan_operator.h        |  52 +++++
 be/src/pipeline/pipeline_x/operator.cpp            |   4 +
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  12 ++
 be/src/vec/exec/scan/vmeta_scanner.cpp             |   9 +
 be/src/vec/exec/scan/vmeta_scanner.h               |   4 +
 be/src/vec/exec/vschema_scan_node.cpp              |  29 +--
 23 files changed, 667 insertions(+), 210 deletions(-)

diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h
index e42cdde60f..2d4e468c59 100644
--- a/be/src/exec/schema_scanner.h
+++ b/be/src/exec/schema_scanner.h
@@ -42,8 +42,17 @@ namespace vectorized {
 class Block;
 }
 
-// scanner parameter from frontend
-struct SchemaScannerParam {
+struct SchemaScannerCommonParam {
+    SchemaScannerCommonParam()
+            : db(nullptr),
+              table(nullptr),
+              wild(nullptr),
+              user(nullptr),
+              user_ip(nullptr),
+              current_user_ident(nullptr),
+              ip(nullptr),
+              port(0),
+              catalog(nullptr) {}
     const std::string* db;
     const std::string* table;
     const std::string* wild;
@@ -54,18 +63,14 @@ struct SchemaScannerParam {
     int32_t port;                            // frontend thrift port
     int64_t thread_id;
     const std::string* catalog;
+};
+
+// scanner parameter from frontend
+struct SchemaScannerParam {
+    std::shared_ptr<SchemaScannerCommonParam> common_param;
     std::unique_ptr<RuntimeProfile> profile;
 
-    SchemaScannerParam()
-            : db(nullptr),
-              table(nullptr),
-              wild(nullptr),
-              user(nullptr),
-              user_ip(nullptr),
-              current_user_ident(nullptr),
-              ip(nullptr),
-              port(0),
-              catalog(nullptr) {}
+    SchemaScannerParam() : common_param(new SchemaScannerCommonParam()) {}
 };
 
 // virtual scanner for all schema table
diff --git a/be/src/exec/schema_scanner/schema_columns_scanner.cpp 
b/be/src/exec/schema_scanner/schema_columns_scanner.cpp
index 91e5c77422..e2c2c66d3d 100644
--- a/be/src/exec/schema_scanner/schema_columns_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_columns_scanner.cpp
@@ -78,26 +78,26 @@ Status SchemaColumnsScanner::start(RuntimeState* state) {
     }
     // get all database
     TGetDbsParams db_params;
-    if (nullptr != _param->db) {
-        db_params.__set_pattern(*(_param->db));
+    if (nullptr != _param->common_param->db) {
+        db_params.__set_pattern(*(_param->common_param->db));
     }
-    if (nullptr != _param->catalog) {
-        db_params.__set_catalog(*(_param->catalog));
+    if (nullptr != _param->common_param->catalog) {
+        db_params.__set_catalog(*(_param->common_param->catalog));
     }
-    if (nullptr != _param->current_user_ident) {
-        db_params.__set_current_user_ident(*_param->current_user_ident);
+    if (nullptr != _param->common_param->current_user_ident) {
+        
db_params.__set_current_user_ident(*_param->common_param->current_user_ident);
     } else {
-        if (nullptr != _param->user) {
-            db_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            db_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            db_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            db_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        RETURN_IF_ERROR(
-                SchemaHelper::get_db_names(*(_param->ip), _param->port, 
db_params, &_db_result));
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        RETURN_IF_ERROR(SchemaHelper::get_db_names(
+                *(_param->common_param->ip), _param->common_param->port, 
db_params, &_db_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
     }
@@ -256,19 +256,20 @@ Status SchemaColumnsScanner::_get_new_desc() {
         
desc_params.tables_name.push_back(_table_result.tables[_table_index++]);
     }
 
-    if (nullptr != _param->current_user_ident) {
-        desc_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
desc_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            desc_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            desc_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            desc_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            desc_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        RETURN_IF_ERROR(SchemaHelper::describe_tables(*(_param->ip), 
_param->port, desc_params,
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        
RETURN_IF_ERROR(SchemaHelper::describe_tables(*(_param->common_param->ip),
+                                                      
_param->common_param->port, desc_params,
                                                       &_desc_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
@@ -285,22 +286,23 @@ Status SchemaColumnsScanner::_get_new_table() {
         table_params.__set_catalog(_db_result.catalogs[_db_index]);
     }
     _db_index++;
-    if (nullptr != _param->table) {
-        table_params.__set_pattern(*(_param->table));
+    if (nullptr != _param->common_param->table) {
+        table_params.__set_pattern(*(_param->common_param->table));
     }
-    if (nullptr != _param->current_user_ident) {
-        table_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            table_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            table_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            table_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            table_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        RETURN_IF_ERROR(SchemaHelper::get_table_names(*(_param->ip), 
_param->port, table_params,
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        
RETURN_IF_ERROR(SchemaHelper::get_table_names(*(_param->common_param->ip),
+                                                      
_param->common_param->port, table_params,
                                                       &_table_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
diff --git a/be/src/exec/schema_scanner/schema_files_scanner.cpp 
b/be/src/exec/schema_scanner/schema_files_scanner.cpp
index 596d21c3dd..55b7a338c3 100644
--- a/be/src/exec/schema_scanner/schema_files_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_files_scanner.cpp
@@ -87,26 +87,26 @@ Status SchemaFilesScanner::start(RuntimeState* state) {
     }
     SCOPED_TIMER(_get_db_timer);
     TGetDbsParams db_params;
-    if (nullptr != _param->db) {
-        db_params.__set_pattern(*(_param->db));
+    if (nullptr != _param->common_param->db) {
+        db_params.__set_pattern(*(_param->common_param->db));
     }
-    if (nullptr != _param->catalog) {
-        db_params.__set_catalog(*(_param->catalog));
+    if (nullptr != _param->common_param->catalog) {
+        db_params.__set_catalog(*(_param->common_param->catalog));
     }
-    if (nullptr != _param->current_user_ident) {
-        db_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            db_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            db_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            db_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            db_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        RETURN_IF_ERROR(
-                SchemaHelper::get_db_names(*(_param->ip), _param->port, 
db_params, &_db_result));
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        RETURN_IF_ERROR(SchemaHelper::get_db_names(
+                *(_param->common_param->ip), _param->common_param->port, 
db_params, &_db_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
     }
diff --git a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp 
b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp
index f99d05dc27..5c3b686b15 100644
--- a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp
@@ -60,26 +60,26 @@ Status SchemaMetadataNameIdsScanner::start(RuntimeState* 
state) {
     }
     SCOPED_TIMER(_get_db_timer);
     TGetDbsParams db_params;
-    if (nullptr != _param->db) {
-        db_params.__set_pattern(*(_param->db));
+    if (nullptr != _param->common_param->db) {
+        db_params.__set_pattern(*(_param->common_param->db));
     }
-    if (nullptr != _param->catalog) {
-        db_params.__set_catalog(*(_param->catalog));
+    if (nullptr != _param->common_param->catalog) {
+        db_params.__set_catalog(*(_param->common_param->catalog));
     }
-    if (nullptr != _param->current_user_ident) {
-        db_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            db_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            db_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            db_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            db_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
     db_params.__set_get_null_catalog(true);
-    if (nullptr != _param->ip && 0 != _param->port) {
-        RETURN_IF_ERROR(
-                SchemaHelper::get_db_names(*(_param->ip), _param->port, 
db_params, &_db_result));
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        RETURN_IF_ERROR(SchemaHelper::get_db_names(
+                *(_param->common_param->ip), _param->common_param->port, 
db_params, &_db_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
     }
@@ -102,22 +102,23 @@ Status SchemaMetadataNameIdsScanner::_get_new_table() {
         table_params.__set_catalog(_db_result.catalogs[_db_index]);
     }
     _db_index++;
-    if (nullptr != _param->wild) {
-        table_params.__set_pattern(*(_param->wild));
+    if (nullptr != _param->common_param->wild) {
+        table_params.__set_pattern(*(_param->common_param->wild));
     }
-    if (nullptr != _param->current_user_ident) {
-        table_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            table_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            table_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            table_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            table_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        
RETURN_IF_ERROR(SchemaHelper::list_table_metadata_name_ids(*(_param->ip), 
_param->port,
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        
RETURN_IF_ERROR(SchemaHelper::list_table_metadata_name_ids(*(_param->common_param->ip),
+                                                                   
_param->common_param->port,
                                                                    
table_params, &_table_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
diff --git a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp 
b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
index 7c6b7827b9..f1ad1f594f 100644
--- a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
@@ -75,26 +75,26 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) {
     }
     SCOPED_TIMER(_get_db_timer);
     TGetDbsParams db_params;
-    if (nullptr != _param->db) {
-        db_params.__set_pattern(*(_param->db));
+    if (nullptr != _param->common_param->db) {
+        db_params.__set_pattern(*(_param->common_param->db));
     }
-    if (nullptr != _param->catalog) {
-        db_params.__set_catalog(*(_param->catalog));
+    if (nullptr != _param->common_param->catalog) {
+        db_params.__set_catalog(*(_param->common_param->catalog));
     }
-    if (nullptr != _param->current_user_ident) {
-        db_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            db_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            db_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            db_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            db_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        RETURN_IF_ERROR(
-                SchemaHelper::get_db_names(*(_param->ip), _param->port, 
db_params, &_db_result));
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        RETURN_IF_ERROR(SchemaHelper::get_db_names(
+                *(_param->common_param->ip), _param->common_param->port, 
db_params, &_db_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
     }
diff --git a/be/src/exec/schema_scanner/schema_profiling_scanner.cpp 
b/be/src/exec/schema_scanner/schema_profiling_scanner.cpp
index d2bd8b256f..2f71eb96f2 100644
--- a/be/src/exec/schema_scanner/schema_profiling_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_profiling_scanner.cpp
@@ -65,24 +65,24 @@ Status SchemaProfilingScanner::start(RuntimeState* state) {
     }
     SCOPED_TIMER(_get_db_timer);
     TGetDbsParams db_params;
-    if (nullptr != _param->db) {
-        db_params.__set_pattern(*(_param->db));
+    if (nullptr != _param->common_param->db) {
+        db_params.__set_pattern(*(_param->common_param->db));
     }
-    if (nullptr != _param->catalog) {
-        db_params.__set_catalog(*(_param->catalog));
+    if (nullptr != _param->common_param->catalog) {
+        db_params.__set_catalog(*(_param->common_param->catalog));
     }
-    if (nullptr != _param->current_user_ident) {
-        db_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            db_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            db_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            db_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            db_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr == _param->ip || 0 == _param->port) {
+    if (nullptr == _param->common_param->ip || 0 == 
_param->common_param->port) {
         return Status::InternalError("IP or port doesn't exists");
     }
     return Status::OK();
diff --git a/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp 
b/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp
index 4b210bc5eb..093d5e04ef 100644
--- a/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp
@@ -57,22 +57,23 @@ Status SchemaSchemaPrivilegesScanner::start(RuntimeState* 
state) {
 
 Status SchemaSchemaPrivilegesScanner::_get_new_table() {
     TGetTablesParams table_params;
-    if (nullptr != _param->wild) {
-        table_params.__set_pattern(*(_param->wild));
+    if (nullptr != _param->common_param->wild) {
+        table_params.__set_pattern(*(_param->common_param->wild));
     }
-    if (nullptr != _param->current_user_ident) {
-        table_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            table_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            table_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            table_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            table_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        
RETURN_IF_ERROR(SchemaHelper::list_schema_privilege_status(*(_param->ip), 
_param->port,
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        
RETURN_IF_ERROR(SchemaHelper::list_schema_privilege_status(*(_param->common_param->ip),
+                                                                   
_param->common_param->port,
                                                                    
table_params, &_priv_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
diff --git a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp 
b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp
index b0d0aca2bb..fb57f48b4f 100644
--- a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp
@@ -53,26 +53,26 @@ Status SchemaSchemataScanner::start(RuntimeState* state) {
         return Status::InternalError("used before initial.");
     }
     TGetDbsParams db_params;
-    if (nullptr != _param->wild) {
-        db_params.__set_pattern(*(_param->wild));
+    if (nullptr != _param->common_param->wild) {
+        db_params.__set_pattern(*(_param->common_param->wild));
     }
-    if (nullptr != _param->catalog) {
-        db_params.__set_catalog(*(_param->catalog));
+    if (nullptr != _param->common_param->catalog) {
+        db_params.__set_catalog(*(_param->common_param->catalog));
     }
-    if (nullptr != _param->current_user_ident) {
-        db_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            db_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            db_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            db_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            db_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        RETURN_IF_ERROR(
-                SchemaHelper::get_db_names(*(_param->ip), _param->port, 
db_params, &_db_result));
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        RETURN_IF_ERROR(SchemaHelper::get_db_names(
+                *(_param->common_param->ip), _param->common_param->port, 
db_params, &_db_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
     }
diff --git a/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp 
b/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp
index 4a4982bad7..3bbaf528c0 100644
--- a/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp
@@ -59,22 +59,23 @@ Status SchemaTablePrivilegesScanner::start(RuntimeState* 
state) {
 Status SchemaTablePrivilegesScanner::_get_new_table() {
     SCOPED_TIMER(_get_table_timer);
     TGetTablesParams table_params;
-    if (nullptr != _param->wild) {
-        table_params.__set_pattern(*(_param->wild));
+    if (nullptr != _param->common_param->wild) {
+        table_params.__set_pattern(*(_param->common_param->wild));
     }
-    if (nullptr != _param->current_user_ident) {
-        table_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            table_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            table_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            table_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            table_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        
RETURN_IF_ERROR(SchemaHelper::list_table_privilege_status(*(_param->ip), 
_param->port,
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        
RETURN_IF_ERROR(SchemaHelper::list_table_privilege_status(*(_param->common_param->ip),
+                                                                  
_param->common_param->port,
                                                                   
table_params, &_priv_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
diff --git a/be/src/exec/schema_scanner/schema_tables_scanner.cpp 
b/be/src/exec/schema_scanner/schema_tables_scanner.cpp
index 41dcaa5439..14ad1a8948 100644
--- a/be/src/exec/schema_scanner/schema_tables_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_tables_scanner.cpp
@@ -75,26 +75,26 @@ Status SchemaTablesScanner::start(RuntimeState* state) {
     }
     SCOPED_TIMER(_get_db_timer);
     TGetDbsParams db_params;
-    if (nullptr != _param->db) {
-        db_params.__set_pattern(*(_param->db));
+    if (nullptr != _param->common_param->db) {
+        db_params.__set_pattern(*(_param->common_param->db));
     }
-    if (nullptr != _param->catalog) {
-        db_params.__set_catalog(*(_param->catalog));
+    if (nullptr != _param->common_param->catalog) {
+        db_params.__set_catalog(*(_param->common_param->catalog));
     }
-    if (nullptr != _param->current_user_ident) {
-        db_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            db_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            db_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            db_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            db_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        RETURN_IF_ERROR(
-                SchemaHelper::get_db_names(*(_param->ip), _param->port, 
db_params, &_db_result));
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        RETURN_IF_ERROR(SchemaHelper::get_db_names(
+                *(_param->common_param->ip), _param->common_param->port, 
db_params, &_db_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
     }
@@ -109,22 +109,23 @@ Status SchemaTablesScanner::_get_new_table() {
         table_params.__set_catalog(_db_result.catalogs[_db_index]);
     }
     _db_index++;
-    if (nullptr != _param->wild) {
-        table_params.__set_pattern(*(_param->wild));
+    if (nullptr != _param->common_param->wild) {
+        table_params.__set_pattern(*(_param->common_param->wild));
     }
-    if (nullptr != _param->current_user_ident) {
-        table_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            table_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            table_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            table_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            table_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->ip), 
_param->port, table_params,
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        
RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->common_param->ip),
+                                                        
_param->common_param->port, table_params,
                                                         &_table_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
diff --git a/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp 
b/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp
index f74436d8d1..a9bf7c3409 100644
--- a/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp
@@ -57,22 +57,23 @@ Status SchemaUserPrivilegesScanner::start(RuntimeState* 
state) {
 Status SchemaUserPrivilegesScanner::_get_new_table() {
     SCOPED_TIMER(_get_table_timer);
     TGetTablesParams table_params;
-    if (nullptr != _param->wild) {
-        table_params.__set_pattern(*(_param->wild));
+    if (nullptr != _param->common_param->wild) {
+        table_params.__set_pattern(*(_param->common_param->wild));
     }
-    if (nullptr != _param->current_user_ident) {
-        table_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            table_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            table_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            table_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            table_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        
RETURN_IF_ERROR(SchemaHelper::list_user_privilege_status(*(_param->ip), 
_param->port,
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        
RETURN_IF_ERROR(SchemaHelper::list_user_privilege_status(*(_param->common_param->ip),
+                                                                 
_param->common_param->port,
                                                                  table_params, 
&_priv_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
diff --git a/be/src/exec/schema_scanner/schema_variables_scanner.cpp 
b/be/src/exec/schema_scanner/schema_variables_scanner.cpp
index 5efcee07b7..8fbca7ca7d 100644
--- a/be/src/exec/schema_scanner/schema_variables_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_variables_scanner.cpp
@@ -50,8 +50,8 @@ SchemaVariablesScanner::~SchemaVariablesScanner() {}
 Status SchemaVariablesScanner::start(RuntimeState* state) {
     TShowVariableRequest var_params;
     // Use db to save type
-    if (_param->db != nullptr) {
-        if (strcmp(_param->db->c_str(), "GLOBAL") == 0) {
+    if (_param->common_param->db != nullptr) {
+        if (strcmp(_param->common_param->db->c_str(), "GLOBAL") == 0) {
             var_params.__set_varType(TVarType::GLOBAL);
         } else {
             var_params.__set_varType(TVarType::SESSION);
@@ -59,11 +59,11 @@ Status SchemaVariablesScanner::start(RuntimeState* state) {
     } else {
         var_params.__set_varType(_type);
     }
-    var_params.__set_threadId(_param->thread_id);
+    var_params.__set_threadId(_param->common_param->thread_id);
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        RETURN_IF_ERROR(SchemaHelper::show_variables(*(_param->ip), 
_param->port, var_params,
-                                                     &_var_result));
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        RETURN_IF_ERROR(SchemaHelper::show_variables(
+                *(_param->common_param->ip), _param->common_param->port, 
var_params, &_var_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
     }
diff --git a/be/src/exec/schema_scanner/schema_views_scanner.cpp 
b/be/src/exec/schema_scanner/schema_views_scanner.cpp
index 64d10e7274..8183cfaa9f 100644
--- a/be/src/exec/schema_scanner/schema_views_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_views_scanner.cpp
@@ -58,26 +58,26 @@ Status SchemaViewsScanner::start(RuntimeState* state) {
     }
     SCOPED_TIMER(_get_db_timer);
     TGetDbsParams db_params;
-    if (nullptr != _param->db) {
-        db_params.__set_pattern(*(_param->db));
+    if (nullptr != _param->common_param->db) {
+        db_params.__set_pattern(*(_param->common_param->db));
     }
-    if (nullptr != _param->catalog) {
-        db_params.__set_catalog(*(_param->catalog));
+    if (nullptr != _param->common_param->catalog) {
+        db_params.__set_catalog(*(_param->common_param->catalog));
     }
-    if (nullptr != _param->current_user_ident) {
-        db_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            db_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            db_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            db_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            db_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        RETURN_IF_ERROR(
-                SchemaHelper::get_db_names(*(_param->ip), _param->port, 
db_params, &_db_result));
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        RETURN_IF_ERROR(SchemaHelper::get_db_names(
+                *(_param->common_param->ip), _param->common_param->port, 
db_params, &_db_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
     }
@@ -88,23 +88,24 @@ Status SchemaViewsScanner::_get_new_table() {
     SCOPED_TIMER(_get_table_timer);
     TGetTablesParams table_params;
     table_params.__set_db(_db_result.dbs[_db_index++]);
-    if (nullptr != _param->wild) {
-        table_params.__set_pattern(*(_param->wild));
+    if (nullptr != _param->common_param->wild) {
+        table_params.__set_pattern(*(_param->common_param->wild));
     }
-    if (nullptr != _param->current_user_ident) {
-        table_params.__set_current_user_ident(*(_param->current_user_ident));
+    if (nullptr != _param->common_param->current_user_ident) {
+        
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
     } else {
-        if (nullptr != _param->user) {
-            table_params.__set_user(*(_param->user));
+        if (nullptr != _param->common_param->user) {
+            table_params.__set_user(*(_param->common_param->user));
         }
-        if (nullptr != _param->user_ip) {
-            table_params.__set_user_ip(*(_param->user_ip));
+        if (nullptr != _param->common_param->user_ip) {
+            table_params.__set_user_ip(*(_param->common_param->user_ip));
         }
     }
     table_params.__set_type("VIEW");
 
-    if (nullptr != _param->ip && 0 != _param->port) {
-        RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->ip), 
_param->port, table_params,
+    if (nullptr != _param->common_param->ip && 0 != 
_param->common_param->port) {
+        
RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->common_param->ip),
+                                                        
_param->common_param->port, table_params,
                                                         &_table_result));
     } else {
         return Status::InternalError("IP or port doesn't exists");
diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp 
b/be/src/pipeline/exec/meta_scan_operator.cpp
new file mode 100644
index 0000000000..bc96a23398
--- /dev/null
+++ b/be/src/pipeline/exec/meta_scan_operator.cpp
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/meta_scan_operator.h"
+
+#include "vec/exec/scan/vmeta_scanner.h"
+
+namespace doris::pipeline {
+
+Status MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
+    if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+        return Status::OK();
+    }
+
+    auto& p = _parent->cast<MetaScanOperatorX>();
+
+    for (auto& scan_range : _scan_ranges) {
+        std::shared_ptr<vectorized::VMetaScanner> scanner = 
vectorized::VMetaScanner::create_shared(
+                state(), this, p._tuple_id, scan_range, p._limit_per_scanner, 
profile(),
+                p._user_identity);
+        RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
+        scanners->push_back(scanner);
+    }
+
+    return Status::OK();
+}
+
+void MetaScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
+    _scan_ranges = scan_ranges;
+}
+
+MetaScanOperatorX::MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode,
+                                     const DescriptorTbl& descs)
+        : ScanOperatorX<MetaScanLocalState>(pool, tnode, descs),
+          _tuple_id(tnode.meta_scan_node.tuple_id),
+          _scan_params(tnode.meta_scan_node) {
+    _output_tuple_id = _tuple_id;
+    if (_scan_params.__isset.current_user_ident) {
+        _user_identity = _scan_params.current_user_ident;
+    }
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/meta_scan_operator.h 
b/be/src/pipeline/exec/meta_scan_operator.h
new file mode 100644
index 0000000000..6463958236
--- /dev/null
+++ b/be/src/pipeline/exec/meta_scan_operator.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <string>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/scan_operator.h"
+#include "pipeline/pipeline_x/operator.h"
+#include "vec/exec/scan/vscan_node.h"
+
+namespace doris {
+class ExecNode;
+
+namespace vectorized {
+class NewOlapScanner;
+}
+} // namespace doris
+
+namespace doris::pipeline {
+
+class MetaScanOperatorX;
+class MetaScanLocalState final : public ScanLocalState<MetaScanLocalState> {
+public:
+    using Parent = MetaScanOperatorX;
+    using Base = ScanLocalState<MetaScanLocalState>;
+    ENABLE_FACTORY_CREATOR(MetaScanLocalState);
+    MetaScanLocalState(RuntimeState* state, OperatorXBase* parent) : 
Base(state, parent) {}
+
+private:
+    friend class vectorized::NewOlapScanner;
+
+    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
+
+    std::vector<TScanRangeParams> _scan_ranges;
+};
+
+class MetaScanOperatorX final : public ScanOperatorX<MetaScanLocalState> {
+public:
+    MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
+
+private:
+    friend class MetaScanLocalState;
+
+    TupleId _tuple_id;
+    TUserIdentity _user_identity;
+    TMetaScanNode _scan_params;
+};
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index d67228848f..d6876e6eb8 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -22,6 +22,7 @@
 #include <memory>
 
 #include "pipeline/exec/es_scan_operator.h"
+#include "pipeline/exec/meta_scan_operator.h"
 #include "pipeline/exec/olap_scan_operator.h"
 #include "pipeline/exec/operator.h"
 #include "vec/exec/runtime_filter_consumer.h"
@@ -1435,5 +1436,7 @@ template class ScanOperatorX<OlapScanLocalState>;
 template class ScanLocalState<OlapScanLocalState>;
 template class ScanOperatorX<EsScanLocalState>;
 template class ScanLocalState<EsScanLocalState>;
+template class ScanLocalState<MetaScanLocalState>;
+template class ScanOperatorX<MetaScanLocalState>;
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp 
b/be/src/pipeline/exec/schema_scan_operator.cpp
index 2eae24cd21..1daf8a53c6 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -17,10 +17,13 @@
 
 #include "schema_scan_operator.h"
 
+#include <gen_cpp/FrontendService_types.h>
+
 #include <memory>
 
 #include "pipeline/exec/operator.h"
 #include "util/runtime_profile.h"
+#include "vec/data_types/data_type_factory.hpp"
 #include "vec/exec/vschema_scan_node.h"
 
 namespace doris {
@@ -41,4 +44,230 @@ Status SchemaScanOperator::close(RuntimeState* state) {
     return Status::OK();
 }
 
+Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+    RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+
+    SCOPED_TIMER(profile()->total_time_counter());
+    SCOPED_TIMER(_open_timer);
+    auto& p = _parent->cast<SchemaScanOperatorX>();
+    _scanner_param.common_param = p._common_scanner_param;
+    // init schema scanner profile
+    _scanner_param.profile.reset(new RuntimeProfile("SchemaScanner"));
+    profile()->add_child(_scanner_param.profile.get(), true, nullptr);
+
+    // get src tuple desc
+    const SchemaTableDescriptor* schema_table =
+            static_cast<const 
SchemaTableDescriptor*>(p._dest_tuple_desc->table_desc());
+    // new one scanner
+    _schema_scanner = SchemaScanner::create(schema_table->schema_table_type());
+
+    if (nullptr == _schema_scanner) {
+        return Status::InternalError("schema scanner get nullptr pointer.");
+    }
+
+    RETURN_IF_ERROR(_schema_scanner->init(&_scanner_param, state->obj_pool()));
+    return _schema_scanner->start(state);
+}
+
+SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& 
tnode,
+                                         const DescriptorTbl& descs)
+        : Base(pool, tnode, descs),
+          _table_name(tnode.schema_scan_node.table_name),
+          _common_scanner_param(new SchemaScannerCommonParam()),
+          _tuple_id(tnode.schema_scan_node.tuple_id),
+          _dest_tuple_desc(nullptr),
+          _tuple_idx(0),
+          _slot_num(0) {}
+
+Status SchemaScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(Base::init(tnode, state));
+
+    if (tnode.schema_scan_node.__isset.db) {
+        _common_scanner_param->db =
+                state->obj_pool()->add(new 
std::string(tnode.schema_scan_node.db));
+    }
+
+    if (tnode.schema_scan_node.__isset.table) {
+        _common_scanner_param->table =
+                state->obj_pool()->add(new 
std::string(tnode.schema_scan_node.table));
+    }
+
+    if (tnode.schema_scan_node.__isset.wild) {
+        _common_scanner_param->wild =
+                state->obj_pool()->add(new 
std::string(tnode.schema_scan_node.wild));
+    }
+
+    if (tnode.schema_scan_node.__isset.current_user_ident) {
+        _common_scanner_param->current_user_ident = state->obj_pool()->add(
+                new TUserIdentity(tnode.schema_scan_node.current_user_ident));
+    } else {
+        if (tnode.schema_scan_node.__isset.user) {
+            _common_scanner_param->user =
+                    state->obj_pool()->add(new 
std::string(tnode.schema_scan_node.user));
+        }
+        if (tnode.schema_scan_node.__isset.user_ip) {
+            _common_scanner_param->user_ip =
+                    state->obj_pool()->add(new 
std::string(tnode.schema_scan_node.user_ip));
+        }
+    }
+
+    if (tnode.schema_scan_node.__isset.ip) {
+        _common_scanner_param->ip =
+                state->obj_pool()->add(new 
std::string(tnode.schema_scan_node.ip));
+    }
+    if (tnode.schema_scan_node.__isset.port) {
+        _common_scanner_param->port = tnode.schema_scan_node.port;
+    }
+
+    if (tnode.schema_scan_node.__isset.thread_id) {
+        _common_scanner_param->thread_id = tnode.schema_scan_node.thread_id;
+    }
+
+    if (tnode.schema_scan_node.__isset.catalog) {
+        _common_scanner_param->catalog =
+                state->obj_pool()->add(new 
std::string(tnode.schema_scan_node.catalog));
+    }
+    return Status::OK();
+}
+
+Status SchemaScanOperatorX::open(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::open(state));
+
+    if (_common_scanner_param->user) {
+        TSetSessionParams param;
+        param.__set_user(*_common_scanner_param->user);
+        //TStatus t_status;
+        //RETURN_IF_ERROR(SchemaJniHelper::set_session(param, &t_status));
+        //RETURN_IF_ERROR(Status(t_status));
+    }
+
+    return Status::OK();
+}
+
+Status SchemaScanOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+
+    // get dest tuple desc
+    _dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+
+    if (nullptr == _dest_tuple_desc) {
+        return Status::InternalError("Failed to get tuple descriptor.");
+    }
+
+    _slot_num = _dest_tuple_desc->slots().size();
+    // get src tuple desc
+    const SchemaTableDescriptor* schema_table =
+            static_cast<const 
SchemaTableDescriptor*>(_dest_tuple_desc->table_desc());
+
+    if (nullptr == schema_table) {
+        return Status::InternalError("Failed to get schema table descriptor.");
+    }
+
+    // new one scanner
+    _schema_scanner = SchemaScanner::create(schema_table->schema_table_type());
+
+    if (nullptr == _schema_scanner) {
+        return Status::InternalError("schema scanner get nullptr pointer.");
+    }
+
+    const std::vector<SchemaScanner::ColumnDesc>& 
columns_desc(_schema_scanner->get_column_desc());
+
+    // if src columns size is zero, it's the dummy slots.
+    if (0 == columns_desc.size()) {
+        _slot_num = 0;
+    }
+
+    // check if type is ok.
+    for (int i = 0; i < _slot_num; ++i) {
+        int j = 0;
+        for (; j < columns_desc.size(); ++j) {
+            if (boost::iequals(_dest_tuple_desc->slots()[i]->col_name(), 
columns_desc[j].name)) {
+                break;
+            }
+        }
+
+        if (j >= columns_desc.size()) {
+            LOG(WARNING) << "no match column for this column("
+                         << _dest_tuple_desc->slots()[i]->col_name() << ")";
+            return Status::InternalError("no match column for this column.");
+        }
+
+        if (columns_desc[j].type != _dest_tuple_desc->slots()[i]->type().type) 
{
+            LOG(WARNING) << "schema not match. input is " << 
columns_desc[j].name << "("
+                         << columns_desc[j].type << ") and output is "
+                         << _dest_tuple_desc->slots()[i]->col_name() << "("
+                         << _dest_tuple_desc->slots()[i]->type() << ")";
+            return Status::InternalError("schema not match.");
+        }
+    }
+
+    _tuple_idx = 0;
+
+    return Status::OK();
+}
+
+Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block,
+                                      SourceState& source_state) {
+    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    SCOPED_TIMER(local_state.profile()->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    bool schema_eos = false;
+
+    const std::vector<SchemaScanner::ColumnDesc>& columns_desc(
+            local_state._schema_scanner->get_column_desc());
+
+    do {
+        block->clear();
+        for (int i = 0; i < _slot_num; ++i) {
+            auto dest_slot_desc = _dest_tuple_desc->slots()[i];
+            block->insert(vectorized::ColumnWithTypeAndName(
+                    dest_slot_desc->get_empty_mutable_column(), 
dest_slot_desc->get_data_type_ptr(),
+                    dest_slot_desc->col_name()));
+        }
+
+        // src block columns desc is filled by schema_scanner->get_column_desc.
+        vectorized::Block src_block;
+        for (int i = 0; i < columns_desc.size(); ++i) {
+            TypeDescriptor descriptor(columns_desc[i].type);
+            auto data_type =
+                    
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
+            
src_block.insert(vectorized::ColumnWithTypeAndName(data_type->create_column(),
+                                                               data_type, 
columns_desc[i].name));
+        }
+        while (true) {
+            RETURN_IF_CANCELLED(state);
+
+            // get all slots from schema table.
+            
RETURN_IF_ERROR(local_state._schema_scanner->get_next_block(&src_block, 
&schema_eos));
+
+            if (schema_eos) {
+                source_state = SourceState::FINISHED;
+                break;
+            }
+
+            if (src_block.rows() >= state->batch_size()) {
+                break;
+            }
+        }
+
+        if (src_block.rows()) {
+            // block->check_number_of_rows();
+            for (int i = 0; i < _slot_num; ++i) {
+                auto dest_slot_desc = _dest_tuple_desc->slots()[i];
+                vectorized::MutableColumnPtr column_ptr =
+                        std::move(*block->get_by_position(i).column).mutate();
+                column_ptr->insert_range_from(
+                        
*src_block.get_by_name(dest_slot_desc->col_name()).column, 0,
+                        src_block.rows());
+            }
+            RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
+                    local_state._conjuncts, block, 
_dest_tuple_desc->slots().size()));
+            src_block.clear();
+        }
+    } while (block->rows() == 0 && source_state != SourceState::FINISHED);
+
+    local_state.reached_limit(block, source_state);
+    return Status::OK();
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/schema_scan_operator.h 
b/be/src/pipeline/exec/schema_scan_operator.h
index df0eab8391..8feabd2e2f 100644
--- a/be/src/pipeline/exec/schema_scan_operator.h
+++ b/be/src/pipeline/exec/schema_scan_operator.h
@@ -21,6 +21,7 @@
 
 #include "common/status.h"
 #include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
 #include "vec/exec/vschema_scan_node.h"
 
 namespace doris {
@@ -48,4 +49,55 @@ public:
     Status close(RuntimeState* state) override;
 };
 
+class SchemaScanOperatorX;
+class SchemaScanLocalState final : public PipelineXLocalState<> {
+public:
+    ENABLE_FACTORY_CREATOR(SchemaScanLocalState);
+
+    SchemaScanLocalState(RuntimeState* state, OperatorXBase* parent)
+            : PipelineXLocalState<>(state, parent) {}
+    ~SchemaScanLocalState() override = default;
+
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+
+private:
+    friend class SchemaScanOperatorX;
+
+    SchemaScannerParam _scanner_param;
+    std::unique_ptr<SchemaScanner> _schema_scanner = nullptr;
+};
+
+class SchemaScanOperatorX final : public OperatorX<SchemaScanLocalState> {
+public:
+    using Base = OperatorX<SchemaScanLocalState>;
+    SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
+    ~SchemaScanOperatorX() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
+    Status get_block(RuntimeState* state, vectorized::Block* block,
+                     SourceState& source_state) override;
+
+    [[nodiscard]] bool is_source() const override { return true; }
+
+private:
+    friend class SchemaScanLocalState;
+
+    const std::string _table_name;
+
+    std::shared_ptr<SchemaScannerCommonParam> _common_scanner_param;
+    // Tuple id resolved in prepare() to set _tuple_desc;
+    TupleId _tuple_id;
+
+    // Descriptor of dest tuples
+    const TupleDescriptor* _dest_tuple_desc;
+    // Tuple index in tuple row.
+    int _tuple_idx;
+    // slot num need to fill in and return
+    int _slot_num;
+
+    std::unique_ptr<SchemaScanner> _schema_scanner = nullptr;
+};
+
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 2937312976..aad4adc471 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -33,6 +33,7 @@
 #include "pipeline/exec/exchange_source_operator.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
+#include "pipeline/exec/meta_scan_operator.h"
 #include "pipeline/exec/multi_cast_data_stream_source.h"
 #include "pipeline/exec/nested_loop_join_build_operator.h"
 #include "pipeline/exec/nested_loop_join_probe_operator.h"
@@ -42,6 +43,7 @@
 #include "pipeline/exec/repeat_operator.h"
 #include "pipeline/exec/result_file_sink_operator.h"
 #include "pipeline/exec/result_sink_operator.h"
+#include "pipeline/exec/schema_scan_operator.h"
 #include "pipeline/exec/select_operator.h"
 #include "pipeline/exec/sort_sink_operator.h"
 #include "pipeline/exec/sort_source_operator.h"
@@ -408,6 +410,8 @@ DECLARE_OPERATOR_X(UnionSourceLocalState)
 DECLARE_OPERATOR_X(MultiCastDataStreamSourceLocalState)
 DECLARE_OPERATOR_X(PartitionSortSourceLocalState)
 DECLARE_OPERATOR_X(DataGenLocalState)
+DECLARE_OPERATOR_X(SchemaScanLocalState)
+DECLARE_OPERATOR_X(MetaScanLocalState)
 
 #undef DECLARE_OPERATOR_X
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index c2becd1683..fc18d11cfd 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -56,6 +56,7 @@
 #include "pipeline/exec/exchange_source_operator.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
+#include "pipeline/exec/meta_scan_operator.h"
 #include "pipeline/exec/multi_cast_data_stream_source.h"
 #include "pipeline/exec/nested_loop_join_build_operator.h"
 #include "pipeline/exec/nested_loop_join_probe_operator.h"
@@ -66,6 +67,7 @@
 #include "pipeline/exec/result_file_sink_operator.h"
 #include "pipeline/exec/result_sink_operator.h"
 #include "pipeline/exec/scan_operator.h"
+#include "pipeline/exec/schema_scan_operator.h"
 #include "pipeline/exec/select_operator.h"
 #include "pipeline/exec/sort_sink_operator.h"
 #include "pipeline/exec/sort_source_operator.h"
@@ -758,6 +760,16 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
         break;
     }
+    case TPlanNodeType::SCHEMA_SCAN_NODE: {
+        op.reset(new SchemaScanOperatorX(pool, tnode, descs));
+        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        break;
+    }
+    case TPlanNodeType::META_SCAN_NODE: {
+        op.reset(new MetaScanOperatorX(pool, tnode, descs));
+        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        break;
+    }
     case TPlanNodeType::SELECT_NODE: {
         op.reset(new SelectOperatorX(pool, tnode, descs));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp 
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index aea604e8f5..ad97460d0d 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -64,6 +64,15 @@ VMetaScanner::VMetaScanner(RuntimeState* state, 
VMetaScanNode* parent, int64_t t
           _user_identity(user_identity),
           _scan_range(scan_range.scan_range) {}
 
+VMetaScanner::VMetaScanner(RuntimeState* state, pipeline::ScanLocalStateBase* 
local_state,
+                           int64_t tuple_id, const TScanRangeParams& 
scan_range, int64_t limit,
+                           RuntimeProfile* profile, TUserIdentity 
user_identity)
+        : VScanner(state, local_state, limit, profile),
+          _meta_eos(false),
+          _tuple_id(tuple_id),
+          _user_identity(user_identity),
+          _scan_range(scan_range.scan_range) {}
+
 Status VMetaScanner::open(RuntimeState* state) {
     VLOG_CRITICAL << "VMetaScanner::open";
     RETURN_IF_ERROR(VScanner::open(state));
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h 
b/be/src/vec/exec/scan/vmeta_scanner.h
index 5eb2296d27..a3297f7e9c 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -55,6 +55,10 @@ public:
                  const TScanRangeParams& scan_range, int64_t limit, 
RuntimeProfile* profile,
                  TUserIdentity user_identity);
 
+    VMetaScanner(RuntimeState* state, pipeline::ScanLocalStateBase* 
local_state, int64_t tuple_id,
+                 const TScanRangeParams& scan_range, int64_t limit, 
RuntimeProfile* profile,
+                 TUserIdentity user_identity);
+
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
     Status prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts);
diff --git a/be/src/vec/exec/vschema_scan_node.cpp 
b/be/src/vec/exec/vschema_scan_node.cpp
index ecc0fdb888..cf5659bf48 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -64,42 +64,47 @@ VSchemaScanNode::~VSchemaScanNode() {}
 Status VSchemaScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::init(tnode, state));
     if (tnode.schema_scan_node.__isset.db) {
-        _scanner_param.db = _pool->add(new 
std::string(tnode.schema_scan_node.db));
+        _scanner_param.common_param->db = _pool->add(new 
std::string(tnode.schema_scan_node.db));
     }
 
     if (tnode.schema_scan_node.__isset.table) {
-        _scanner_param.table = _pool->add(new 
std::string(tnode.schema_scan_node.table));
+        _scanner_param.common_param->table =
+                _pool->add(new std::string(tnode.schema_scan_node.table));
     }
 
     if (tnode.schema_scan_node.__isset.wild) {
-        _scanner_param.wild = _pool->add(new 
std::string(tnode.schema_scan_node.wild));
+        _scanner_param.common_param->wild =
+                _pool->add(new std::string(tnode.schema_scan_node.wild));
     }
 
     if (tnode.schema_scan_node.__isset.current_user_ident) {
-        _scanner_param.current_user_ident =
+        _scanner_param.common_param->current_user_ident =
                 _pool->add(new 
TUserIdentity(tnode.schema_scan_node.current_user_ident));
     } else {
         if (tnode.schema_scan_node.__isset.user) {
-            _scanner_param.user = _pool->add(new 
std::string(tnode.schema_scan_node.user));
+            _scanner_param.common_param->user =
+                    _pool->add(new std::string(tnode.schema_scan_node.user));
         }
         if (tnode.schema_scan_node.__isset.user_ip) {
-            _scanner_param.user_ip = _pool->add(new 
std::string(tnode.schema_scan_node.user_ip));
+            _scanner_param.common_param->user_ip =
+                    _pool->add(new 
std::string(tnode.schema_scan_node.user_ip));
         }
     }
 
     if (tnode.schema_scan_node.__isset.ip) {
-        _scanner_param.ip = _pool->add(new 
std::string(tnode.schema_scan_node.ip));
+        _scanner_param.common_param->ip = _pool->add(new 
std::string(tnode.schema_scan_node.ip));
     }
     if (tnode.schema_scan_node.__isset.port) {
-        _scanner_param.port = tnode.schema_scan_node.port;
+        _scanner_param.common_param->port = tnode.schema_scan_node.port;
     }
 
     if (tnode.schema_scan_node.__isset.thread_id) {
-        _scanner_param.thread_id = tnode.schema_scan_node.thread_id;
+        _scanner_param.common_param->thread_id = 
tnode.schema_scan_node.thread_id;
     }
 
     if (tnode.schema_scan_node.__isset.catalog) {
-        _scanner_param.catalog = _pool->add(new 
std::string(tnode.schema_scan_node.catalog));
+        _scanner_param.common_param->catalog =
+                _pool->add(new std::string(tnode.schema_scan_node.catalog));
     }
     return Status::OK();
 }
@@ -121,9 +126,9 @@ Status VSchemaScanNode::open(RuntimeState* state) {
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(ExecNode::open(state));
 
-    if (_scanner_param.user) {
+    if (_scanner_param.common_param->user) {
         TSetSessionParams param;
-        param.__set_user(*_scanner_param.user);
+        param.__set_user(*_scanner_param.common_param->user);
         //TStatus t_status;
         //RETURN_IF_ERROR(SchemaJniHelper::set_session(param, &t_status));
         //RETURN_IF_ERROR(Status(t_status));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to