This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 6cc8762 [fix](load) fix concurrent synchronization problem in NodeChannel::try_send_batch (#8728) 6cc8762 is described below commit 6cc8762ce76845bccdd2630122f0fdbcfcf5d207 Author: dataroaring <98214048+dataroar...@users.noreply.github.com> AuthorDate: Sun Apr 3 10:15:45 2022 +0800 [fix](load) fix concurrent synchronization problem in NodeChannel::try_send_batch (#8728) The patch fixes two problems. 1. Memory order problem accessing _last_patch_processed_finished and in_flight, actually _last_patch_processed_finished is redundant, so the patch removes it. 2. synchronization in join on cid. Fix for #8725. --- be/src/exec/tablet_sink.cpp | 19 +++++++++++++++---- be/src/exec/tablet_sink.h | 31 +++++++++++++++++++++---------- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index bc616b4..2cfbbb6 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -457,14 +457,24 @@ int NodeChannel::try_send_and_fetch_status(RuntimeState* state, if (!st.ok()) { return 0; } - bool is_finished = true; - if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0 && - _last_patch_processed_finished.compare_exchange_strong(is_finished, false)) { + + if (!_add_batch_closure->try_set_in_flight()) { + return _send_finished ? 0 : 1; + } + + // We are sure that try_send_batch is not running + if (_pending_batches_num > 0) { auto s = thread_pool_token->submit_func( std::bind(&NodeChannel::try_send_batch, this, state)); if (!s.ok()) { _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed"); + // clear in flight + _add_batch_closure->clear_in_flight(); } + // in_flight is cleared in closure::Run + } else { + // clear in flight + _add_batch_closure->clear_in_flight(); } return _send_finished ? 0 : 1; } @@ -495,6 +505,7 @@ void NodeChannel::try_send_batch(RuntimeState* state) { &compressed_bytes, _tuple_data_buffer_ptr); if (!st.ok()) { cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); + _add_batch_closure->clear_in_flight(); return; } if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) { @@ -508,6 +519,7 @@ void NodeChannel::try_send_batch(RuntimeState* state) { if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { if (remain_ms <= 0 && !request.eos()) { cancel(fmt::format("{}, err: timeout", channel_info())); + _add_batch_closure->clear_in_flight(); return; } else { remain_ms = config::min_load_rpc_timeout_ms; @@ -544,7 +556,6 @@ void NodeChannel::try_send_batch(RuntimeState* state) { _add_batch_closure); _next_packet_seq++; - _last_patch_processed_finished = true; } Status NodeChannel::none_of(std::initializer_list<bool> vars) { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 6749602..cf5ad40 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -100,22 +100,35 @@ public: void addSuccessHandler(std::function<void(const T&, bool)> fn) { success_handler = fn; } void join() { - if (cid != INVALID_BTHREAD_ID && _packet_in_flight) { - brpc::Join(cid); + // We rely on in_flight to assure one rpc is running, + // while cid is not reliable due to memory order. + // in_flight is written before getting callid, + // so we can not use memory fence to synchronize. + while (_packet_in_flight) { + // cid here is complicated + if (cid != INVALID_BTHREAD_ID) { + // actually cid may be the last rpc call id. + brpc::Join(cid); + } + if (_packet_in_flight) { + SleepFor(MonoDelta::FromMilliseconds(10)); + } } } // plz follow this order: reset() -> set_in_flight() -> send brpc batch void reset() { - join(); - DCHECK(_packet_in_flight == false); cntl.Reset(); cid = cntl.call_id(); } - void set_in_flight() { - DCHECK(_packet_in_flight == false); - _packet_in_flight = true; + bool try_set_in_flight() { + bool value = false; + return _packet_in_flight.compare_exchange_strong(value, true); + } + + void clear_in_flight() { + _packet_in_flight = false; } bool is_packet_in_flight() { return _packet_in_flight; } @@ -134,7 +147,7 @@ public: } else { success_handler(result, _is_last_rpc); } - _packet_in_flight = false; + clear_in_flight(); } brpc::Controller cntl; @@ -245,8 +258,6 @@ private: // add batches finished means the last rpc has be response, used to check whether this channel can be closed std::atomic<bool> _add_batches_finished {false}; - std::atomic<bool> _last_patch_processed_finished {true}; - bool _eos_is_produced {false}; // only for restricting producer behaviors std::unique_ptr<RowDescriptor> _row_desc; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org