This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 159fc3fb191 [fix](load) fix miss writer in concurrency incremental open (#38605) 159fc3fb191 is described below commit 159fc3fb19180e09103bbb776ad33767c80e13d9 Author: zclllhhjj <zhaochan...@selectdb.com> AuthorDate: Fri Aug 2 15:50:10 2024 +0800 [fix](load) fix miss writer in concurrency incremental open (#38605) ## Proposed changes Issue Number: close #xxx fix bugs like: ```log W20240730 23:06:19.640558 108249 status.h:421] meet error status: [INTERNAL_ERROR]unknown tablet to append data, tablet=17285252124319 0# doris::BaseTabletsChannel::_write_block_data(doris::PTabletWriterAddBlockRequest const&, long, std::unordered_map<long, std::vector<unsigned int, std::allocator<unsigned int> >, std::hash<long>, std::equal_to<long>, std::allocator<std::pair<long const, std::vector<unsigned int, std::allocator<unsigned int> > > > >&, doris::PTabletWriterAddBlockResult*) at /root/doris/be/src/common/status.h:0 1# doris::CloudTabletsChannel::add_batch(doris::PTabletWriterAddBlockRequest const&, doris::PTabletWriterAddBlockResult*) at /root/doris/be/src/cloud/cloud_tablets_channel.cpp:0 2# doris::LoadChannel::add_batch(doris::PTabletWriterAddBlockRequest const&, doris::PTabletWriterAddBlockResult*) at /root/doris/be/src/runtime/load_channel.cpp:179 3# doris::LoadChannelMgr::add_batch(doris::PTabletWriterAddBlockRequest const&, doris::PTabletWriterAddBlockResult*) at /root/doris/be/src/runtime/load_channel_mgr.cpp:0 4# std::_Function_handler<void (), doris::PInternalService::tablet_writer_add_block(google::protobuf::RpcController*, doris::PTabletWriterAddBlockRequest const*, doris::PTabletWriterAddBlockResult*, google::protobuf::Closure*)::$_0>::_M_invoke(std::_Any_data const&) at /root/doris/be/src/common/status.h:488 5# doris::WorkThreadPool<false>::work_thread(int) at /root/doris/be/src/util/work_thread_pool.hpp:159 6# execute_native_thread_routine at /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/unique_ptr.h:85 7# ? 8# clone ``` --- be/src/cloud/cloud_tablets_channel.cpp | 23 ++++++++++++++--------- be/src/runtime/tablets_channel.cpp | 9 +++++++++ be/src/runtime/tablets_channel.h | 9 ++++----- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/be/src/cloud/cloud_tablets_channel.cpp b/be/src/cloud/cloud_tablets_channel.cpp index e063ab68116..85b8e3ea33a 100644 --- a/be/src/cloud/cloud_tablets_channel.cpp +++ b/be/src/cloud/cloud_tablets_channel.cpp @@ -59,15 +59,20 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques _build_tablet_to_rowidxs(request, &tablet_to_rowidxs); std::unordered_set<int64_t> partition_ids; - for (auto& [tablet_id, _] : tablet_to_rowidxs) { - auto tablet_writer_it = _tablet_writers.find(tablet_id); - if (tablet_writer_it == _tablet_writers.end()) { - return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id); + { + // add_batch may concurrency with inc_open but not under _lock. + // so need to protect it with _tablet_writers_lock. + std::lock_guard<SpinLock> l(_tablet_writers_lock); + for (auto& [tablet_id, _] : tablet_to_rowidxs) { + auto tablet_writer_it = _tablet_writers.find(tablet_id); + if (tablet_writer_it == _tablet_writers.end()) { + return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id); + } + partition_ids.insert(tablet_writer_it->second->partition_id()); + } + if (!partition_ids.empty()) { + RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids)); } - partition_ids.insert(tablet_writer_it->second->partition_id()); - } - if (!partition_ids.empty()) { - RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids)); } return _write_block_data(request, cur_seq, tablet_to_rowidxs, response); @@ -124,7 +129,7 @@ Status CloudTabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlo _state = kFinished; // All senders are closed - // 1. close all delta writers + // 1. close all delta writers. under _lock. std::vector<CloudDeltaWriter*> writers_to_commit; writers_to_commit.reserve(_tablet_writers.size()); bool success = true; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 11ddf27cfcd..a58ff59b6a8 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -215,6 +215,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id()) << " incremental open delta writer: "; + // every change will hold _lock. this find in under _lock too. so no need _tablet_writers_lock again. for (const auto& tablet : params.tablets()) { if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) { continue; @@ -237,6 +238,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para auto delta_writer = create_delta_writer(wrequest); { + // here we modify _tablet_writers. so need lock. std::lock_guard<SpinLock> l(_tablet_writers_lock); _tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer)); } @@ -291,6 +293,7 @@ Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockReq // All senders are closed // 1. close all delta writers std::set<DeltaWriter*> need_wait_writers; + // under _lock. no need _tablet_writers_lock again. for (auto&& [tablet_id, writer] : _tablet_writers) { if (_partition_ids.contains(writer->partition_id())) { auto st = writer->close(); @@ -492,6 +495,7 @@ Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& req #endif int tablet_cnt = 0; + // under _lock. no need _tablet_writers_lock again. for (const auto& tablet : request.tablets()) { if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) { continue; @@ -574,6 +578,11 @@ Status BaseTabletsChannel::_write_block_data( std::function<Status(BaseDeltaWriter * writer)> write_func) { google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors = response->mutable_tablet_errors(); + + // add_batch may concurrency with inc_open but not under _lock. + // so need to protect it with _tablet_writers_lock. + std::lock_guard<SpinLock> l(_tablet_writers_lock); + auto tablet_writer_it = _tablet_writers.find(tablet_id); if (tablet_writer_it == _tablet_writers.end()) { return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id); diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 48e98734158..87fbf9d06aa 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -143,11 +143,8 @@ protected: // id of this load channel TabletsChannelKey _key; - // make execute sequence + // protect _state change. open and close. when add_batch finished, lock to change _next_seqs also std::mutex _lock; - - SpinLock _tablet_writers_lock; - enum State { kInitialized, kOpened, @@ -173,8 +170,10 @@ protected: // currently it's OK. Status _close_status; - // tablet_id -> TabletChannel + // tablet_id -> TabletChannel. it will only be changed in open() or inc_open() std::unordered_map<int64_t, std::unique_ptr<BaseDeltaWriter>> _tablet_writers; + // protect _tablet_writers + SpinLock _tablet_writers_lock; // broken tablet ids. // If a tablet write fails, it's id will be added to this set. // So that following batch will not handle this tablet anymore. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org