This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d611e7124ab branch-3.0: [fix](meta-service) Avoid rowset meta exceeds 
2G result in protobuf fatal #44780 (#45205)
d611e7124ab is described below

commit d611e7124ab46674cc561d66ae9f532cdd9391d2
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Dec 9 22:12:03 2024 +0800

    branch-3.0: [fix](meta-service) Avoid rowset meta exceeds 2G result in 
protobuf fatal #44780 (#45205)
    
    Cherry-picked from #44780
    
    Co-authored-by: Siyang Tang <tangsiyang2...@foxmail.com>
---
 cloud/src/meta-service/meta_service.cpp | 18 +++++--
 cloud/test/txn_lazy_commit_test.cpp     | 90 ++++++++++++++++++++++++++++++++-
 2 files changed, 104 insertions(+), 4 deletions(-)

diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index c6b1c592b20..18a1eefce8f 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1328,15 +1328,27 @@ void internal_get_rowset(Transaction* txn, int64_t 
start, int64_t end,
 
         while (it->has_next()) {
             auto [k, v] = it->next();
-            auto rs = response->add_rowset_meta();
+            auto* rs = response->add_rowset_meta();
+            auto byte_size = rs->ByteSizeLong();
+            TEST_SYNC_POINT_CALLBACK("get_rowset:meta_exceed_limit", 
&byte_size);
+            if (byte_size + v.size() > std::numeric_limits<int32_t>::max()) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                msg = fmt::format(
+                        "rowset meta exceeded 2G, unable to serialize, key={}. 
byte_size={}",
+                        hex(k), byte_size);
+                LOG(WARNING) << msg;
+                return;
+            }
             if (!rs->ParseFromArray(v.data(), v.size())) {
                 code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                msg = "malformed rowset meta, unable to deserialize";
+                msg = "malformed rowset meta, unable to serialize";
                 LOG(WARNING) << msg << " key=" << hex(k);
                 return;
             }
             ++num_rowsets;
-            if (!it->has_next()) key0 = k;
+            if (!it->has_next()) {
+                key0 = k;
+            }
         }
         key0.push_back('\x00'); // Update to next smallest key for iteration
     } while (it->more());
diff --git a/cloud/test/txn_lazy_commit_test.cpp 
b/cloud/test/txn_lazy_commit_test.cpp
index 9a7679f3dd9..0f284508a3f 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -25,7 +25,9 @@
 
 #include <atomic>
 #include <condition_variable>
+#include <cstddef>
 #include <cstdint>
+#include <limits>
 #include <memory>
 #include <random>
 #include <string>
@@ -1812,4 +1814,90 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase4Test) {
     ASSERT_TRUE(abort_timeout_txn_hit);
     ASSERT_EQ(txn_id, txn_info_pb.txn_id());
 }
-} // namespace doris::cloud
\ No newline at end of file
+
+TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) {
+    auto txn_kv = get_mem_txn_kv();
+
+    int64_t db_id = 5252025;
+    int64_t table_id = 35201043384;
+    int64_t index_id = 256439;
+    int64_t partition_id = 732536259;
+
+    auto meta_service = get_meta_service(txn_kv, true);
+    int64_t tablet_id = 25910248;
+
+    {
+        create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
+                                 tablet_id);
+    }
+    {
+        int tmp_txn_id = 0;
+        {
+            brpc::Controller cntl;
+            BeginTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            TxnInfoPB txn_info_pb;
+            txn_info_pb.set_db_id(db_id);
+            txn_info_pb.set_label("test_label_32ae213dasg3");
+            txn_info_pb.add_table_ids(table_id);
+            txn_info_pb.set_timeout_ms(36000);
+            req.mutable_txn_info()->CopyFrom(txn_info_pb);
+            BeginTxnResponse res;
+            
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            tmp_txn_id = res.txn_id();
+            ASSERT_GT(res.txn_id(), 0);
+        }
+        {
+            auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, 
partition_id);
+            CreateRowsetResponse res;
+            commit_rowset(meta_service.get(), tmp_rowset, res);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+        {
+            brpc::Controller cntl;
+            CommitTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_db_id(db_id);
+            req.set_txn_id(tmp_txn_id);
+            req.set_is_2pc(false);
+            req.set_enable_txn_lazy_commit(true);
+            CommitTxnResponse res;
+            
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+    }
+
+    auto* sp = SyncPoint::get_instance();
+    sp->set_call_back("get_rowset:meta_exceed_limit", [](auto&& args) {
+        auto* byte_size = try_any_cast<size_t*>(args[0]);
+        *byte_size = std::numeric_limits<int32_t>::max();
+        ++(*byte_size);
+    });
+
+    sp->enable_processing();
+    {
+        brpc::Controller cntl;
+        GetRowsetRequest req;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        auto* tablet_idx = req.mutable_idx();
+        tablet_idx->set_table_id(table_id);
+        tablet_idx->set_index_id(index_id);
+        tablet_idx->set_partition_id(partition_id);
+        tablet_idx->set_tablet_id(tablet_id);
+        req.set_start_version(0);
+        req.set_end_version(-1);
+        req.set_cumulative_compaction_cnt(0);
+        req.set_base_compaction_cnt(0);
+        req.set_cumulative_point(2);
+
+        GetRowsetResponse res;
+        
meta_service->get_rowset(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                 &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::PROTOBUF_PARSE_ERR);
+    }
+}
+
+} // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to