This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 960e80cc1b1 [refactor](move-memtable) remove phmap and use shared ptr 
in delta writer v2 (#30949)
960e80cc1b1 is described below

commit 960e80cc1b1e0ed8259db2d286d1b144042980c4
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Wed Feb 7 18:04:48 2024 +0800

    [refactor](move-memtable) remove phmap and use shared ptr in delta writer 
v2 (#30949)
    
    * [refactor](move-memtable) remove phmap and use shared ptr in delta writer 
v2 pool
    
    * ENABLE_FACTORY_CREATOR DeltaWriterV2
---
 be/src/olap/delta_writer_v2.h                |  2 ++
 be/src/vec/sink/delta_writer_v2_pool.cpp     | 41 +++++++++++++---------------
 be/src/vec/sink/delta_writer_v2_pool.h       | 15 ++++------
 be/src/vec/sink/writer/vtablet_writer_v2.cpp |  4 +--
 4 files changed, 28 insertions(+), 34 deletions(-)

diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index c20c7f6745e..d9a63593fa8 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -62,6 +62,8 @@ class Block;
 // Writer for a particular (load, index, tablet).
 // This class is NOT thread-safe, external synchronization is required.
 class DeltaWriterV2 {
+    ENABLE_FACTORY_CREATOR(DeltaWriterV2);
+
 public:
     DeltaWriterV2(WriteRequest* req, const 
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
                   RuntimeState* state);
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp 
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index df9c0fc1c8c..cfb2b5294c7 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -30,12 +30,15 @@ DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int 
num_use, DeltaWriterV2P
 
 DeltaWriterV2Map::~DeltaWriterV2Map() = default;
 
-DeltaWriterV2* DeltaWriterV2Map::get_or_create(
+std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create(
         int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> 
creator) {
-    _map.lazy_emplace(tablet_id, [&](const 
TabletToDeltaWriterV2Map::constructor& ctor) {
-        ctor(tablet_id, creator());
-    });
-    return _map.at(tablet_id).get();
+    std::lock_guard lock(_mutex);
+    if (_map.contains(tablet_id)) {
+        return _map.at(tablet_id);
+    }
+    std::shared_ptr<DeltaWriterV2> writer = creator();
+    _map[tablet_id] = writer;
+    return writer;
 }
 
 Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
@@ -48,22 +51,15 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
         _pool->erase(_load_id);
     }
     LOG(INFO) << "closing DeltaWriterV2Map, load_id=" << _load_id;
-    Status status = Status::OK();
-    _map.for_each([&status](auto& entry) {
-        if (status.ok()) {
-            status = entry.second->close();
-        }
-    });
-    if (!status.ok()) {
-        return status;
+    std::lock_guard lock(_mutex);
+    for (auto& [_, writer] : _map) {
+        RETURN_IF_ERROR(writer->close());
     }
     LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
-    _map.for_each([&status, profile](auto& entry) {
-        if (status.ok()) {
-            status = entry.second->close_wait(profile);
-        }
-    });
-    return status;
+    for (auto& [_, writer] : _map) {
+        RETURN_IF_ERROR(writer->close_wait(profile));
+    }
+    return Status::OK();
 }
 
 void DeltaWriterV2Map::cancel(Status status) {
@@ -72,9 +68,10 @@ void DeltaWriterV2Map::cancel(Status status) {
     if (num_use == 0 && _pool != nullptr) {
         _pool->erase(_load_id);
     }
-    _map.for_each([&status](auto& entry) {
-        static_cast<void>(entry.second->cancel_with_status(status));
-    });
+    std::lock_guard lock(_mutex);
+    for (auto& [_, writer] : _map) {
+        static_cast<void>(writer->cancel_with_status(status));
+    }
 }
 
 DeltaWriterV2Pool::DeltaWriterV2Pool() = default;
diff --git a/be/src/vec/sink/delta_writer_v2_pool.h 
b/be/src/vec/sink/delta_writer_v2_pool.h
index b2e267bcfd7..912b9216e9f 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.h
+++ b/be/src/vec/sink/delta_writer_v2_pool.h
@@ -26,7 +26,6 @@
 #include <gen_cpp/types.pb.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
-#include <parallel_hashmap/phmap.h>
 #include <stddef.h>
 #include <stdint.h>
 
@@ -67,8 +66,8 @@ public:
     ~DeltaWriterV2Map();
 
     // get or create delta writer for the given tablet, memory is managed by 
DeltaWriterV2Map
-    DeltaWriterV2* get_or_create(int64_t tablet_id,
-                                 
std::function<std::unique_ptr<DeltaWriterV2>()> creator);
+    std::shared_ptr<DeltaWriterV2> get_or_create(
+            int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> 
creator);
 
     // close all delta writers in this DeltaWriterV2Map if there is no other 
users
     Status close(RuntimeProfile* profile = nullptr);
@@ -79,13 +78,9 @@ public:
     size_t size() const { return _map.size(); }
 
 private:
-    using TabletToDeltaWriterV2Map = phmap::parallel_flat_hash_map<
-            int64_t, std::unique_ptr<DeltaWriterV2>, std::hash<int64_t>, 
std::equal_to<int64_t>,
-            std::allocator<phmap::Pair<const int64_t, 
std::unique_ptr<DeltaWriterV2>>>, 4,
-            std::mutex>;
-
     UniqueId _load_id;
-    TabletToDeltaWriterV2Map _map;
+    std::mutex _mutex;
+    std::unordered_map<int64_t, std::shared_ptr<DeltaWriterV2>> _map;
     std::atomic<int> _use_cnt;
     DeltaWriterV2Pool* _pool = nullptr;
 };
@@ -111,4 +106,4 @@ private:
 };
 
 } // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index af278228eb0..4cbca9cd8af 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -420,7 +420,7 @@ Status VTabletWriterV2::write(Block& input_block) {
 
 Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> 
block, int64_t tablet_id,
                                         const Rows& rows, const Streams& 
streams) {
-    DeltaWriterV2* delta_writer = 
_delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
+    auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, 
[&]() {
         WriteRequest req {
                 .tablet_id = tablet_id,
                 .txn_id = _txn_id,
@@ -446,7 +446,7 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
                          << " not found in schema, load_id=" << 
print_id(_load_id);
             return std::unique_ptr<DeltaWriterV2>(nullptr);
         }
-        return std::make_unique<DeltaWriterV2>(&req, streams, _state);
+        return DeltaWriterV2::create_unique(&req, streams, _state);
     });
     if (delta_writer == nullptr) {
         LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to