This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a9d83b83dff [refactor](sink) refactor vtablet writer v2 sequential
close to parallel close (#52639)
a9d83b83dff is described below
commit a9d83b83dff9f7cc5b7fedc9fb524d7774d2cf45
Author: hui lai <[email protected]>
AuthorDate: Sat Jul 5 23:13:55 2025 +0800
[refactor](sink) refactor vtablet writer v2 sequential close to parallel
close (#52639)
### What problem does this PR solve?
1. re-pick: https://github.com/apache/doris/pull/51989
2. fix can not graceful shutdown.
---
be/src/vec/sink/load_stream_map_pool.h | 9 ++
be/src/vec/sink/load_stream_stub.cpp | 72 +++++++-------
be/src/vec/sink/load_stream_stub.h | 10 +-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 107 ++++++++++++++++-----
be/src/vec/sink/writer/vtablet_writer_v2.h | 12 ++-
.../test_writer_v2_fault_injection.groovy | 17 +++-
6 files changed, 158 insertions(+), 69 deletions(-)
diff --git a/be/src/vec/sink/load_stream_map_pool.h
b/be/src/vec/sink/load_stream_map_pool.h
index bdf98ca8f61..ab8a40d0c91 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -98,6 +98,15 @@ public:
// only call this method after release() returns true.
void close_load(bool incremental);
+ std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>
get_streams_for_node() {
+ decltype(_streams_for_node) snapshot;
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ snapshot = _streams_for_node;
+ }
+ return snapshot;
+ }
+
private:
const UniqueId _load_id;
const int64_t _src_id;
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 2479fc6b25c..53cc11c97f5 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -114,9 +114,7 @@ void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
LOG(WARNING) << "stub is not exist when on_closed, " << *this;
return;
}
- std::lock_guard<bthread::Mutex> lock(stub->_close_mutex);
stub->_is_closed.store(true);
- stub->_close_cv.notify_all();
}
inline std::ostream& operator<<(std::ostream& ostr, const
LoadStreamReplyHandler& handler) {
@@ -330,37 +328,30 @@ Status LoadStreamStub::wait_for_schema(int64_t
partition_id, int64_t index_id, i
return Status::OK();
}
-Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
+Status LoadStreamStub::close_finish_check(RuntimeState* state, bool*
is_closed) {
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
+ DBUG_EXECUTE_IF("LoadStreamStub::close_finish_check.close_failed",
+ { return Status::InternalError("close failed"); });
+ *is_closed = true;
if (!_is_open.load()) {
// we don't need to close wait on non-open streams
return Status::OK();
}
+ if (state->get_query_ctx()->is_cancelled()) {
+ return state->get_query_ctx()->exec_status();
+ }
if (!_is_closing.load()) {
+ *is_closed = false;
return _status;
}
if (_is_closed.load()) {
- return _check_cancel();
- }
- DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
- std::unique_lock<bthread::Mutex> lock(_close_mutex);
- auto timeout_sec = timeout_ms / 1000;
- while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) {
- //the query maybe cancel, so need check after wait 1s
- timeout_sec = timeout_sec - 1;
- LOG(INFO) << "close waiting, " << *this << ", timeout_sec=" <<
timeout_sec
- << ", is_closed=" << _is_closed.load()
- << ", is_cancelled=" <<
state->get_query_ctx()->is_cancelled();
- int ret = _close_cv.wait_for(lock, 1000000);
- if (ret != 0 && timeout_sec <= 0) {
- return Status::InternalError("stream close_wait timeout, error={},
timeout_ms={}, {}",
- ret, timeout_ms, to_string());
+ RETURN_IF_ERROR(_check_cancel());
+ if (!_is_eos.load()) {
+ return Status::InternalError("Stream closed without EOS, {}",
to_string());
}
+ return Status::OK();
}
- RETURN_IF_ERROR(_check_cancel());
- if (!_is_eos.load()) {
- return Status::InternalError("stream closed without eos, {}",
to_string());
- }
+ *is_closed = false;
return Status::OK();
}
@@ -374,11 +365,7 @@ void LoadStreamStub::cancel(Status reason) {
_cancel_st = reason;
_is_cancelled.store(true);
}
- {
- std::lock_guard<bthread::Mutex> lock(_close_mutex);
- _is_closed.store(true);
- _close_cv.notify_all();
- }
+ _is_closed.store(true);
}
Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const
Slice> data) {
@@ -437,12 +424,34 @@ void LoadStreamStub::_handle_failure(butil::IOBuf& buf,
Status st) {
switch (hdr.opcode()) {
case PStreamHeader::ADD_SEGMENT:
case PStreamHeader::APPEND_DATA: {
+
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.append_data_failed", {
+ add_failed_tablet(hdr.tablet_id(), st);
+ return;
+ });
+
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.add_segment_failed", {
+ add_failed_tablet(hdr.tablet_id(), st);
+ return;
+ });
add_failed_tablet(hdr.tablet_id(), st);
} break;
case PStreamHeader::CLOSE_LOAD: {
+
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.close_load_failed", {
+ brpc::StreamClose(_stream_id);
+ return;
+ });
brpc::StreamClose(_stream_id);
} break;
case PStreamHeader::GET_SCHEMA: {
+
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.get_schema_failed", {
+ // Just log and let wait_for_schema timeout
+ std::ostringstream oss;
+ for (const auto& tablet : hdr.tablets()) {
+ oss << " " << tablet.tablet_id();
+ }
+ LOG(WARNING) << "failed to send GET_SCHEMA request,
tablet_id:" << oss.str() << ", "
+ << *this;
+ return;
+ });
// Just log and let wait_for_schema timeout
std::ostringstream oss;
for (const auto& tablet : hdr.tablets()) {
@@ -556,13 +565,4 @@ Status LoadStreamStubs::close_load(const
std::vector<PTabletID>& tablets_to_comm
return status;
}
-Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) {
- MonotonicStopWatch watch;
- watch.start();
- for (auto& stream : _streams) {
- RETURN_IF_ERROR(stream->close_wait(state, timeout_ms -
watch.elapsed_time() / 1000 / 1000));
- }
- return Status::OK();
-}
-
} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 5c3ca02272d..a030de75720 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -155,7 +155,7 @@ public:
// wait remote to close stream,
// remote will close stream when it receives CLOSE_LOAD
- Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
+ Status close_finish_check(RuntimeState* state, bool* is_closed);
// cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
void cancel(Status reason);
@@ -223,6 +223,8 @@ private:
void _handle_failure(butil::IOBuf& buf, Status st);
Status _check_cancel() {
+ DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled",
+ { return Status::InternalError("stream cancelled"); });
if (!_is_cancelled.load()) {
return Status::OK();
}
@@ -247,9 +249,7 @@ protected:
Status _cancel_st;
bthread::Mutex _open_mutex;
- bthread::Mutex _close_mutex;
bthread::Mutex _cancel_mutex;
- bthread::ConditionVariable _close_cv;
std::mutex _buffer_mutex;
std::mutex _send_mutex;
@@ -310,8 +310,6 @@ public:
Status close_load(const std::vector<PTabletID>& tablets_to_commit);
- Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
-
std::unordered_set<int64_t> success_tablets() {
std::unordered_set<int64_t> s;
for (auto& stream : _streams) {
@@ -330,6 +328,8 @@ public:
return m;
}
+ std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; }
+
private:
std::vector<std::shared_ptr<LoadStreamStub>> _streams;
std::atomic<bool> _open_success = false;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 28059e10b3c..5104a9e65d2 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -666,7 +666,7 @@ Status VTabletWriterV2::close(Status exec_status) {
// close_wait on all non-incremental streams, even if this is not the
last sink.
// because some per-instance data structures are now shared among all
sinks
// due to sharing delta writers and load stream stubs.
- RETURN_IF_ERROR(_close_wait(false));
+ RETURN_IF_ERROR(_close_wait(_non_incremental_streams()));
// send CLOSE_LOAD on all incremental streams if this is the last sink.
// this must happen after all non-incremental streams are closed,
@@ -676,7 +676,7 @@ Status VTabletWriterV2::close(Status exec_status) {
}
// close_wait on all incremental streams, even if this is not the last
sink.
- RETURN_IF_ERROR(_close_wait(true));
+ RETURN_IF_ERROR(_close_wait(_incremental_streams()));
// calculate and submit commit info
if (is_last_sink) {
@@ -721,32 +721,87 @@ Status VTabletWriterV2::close(Status exec_status) {
return status;
}
-Status VTabletWriterV2::_close_wait(bool incremental) {
+std::unordered_set<std::shared_ptr<LoadStreamStub>>
VTabletWriterV2::_incremental_streams() {
+ std::unordered_set<std::shared_ptr<LoadStreamStub>> incremental_streams;
+ auto streams_for_node = _load_stream_map->get_streams_for_node();
+ for (const auto& [dst_id, streams] : streams_for_node) {
+ for (const auto& stream : streams->streams()) {
+ if (stream->is_incremental()) {
+ incremental_streams.insert(stream);
+ }
+ }
+ }
+ return incremental_streams;
+}
+
+std::unordered_set<std::shared_ptr<LoadStreamStub>>
VTabletWriterV2::_non_incremental_streams() {
+ std::unordered_set<std::shared_ptr<LoadStreamStub>>
non_incremental_streams;
+ auto streams_for_node = _load_stream_map->get_streams_for_node();
+ for (const auto& [dst_id, streams] : streams_for_node) {
+ for (const auto& stream : streams->streams()) {
+ if (!stream->is_incremental()) {
+ non_incremental_streams.insert(stream);
+ }
+ }
+ }
+ return non_incremental_streams;
+}
+
+Status VTabletWriterV2::_close_wait(
+ std::unordered_set<std::shared_ptr<LoadStreamStub>>
unfinished_streams) {
SCOPED_TIMER(_close_load_timer);
- auto st = _load_stream_map->for_each_st(
- [this, incremental](int64_t dst_id, LoadStreamStubs& streams) ->
Status {
- if (streams.is_incremental() != incremental) {
- return Status::OK();
- }
- int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
- _timeout_watch.elapsed_time() / 1000 /
1000;
- DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", {
remain_ms = 0; });
- if (remain_ms <= 0) {
- LOG(WARNING) << "load timed out before close waiting,
load_id="
- << print_id(_load_id);
- return Status::TimedOut("load timed out before close
waiting");
- }
- auto st = streams.close_wait(_state, remain_ms);
- if (!st.ok()) {
- LOG(WARNING) << "close_wait timeout on streams to dst_id="
<< dst_id
- << ", load_id=" << print_id(_load_id) << ": "
<< st;
- }
- return st;
- });
- if (!st.ok()) {
- LOG(WARNING) << "close_wait failed: " << st << ", load_id=" <<
print_id(_load_id);
+ Status status;
+ auto streams_for_node = _load_stream_map->get_streams_for_node();
+ while (true) {
+ RETURN_IF_ERROR(_check_timeout());
+ RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status,
streams_for_node));
+ if (!status.ok() || unfinished_streams.empty()) {
+ LOG(INFO) << "is all unfinished: " << unfinished_streams.empty()
+ << ", status: " << status << ", txn_id: " << _txn_id
+ << ", load_id: " << print_id(_load_id);
+ break;
+ }
+ bthread_usleep(1000 * 10);
}
- return st;
+ if (!status.ok()) {
+ LOG(WARNING) << "close_wait failed: " << status << ", load_id=" <<
print_id(_load_id);
+ }
+ return status;
+}
+
+Status VTabletWriterV2::_check_timeout() {
+ int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) *
1000 -
+ _timeout_watch.elapsed_time() / 1000 / 1000;
+ DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms =
0; });
+ if (remain_ms <= 0) {
+ LOG(WARNING) << "load timed out before close waiting, load_id=" <<
print_id(_load_id);
+ return Status::TimedOut("load timed out before close waiting");
+ }
+ return Status::OK();
+}
+
+Status VTabletWriterV2::_check_streams_finish(
+ std::unordered_set<std::shared_ptr<LoadStreamStub>>&
unfinished_streams, Status& status,
+ const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>&
streams_for_node) {
+ for (const auto& [dst_id, streams] : streams_for_node) {
+ for (const auto& stream : streams->streams()) {
+ if (!unfinished_streams.contains(stream)) {
+ continue;
+ }
+ bool is_closed = false;
+ auto stream_st = stream->close_finish_check(_state, &is_closed);
+ if (!stream_st.ok()) {
+ status = stream_st;
+ unfinished_streams.erase(stream);
+ LOG(WARNING) << "close_wait failed: " << stream_st
+ << ", load_id=" << print_id(_load_id);
+ }
+ if (is_closed) {
+ unfinished_streams.erase(stream);
+ }
+ }
+ }
+ return status;
}
void VTabletWriterV2::_calc_tablets_to_commit() {
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 51f647b07e5..10e3c6378cb 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -148,7 +148,17 @@ private:
void _calc_tablets_to_commit();
- Status _close_wait(bool incremental);
+ std::unordered_set<std::shared_ptr<LoadStreamStub>> _incremental_streams();
+
+ std::unordered_set<std::shared_ptr<LoadStreamStub>>
_non_incremental_streams();
+
+ Status _close_wait(std::unordered_set<std::shared_ptr<LoadStreamStub>>
unfinished_streams);
+
+ Status _check_timeout();
+
+ Status _check_streams_finish(
+ std::unordered_set<std::shared_ptr<LoadStreamStub>>&
unfinished_streams, Status& status,
+ const std::unordered_map<int64_t,
std::shared_ptr<LoadStreamStubs>>& streams_for_node);
void _cancel(Status status);
diff --git
a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
index 30854cfb50b..1a473d90e52 100644
---
a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
@@ -68,7 +68,7 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
file "baseall.txt"
}
- def load_with_injection = { injection, error_msg, success=false->
+ def load_with_injection = { injection, error_msg="", success=false->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
sql "insert into test select * from baseall where k1 <= 3"
@@ -104,6 +104,21 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
// DeltaWriterV2 stream_size is 0
load_with_injection("DeltaWriterV2.init.stream_size", "failed to find
tablet schema")
+ // injection cases for VTabletWriterV2 close logic
+ // Test LoadStreamStub close_finish_check close failed
+ load_with_injection("LoadStreamStub::close_finish_check.close_failed")
+ // Test LoadStreamStub _check_cancel when cancelled
+ load_with_injection("LoadStreamStub._check_cancel.cancelled")
+ // Test LoadStreamStub _send_with_retry stream write failed
+
load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed")
+ // Test LoadStreamStub _handle_failure for different opcodes
+
load_with_injection("LoadStreamStub._handle_failure.append_data_failed")
+
load_with_injection("LoadStreamStub._handle_failure.add_segment_failed")
+ load_with_injection("LoadStreamStub._handle_failure.close_load_failed")
+ load_with_injection("LoadStreamStub._handle_failure.get_schema_failed")
+ // Test LoadStreamStub skip send segment
+ load_with_injection("LoadStreamStub.skip_send_segment")
+
sql """ set enable_memtable_on_sink_node=false """
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]