This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8f320944a83 [fix](move-memtable) fix DeltaWriterV2 profile use-after-free (#26110) 8f320944a83 is described below commit 8f320944a8301619a772458c0c7761dc7c307a4a Author: Kaijie Chen <c...@apache.org> AuthorDate: Tue Oct 31 13:52:18 2023 +0800 [fix](move-memtable) fix DeltaWriterV2 profile use-after-free (#26110) The sink who creates the delta writer may be closed while other sinks still using this delta writer. The parent profile is deconstructed and when the last sink trying to update the profile, it will meet use-after-free. To address this issue, we record the profile number in delta writer, and the last sink who close the delta writer will create and update the profile. --- be/src/olap/delta_writer_v2.cpp | 34 ++++++++++++++------------ be/src/olap/delta_writer_v2.h | 13 +++++----- be/src/vec/sink/delta_writer_v2_pool.cpp | 7 +++--- be/src/vec/sink/delta_writer_v2_pool.h | 3 ++- be/src/vec/sink/vtablet_sink_v2.cpp | 4 +-- be/test/vec/exec/delta_writer_v2_pool_test.cpp | 9 +++---- 6 files changed, 35 insertions(+), 35 deletions(-) diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index ef3ff23f9d8..8e13f8f3050 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -66,26 +66,25 @@ using namespace ErrorCode; Status DeltaWriterV2::open(WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams, - DeltaWriterV2** writer, RuntimeProfile* profile) { - *writer = new DeltaWriterV2(req, streams, StorageEngine::instance(), profile); + DeltaWriterV2** writer) { + *writer = new DeltaWriterV2(req, streams, StorageEngine::instance()); return Status::OK(); } DeltaWriterV2::DeltaWriterV2(WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams, - StorageEngine* storage_engine, RuntimeProfile* profile) + StorageEngine* storage_engine) : _req(*req), _tablet_schema(new TabletSchema), - _profile(profile->create_child(fmt::format("DeltaWriterV2 {}", _req.tablet_id), true, - true)), _memtable_writer(new MemTableWriter(*req)), - _streams(streams) { - _init_profile(profile); -} - -void DeltaWriterV2::_init_profile(RuntimeProfile* profile) { - _write_memtable_timer = ADD_TIMER(_profile, "WriteMemTableTime"); - _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); + _streams(streams) {} + +void DeltaWriterV2::_update_profile(RuntimeProfile* profile) { + auto child = profile->create_child(fmt::format("DeltaWriterV2 {}", _req.tablet_id), true, true); + auto write_memtable_timer = ADD_TIMER(child, "WriteMemTableTime"); + auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime"); + COUNTER_SET(write_memtable_timer, _write_memtable_time); + COUNTER_SET(close_wait_timer, _close_wait_time); } DeltaWriterV2::~DeltaWriterV2() { @@ -149,7 +148,7 @@ Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<in if (!_is_init && !_is_cancelled) { RETURN_IF_ERROR(init()); } - SCOPED_TIMER(_write_memtable_timer); + SCOPED_RAW_TIMER(&_write_memtable_time); return _memtable_writer->write(block, row_idxs, is_append); } @@ -168,13 +167,16 @@ Status DeltaWriterV2::close() { return _memtable_writer->close(); } -Status DeltaWriterV2::close_wait() { - SCOPED_TIMER(_close_wait_timer); +Status DeltaWriterV2::close_wait(RuntimeProfile* profile) { + SCOPED_RAW_TIMER(&_close_wait_time); std::lock_guard<std::mutex> l(_lock); DCHECK(_is_init) << "delta writer is supposed be to initialized before close_wait() being called"; - RETURN_IF_ERROR(_memtable_writer->close_wait(_profile)); + if (profile != nullptr) { + _update_profile(profile); + } + RETURN_IF_ERROR(_memtable_writer->close_wait(profile)); _delta_written_success = true; return Status::OK(); diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index b2b1f5f1c19..61ad63d0a9b 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -65,7 +65,7 @@ class DeltaWriterV2 { public: static Status open(WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams, - DeltaWriterV2** writer, RuntimeProfile* profile); + DeltaWriterV2** writer); ~DeltaWriterV2(); @@ -80,7 +80,7 @@ public: Status close(); // wait for all memtables to be flushed. // mem_consumption() should be 0 after this function returns. - Status close_wait(); + Status close_wait(RuntimeProfile* profile = nullptr); // abandon current memtable and wait for all pending-flushing memtables to be destructed. // mem_consumption() should be 0 after this function returns. @@ -99,13 +99,13 @@ public: private: DeltaWriterV2(WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams, - StorageEngine* storage_engine, RuntimeProfile* profile); + StorageEngine* storage_engine); void _build_current_tablet_schema(int64_t index_id, const OlapTableSchemaParam* table_schema_param, const TabletSchema& ori_tablet_schema); - void _init_profile(RuntimeProfile* profile); + void _update_profile(RuntimeProfile* profile); bool _is_init = false; bool _is_cancelled = false; @@ -119,9 +119,8 @@ private: // total rows num written by DeltaWriterV2 int64_t _total_received_rows = 0; - RuntimeProfile* _profile = nullptr; - RuntimeProfile::Counter* _write_memtable_timer = nullptr; - RuntimeProfile::Counter* _close_wait_timer = nullptr; + int64_t _write_memtable_time = 0; + int64_t _close_wait_time = 0; std::shared_ptr<MemTableWriter> _memtable_writer; MonotonicStopWatch _lock_watch; diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp b/be/src/vec/sink/delta_writer_v2_pool.cpp index c2b10e65932..e714e6b1bbd 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.cpp +++ b/be/src/vec/sink/delta_writer_v2_pool.cpp @@ -18,6 +18,7 @@ #include "vec/sink/delta_writer_v2_pool.h" #include "olap/delta_writer_v2.h" +#include "util/runtime_profile.h" namespace doris { class TExpr; @@ -36,7 +37,7 @@ DeltaWriterV2* DeltaWriterV2Map::get_or_create(int64_t tablet_id, return _map.at(tablet_id).get(); } -Status DeltaWriterV2Map::close() { +Status DeltaWriterV2Map::close(RuntimeProfile* profile) { if (--_use_cnt > 0) { return Status::OK(); } @@ -49,9 +50,9 @@ Status DeltaWriterV2Map::close() { if (!status.ok()) { return status; } - _map.for_each([&status](auto& entry) { + _map.for_each([&status, profile](auto& entry) { if (status.ok()) { - status = entry.second->close_wait(); + status = entry.second->close_wait(profile); } }); return status; diff --git a/be/src/vec/sink/delta_writer_v2_pool.h b/be/src/vec/sink/delta_writer_v2_pool.h index 7654c2ca7ed..414d34da671 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.h +++ b/be/src/vec/sink/delta_writer_v2_pool.h @@ -54,6 +54,7 @@ namespace doris { class DeltaWriterV2; +class RuntimeProfile; namespace vectorized { @@ -69,7 +70,7 @@ public: DeltaWriterV2* get_or_create(int64_t tablet_id, std::function<DeltaWriterV2*()> creator); // close all delta writers in this DeltaWriterV2Map if there is no other users - Status close(); + Status close(RuntimeProfile* profile); // cancel all delta writers in this DeltaWriterV2Map void cancel(Status status); diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index 7b9bd29176f..86c25722b58 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -337,7 +337,7 @@ Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc } } DeltaWriterV2* delta_writer = nullptr; - static_cast<void>(DeltaWriterV2::open(&req, streams, &delta_writer, _profile)); + static_cast<void>(DeltaWriterV2::open(&req, streams, &delta_writer)); return delta_writer; }); { @@ -381,7 +381,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { { SCOPED_TIMER(_close_writer_timer); // close all delta writers if this is the last user - static_cast<void>(_delta_writer_for_tablet->close()); + static_cast<void>(_delta_writer_for_tablet->close(_profile)); _delta_writer_for_tablet.reset(); } diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp b/be/test/vec/exec/delta_writer_v2_pool_test.cpp index 2cce1dd72f8..e68555ed68e 100644 --- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp +++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp @@ -57,21 +57,18 @@ TEST_F(DeltaWriterV2PoolTest, test_map) { EXPECT_EQ(1, pool.size()); WriteRequest req; auto writer = map->get_or_create(100, [&req]() { - RuntimeProfile profile("test"); DeltaWriterV2* writer; - static_cast<void>(DeltaWriterV2::open(&req, {}, &writer, &profile)); + static_cast<void>(DeltaWriterV2::open(&req, {}, &writer)); return writer; }); auto writer2 = map->get_or_create(101, [&req]() { - RuntimeProfile profile("test"); DeltaWriterV2* writer; - static_cast<void>(DeltaWriterV2::open(&req, {}, &writer, &profile)); + static_cast<void>(DeltaWriterV2::open(&req, {}, &writer)); return writer; }); auto writer3 = map->get_or_create(100, [&req]() { - RuntimeProfile profile("test"); DeltaWriterV2* writer; - static_cast<void>(DeltaWriterV2::open(&req, {}, &writer, &profile)); + static_cast<void>(DeltaWriterV2::open(&req, {}, &writer)); return writer; }); EXPECT_EQ(2, map->size()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org