This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 8fa0710cb35 [branch-2.1](load) fix miss writer in concurrency incremental open (#38605) (#38793) 8fa0710cb35 is described below commit 8fa0710cb35d925d7b09c4bdee013d76d0b277f6 Author: zclllhhjj <zhaochan...@selectdb.com> AuthorDate: Mon Aug 5 08:56:23 2024 +0800 [branch-2.1](load) fix miss writer in concurrency incremental open (#38605) (#38793) pick https://github.com/apache/doris/pull/38605 --- be/src/runtime/tablets_channel.cpp | 8 ++++++++ be/src/runtime/tablets_channel.h | 9 ++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index adaced0b76e..266d4d45f18 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -215,6 +215,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id()) << " incremental open delta writer: "; + // every change will hold _lock. this find in under _lock too. so no need _tablet_writers_lock again. for (const auto& tablet : params.tablets()) { if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) { continue; @@ -238,6 +239,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para _profile, _load_id); ss << "[" << tablet.tablet_id() << "]"; { + // here we modify _tablet_writers. so need lock. std::lock_guard<SpinLock> l(_tablet_writers_lock); _tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer)); } @@ -479,6 +481,7 @@ Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& req #endif int tablet_cnt = 0; + // under _lock. no need _tablet_writers_lock again. for (const auto& tablet : request.tablets()) { if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) { continue; @@ -578,6 +581,11 @@ Status BaseTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request std::function<Status(BaseDeltaWriter * writer)> write_func) { google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors = response->mutable_tablet_errors(); + + // add_batch may concurrency with inc_open but not under _lock. + // so need to protect it with _tablet_writers_lock. + std::lock_guard<SpinLock> l(_tablet_writers_lock); + auto tablet_writer_it = _tablet_writers.find(tablet_id); if (tablet_writer_it == _tablet_writers.end()) { return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id); diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 54438be7690..8ed4c7ab1aa 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -136,11 +136,8 @@ protected: // id of this load channel TabletsChannelKey _key; - // make execute sequence + // protect _state change. open and close. when add_batch finished, lock to change _next_seqs also std::mutex _lock; - - SpinLock _tablet_writers_lock; - enum State { kInitialized, kOpened, @@ -166,8 +163,10 @@ protected: // currently it's OK. Status _close_status; - // tablet_id -> TabletChannel + // tablet_id -> TabletChannel. it will only be changed in open() or inc_open() std::unordered_map<int64_t, std::unique_ptr<BaseDeltaWriter>> _tablet_writers; + // protect _tablet_writers + SpinLock _tablet_writers_lock; // broken tablet ids. // If a tablet write fails, it's id will be added to this set. // So that following batch will not handle this tablet anymore. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org