This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4c2fb951f6b [refactor](close) refactor ispendingfinish logic and close logic to do close more quickly (#30021) 4c2fb951f6b is described below commit 4c2fb951f6bb4af49fe611f9ac87a019b75c3f5c Author: yiguolei <676222...@qq.com> AuthorDate: Fri Jan 19 15:23:27 2024 +0800 [refactor](close) refactor ispendingfinish logic and close logic to do close more quickly (#30021) --- be/src/exec/data_sink.h | 2 +- be/src/pipeline/exec/jdbc_table_sink_operator.h | 1 + be/src/pipeline/exec/olap_table_sink_operator.h | 1 + be/src/pipeline/exec/olap_table_sink_v2_operator.h | 1 + be/src/pipeline/exec/operator.h | 2 +- .../pipeline/exec/partition_sort_sink_operator.cpp | 2 -- be/src/pipeline/exec/result_file_sink_operator.cpp | 6 ++--- be/src/pipeline/exec/result_file_sink_operator.h | 1 + be/src/pipeline/exec/sort_sink_operator.cpp | 1 - be/src/pipeline/exec/table_function_operator.cpp | 1 - be/src/pipeline/pipeline_x/operator.cpp | 14 +++++------ be/src/pipeline/pipeline_x/operator.h | 5 ++++ be/src/runtime/result_writer.h | 2 ++ be/src/runtime/runtime_state.cpp | 18 -------------- be/src/runtime/runtime_state.h | 5 ---- be/src/vec/exec/vpartition_sort_node.cpp | 3 --- be/src/vec/exec/vsort_node.cpp | 2 -- be/src/vec/exec/vtable_function_node.cpp | 1 - be/src/vec/sink/async_writer_sink.h | 23 +++++++++++------- be/src/vec/sink/vresult_file_sink.cpp | 19 ++++++++++----- be/src/vec/sink/writer/async_result_writer.cpp | 23 +++++++++++------- be/src/vec/sink/writer/async_result_writer.h | 7 ------ be/src/vec/sink/writer/vjdbc_table_writer.h | 6 ++--- be/src/vec/sink/writer/vodbc_table_writer.h | 6 ++--- be/src/vec/sink/writer/vtablet_writer.cpp | 28 ++++------------------ be/src/vec/sink/writer/vtablet_writer.h | 8 ++----- 26 files changed, 75 insertions(+), 113 deletions(-) diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index 3bf72ae5450..be6cfe236b7 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -74,7 +74,7 @@ public: return Status::OK(); } - virtual bool is_close_done() { return true; } + [[nodiscard]] virtual bool is_pending_finish() const { return false; } // Releases all resources that were allocated in prepare()/send(). // Further send() calls are illegal after calling close(). diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h b/be/src/pipeline/exec/jdbc_table_sink_operator.h index 41348ccaccf..33018f69da5 100644 --- a/be/src/pipeline/exec/jdbc_table_sink_operator.h +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h @@ -58,6 +58,7 @@ public: private: friend class JdbcTableSinkLocalState; template <typename Writer, typename Parent> + requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) friend class AsyncWriterSink; const RowDescriptor& _row_desc; diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h b/be/src/pipeline/exec/olap_table_sink_operator.h index 6aa80776a3a..6707ddd86e6 100644 --- a/be/src/pipeline/exec/olap_table_sink_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_operator.h @@ -102,6 +102,7 @@ public: private: friend class OlapTableSinkLocalState; template <typename Writer, typename Parent> + requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) friend class AsyncWriterSink; const RowDescriptor& _row_desc; vectorized::VExprContextSPtrs _output_vexpr_ctxs; diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h b/be/src/pipeline/exec/olap_table_sink_v2_operator.h index 1e68f9e98d3..f1b5d6e9061 100644 --- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h @@ -104,6 +104,7 @@ public: private: friend class OlapTableSinkV2LocalState; template <typename Writer, typename Parent> + requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) friend class AsyncWriterSink; const RowDescriptor& _row_desc; vectorized::VExprContextSPtrs _output_vexpr_ctxs; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 80184374b77..cd5fba5fee3 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -297,7 +297,7 @@ public: return _sink->try_close(state, state->query_status()); } - [[nodiscard]] bool is_pending_finish() const override { return !_sink->is_close_done(); } + [[nodiscard]] bool is_pending_finish() const override { return _sink->is_pending_finish(); } Status close(RuntimeState* state) override { if (is_closed()) { diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 14d917d62e3..c09a6a90b95 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -120,8 +120,6 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* RETURN_IF_ERROR( _split_block_by_partition(input_block, state->batch_size(), local_state)); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR( - state->check_query_state("VPartitionSortNode, while split input block.")); input_block->clear_column_data(); } } diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 2b095748b15..71a2d539c95 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -173,9 +173,9 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) } Status final_status = exec_status; - // close the writer - if (_writer && _writer->need_normal_close()) { - Status st = _writer->close(); + // For pipelinex engine, the writer is closed in async thread process_block + if (_writer) { + Status st = _writer->get_writer_status(); if (!st.ok() && exec_status.ok()) { // close file writer failed, should return this error to client final_status = st; diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index 57e1e8c9147..e196401991a 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -106,6 +106,7 @@ public: private: friend class ResultFileSinkLocalState; template <typename Writer, typename Parent> + requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) friend class AsyncWriterSink; const RowDescriptor& _row_desc; diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index e2c851f758f..56a81422484 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -150,7 +150,6 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in if (in_block->rows() > 0) { RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block)); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input.")); // update runtime predicate if (_use_topn_opt) { diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp index cf14b3ec5e3..960fccff2b4 100644 --- a/be/src/pipeline/exec/table_function_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -159,7 +159,6 @@ Status TableFunctionLocalState::get_expanded_block(RuntimeState* state, while (columns[p._child_slots.size()]->size() < state->batch_size()) { RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch.")); if (_child_block->rows() == 0) { break; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index e38e7b39d95..bbb7473f868 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -497,6 +497,7 @@ Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori } template <typename Writer, typename Parent> + requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); _output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size()); @@ -516,6 +517,7 @@ Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, LocalSinkState } template <typename Writer, typename Parent> + requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); _writer->start_writer(state, _profile); @@ -523,12 +525,14 @@ Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) { } template <typename Writer, typename Parent> + requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) { return _writer->sink(block, source_state == SourceState::FINISHED); } template <typename Writer, typename Parent> + requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); @@ -537,19 +541,13 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_s COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); // if the init failed, the _writer may be nullptr. so here need check if (_writer) { - if (_writer->need_normal_close()) { - if (exec_status.ok() && !state->is_cancelled()) { - RETURN_IF_ERROR(_writer->commit_trans()); - } - RETURN_IF_ERROR(_writer->close(exec_status)); - } else { - RETURN_IF_ERROR(_writer->get_writer_status()); - } + RETURN_IF_ERROR(_writer->get_writer_status()); } return Base::close(state, exec_status); } template <typename Writer, typename Parent> + requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) Status AsyncWriterSink<Writer, Parent>::try_close(RuntimeState* state, Status exec_status) { if (state->is_cancelled() || !exec_status.ok()) { _writer->force_close(!exec_status.ok() ? exec_status : Status::Cancelled("Cancelled")); diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index d46dc859b0c..ca1b224c5c5 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -22,6 +22,9 @@ #include "pipeline/pipeline_x/dependency.h" #include "pipeline/pipeline_x/local_exchange/local_exchanger.h" +namespace doris::vectorized { +class AsyncResultWriter; +} namespace doris::pipeline { struct LocalExchangeSinkDependency; @@ -569,6 +572,7 @@ public: protected: template <typename Writer, typename Parent> + requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) friend class AsyncWriterSink; // _operator_id : the current Operator's ID, which is not visible to the user. // _node_id : the plan node ID corresponding to the Operator, which is visible on the profile. @@ -680,6 +684,7 @@ public: }; template <typename Writer, typename Parent> + requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) class AsyncWriterSink : public PipelineXSinkLocalState<FakeDependency> { public: using Base = PipelineXSinkLocalState<FakeDependency>; diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h index f65f06399b8..5c49954ef2f 100644 --- a/be/src/runtime/result_writer.h +++ b/be/src/runtime/result_writer.h @@ -38,6 +38,8 @@ public: virtual Status init(RuntimeState* state) = 0; + virtual Status finish(RuntimeState* state) { return Status::OK(); } + virtual Status close(Status s = Status::OK()) = 0; [[nodiscard]] virtual int64_t get_written_rows() const { return _written_rows; } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 1a6f8a2661f..d84b86f2290 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -365,24 +365,6 @@ Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) { return _process_status; } -Status RuntimeState::check_query_state(const std::string& msg) { - // TODO: it would be nice if this also checked for cancellation, but doing so breaks - // cases where we use Status::Cancelled("Cancelled") to indicate that the limit was reached. - // - // If the thread MemTrackerLimiter exceeds the limit, an error status is returned. - // Usually used after SCOPED_ATTACH_TASK, during query execution. - if (is_thread_context_init() && thread_context()->thread_mem_tracker()->limit_exceeded() && - !config::enable_query_memory_overcommit) { - auto failed_msg = - fmt::format("{}, {}", msg, - thread_context()->thread_mem_tracker()->tracker_limit_exceeded_str()); - thread_context()->thread_mem_tracker()->print_log_usage(failed_msg); - log_error(failed_msg); - return Status::MemoryLimitExceeded(failed_msg); - } - return query_status(); -} - const int64_t MAX_ERROR_NUM = 50; Status RuntimeState::create_error_log_file() { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index ede451cd92a..91443ef9492 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -236,11 +236,6 @@ public: // generic "Memory limit exceeded" error. Status set_mem_limit_exceeded(const std::string& msg = "Memory limit exceeded"); - // Returns a non-OK status if query execution should stop (e.g., the query was cancelled - // or a mem limit was exceeded). Exec nodes should check this periodically so execution - // doesn't continue if the query terminates abnormally. - Status check_query_state(const std::string& msg); - std::vector<std::string>& output_files() { return _output_files; } void set_import_label(const std::string& import_label) { _import_label = import_label; } diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index 6a7ebcffd1e..95c0abd72a8 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -168,8 +168,6 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl } else { RETURN_IF_ERROR(_split_block_by_partition(input_block, state->batch_size())); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR( - state->check_query_state("VPartitionSortNode, while split input block.")); input_block->clear_column_data(); } } @@ -237,7 +235,6 @@ Status VPartitionSortNode::alloc_resource(RuntimeState* state) { RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state)); RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state("VPartitionSortNode, while open.")); return Status::OK(); } diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 189c628ef0b..e313e3f74ac 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -130,7 +130,6 @@ Status VSortNode::alloc_resource(doris::RuntimeState* state) { RETURN_IF_ERROR(ExecNode::alloc_resource(state)); RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state("vsort, while open.")); return Status::OK(); } @@ -140,7 +139,6 @@ Status VSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool if (input_block->rows() > 0) { RETURN_IF_ERROR(_sorter->append_block(input_block)); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input.")); // update runtime predicate if (_use_topn_opt) { diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index 8affd4cbe7e..0c35fae806e 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -170,7 +170,6 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu while (columns[_child_slots.size()]->size() < state->batch_size()) { RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch.")); if (_child_block->rows() == 0) { break; diff --git a/be/src/vec/sink/async_writer_sink.h b/be/src/vec/sink/async_writer_sink.h index 600eb609281..1260382d6fa 100644 --- a/be/src/vec/sink/async_writer_sink.h +++ b/be/src/vec/sink/async_writer_sink.h @@ -42,6 +42,7 @@ namespace vectorized { class Block; template <typename Writer, const char* Name> + requires(std::is_base_of_v<AsyncResultWriter, Writer>) class AsyncWriterSink : public DataSink { public: AsyncWriterSink(const RowDescriptor& row_desc, const std::vector<TExpr>& t_exprs) @@ -78,13 +79,21 @@ public: return Status::OK(); } + // Non-pipeline engine will call this api to send data to sink destination Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override { SCOPED_TIMER(_exec_timer); COUNTER_UPDATE(_blocks_sent_counter, 1); COUNTER_UPDATE(_output_rows_counter, block->rows()); - return _writer->write(*block); + Status st = _writer->write(*block); + // Should also check !state->is_cancelled()???, do not know which scenario? + if (st.ok() && eos) { + // If this is the last block, then call finish to flush the buffer or commit transctions. + st = _writer->finish(state); + } + return st; } + // Pipeline engine will call this api to send data to destination. This is an async API. Status sink(RuntimeState* state, vectorized::Block* block, bool eos = false) override { return _writer->sink(block, eos); } @@ -94,13 +103,11 @@ public: Status close(RuntimeState* state, Status exec_status) override { // if the init failed, the _writer may be nullptr. so here need check if (_writer) { - if (_writer->need_normal_close()) { - if (exec_status.ok() && !state->is_cancelled()) { - RETURN_IF_ERROR(_writer->commit_trans()); - } - RETURN_IF_ERROR(_writer->close(exec_status)); - } else { + // For pipeline engine, the writer is always closed in async thread process_block + if (state->enable_pipeline_exec()) { RETURN_IF_ERROR(_writer->get_writer_status()); + } else { + RETURN_IF_ERROR(_writer->close(exec_status)); } } return DataSink::close(state, exec_status); @@ -113,7 +120,7 @@ public: return Status::OK(); } - bool is_close_done() override { return !_writer->is_pending_finish(); } + [[nodiscard]] bool is_pending_finish() const override { return _writer->is_pending_finish(); } protected: const std::vector<TExpr>& _t_output_expr; diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index 02d77fa6d42..08dd881bf4b 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -113,14 +113,21 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) { } Status final_status = exec_status; - // close the writer - if (_writer && _writer->need_normal_close()) { - Status st = _writer->close(); - if (!st.ok() && exec_status.ok()) { - // close file writer failed, should return this error to client - final_status = st; + Status writer_st = Status::OK(); + if (_writer) { + // For pipeline engine, the writer is always closed in async thread process_block + if (state->enable_pipeline_exec()) { + writer_st = _writer->get_writer_status(); + } else { + writer_st = _writer->close(exec_status); } } + + if (!writer_st.ok() && exec_status.ok()) { + // close file writer failed, should return this error to client + final_status = writer_st; + } + if (_is_top_sink) { // close sender, this is normal path end if (_sender) { diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 0ce3196e3a3..35d94fe3c47 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -125,15 +125,22 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi } } - // if not in transaction or status is in error or force close we can do close in - // async IO thread - if (!_writer_status.ok() || !in_transaction()) { - std::lock_guard l(_m); - // Using lock to make sure the writer status is not modified - // There is a unique ptr err_msg in Status, if it is modified, the unique ptr - // maybe released. And it will core because use after free. + // If the last block is sent successfuly, then call finish to clear the buffer or commit + // transactions. + // Using lock to make sure the writer status is not modified + // There is a unique ptr err_msg in Status, if it is modified, the unique ptr + // maybe released. And it will core because use after free. + std::lock_guard l(_m); + if (_writer_status.ok() && _eos) { + _writer_status = finish(state); + } + + if (_writer_status.ok()) { _writer_status = close(_writer_status); - _need_normal_close = false; + } else { + // If it is already failed before, then not update the write status so that we could get + // the real reason. + static_cast<void>(close(_writer_status)); } _writer_thread_closed = true; if (_finish_dependency) { diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index ced123267c5..5fbcca98af3 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -61,12 +61,6 @@ public: void force_close(Status s); - virtual bool in_transaction() { return false; } - - virtual Status commit_trans() { return Status::OK(); } - - bool need_normal_close() const { return _need_normal_close; } - Status init(RuntimeState* state) override { return Status::OK(); } virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0; @@ -110,7 +104,6 @@ private: std::deque<std::unique_ptr<Block>> _data_queue; Status _writer_status = Status::OK(); bool _eos = false; - bool _need_normal_close = true; bool _writer_thread_closed = false; // Used by pipelineX diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h b/be/src/vec/sink/writer/vjdbc_table_writer.h index 735c023fce5..a683259c992 100644 --- a/be/src/vec/sink/writer/vjdbc_table_writer.h +++ b/be/src/vec/sink/writer/vjdbc_table_writer.h @@ -46,11 +46,9 @@ public: Status write(vectorized::Block& block) override; - Status close(Status s) override { return JdbcConnector::close(s); } - - bool in_transaction() override { return TableConnector::_is_in_transaction; } + Status finish(RuntimeState* state) override { return JdbcConnector::finish_trans(); } - Status commit_trans() override { return JdbcConnector::finish_trans(); } + Status close(Status s) override { return JdbcConnector::close(s); } private: JdbcConnectorParam _param; diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h b/be/src/vec/sink/writer/vodbc_table_writer.h index a28947355e7..687b5106a8b 100644 --- a/be/src/vec/sink/writer/vodbc_table_writer.h +++ b/be/src/vec/sink/writer/vodbc_table_writer.h @@ -46,11 +46,9 @@ public: Status write(vectorized::Block& block) override; - Status close(Status s) override { return ODBCConnector::close(s); } - - bool in_transaction() override { return TableConnector::_is_in_transaction; } + Status finish(RuntimeState* state) override { return ODBCConnector::finish_trans(); } - Status commit_trans() override { return ODBCConnector::finish_trans(); } + Status close(Status s) override { return ODBCConnector::close(s); } }; } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 7c51b80a015..7af7b115c43 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -865,10 +865,6 @@ void VNodeChannel::cancel(const std::string& cancel_msg) { static_cast<void>(request->release_id()); } -bool VNodeChannel::is_send_data_rpc_done() const { - return _add_batches_finished || _cancelled; -} - Status VNodeChannel::close_wait(RuntimeState* state) { DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { MemInfo::process_full_gc(); }); SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); @@ -891,7 +887,8 @@ Status VNodeChannel::close_wait(RuntimeState* state) { } // waiting for finished, it may take a long time, so we couldn't set a timeout - // In pipeline, is_close_done() is false at this time, will not block. + // For pipeline engine, the close is called in async writer's process block method, + // so that it will not block pipeline thread. while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) { bthread_usleep(1000); } @@ -1358,7 +1355,7 @@ Status VTabletWriter::_send_new_partition_batch() { return Status::OK(); } -Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) { +void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status) { SCOPED_TIMER(_close_timer); Status status = exec_status; @@ -1400,23 +1397,6 @@ Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) { _close_status = status; _close_wait = true; } - - return Status::OK(); -} - -bool VTabletWriter::is_close_done() { - // Only after try_close, need to wait rpc end. - if (!_close_wait) { - return true; - } - bool close_done = true; - for (const auto& index_channel : _channels) { - index_channel->for_each_node_channel( - [&close_done](const std::shared_ptr<VNodeChannel>& ch) { - close_done &= ch->is_send_data_rpc_done(); - }); - } - return close_done; } Status VTabletWriter::close(Status exec_status) { @@ -1431,7 +1411,7 @@ Status VTabletWriter::close(Status exec_status) { SCOPED_TIMER(_profile->total_time_counter()); // will make the last batch of request-> close_wait will wait this finished. - static_cast<void>(try_close(_state, exec_status)); + _do_try_close(_state, exec_status); // If _close_status is not ok, all nodes have been canceled in try_close. if (_close_status.ok()) { diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 58ddbfd8869..cd2eafb1f24 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -259,8 +259,6 @@ public: // 2. just cancel() void mark_close(); - bool is_send_data_rpc_done() const; - bool is_closed() const { return _is_closed; } bool is_cancelled() const { return _cancelled; } std::string get_cancel_msg() { @@ -527,15 +525,11 @@ public: Status open(RuntimeState* state, RuntimeProfile* profile) override; - Status try_close(RuntimeState* state, Status exec_status); - // the consumer func of sending pending batches in every NodeChannel. // use polling & NodeChannel::try_send_and_fetch_status() to achieve nonblocking sending. // only focus on pending batches and channel status, the internal errors of NodeChannels will be handled by the producer void _send_batch_process(); - bool is_close_done(); - Status on_partitions_created(TCreatePartitionResult* result); Status _send_new_partition_batch(); @@ -562,6 +556,8 @@ private: Status _incremental_open_node_channel(const std::vector<TOlapTablePartition>& partitions); + void _do_try_close(RuntimeState* state, const Status& exec_status); + TDataSink _t_sink; std::shared_ptr<MemTracker> _mem_tracker; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org