This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 25f9bc98fb0 [Cloud](Variant) support DESC merge variants's schema for 
cloud mode … (#38143)
25f9bc98fb0 is described below

commit 25f9bc98fb0cd99e9ecd1b1ec0b5f29609f2ac1d
Author: lihangyu <15605149...@163.com>
AuthorDate: Fri Jul 19 16:49:22 2024 +0800

    [Cloud](Variant) support DESC merge variants's schema for cloud mode … 
(#38143)
    
    …(#37955)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/cloud/cloud_tablet.cpp                 | 16 ++++++++++++++++
 be/src/cloud/cloud_tablet.h                   |  3 +++
 be/src/olap/base_tablet.h                     |  4 ++++
 be/src/service/internal_service.cpp           | 16 ++++++++--------
 be/src/service/internal_service.h             | 10 +++++-----
 regression-test/suites/variant_p0/desc.groovy | 20 +++++++++++++++++---
 6 files changed, 53 insertions(+), 16 deletions(-)

diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 50c8765a18d..17ec1fe22b0 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -28,6 +28,7 @@
 #include <memory>
 #include <shared_mutex>
 #include <unordered_map>
+#include <vector>
 
 #include "cloud/cloud_meta_mgr.h"
 #include "cloud/cloud_storage_engine.h"
@@ -43,8 +44,10 @@
 #include "olap/rowset/rowset_writer.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/storage_policy.h"
+#include "olap/tablet_schema.h"
 #include "olap/txn_manager.h"
 #include "util/debug_points.h"
+#include "vec/common/schema_util.h"
 
 namespace doris {
 using namespace ErrorCode;
@@ -132,6 +135,19 @@ Status CloudTablet::sync_rowsets(int64_t query_version, 
bool warmup_delta_data)
     return st;
 }
 
+TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
+    std::shared_lock rdlock(_meta_lock);
+    TabletSchemaSPtr target_schema;
+    std::vector<TabletSchemaSPtr> schemas;
+    for (const auto& [_, rowset] : _rs_version_map) {
+        schemas.push_back(rowset->tablet_schema());
+    }
+    // get the max version schema and merge all schema
+    static_cast<void>(
+            vectorized::schema_util::get_least_common_schema(schemas, nullptr, 
target_schema));
+    return target_schema;
+}
+
 // Sync tablet meta and all rowset meta if not running.
 // This could happen when BE didn't finish schema change job and another BE 
committed this schema change job.
 // It should be a quite rare situation.
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index ca05759cdbf..2e6938444d1 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -206,6 +206,9 @@ public:
     int64_t last_cumu_compaction_success_time_ms = 0;
     int64_t last_cumu_no_suitable_version_ms = 0;
 
+    // Return merged extended schema
+    TabletSchemaSPtr merged_tablet_schema() const override;
+
 private:
     // FIXME(plat1ko): No need to record base size if rowsets are ordered by 
version
     void update_base_size(const Rowset& rs);
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index f625ecf4a0a..cefb31ccd11 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -28,6 +28,7 @@
 #include "olap/rowset/segment_v2/segment.h"
 #include "olap/tablet_fwd.h"
 #include "olap/tablet_meta.h"
+#include "olap/tablet_schema.h"
 #include "olap/version_graph.h"
 #include "util/metrics.h"
 
@@ -252,6 +253,9 @@ public:
                                         const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
                                         int limit);
 
+    // Return the merged schema of all rowsets
+    virtual TabletSchemaSPtr merged_tablet_schema() const { return 
_max_version_schema; }
+
 protected:
     // Find the missed versions until the spec_version.
     //
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 04abf2b09ce..9611e1a93cb 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1044,11 +1044,11 @@ struct AsyncRPCContext {
     brpc::CallId cid;
 };
 
-void 
PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcController*
 controller,
-                                                      const 
PFetchRemoteSchemaRequest* request,
-                                                      
PFetchRemoteSchemaResponse* response,
-                                                      
google::protobuf::Closure* done) {
-    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
+void 
PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcController* 
controller,
+                                                  const 
PFetchRemoteSchemaRequest* request,
+                                                  PFetchRemoteSchemaResponse* 
response,
+                                                  google::protobuf::Closure* 
done) {
+    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
         brpc::ClosureGuard closure_guard(done);
         Status st = Status::OK();
         if (request->is_coordinator()) {
@@ -1120,13 +1120,13 @@ void 
PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcContr
             if (!target_tablets.empty()) {
                 std::vector<TabletSchemaSPtr> tablet_schemas;
                 for (int64_t tablet_id : target_tablets) {
-                    TabletSharedPtr tablet = 
_engine.tablet_manager()->get_tablet(tablet_id, false);
-                    if (tablet == nullptr) {
+                    auto res = ExecEnv::get_tablet(tablet_id);
+                    if (!res.has_value()) {
                         // just ignore
                         LOG(WARNING) << "tablet does not exist, tablet id is " 
<< tablet_id;
                         continue;
                     }
-                    tablet_schemas.push_back(tablet->tablet_schema());
+                    
tablet_schemas.push_back(res.value()->merged_tablet_schema());
                 }
                 if (!tablet_schemas.empty()) {
                     // merge all
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 9cad429107a..7f3a2ca6f30 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -225,6 +225,11 @@ public:
                               PJdbcTestConnectionResult* result,
                               google::protobuf::Closure* done) override;
 
+    void fetch_remote_tablet_schema(google::protobuf::RpcController* 
controller,
+                                    const PFetchRemoteSchemaRequest* request,
+                                    PFetchRemoteSchemaResponse* response,
+                                    google::protobuf::Closure* done) override;
+
 private:
     void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* 
controller,
                                         const PExecPlanFragmentRequest* 
request,
@@ -287,11 +292,6 @@ public:
                                     PGetTabletVersionsResponse* response,
                                     google::protobuf::Closure* done) override;
 
-    void fetch_remote_tablet_schema(google::protobuf::RpcController* 
controller,
-                                    const PFetchRemoteSchemaRequest* request,
-                                    PFetchRemoteSchemaResponse* response,
-                                    google::protobuf::Closure* done) override;
-
 private:
     void _response_pull_slave_rowset(const std::string& remote_host, int64_t 
brpc_port,
                                      int64_t txn_id, int64_t tablet_id, 
int64_t node_id,
diff --git a/regression-test/suites/variant_p0/desc.groovy 
b/regression-test/suites/variant_p0/desc.groovy
index dfb5b40794e..5efcda3a043 100644
--- a/regression-test/suites/variant_p0/desc.groovy
+++ b/regression-test/suites/variant_p0/desc.groovy
@@ -16,9 +16,9 @@
 // under the License.
 
 suite("regression_test_variant_desc", "nonConcurrent"){
-    if (isCloudMode()) {
-        return
-    }
+    // if (isCloudMode()) {
+    //     return
+    // }
 
     def load_json_data = {table_name, file_name ->
         // load the json data
@@ -101,10 +101,13 @@ suite("regression_test_variant_desc", "nonConcurrent"){
         sql """set describe_extend_variant_column = true"""
         sql """insert into  sparse_columns select 0, '{"a": 11245, "b" : [123, 
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}'  as json_str
             union  all select 0, '{"a": 1123}' as json_str union all select 0, 
'{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") 
limit 4096 ;"""
+        // select for sync rowsets
+        sql "select * from sparse_columns limit 1"
         qt_sql_1 """desc ${table_name}"""
         sql "truncate table sparse_columns"
         sql """insert into  sparse_columns select 0, '{"a": 1123, "b" : [123, 
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : 
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}'  as json_str
             union  all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" : 
{"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" = 
"4096") limit 4096 ;"""
+        sql "select * from sparse_columns limit 1"
         qt_sql_2 """desc ${table_name}"""
         sql "truncate table sparse_columns"
 
@@ -115,6 +118,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
         set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "1.0")
         sql """insert into  ${table_name} select 0, '{"a": 11245, "b" : [123, 
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}'  as json_str
             union  all select 0, '{"a": 1123}' as json_str union all select 0, 
'{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") 
limit 4096 ;"""
+        sql "select * from no_sparse_columns limit 1"
         qt_sql_3 """desc ${table_name}"""
         sql "truncate table ${table_name}"
 
@@ -128,6 +132,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
         sql """insert into  ${table_name} select 45000, '{"a": 11245, "b" : 
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}'  as json_str
             union  all select 45000, '{"a": 1123}' as json_str union all 
select 45000, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from 
numbers("number" = "4096") limit 4096 ;"""
         sql """insert into  ${table_name} values(95000, '{"a": 11245, "b" : 
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')"""
+        sql "select * from partition_data limit 1"
         qt_sql_6_1 """desc ${table_name} partition p1"""
         qt_sql_6_2 """desc ${table_name} partition p2"""
         qt_sql_6_3 """desc ${table_name} partition p3"""
@@ -145,6 +150,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
          sql """insert into  ${table_name} values(95000, '{"a": 11245, "b" : 
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')"""
         // drop p1
         sql """alter table ${table_name} drop partition p1"""
+        sql "select * from drop_partition limit 1"
         qt_sql_7 """desc ${table_name}"""
         qt_sql_7_1 """desc ${table_name} partition p2"""
         qt_sql_7_2 """desc ${table_name} partition p3"""
@@ -165,6 +171,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
             properties("replication_num" = "1", "disable_auto_compaction" = 
"false");
         """
         sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, 
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : 
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}', '{"a": 11245, "xxxx" : "kaana"}', 
'{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 
7.111}}')"""
+         sql "select * from ${table_name} limit 1"
         qt_sql_8 """desc ${table_name}"""
         sql "truncate table ${table_name}"
 
@@ -181,6 +188,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
             properties("replication_num" = "1", "disable_auto_compaction" = 
"false");
         """
         sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, 
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : 
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}')"""
+         sql "select * from ${table_name} limit 1"
         qt_sql_9 """desc ${table_name}"""
         sql """set describe_extend_variant_column = true"""
         qt_sql_9_1 """desc ${table_name}"""
@@ -191,12 +199,14 @@ suite("regression_test_variant_desc", "nonConcurrent"){
         create_table.call(table_name, "5")
         // add, drop columns
         sql """INSERT INTO ${table_name} values(0, '{"k1":1, "k2": "hello 
world", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}')"""
+        sql "select * from ${table_name} limit 1"
         sql """set describe_extend_variant_column = true"""
         qt_sql_10 """desc ${table_name}"""
         // add column
         sql "alter table ${table_name} add column v2 variant default null"
         sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, 
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : 
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}',
                  '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : 
null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 
123}}}')"""
+        sql "select * from ${table_name} limit 1"
         qt_sql_10_1 """desc ${table_name}"""
         // drop cloumn
         sql "alter table ${table_name} drop column v2"
@@ -205,6 +215,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
         sql "alter table ${table_name} add column v3 variant default null"
         sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, 
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : 
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}',
                      '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, 
"d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : 
{"xxx" : 123}}}')"""
+        sql "select * from ${table_name} limit 1"
         qt_sql_10_3 """desc ${table_name}"""
         //sql "truncate table ${table_name}"
 
@@ -221,6 +232,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
         """
         sql """ insert into ${table_name} values (0, '{"名字" : "jack", 
"!@#^&*()": "11111", "金额" : 200, "画像" : {"地址" : "北京", "\\\u4E2C\\\u6587": 
"unicode"}}')"""
         sql """set describe_extend_variant_column = true"""
+        sql "select * from ${table_name} limit 1"
         qt_sql_11 """desc ${table_name}"""
 
         // varaint subcolumn: empty
@@ -237,6 +249,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
         sql """ insert into ${table_name} values (0, '{}')"""
         sql """ insert into ${table_name} values (0, '100')"""
         sql """set describe_extend_variant_column = true"""
+        sql "select * from ${table_name} limit 1"
         qt_sql_12 """desc ${table_name}"""
 
 
@@ -247,6 +260,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
         sql """insert into large_tablets values (3001, '{"b" : 10}')"""
         sql """insert into large_tablets values (50001, '{"c" : 10}')"""
         sql """insert into large_tablets values (99999, '{"d" : 10}')"""
+        sql "select * from ${table_name} limit 1"
         sql """set max_fetch_remote_schema_tablet_count = 2"""
         sql "desc large_tablets"
         sql """set max_fetch_remote_schema_tablet_count = 128"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to