This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 960e80cc1b1 [refactor](move-memtable) remove phmap and use shared ptr in delta writer v2 (#30949) 960e80cc1b1 is described below commit 960e80cc1b1e0ed8259db2d286d1b144042980c4 Author: Kaijie Chen <c...@apache.org> AuthorDate: Wed Feb 7 18:04:48 2024 +0800 [refactor](move-memtable) remove phmap and use shared ptr in delta writer v2 (#30949) * [refactor](move-memtable) remove phmap and use shared ptr in delta writer v2 pool * ENABLE_FACTORY_CREATOR DeltaWriterV2 --- be/src/olap/delta_writer_v2.h | 2 ++ be/src/vec/sink/delta_writer_v2_pool.cpp | 41 +++++++++++++--------------- be/src/vec/sink/delta_writer_v2_pool.h | 15 ++++------ be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 +-- 4 files changed, 28 insertions(+), 34 deletions(-) diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index c20c7f6745e..d9a63593fa8 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -62,6 +62,8 @@ class Block; // Writer for a particular (load, index, tablet). // This class is NOT thread-safe, external synchronization is required. class DeltaWriterV2 { + ENABLE_FACTORY_CREATOR(DeltaWriterV2); + public: DeltaWriterV2(WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams, RuntimeState* state); diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp b/be/src/vec/sink/delta_writer_v2_pool.cpp index df9c0fc1c8c..cfb2b5294c7 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.cpp +++ b/be/src/vec/sink/delta_writer_v2_pool.cpp @@ -30,12 +30,15 @@ DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int num_use, DeltaWriterV2P DeltaWriterV2Map::~DeltaWriterV2Map() = default; -DeltaWriterV2* DeltaWriterV2Map::get_or_create( +std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create( int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> creator) { - _map.lazy_emplace(tablet_id, [&](const TabletToDeltaWriterV2Map::constructor& ctor) { - ctor(tablet_id, creator()); - }); - return _map.at(tablet_id).get(); + std::lock_guard lock(_mutex); + if (_map.contains(tablet_id)) { + return _map.at(tablet_id); + } + std::shared_ptr<DeltaWriterV2> writer = creator(); + _map[tablet_id] = writer; + return writer; } Status DeltaWriterV2Map::close(RuntimeProfile* profile) { @@ -48,22 +51,15 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) { _pool->erase(_load_id); } LOG(INFO) << "closing DeltaWriterV2Map, load_id=" << _load_id; - Status status = Status::OK(); - _map.for_each([&status](auto& entry) { - if (status.ok()) { - status = entry.second->close(); - } - }); - if (!status.ok()) { - return status; + std::lock_guard lock(_mutex); + for (auto& [_, writer] : _map) { + RETURN_IF_ERROR(writer->close()); } LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id; - _map.for_each([&status, profile](auto& entry) { - if (status.ok()) { - status = entry.second->close_wait(profile); - } - }); - return status; + for (auto& [_, writer] : _map) { + RETURN_IF_ERROR(writer->close_wait(profile)); + } + return Status::OK(); } void DeltaWriterV2Map::cancel(Status status) { @@ -72,9 +68,10 @@ void DeltaWriterV2Map::cancel(Status status) { if (num_use == 0 && _pool != nullptr) { _pool->erase(_load_id); } - _map.for_each([&status](auto& entry) { - static_cast<void>(entry.second->cancel_with_status(status)); - }); + std::lock_guard lock(_mutex); + for (auto& [_, writer] : _map) { + static_cast<void>(writer->cancel_with_status(status)); + } } DeltaWriterV2Pool::DeltaWriterV2Pool() = default; diff --git a/be/src/vec/sink/delta_writer_v2_pool.h b/be/src/vec/sink/delta_writer_v2_pool.h index b2e267bcfd7..912b9216e9f 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.h +++ b/be/src/vec/sink/delta_writer_v2_pool.h @@ -26,7 +26,6 @@ #include <gen_cpp/types.pb.h> #include <glog/logging.h> #include <google/protobuf/stubs/callback.h> -#include <parallel_hashmap/phmap.h> #include <stddef.h> #include <stdint.h> @@ -67,8 +66,8 @@ public: ~DeltaWriterV2Map(); // get or create delta writer for the given tablet, memory is managed by DeltaWriterV2Map - DeltaWriterV2* get_or_create(int64_t tablet_id, - std::function<std::unique_ptr<DeltaWriterV2>()> creator); + std::shared_ptr<DeltaWriterV2> get_or_create( + int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> creator); // close all delta writers in this DeltaWriterV2Map if there is no other users Status close(RuntimeProfile* profile = nullptr); @@ -79,13 +78,9 @@ public: size_t size() const { return _map.size(); } private: - using TabletToDeltaWriterV2Map = phmap::parallel_flat_hash_map< - int64_t, std::unique_ptr<DeltaWriterV2>, std::hash<int64_t>, std::equal_to<int64_t>, - std::allocator<phmap::Pair<const int64_t, std::unique_ptr<DeltaWriterV2>>>, 4, - std::mutex>; - UniqueId _load_id; - TabletToDeltaWriterV2Map _map; + std::mutex _mutex; + std::unordered_map<int64_t, std::shared_ptr<DeltaWriterV2>> _map; std::atomic<int> _use_cnt; DeltaWriterV2Pool* _pool = nullptr; }; @@ -111,4 +106,4 @@ private: }; } // namespace vectorized -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index af278228eb0..4cbca9cd8af 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -420,7 +420,7 @@ Status VTabletWriterV2::write(Block& input_block) { Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id, const Rows& rows, const Streams& streams) { - DeltaWriterV2* delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() { + auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() { WriteRequest req { .tablet_id = tablet_id, .txn_id = _txn_id, @@ -446,7 +446,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block << " not found in schema, load_id=" << print_id(_load_id); return std::unique_ptr<DeltaWriterV2>(nullptr); } - return std::make_unique<DeltaWriterV2>(&req, streams, _state); + return DeltaWriterV2::create_unique(&req, streams, _state); }); if (delta_writer == nullptr) { LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org