This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 159fc3fb191 [fix](load) fix miss writer in concurrency incremental 
open (#38605)
159fc3fb191 is described below

commit 159fc3fb19180e09103bbb776ad33767c80e13d9
Author: zclllhhjj <zhaochan...@selectdb.com>
AuthorDate: Fri Aug 2 15:50:10 2024 +0800

    [fix](load) fix miss writer in concurrency incremental open (#38605)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    fix bugs like:
    ```log
    W20240730 23:06:19.640558 108249 status.h:421] meet error status: 
[INTERNAL_ERROR]unknown tablet to append data, tablet=17285252124319
    
            0#  
doris::BaseTabletsChannel::_write_block_data(doris::PTabletWriterAddBlockRequest
 const&, long, std::unordered_map<long, std::vector<unsigned int, 
std::allocator<unsigned int> >, std::hash<long>, std::equal_to<long>, 
std::allocator<std::pair<long const, std::vector<unsigned int, 
std::allocator<unsigned int> > > > >&, doris::PTabletWriterAddBlockResult*) at 
/root/doris/be/src/common/status.h:0
            1#  
doris::CloudTabletsChannel::add_batch(doris::PTabletWriterAddBlockRequest 
const&, doris::PTabletWriterAddBlockResult*) at 
/root/doris/be/src/cloud/cloud_tablets_channel.cpp:0
            2#  
doris::LoadChannel::add_batch(doris::PTabletWriterAddBlockRequest const&, 
doris::PTabletWriterAddBlockResult*) at 
/root/doris/be/src/runtime/load_channel.cpp:179
            3#  
doris::LoadChannelMgr::add_batch(doris::PTabletWriterAddBlockRequest const&, 
doris::PTabletWriterAddBlockResult*) at 
/root/doris/be/src/runtime/load_channel_mgr.cpp:0
            4#  std::_Function_handler<void (), 
doris::PInternalService::tablet_writer_add_block(google::protobuf::RpcController*,
 doris::PTabletWriterAddBlockRequest const*, 
doris::PTabletWriterAddBlockResult*, 
google::protobuf::Closure*)::$_0>::_M_invoke(std::_Any_data const&) at 
/root/doris/be/src/common/status.h:488
            5#  doris::WorkThreadPool<false>::work_thread(int) at 
/root/doris/be/src/util/work_thread_pool.hpp:159
            6#  execute_native_thread_routine at 
/data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/unique_ptr.h:85
            7#  ?
            8#  clone
    ```
---
 be/src/cloud/cloud_tablets_channel.cpp | 23 ++++++++++++++---------
 be/src/runtime/tablets_channel.cpp     |  9 +++++++++
 be/src/runtime/tablets_channel.h       |  9 ++++-----
 3 files changed, 27 insertions(+), 14 deletions(-)

diff --git a/be/src/cloud/cloud_tablets_channel.cpp 
b/be/src/cloud/cloud_tablets_channel.cpp
index e063ab68116..85b8e3ea33a 100644
--- a/be/src/cloud/cloud_tablets_channel.cpp
+++ b/be/src/cloud/cloud_tablets_channel.cpp
@@ -59,15 +59,20 @@ Status CloudTabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& reques
     _build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
 
     std::unordered_set<int64_t> partition_ids;
-    for (auto& [tablet_id, _] : tablet_to_rowidxs) {
-        auto tablet_writer_it = _tablet_writers.find(tablet_id);
-        if (tablet_writer_it == _tablet_writers.end()) {
-            return Status::InternalError("unknown tablet to append data, 
tablet={}", tablet_id);
+    {
+        // add_batch may concurrency with inc_open but not under _lock.
+        // so need to protect it with _tablet_writers_lock.
+        std::lock_guard<SpinLock> l(_tablet_writers_lock);
+        for (auto& [tablet_id, _] : tablet_to_rowidxs) {
+            auto tablet_writer_it = _tablet_writers.find(tablet_id);
+            if (tablet_writer_it == _tablet_writers.end()) {
+                return Status::InternalError("unknown tablet to append data, 
tablet={}", tablet_id);
+            }
+            partition_ids.insert(tablet_writer_it->second->partition_id());
+        }
+        if (!partition_ids.empty()) {
+            RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids));
         }
-        partition_ids.insert(tablet_writer_it->second->partition_id());
-    }
-    if (!partition_ids.empty()) {
-        RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids));
     }
 
     return _write_block_data(request, cur_seq, tablet_to_rowidxs, response);
@@ -124,7 +129,7 @@ Status CloudTabletsChannel::close(LoadChannel* parent, 
const PTabletWriterAddBlo
     _state = kFinished;
 
     // All senders are closed
-    // 1. close all delta writers
+    // 1. close all delta writers. under _lock.
     std::vector<CloudDeltaWriter*> writers_to_commit;
     writers_to_commit.reserve(_tablet_writers.size());
     bool success = true;
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 11ddf27cfcd..a58ff59b6a8 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -215,6 +215,7 @@ Status BaseTabletsChannel::incremental_open(const 
PTabletWriterOpenRequest& para
     ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << 
print_id(params.id())
        << " incremental open delta writer: ";
 
+    // every change will hold _lock. this find in under _lock too. so no need 
_tablet_writers_lock again.
     for (const auto& tablet : params.tablets()) {
         if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) 
{
             continue;
@@ -237,6 +238,7 @@ Status BaseTabletsChannel::incremental_open(const 
PTabletWriterOpenRequest& para
 
         auto delta_writer = create_delta_writer(wrequest);
         {
+            // here we modify _tablet_writers. so need lock.
             std::lock_guard<SpinLock> l(_tablet_writers_lock);
             _tablet_writers.emplace(tablet.tablet_id(), 
std::move(delta_writer));
         }
@@ -291,6 +293,7 @@ Status TabletsChannel::close(LoadChannel* parent, const 
PTabletWriterAddBlockReq
     // All senders are closed
     // 1. close all delta writers
     std::set<DeltaWriter*> need_wait_writers;
+    // under _lock. no need _tablet_writers_lock again.
     for (auto&& [tablet_id, writer] : _tablet_writers) {
         if (_partition_ids.contains(writer->partition_id())) {
             auto st = writer->close();
@@ -492,6 +495,7 @@ Status BaseTabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& req
 #endif
 
     int tablet_cnt = 0;
+    // under _lock. no need _tablet_writers_lock again.
     for (const auto& tablet : request.tablets()) {
         if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) 
{
             continue;
@@ -574,6 +578,11 @@ Status BaseTabletsChannel::_write_block_data(
                                  std::function<Status(BaseDeltaWriter * 
writer)> write_func) {
         google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
                 response->mutable_tablet_errors();
+
+        // add_batch may concurrency with inc_open but not under _lock.
+        // so need to protect it with _tablet_writers_lock.
+        std::lock_guard<SpinLock> l(_tablet_writers_lock);
+
         auto tablet_writer_it = _tablet_writers.find(tablet_id);
         if (tablet_writer_it == _tablet_writers.end()) {
             return Status::InternalError("unknown tablet to append data, 
tablet={}", tablet_id);
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 48e98734158..87fbf9d06aa 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -143,11 +143,8 @@ protected:
     // id of this load channel
     TabletsChannelKey _key;
 
-    // make execute sequence
+    // protect _state change. open and close. when add_batch finished, lock to 
change _next_seqs also
     std::mutex _lock;
-
-    SpinLock _tablet_writers_lock;
-
     enum State {
         kInitialized,
         kOpened,
@@ -173,8 +170,10 @@ protected:
     // currently it's OK.
     Status _close_status;
 
-    // tablet_id -> TabletChannel
+    // tablet_id -> TabletChannel. it will only be changed in open() or 
inc_open()
     std::unordered_map<int64_t, std::unique_ptr<BaseDeltaWriter>> 
_tablet_writers;
+    // protect _tablet_writers
+    SpinLock _tablet_writers_lock;
     // broken tablet ids.
     // If a tablet write fails, it's id will be added to this set.
     // So that following batch will not handle this tablet anymore.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to