This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c27558e235288b721b249318dbbe3ec72d835a0e Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Fri Jul 7 14:03:54 2023 +0800 [fix](sink) fix OlapTableSink early close causes load failure #21545 --- be/src/vec/sink/vtablet_sink.cpp | 73 ++++++++++++++++++++++++++++------------ be/src/vec/sink/vtablet_sink.h | 5 ++- 2 files changed, 56 insertions(+), 22 deletions(-) diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 0d1d260608..be7183428a 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -107,7 +107,7 @@ public: void Run() override { SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - if (cntl.Failed()) { + auto open_partition_failed = [this]() { std::stringstream ss; ss << "failed to open partition, error=" << berror(this->cntl.ErrorCode()) << ", error_text=" << this->cntl.ErrorText(); @@ -117,6 +117,14 @@ public: fmt::format("{}, open failed, err: {}", vnode_channel->channel_info(), ss.str()), -1); + }; + if (cntl.Failed()) { + open_partition_failed(); + } else { + Status status(result.status()); + if (!status.ok()) { + open_partition_failed(); + } } done = true; } @@ -903,14 +911,12 @@ void VNodeChannel::cancel(const std::string& cancel_msg) { request.release_id(); } -bool VNodeChannel::is_rpc_done() const { +bool VNodeChannel::is_send_data_rpc_done() const { if (_add_block_closure != nullptr) { - return (_add_batches_finished || - (_cancelled && !_add_block_closure->is_packet_in_flight())) && - open_partition_finished(); + return _add_batches_finished || (_cancelled && !_add_block_closure->is_packet_in_flight()); } else { // such as, canceled before open_wait new closure. - return (_add_batches_finished || _cancelled) && open_partition_finished(); + return _add_batches_finished || _cancelled; } } @@ -1122,6 +1128,7 @@ Status VOlapTableSink::prepare(RuntimeState* state) { } // Prepare the exprs to run. RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _input_row_desc)); + _prepare = true; return Status::OK(); } @@ -1505,6 +1512,22 @@ void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) { if (_try_close) { return; } + + if (config::enable_lazy_open_partition && !_open_partition_done) { + // open_partition_finished must be before mark_close + bool open_partition_done = true; + for (const auto& index_channel : _channels) { + index_channel->for_each_node_channel( + [&open_partition_done](const std::shared_ptr<VNodeChannel>& ch) { + open_partition_done &= ch->open_partition_finished(); + }); + } + if (!open_partition_done) { + return; + } + _open_partition_done = true; + } + SCOPED_TIMER(_close_timer); Status status = exec_status; if (status.ok()) { @@ -1540,11 +1563,14 @@ void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) { } bool VOlapTableSink::is_close_done() { + if (config::enable_lazy_open_partition && !_open_partition_done) { + return false; + } bool close_done = true; for (const auto& index_channel : _channels) { index_channel->for_each_node_channel( [&close_done](const std::shared_ptr<VNodeChannel>& ch) { - close_done &= ch->is_rpc_done(); + close_done &= ch->is_send_data_rpc_done(); }); } return close_done; @@ -1554,15 +1580,29 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { if (_closed) { return _close_status; } - try_close(state, exec_status); + if (!_prepare) { + DCHECK(!exec_status.ok()); + _cancel_all_channel(exec_status); + DataSink::close(state, exec_status); + _close_status = exec_status; + return _close_status; + } + SCOPED_TIMER(_close_timer); + SCOPED_TIMER(_profile->total_time_counter()); + + if (config::enable_lazy_open_partition) { + for (const auto& index_channel : _channels) { + index_channel->for_each_node_channel( + [](const std::shared_ptr<VNodeChannel>& ch) { ch->open_partition_wait(); }); + } + _open_partition_done = true; + } + + try_close(state, exec_status); // If _close_status is not ok, all nodes have been canceled in try_close. if (_close_status.ok()) { - DCHECK(exec_status.ok()); auto status = Status::OK(); - // only if status is ok can we call this _profile->total_time_counter(). - // if status is not ok, this sink may not be prepared, so that _profile is null - SCOPED_TIMER(_profile->total_time_counter()); // BE id -> add_batch method counter std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map; int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0, actual_consume_ns = 0, @@ -1571,15 +1611,6 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { num_node_channels = 0; VNodeChannelStat channel_stat; { - if (config::enable_lazy_open_partition) { - for (const auto& index_channel : _channels) { - index_channel->for_each_node_channel( - [](const std::shared_ptr<VNodeChannel>& ch) { - ch->open_partition_wait(); - }); - } - } - for (const auto& index_channel : _channels) { if (!status.ok()) { break; diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index a3a6a91a0c..265ff47dcc 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -250,7 +250,7 @@ public: // 2. just cancel() void mark_close(); - bool is_rpc_done() const; + bool is_send_data_rpc_done() const; bool is_closed() const { return _is_closed; } bool is_cancelled() const { return _cancelled; } @@ -647,6 +647,9 @@ private: // Save the status of try_close() and close() method Status _close_status; bool _try_close = false; + bool _prepare = false; + + std::atomic<bool> _open_partition_done {false}; // User can change this config at runtime, avoid it being modified during query or loading process. bool _transfer_large_data_by_brpc = false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org