This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f7a340a2dfe8250f4cced44eac9faa25adc5de84 Author: Kaijie Chen <c...@apache.org> AuthorDate: Tue Jan 16 20:08:26 2024 +0800 [improve](move-memtable) add cancel method to load stream stub (#29994) --- be/src/vec/sink/load_stream_stub.cpp | 25 +++++++++++++++++++++++-- be/src/vec/sink/load_stream_stub.h | 15 +++++++++++++++ be/src/vec/sink/load_stream_stub_pool.cpp | 2 +- be/src/vec/sink/load_stream_stub_pool.h | 4 ++-- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 7 +++++-- be/test/vec/exec/load_stream_stub_pool_test.cpp | 7 +++---- 6 files changed, 49 insertions(+), 11 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 40ce75d24e6..347acb1b6f6 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -294,6 +294,7 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i watch.start(); while (!_tablet_schema_for_index->contains(index_id) && watch.elapsed_time() / 1000 / 1000 < timeout_ms) { + RETURN_IF_ERROR(_check_cancel()); static_cast<void>(wait_for_new_schema(100)); } @@ -308,8 +309,12 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) { while (true) { }; }); - if (!_is_init.load() || _is_closed.load()) { - return Status::OK(); + if (!_is_init.load()) { + return Status::InternalError("stream {} is not opened, load_id={}", _stream_id, + print_id(_load_id)); + } + if (_is_closed.load()) { + return _check_cancel(); } if (timeout_ms <= 0) { timeout_ms = config::close_load_stream_timeout_ms; @@ -324,6 +329,7 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) { print_id(_load_id), _dst_id, _stream_id); } } + RETURN_IF_ERROR(_check_cancel()); if (!_is_eos.load()) { return Status::InternalError( "stream closed without eos, load_id={}, dst_id={}, stream_id={}", @@ -332,6 +338,20 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) { return Status::OK(); } +void LoadStreamStub::cancel(Status reason) { + LOG(WARNING) << *this << " is cancelled because of " << reason; + { + std::lock_guard<bthread::Mutex> lock(_cancel_mutex); + _cancel_reason = reason; + _is_cancelled.store(true); + } + { + std::lock_guard<bthread::Mutex> lock(_close_mutex); + _is_closed.store(true); + _close_cv.notify_all(); + } +} + Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const Slice> data) { butil::IOBuf buf; size_t header_len = header.ByteSizeLong(); @@ -365,6 +385,7 @@ Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) { Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) { for (;;) { + RETURN_IF_ERROR(_check_cancel()); int ret; { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 6aae778dc93..c91f1016d35 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -153,6 +153,9 @@ public: // if timeout_ms <= 0, will fallback to config::close_load_stream_timeout_ms Status close_wait(int64_t timeout_ms = 0); + // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled + void cancel(Status reason); + Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t timeout_ms = 60000); @@ -197,9 +200,19 @@ private: Status _send_with_buffer(butil::IOBuf& buf, bool sync = false); Status _send_with_retry(butil::IOBuf& buf); + Status _check_cancel() { + if (!_is_cancelled.load()) { + return Status::OK(); + } + std::lock_guard<bthread::Mutex> lock(_cancel_mutex); + return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id), + _cancel_reason.to_string_no_stack()); + } + protected: std::atomic<bool> _is_init; std::atomic<bool> _is_closed; + std::atomic<bool> _is_cancelled; std::atomic<bool> _is_eos; std::atomic<int> _use_cnt; @@ -207,9 +220,11 @@ protected: brpc::StreamId _stream_id; int64_t _src_id = -1; // source backend_id int64_t _dst_id = -1; // destination backend_id + Status _cancel_reason; bthread::Mutex _open_mutex; bthread::Mutex _close_mutex; + bthread::Mutex _cancel_mutex; bthread::ConditionVariable _close_cv; std::mutex _tablets_to_commit_mutex; diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp index 1baa903f2ee..d76402b57d5 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -26,7 +26,7 @@ class TExpr; LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool) : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {} -void LoadStreams::release(Status status) { +void LoadStreams::release() { int num_use = --_use_cnt; DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; }); if (num_use == 0) { diff --git a/be/src/vec/sink/load_stream_stub_pool.h b/be/src/vec/sink/load_stream_stub_pool.h index b34383b25f9..662fc5bc1a1 100644 --- a/be/src/vec/sink/load_stream_stub_pool.h +++ b/be/src/vec/sink/load_stream_stub_pool.h @@ -76,7 +76,7 @@ class LoadStreams { public: LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool); - void release(Status status); + void release(); Streams& streams() { return _streams; } @@ -116,4 +116,4 @@ private: std::unordered_map<std::pair<UniqueId, int64_t>, std::shared_ptr<LoadStreams>> _pool; }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 02b40549253..e23fe761ecf 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -457,7 +457,10 @@ Status VTabletWriterV2::_cancel(Status status) { _delta_writer_for_tablet.reset(); } for (const auto& [_, streams] : _streams_for_node) { - streams->release(status); + for (const auto& stream : streams->streams()) { + stream->cancel(status); + } + streams->release(); } return Status::OK(); } @@ -514,7 +517,7 @@ Status VTabletWriterV2::close(Status exec_status) { // defer stream release to prevent memory leak Defer defer([&] { for (const auto& [_, streams] : _streams_for_node) { - streams->release(status); + streams->release(); } _streams_for_node.clear(); }); diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp b/be/test/vec/exec/load_stream_stub_pool_test.cpp index bea5443b4ff..24da3bb6999 100644 --- a/be/test/vec/exec/load_stream_stub_pool_test.cpp +++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp @@ -32,7 +32,6 @@ TEST_F(LoadStreamStubPoolTest, test) { LoadStreamStubPool pool; int64_t src_id = 100; PUniqueId load_id; - Status st = Status::OK(); load_id.set_hi(1); load_id.set_hi(2); auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1); @@ -42,9 +41,9 @@ TEST_F(LoadStreamStubPoolTest, test) { EXPECT_EQ(1, pool.templates_size()); EXPECT_EQ(streams1, streams3); EXPECT_NE(streams1, streams2); - streams1->release(st); - streams2->release(st); - streams3->release(st); + streams1->release(); + streams2->release(); + streams3->release(); EXPECT_EQ(0, pool.size()); EXPECT_EQ(0, pool.templates_size()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org