This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new ea61206233e [pick](branch-2.1) pick #43281 (#44020) ea61206233e is described below commit ea61206233eda8a26c6931543e6914cefa852fb3 Author: Xinyi Zou <zouxi...@selectdb.com> AuthorDate: Sat Nov 16 21:53:21 2024 +0800 [pick](branch-2.1) pick #43281 (#44020) pick #43281 --- be/src/common/config.cpp | 7 + be/src/common/config.h | 6 + be/src/pipeline/exec/result_file_sink_operator.cpp | 2 +- be/src/pipeline/exec/result_sink_operator.cpp | 12 +- be/src/runtime/buffer_control_block.cpp | 236 ++++++++++++++--- be/src/runtime/buffer_control_block.h | 79 +++++- be/src/runtime/result_buffer_mgr.cpp | 78 +++--- be/src/runtime/result_buffer_mgr.h | 29 +- .../arrow_flight/arrow_flight_batch_reader.cpp | 291 +++++++++++++++++++-- .../arrow_flight/arrow_flight_batch_reader.h | 69 ++++- be/src/service/arrow_flight/flight_sql_service.cpp | 57 ++-- be/src/service/internal_service.cpp | 60 ++++- be/src/service/internal_service.h | 5 + be/src/util/arrow/row_batch.cpp | 12 +- be/src/util/arrow/row_batch.h | 12 +- be/src/util/arrow/utils.cpp | 3 +- be/src/util/doris_metrics.h | 5 + be/src/vec/sink/varrow_flight_result_writer.cpp | 64 ++--- be/src/vec/sink/varrow_flight_result_writer.h | 18 +- be/src/vec/sink/vmemory_scratch_sink.cpp | 2 +- be/src/vec/sink/vresult_file_sink.cpp | 2 +- be/src/vec/sink/vresult_sink.cpp | 11 +- be/test/runtime/result_buffer_mgr_test.cpp | 13 +- .../arrowflight/DorisFlightSqlProducer.java | 26 +- .../arrowflight/FlightSqlConnectProcessor.java | 16 +- gensrc/proto/internal_service.proto | 16 ++ 26 files changed, 881 insertions(+), 250 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c239dc3e72d..d5b67c2c128 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -64,6 +64,7 @@ DEFINE_Int32(brpc_port, "8060"); DEFINE_Int32(arrow_flight_sql_port, "-1"); DEFINE_mString(public_access_ip, ""); +DEFINE_Int32(public_access_port, "-1"); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores @@ -524,6 +525,8 @@ DEFINE_Int32(brpc_heavy_work_pool_threads, "-1"); DEFINE_Int32(brpc_light_work_pool_threads, "-1"); DEFINE_Int32(brpc_heavy_work_pool_max_queue_size, "-1"); DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1"); +DEFINE_Int32(brpc_arrow_flight_work_pool_threads, "-1"); +DEFINE_Int32(brpc_arrow_flight_work_pool_max_queue_size, "-1"); //Enable brpc builtin services, see: //https://brpc.apache.org/docs/server/basics/#disable-built-in-services-completely @@ -646,7 +649,11 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5"); // result buffer cancelled time (unit: second) DEFINE_mInt32(result_buffer_cancelled_interval_time, "300"); +// arrow flight result sink buffer rows size, default 4096 * 8 DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768"); +// The timeout for ADBC Client to wait for data using arrow flight reader. +// If the query is very complex and no result is generated after this time, consider increasing this timeout. +DEFINE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms, "300000"); // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 289c56464f3..aca5b6b829a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -104,6 +104,7 @@ DECLARE_Int32(arrow_flight_sql_port); // For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result. // If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip. DECLARE_mString(public_access_ip); +DECLARE_Int32(public_access_port); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores @@ -580,6 +581,8 @@ DECLARE_Int32(brpc_heavy_work_pool_threads); DECLARE_Int32(brpc_light_work_pool_threads); DECLARE_Int32(brpc_heavy_work_pool_max_queue_size); DECLARE_Int32(brpc_light_work_pool_max_queue_size); +DECLARE_Int32(brpc_arrow_flight_work_pool_threads); +DECLARE_Int32(brpc_arrow_flight_work_pool_max_queue_size); // The maximum amount of data that can be processed by a stream load DECLARE_mInt64(streaming_load_max_mb); @@ -701,6 +704,9 @@ DECLARE_mInt32(result_buffer_cancelled_interval_time); // arrow flight result sink buffer rows size, default 4096 * 8 DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows); +// The timeout for ADBC Client to wait for data using arrow flight reader. +// If the query is very complex and no result is generated after this time, consider increasing this timeout. +DECLARE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms); // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency); diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 6fe0b7f9e25..60a46544300 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -115,7 +115,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( state->fragment_instance_id(), p._buf_size, &_sender, state->enable_pipeline_exec(), - state->execution_timeout())); + state)); // create writer _writer.reset(new (std::nothrow) vectorized::VFileResultWriter( p._file_opts.get(), p._storage_type, state->fragment_instance_id(), diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 8f33aa9bed2..d2dfa89cdd6 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -69,8 +69,7 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) // create sender auto& p = _parent->cast<ResultSinkOperatorX>(); RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), p._result_sink_buffer_size_rows, &_sender, true, - state->execution_timeout())); + state->fragment_instance_id(), p._result_sink_buffer_size_rows, &_sender, true, state)); ((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this()); return Status::OK(); } @@ -98,12 +97,11 @@ Status ResultSinkLocalState::open(RuntimeState* state) { } case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { std::shared_ptr<arrow::Schema> arrow_schema; - RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema, - state->timezone())); - state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(), - arrow_schema); + RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema, + state->timezone())); + _sender->register_arrow_schema(arrow_schema); _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( - _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema)); + _sender.get(), _output_vexpr_ctxs, _profile)); break; } default: diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index c61c98a324b..f2ac2780c9d 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -33,9 +33,11 @@ #include "arrow/record_batch.h" #include "arrow/type_fwd.h" #include "pipeline/exec/result_sink_operator.h" -#include "runtime/exec_env.h" #include "runtime/thread_context.h" +#include "util/runtime_profile.h" +#include "util/string_util.h" #include "util/thrift_util.h" +#include "vec/core/block.h" namespace doris { @@ -93,14 +95,80 @@ void GetResultBatchCtx::on_data(const std::unique_ptr<TFetchDataResult>& t_resul delete this; } -BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size) +void GetArrowResultBatchCtx::on_failure(const Status& status) { + DCHECK(!status.ok()) << "status is ok, errmsg=" << status; + status.to_protobuf(result->mutable_status()); + delete this; +} + +void GetArrowResultBatchCtx::on_close(int64_t packet_seq) { + Status status; + status.to_protobuf(result->mutable_status()); + result->set_packet_seq(packet_seq); + result->set_eos(true); + delete this; +} + +void GetArrowResultBatchCtx::on_data( + const std::shared_ptr<vectorized::Block>& block, int64_t packet_seq, int be_exec_version, + segment_v2::CompressionTypePB fragement_transmission_compression_type, std::string timezone, + RuntimeProfile::Counter* serialize_batch_ns_timer, + RuntimeProfile::Counter* uncompressed_bytes_counter, + RuntimeProfile::Counter* compressed_bytes_counter) { + Status st = Status::OK(); + if (result != nullptr) { + size_t uncompressed_bytes = 0, compressed_bytes = 0; + SCOPED_TIMER(serialize_batch_ns_timer); + st = block->serialize(be_exec_version, result->mutable_block(), &uncompressed_bytes, + &compressed_bytes, fragement_transmission_compression_type, false); + COUNTER_UPDATE(uncompressed_bytes_counter, uncompressed_bytes); + COUNTER_UPDATE(compressed_bytes_counter, compressed_bytes); + if (st.ok()) { + result->set_packet_seq(packet_seq); + result->set_eos(false); + if (packet_seq == 0) { + result->set_timezone(timezone); + } + } else { + result->clear_block(); + result->set_packet_seq(packet_seq); + LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st; + } + } else { + result->set_empty_batch(true); + result->set_packet_seq(packet_seq); + result->set_eos(false); + } + + /// The size limit of proto buffer message is 2G + if (result->ByteSizeLong() > std::numeric_limits<int32_t>::max()) { + st = Status::InternalError("Message size exceeds 2GB: {}", result->ByteSizeLong()); + result->clear_block(); + } + st.to_protobuf(result->mutable_status()); + delete this; +} + +BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state) : _fragment_id(id), _is_close(false), _is_cancelled(false), _buffer_rows(0), _buffer_limit(buffer_size), - _packet_num(0) { + _packet_num(0), + _timezone(state->timezone()), + _timezone_obj(state->timezone_obj()), + _be_exec_version(state->be_exec_version()), + _fragement_transmission_compression_type( + state->fragement_transmission_compression_type()), + _profile("BufferControlBlock " + print_id(_fragment_id)) { _query_statistics = std::make_unique<QueryStatistics>(); + _serialize_batch_ns_timer = ADD_TIMER(&_profile, "SerializeBatchNsTime"); + _uncompressed_bytes_counter = ADD_COUNTER(&_profile, "UncompressedBytes", TUnit::BYTES); + _compressed_bytes_counter = ADD_COUNTER(&_profile, "CompressedBytes", TUnit::BYTES); + _mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::QUERY, + fmt::format("BufferControlBlock#FragmentInstanceId={}", print_id(_fragment_id))); } BufferControlBlock::~BufferControlBlock() { @@ -157,28 +225,29 @@ Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result, return Status::OK(); } -Status BufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result) { +Status BufferControlBlock::add_arrow_batch(std::shared_ptr<vectorized::Block>& result) { std::unique_lock<std::mutex> l(_lock); if (_is_cancelled) { return Status::Cancelled("Cancelled"); } - int num_rows = result->num_rows(); - - while ((!_arrow_flight_batch_queue.empty() && _buffer_rows > _buffer_limit) && !_is_cancelled) { - _data_removal.wait_for(l, std::chrono::seconds(1)); - } - - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); + if (_waiting_arrow_result_batch_rpc.empty()) { + // TODO: Merge result into block to reduce rpc times + int num_rows = result->rows(); + _arrow_flight_result_batch_queue.push_back(std::move(result)); + _buffer_rows += num_rows; + _arrow_data_arrival + .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<vectorized::Block>,) + } else { + auto* ctx = _waiting_arrow_result_batch_rpc.front(); + _waiting_arrow_result_batch_rpc.pop_front(); + ctx->on_data(result, _packet_num, _be_exec_version, + _fragement_transmission_compression_type, _timezone, _serialize_batch_ns_timer, + _uncompressed_bytes_counter, _compressed_bytes_counter); + _packet_num++; } - // TODO: merge RocordBatch, ToStructArray -> Make again - - _arrow_flight_batch_queue.push_back(std::move(result)); - _buffer_rows += num_rows; - _data_arrival.notify_one(); return Status::OK(); } @@ -211,37 +280,113 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { _waiting_rpc.push_back(ctx); } -Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) { +Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* result, + cctz::time_zone& timezone_obj) { std::unique_lock<std::mutex> l(_lock); if (!_status.ok()) { return _status; } if (_is_cancelled) { - return Status::Cancelled("Cancelled"); + return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))); } - while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) { - _data_arrival.wait_for(l, std::chrono::seconds(1)); + while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled && !_is_close) { + _arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20)); } if (_is_cancelled) { - return Status::Cancelled("Cancelled"); + return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))); } - if (!_arrow_flight_batch_queue.empty()) { - *result = std::move(_arrow_flight_batch_queue.front()); - _arrow_flight_batch_queue.pop_front(); - _buffer_rows -= (*result)->num_rows(); - _data_removal.notify_one(); + if (!_arrow_flight_result_batch_queue.empty()) { + *result = std::move(_arrow_flight_result_batch_queue.front()); + _arrow_flight_result_batch_queue.pop_front(); + timezone_obj = _timezone_obj; + _buffer_rows -= (*result)->rows(); _packet_num++; return Status::OK(); } // normal path end if (_is_close) { + std::stringstream ss; + _profile.pretty_print(&ss); + VLOG_NOTICE << fmt::format( + "BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, " + "packet_num={}, peak_memory_usage={}, profile={}", + print_id(_fragment_id), _is_close, _is_cancelled, _packet_num, + _mem_tracker->peak_consumption(), ss.str()); + return Status::OK(); + } + return Status::InternalError( + fmt::format("Get Arrow Batch Abnormal Ending ()", print_id(_fragment_id))); +} + +void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) { + std::unique_lock<std::mutex> l(_lock); + SCOPED_ATTACH_TASK(_mem_tracker); + if (!_status.ok()) { + ctx->on_failure(_status); + return; + } + if (_is_cancelled) { + ctx->on_failure(Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id)))); + return; + } + + if (!_arrow_flight_result_batch_queue.empty()) { + auto block = _arrow_flight_result_batch_queue.front(); + _arrow_flight_result_batch_queue.pop_front(); + + ctx->on_data(block, _packet_num, _be_exec_version, _fragement_transmission_compression_type, + _timezone, _serialize_batch_ns_timer, _uncompressed_bytes_counter, + _compressed_bytes_counter); + _buffer_rows -= block->rows(); + _packet_num++; + return; + } + + // normal path end + if (_is_close) { + ctx->on_close(_packet_num); + std::stringstream ss; + _profile.pretty_print(&ss); + VLOG_NOTICE << fmt::format( + "BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, " + "packet_num={}, peak_memory_usage={}, profile={}", + print_id(_fragment_id), _is_close, _is_cancelled, _packet_num, + _mem_tracker->peak_consumption(), ss.str()); + return; + } + // no ready data, push ctx to waiting list + _waiting_arrow_result_batch_rpc.push_back(ctx); +} + +void BufferControlBlock::register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema) { + std::lock_guard<std::mutex> l(_lock); + _arrow_schema = arrow_schema; +} + +Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema) { + std::unique_lock<std::mutex> l(_lock); + if (!_status.ok()) { + return _status; + } + if (_is_cancelled) { + return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))); + } + + // normal path end + if (_arrow_schema != nullptr) { + *arrow_schema = _arrow_schema; return Status::OK(); } - return Status::InternalError("Get Arrow Batch Abnormal Ending"); + + if (_is_close) { + return Status::RuntimeError(fmt::format("Closed ()", print_id(_fragment_id))); + } + return Status::InternalError( + fmt::format("Get Arrow Schema Abnormal Ending ()", print_id(_fragment_id))); } Status BufferControlBlock::close(Status exec_status) { @@ -251,6 +396,7 @@ Status BufferControlBlock::close(Status exec_status) { // notify blocked get thread _data_arrival.notify_all(); + _arrow_data_arrival.notify_all(); if (!_waiting_rpc.empty()) { if (_status.ok()) { for (auto& ctx : _waiting_rpc) { @@ -263,18 +409,38 @@ Status BufferControlBlock::close(Status exec_status) { } _waiting_rpc.clear(); } + + if (!_waiting_arrow_result_batch_rpc.empty()) { + if (_status.ok()) { + for (auto& ctx : _waiting_arrow_result_batch_rpc) { + ctx->on_close(_packet_num); + } + } else { + for (auto& ctx : _waiting_arrow_result_batch_rpc) { + ctx->on_failure(_status); + } + } + _waiting_arrow_result_batch_rpc.clear(); + } return Status::OK(); } void BufferControlBlock::cancel(const Status& reason) { std::unique_lock<std::mutex> l(_lock); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); _is_cancelled = true; _data_removal.notify_all(); _data_arrival.notify_all(); + _arrow_data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { ctx->on_failure(reason); } _waiting_rpc.clear(); + for (auto& ctx : _waiting_arrow_result_batch_rpc) { + ctx->on_failure(Status::Cancelled("Cancelled")); + } + _waiting_arrow_result_batch_rpc.clear(); + _arrow_flight_result_batch_queue.clear(); } Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result, @@ -284,7 +450,7 @@ Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& resul return Status::OK(); } -Status PipBufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result) { +Status PipBufferControlBlock::add_arrow_batch(std::shared_ptr<vectorized::Block>& result) { RETURN_IF_ERROR(BufferControlBlock::add_arrow_batch(result)); _update_dependency(); return Status::OK(); @@ -295,12 +461,18 @@ void PipBufferControlBlock::get_batch(GetResultBatchCtx* ctx) { _update_dependency(); } -Status PipBufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) { - RETURN_IF_ERROR(BufferControlBlock::get_arrow_batch(result)); +Status PipBufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* result, + cctz::time_zone& timezone_obj) { + RETURN_IF_ERROR(BufferControlBlock::get_arrow_batch(result, timezone_obj)); _update_dependency(); return Status::OK(); } +void PipBufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) { + BufferControlBlock::get_arrow_batch(ctx); + _update_dependency(); +} + void PipBufferControlBlock::cancel(const Status& reason) { BufferControlBlock::cancel(reason); _update_dependency(); @@ -322,7 +494,7 @@ void PipBufferControlBlock::_update_dependency() { } void PipBufferControlBlock::_update_batch_queue_empty() { - _batch_queue_empty = _fe_result_batch_queue.empty() && _arrow_flight_batch_queue.empty(); + _batch_queue_empty = _fe_result_batch_queue.empty() && _arrow_flight_result_batch_queue.empty(); _update_dependency(); } diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 9e991613f2e..724d86a2dc5 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -17,8 +17,11 @@ #pragma once +#include <arrow/type.h> +#include <cctz/time_zone.h> #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/Types_types.h> +#include <gen_cpp/segment_v2.pb.h> #include <stdint.h> #include <atomic> @@ -29,7 +32,9 @@ #include <mutex> #include "common/status.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "runtime/query_statistics.h" +#include "util/runtime_profile.h" namespace google { namespace protobuf { @@ -51,7 +56,13 @@ namespace pipeline { class Dependency; } // namespace pipeline +namespace vectorized { +class Block; +} // namespace vectorized + class PFetchDataResult; +class PFetchArrowDataResult; +class RuntimeState; struct GetResultBatchCtx { brpc::Controller* cntl = nullptr; @@ -68,20 +79,46 @@ struct GetResultBatchCtx { bool eos = false); }; +struct GetArrowResultBatchCtx { + brpc::Controller* cntl = nullptr; + PFetchArrowDataResult* result = nullptr; + google::protobuf::Closure* done = nullptr; + + GetArrowResultBatchCtx(brpc::Controller* cntl_, PFetchArrowDataResult* result_, + google::protobuf::Closure* done_) + : cntl(cntl_), result(result_), done(done_) {} + + void on_failure(const Status& status); + void on_close(int64_t packet_seq); + void on_data(const std::shared_ptr<vectorized::Block>& block, int64_t packet_seq, + int be_exec_version, + segment_v2::CompressionTypePB fragement_transmission_compression_type, + std::string timezone, RuntimeProfile::Counter* serialize_batch_ns_timer, + RuntimeProfile::Counter* uncompressed_bytes_counter, + RuntimeProfile::Counter* compressed_bytes_counter); +}; + // buffer used for result customer and producer class BufferControlBlock { public: - BufferControlBlock(const TUniqueId& id, int buffer_size); + BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state); virtual ~BufferControlBlock(); Status init(); // Only one fragment is written, so can_sink returns true, then the sink must be executed virtual bool can_sink(); virtual Status add_batch(std::unique_ptr<TFetchDataResult>& result, bool is_pipeline = false); - virtual Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result); + virtual Status add_arrow_batch(std::shared_ptr<vectorized::Block>& result); virtual void get_batch(GetResultBatchCtx* ctx); - virtual Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result); + // for ArrowFlightBatchLocalReader + virtual Status get_arrow_batch(std::shared_ptr<vectorized::Block>* result, + cctz::time_zone& timezone_obj); + // for ArrowFlightBatchRemoteReader + virtual void get_arrow_batch(GetArrowResultBatchCtx* ctx); + + virtual void register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema); + virtual Status find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema); // close buffer block, set _status to exec_status and set _is_close to true; // called because data has been read or error happened. @@ -90,6 +127,7 @@ public: virtual void cancel(const Status& reason); [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; } + [[nodiscard]] std::shared_ptr<MemTrackerLimiter> mem_tracker() { return _mem_tracker; } void update_return_rows(int64_t num_rows) { // _query_statistics may be null when the result sink init failed @@ -102,12 +140,12 @@ public: protected: virtual bool _get_batch_queue_empty() { - return _fe_result_batch_queue.empty() && _arrow_flight_batch_queue.empty(); + return _fe_result_batch_queue.empty() && _arrow_flight_result_batch_queue.empty(); } virtual void _update_batch_queue_empty() {} using FeResultQueue = std::list<std::unique_ptr<TFetchDataResult>>; - using ArrowFlightResultQueue = std::list<std::shared_ptr<arrow::RecordBatch>>; + using ArrowFlightResultQueue = std::list<std::shared_ptr<vectorized::Block>>; // result's query id TUniqueId _fragment_id; @@ -120,7 +158,9 @@ protected: // blocking queue for batch FeResultQueue _fe_result_batch_queue; - ArrowFlightResultQueue _arrow_flight_batch_queue; + ArrowFlightResultQueue _arrow_flight_result_batch_queue; + // for arrow flight + std::shared_ptr<arrow::Schema> _arrow_schema; // protects all subsequent data in this block std::mutex _lock; @@ -128,17 +168,33 @@ protected: std::condition_variable _data_arrival; // signal removal of data by stream consumer std::condition_variable _data_removal; + // get arrow flight result is a sync method, need wait for data ready and return result. + // TODO, waiting for data will block pipeline, so use a request pool to save requests waiting for data. + std::condition_variable _arrow_data_arrival; std::deque<GetResultBatchCtx*> _waiting_rpc; + std::deque<GetArrowResultBatchCtx*> _waiting_arrow_result_batch_rpc; // only used for FE using return rows to check limit std::unique_ptr<QueryStatistics> _query_statistics; + + std::string _timezone; + cctz::time_zone _timezone_obj; + int _be_exec_version; + segment_v2::CompressionTypePB _fragement_transmission_compression_type; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; + + // only used for ArrowFlightBatchRemoteReader + RuntimeProfile _profile; + RuntimeProfile::Counter* _serialize_batch_ns_timer = nullptr; + RuntimeProfile::Counter* _uncompressed_bytes_counter = nullptr; + RuntimeProfile::Counter* _compressed_bytes_counter = nullptr; }; class PipBufferControlBlock : public BufferControlBlock { public: - PipBufferControlBlock(const TUniqueId& id, int buffer_size) - : BufferControlBlock(id, buffer_size) {} + PipBufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state) + : BufferControlBlock(id, buffer_size, state) {} bool can_sink() override { return _get_batch_queue_empty() || _buffer_rows < _buffer_limit || _is_cancelled; @@ -146,11 +202,14 @@ public: Status add_batch(std::unique_ptr<TFetchDataResult>& result, bool is_pipeline = true) override; - Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result) override; + Status add_arrow_batch(std::shared_ptr<vectorized::Block>& result) override; void get_batch(GetResultBatchCtx* ctx) override; - Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) override; + Status get_arrow_batch(std::shared_ptr<vectorized::Block>* result, + cctz::time_zone& timezone_obj) override; + + void get_arrow_batch(GetArrowResultBatchCtx* ctx) override; void cancel(const Status& reason) override; diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index f81c9b1094f..a15b3115d18 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -34,6 +34,7 @@ #include "arrow/type_fwd.h" #include "common/status.h" #include "runtime/buffer_control_block.h" +#include "runtime/runtime_state.h" #include "util/doris_metrics.h" #include "util/metrics.h" #include "util/thread.h" @@ -69,7 +70,7 @@ Status ResultBufferMgr::init() { Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size, std::shared_ptr<BufferControlBlock>* sender, - bool enable_pipeline, int exec_timout) { + bool enable_pipeline, RuntimeState* state) { *sender = find_control_block(query_id); if (*sender != nullptr) { LOG(WARNING) << "already have buffer control block for this instance " << query_id; @@ -79,9 +80,9 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size std::shared_ptr<BufferControlBlock> control_block = nullptr; if (enable_pipeline) { - control_block = std::make_shared<PipBufferControlBlock>(query_id, buffer_size); + control_block = std::make_shared<PipBufferControlBlock>(query_id, buffer_size, state); } else { - control_block = std::make_shared<BufferControlBlock>(query_id, buffer_size); + control_block = std::make_shared<BufferControlBlock>(query_id, buffer_size, state); } { @@ -92,7 +93,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size // otherwise in some case may block all fragment handle threads // details see issue https://github.com/apache/doris/issues/16203 // add extra 5s for avoid corner case - int64_t max_timeout = time(nullptr) + exec_timout + 5; + int64_t max_timeout = time(nullptr) + state->execution_timeout() + 5; cancel_at_time(max_timeout, query_id); } *sender = control_block; @@ -110,27 +111,19 @@ std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const TU return std::shared_ptr<BufferControlBlock>(); } -void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id, - const std::shared_ptr<arrow::Schema>& arrow_schema) { - std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock); - _arrow_schema_map.insert(std::make_pair(query_id, arrow_schema)); -} - -std::shared_ptr<arrow::Schema> ResultBufferMgr::find_arrow_schema(const TUniqueId& query_id) { - std::shared_lock<std::shared_mutex> rlock(_arrow_schema_map_lock); - auto iter = _arrow_schema_map.find(query_id); - - if (_arrow_schema_map.end() != iter) { - return iter->second; +Status ResultBufferMgr::find_arrow_schema(const TUniqueId& finst_id, + std::shared_ptr<arrow::Schema>* schema) { + std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id); + if (cb == nullptr) { + return Status::InternalError( + "no arrow schema for this query, maybe query has been canceled, finst_id={}", + print_id(finst_id)); } - - return nullptr; + return cb->find_arrow_schema(schema); } void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx) { - TUniqueId tid; - tid.__set_hi(finst_id.hi()); - tid.__set_lo(finst_id.lo()); + TUniqueId tid = UniqueId(finst_id).to_thrift(); std::shared_ptr<BufferControlBlock> cb = find_control_block(tid); if (cb == nullptr) { LOG(WARNING) << "no result for this query, id=" << print_id(tid); @@ -140,17 +133,43 @@ void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* c cb->get_batch(ctx); } +Status ResultBufferMgr::find_mem_tracker(const TUniqueId& finst_id, + std::shared_ptr<MemTrackerLimiter>* mem_tracker) { + std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id); + if (cb == nullptr) { + return Status::InternalError( + "no result for this query, maybe query has been canceled, finst_id={}", + print_id(finst_id)); + } + *mem_tracker = cb->mem_tracker(); + return Status::OK(); +} + Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id, - std::shared_ptr<arrow::RecordBatch>* result) { + std::shared_ptr<vectorized::Block>* result, + cctz::time_zone& timezone_obj) { std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id); if (cb == nullptr) { - LOG(WARNING) << "no result for this query, id=" << print_id(finst_id); - return Status::InternalError("no result for this query"); + return Status::InternalError( + "no result for this query, maybe query has been canceled, finst_id={}", + print_id(finst_id)); } - RETURN_IF_ERROR(cb->get_arrow_batch(result)); + RETURN_IF_ERROR(cb->get_arrow_batch(result, timezone_obj)); return Status::OK(); } +void ResultBufferMgr::fetch_arrow_data(const PUniqueId& finst_id, GetArrowResultBatchCtx* ctx) { + TUniqueId tid = UniqueId(finst_id).to_thrift(); + std::shared_ptr<BufferControlBlock> cb = find_control_block(tid); + if (cb == nullptr) { + ctx->on_failure(Status::InternalError( + "no result for this query, maybe query has been canceled, finst_id={}", + print_id(tid))); + return; + } + cb->get_arrow_batch(ctx); +} + void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) { { std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock); @@ -161,15 +180,6 @@ void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) { _buffer_map.erase(iter); } } - - { - std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock); - auto arrow_schema_iter = _arrow_schema_map.find(query_id); - - if (_arrow_schema_map.end() != arrow_schema_iter) { - _arrow_schema_map.erase(arrow_schema_iter); - } - } } void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) { diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index 18846684233..971974a3b63 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -17,7 +17,9 @@ #pragma once +#include <cctz/time_zone.h> #include <gen_cpp/Types_types.h> +#include <gen_cpp/segment_v2.pb.h> #include <ctime> #include <map> @@ -41,8 +43,14 @@ namespace doris { class BufferControlBlock; struct GetResultBatchCtx; +struct GetArrowResultBatchCtx; class PUniqueId; +class RuntimeState; +class MemTrackerLimiter; class Thread; +namespace vectorized { +class Block; +} // namespace vectorized // manage all result buffer control block in one backend class ResultBufferMgr { @@ -59,16 +67,18 @@ public: // sender is not used when call cancel or unregister Status create_sender(const TUniqueId& query_id, int buffer_size, std::shared_ptr<BufferControlBlock>* sender, bool enable_pipeline, - int exec_timeout); + RuntimeState* state); // fetch data result to FE void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx); - // fetch data result to Arrow Flight Server - Status fetch_arrow_data(const TUniqueId& finst_id, std::shared_ptr<arrow::RecordBatch>* result); - - void register_arrow_schema(const TUniqueId& query_id, - const std::shared_ptr<arrow::Schema>& arrow_schema); - std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& query_id); + // fetch data result to Arrow Flight Client + Status fetch_arrow_data(const TUniqueId& finst_id, std::shared_ptr<vectorized::Block>* result, + cctz::time_zone& timezone_obj); + // fetch data result to Other BE forwards to Client + void fetch_arrow_data(const PUniqueId& finst_id, GetArrowResultBatchCtx* ctx); + Status find_mem_tracker(const TUniqueId& finst_id, + std::shared_ptr<MemTrackerLimiter>* mem_tracker); + Status find_arrow_schema(const TUniqueId& query_id, std::shared_ptr<arrow::Schema>* schema); // cancel void cancel(const TUniqueId& query_id, const Status& reason); @@ -79,7 +89,6 @@ public: private: using BufferMap = std::unordered_map<TUniqueId, std::shared_ptr<BufferControlBlock>>; using TimeoutMap = std::map<time_t, std::vector<TUniqueId>>; - using ArrowSchemaMap = std::unordered_map<TUniqueId, std::shared_ptr<arrow::Schema>>; std::shared_ptr<BufferControlBlock> find_control_block(const TUniqueId& query_id); @@ -91,10 +100,6 @@ private: std::shared_mutex _buffer_map_lock; // buffer block map BufferMap _buffer_map; - // lock for arrow schema map - std::shared_mutex _arrow_schema_map_lock; - // for arrow flight - ArrowSchemaMap _arrow_schema_map; // lock for timeout map std::mutex _timeout_lock; diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp index a07e479d759..e935aff996d 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp @@ -17,53 +17,294 @@ #include "service/arrow_flight/arrow_flight_batch_reader.h" +#include <arrow/io/memory.h> +#include <arrow/ipc/reader.h> #include <arrow/status.h> +#include <arrow/type.h> +#include <gen_cpp/internal_service.pb.h> -#include "arrow/builder.h" #include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "runtime/result_buffer_mgr.h" +#include "runtime/thread_context.h" +#include "service/backend_options.h" +#include "util/arrow/block_convertor.h" #include "util/arrow/row_batch.h" #include "util/arrow/utils.h" +#include "util/brpc_client_cache.h" +#include "util/ref_count_closure.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" -namespace doris { -namespace flight { +namespace doris::flight { -std::shared_ptr<arrow::Schema> ArrowFlightBatchReader::schema() const { - return schema_; +ArrowFlightBatchReaderBase::ArrowFlightBatchReaderBase( + const std::shared_ptr<QueryStatement>& statement) + : _statement(statement) {} + +std::shared_ptr<arrow::Schema> ArrowFlightBatchReaderBase::schema() const { + return _schema; +} + +arrow::Status ArrowFlightBatchReaderBase::_return_invalid_status(const std::string& msg) { + std::string status_msg = + fmt::format("ArrowFlightBatchReader {}, packet_seq={}, result={}:{}, finistId={}", msg, + _packet_seq, _statement->result_addr.hostname, _statement->result_addr.port, + print_id(_statement->query_id)); + LOG(WARNING) << status_msg; + return arrow::Status::Invalid(status_msg); } -ArrowFlightBatchReader::ArrowFlightBatchReader(std::shared_ptr<QueryStatement> statement, - std::shared_ptr<arrow::Schema> schema) - : statement_(std::move(statement)), schema_(std::move(schema)) {} +ArrowFlightBatchReaderBase::~ArrowFlightBatchReaderBase() { + VLOG_NOTICE << fmt::format( + "ArrowFlightBatchReader finished, packet_seq={}, result_addr={}:{}, finistId={}, " + "convert_arrow_batch_timer={}, deserialize_block_timer={}, peak_memory_usage={}", + _packet_seq, _statement->result_addr.hostname, _statement->result_addr.port, + print_id(_statement->query_id), _convert_arrow_batch_timer, _deserialize_block_timer, + _mem_tracker->peak_consumption()); +} -arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> ArrowFlightBatchReader::Create( - const std::shared_ptr<QueryStatement>& statement_) { +ArrowFlightBatchLocalReader::ArrowFlightBatchLocalReader( + const std::shared_ptr<QueryStatement>& statement, + const std::shared_ptr<arrow::Schema>& schema, + const std::shared_ptr<MemTrackerLimiter>& mem_tracker) + : ArrowFlightBatchReaderBase(statement) { + _schema = schema; + _mem_tracker = mem_tracker; +} + +arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>> ArrowFlightBatchLocalReader::Create( + const std::shared_ptr<QueryStatement>& statement) { + DCHECK(statement->result_addr.hostname == BackendOptions::get_localhost()); // Make sure that FE send the fragment to BE and creates the BufferControlBlock before returning ticket // to the ADBC client, so that the schema and control block can be found. - auto schema = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id); - if (schema == nullptr) { - ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format( - "Client not found arrow flight schema, maybe query has been canceled, queryid: {}", - print_id(statement_->query_id)))); + std::shared_ptr<arrow::Schema> schema; + RETURN_ARROW_STATUS_IF_ERROR( + ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement->query_id, &schema)); + std::shared_ptr<MemTrackerLimiter> mem_tracker; + RETURN_ARROW_STATUS_IF_ERROR(ExecEnv::GetInstance()->result_mgr()->find_mem_tracker( + statement->query_id, &mem_tracker)); + + std::shared_ptr<ArrowFlightBatchLocalReader> result( + new ArrowFlightBatchLocalReader(statement, schema, mem_tracker)); + return result; +} + +arrow::Status ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) { + // parameter *out not nullptr + *out = nullptr; + SCOPED_ATTACH_TASK(_mem_tracker); + std::shared_ptr<vectorized::Block> result; + auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(_statement->query_id, &result, + _timezone_obj); + st.prepend("ArrowFlightBatchLocalReader fetch arrow data failed"); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + if (result == nullptr) { + // eof, normal path end + return arrow::Status::OK(); + } + + { + // convert one batch + SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer); + st = convert_to_arrow_batch(*result, _schema, arrow::default_memory_pool(), out, + _timezone_obj); + st.prepend("ArrowFlightBatchLocalReader convert block to arrow batch failed"); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + } + + _packet_seq++; + if (*out != nullptr) { + VLOG_NOTICE << "ArrowFlightBatchLocalReader read next: " << (*out)->num_rows() << ", " + << (*out)->num_columns() << ", packet_seq: " << _packet_seq; } - std::shared_ptr<ArrowFlightBatchReader> result(new ArrowFlightBatchReader(statement_, schema)); + return arrow::Status::OK(); +} + +ArrowFlightBatchRemoteReader::ArrowFlightBatchRemoteReader( + const std::shared_ptr<QueryStatement>& statement, + const std::shared_ptr<PBackendService_Stub>& stub) + : ArrowFlightBatchReaderBase(statement), _brpc_stub(stub), _block(nullptr) { + _mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::QUERY, + fmt::format("ArrowFlightBatchRemoteReader#QueryId={}", print_id(_statement->query_id))); +} + +arrow::Result<std::shared_ptr<ArrowFlightBatchRemoteReader>> ArrowFlightBatchRemoteReader::Create( + const std::shared_ptr<QueryStatement>& statement) { + std::shared_ptr<PBackendService_Stub> stub = + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + statement->result_addr); + if (!stub) { + std::string msg = fmt::format( + "ArrowFlightBatchRemoteReader get rpc stub failed, result_addr={}:{}, finistId={}", + statement->result_addr.hostname, statement->result_addr.port, + print_id(statement->query_id)); + LOG(WARNING) << msg; + return arrow::Status::Invalid(msg); + } + + std::shared_ptr<ArrowFlightBatchRemoteReader> result( + new ArrowFlightBatchRemoteReader(statement, stub)); + ARROW_RETURN_NOT_OK(result->init_schema()); return result; } -arrow::Status ArrowFlightBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) { - // *out not nullptr +arrow::Status ArrowFlightBatchRemoteReader::_fetch_schema() { + Status st; + auto request = std::make_shared<PFetchArrowFlightSchemaRequest>(); + auto* pfinst_id = request->mutable_finst_id(); + pfinst_id->set_hi(_statement->query_id.hi); + pfinst_id->set_lo(_statement->query_id.lo); + auto callback = DummyBrpcCallback<PFetchArrowFlightSchemaResult>::create_shared(); + auto closure = AutoReleaseClosure< + PFetchArrowFlightSchemaRequest, + DummyBrpcCallback<PFetchArrowFlightSchemaResult>>::create_unique(request, callback); + callback->cntl_->set_timeout_ms(config::arrow_flight_reader_brpc_controller_timeout_ms); + callback->cntl_->ignore_eovercrowded(); + + _brpc_stub->fetch_arrow_flight_schema(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + callback->join(); + + if (callback->cntl_->Failed()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( + _brpc_stub, _statement->result_addr.hostname, _statement->result_addr.port)) { + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + callback->cntl_->remote_side()); + } + auto error_code = callback->cntl_->ErrorCode(); + auto error_text = callback->cntl_->ErrorText(); + return _return_invalid_status(fmt::format("fetch schema error: {}, error_text: {}", + berror(error_code), error_text)); + } + st = Status::create(callback->response_->status()); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + + if (callback->response_->has_schema() && !callback->response_->schema().empty()) { + auto input = + arrow::io::BufferReader::FromString(std::string(callback->response_->schema())); + ARROW_ASSIGN_OR_RAISE(auto reader, + arrow::ipc::RecordBatchStreamReader::Open( + input.get(), arrow::ipc::IpcReadOptions::Defaults())); + _schema = reader->schema(); + } else { + return _return_invalid_status(fmt::format("fetch schema error: not find schema")); + } + return arrow::Status::OK(); +} + +arrow::Status ArrowFlightBatchRemoteReader::_fetch_data() { + DCHECK(_block == nullptr); + while (true) { + // if `continue` occurs, data is invalid, continue fetch, block is nullptr. + // if `break` occurs, fetch data successfully (block is not nullptr) or fetch eos. + Status st; + auto request = std::make_shared<PFetchArrowDataRequest>(); + auto* pfinst_id = request->mutable_finst_id(); + pfinst_id->set_hi(_statement->query_id.hi); + pfinst_id->set_lo(_statement->query_id.lo); + auto callback = DummyBrpcCallback<PFetchArrowDataResult>::create_shared(); + auto closure = AutoReleaseClosure< + PFetchArrowDataRequest, + DummyBrpcCallback<PFetchArrowDataResult>>::create_unique(request, callback); + callback->cntl_->set_timeout_ms(config::arrow_flight_reader_brpc_controller_timeout_ms); + callback->cntl_->ignore_eovercrowded(); + + _brpc_stub->fetch_arrow_data(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + callback->join(); + + if (callback->cntl_->Failed()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( + _brpc_stub, _statement->result_addr.hostname, + _statement->result_addr.port)) { + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + callback->cntl_->remote_side()); + } + auto error_code = callback->cntl_->ErrorCode(); + auto error_text = callback->cntl_->ErrorText(); + return _return_invalid_status(fmt::format("fetch data error={}, error_text: {}", + berror(error_code), error_text)); + } + st = Status::create(callback->response_->status()); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + + DCHECK(callback->response_->has_packet_seq()); + if (_packet_seq != callback->response_->packet_seq()) { + return _return_invalid_status( + fmt::format("fetch data receive packet failed, expect: {}, receive: {}", + _packet_seq, callback->response_->packet_seq())); + } + _packet_seq++; + + if (callback->response_->has_eos() && callback->response_->eos()) { + break; + } + + if (callback->response_->has_empty_batch() && callback->response_->empty_batch()) { + continue; + } + + DCHECK(callback->response_->has_block()); + if (callback->response_->block().ByteSizeLong() == 0) { + continue; + } + + std::call_once(_timezone_once_flag, [this, callback] { + DCHECK(callback->response_->has_timezone()); + TimezoneUtils::find_cctz_time_zone(callback->response_->timezone(), _timezone_obj); + }); + + { + SCOPED_ATOMIC_TIMER(&_deserialize_block_timer); + _block = vectorized::Block::create_shared(); + st = _block->deserialize(callback->response_->block()); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + break; + } + + const auto rows = _block->rows(); + if (rows == 0) { + _block = nullptr; + continue; + } + } + return arrow::Status::OK(); +} + +arrow::Status ArrowFlightBatchRemoteReader::init_schema() { + ARROW_RETURN_NOT_OK(_fetch_schema()); + DCHECK(_schema != nullptr); + return arrow::Status::OK(); +} + +arrow::Status ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) { + // parameter *out not nullptr *out = nullptr; - auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(statement_->query_id, out); - if (UNLIKELY(!st.ok())) { - LOG(WARNING) << "ArrowFlightBatchReader fetch arrow data failed: " + st.to_string(); + SCOPED_ATTACH_TASK(_mem_tracker); + ARROW_RETURN_NOT_OK(_fetch_data()); + if (_block == nullptr) { + // eof, normal path end, last _fetch_data return block is nullptr + return arrow::Status::OK(); + } + { + // convert one batch + SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer); + auto st = convert_to_arrow_batch(*_block, _schema, arrow::default_memory_pool(), out, + _timezone_obj); + st.prepend("ArrowFlightBatchRemoteReader convert block to arrow batch failed"); ARROW_RETURN_NOT_OK(to_arrow_status(st)); } + _block = nullptr; + if (*out != nullptr) { - VLOG_NOTICE << "ArrowFlightBatchReader read next: " << (*out)->num_rows() << ", " - << (*out)->num_columns(); + VLOG_NOTICE << "ArrowFlightBatchRemoteReader read next: " << (*out)->num_rows() << ", " + << (*out)->num_columns() << ", packet_seq: " << _packet_seq; } return arrow::Status::OK(); } -} // namespace flight -} // namespace doris +} // namespace doris::flight diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.h b/be/src/service/arrow_flight/arrow_flight_batch_reader.h index e0279cbb70d..612ebc8063c 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.h +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.h @@ -17,40 +17,91 @@ #pragma once +#include <cctz/time_zone.h> #include <gen_cpp/Types_types.h> #include <memory> +#include <utility> #include "arrow/record_batch.h" +#include "runtime/exec_env.h" namespace doris { + +namespace vectorized { +class Block; +} // namespace vectorized + namespace flight { struct QueryStatement { public: TUniqueId query_id; + TNetworkAddress result_addr; // BE brpc ip & port std::string sql; - QueryStatement(const TUniqueId& query_id_, const std::string& sql_) - : query_id(query_id_), sql(sql_) {} + QueryStatement(TUniqueId query_id_, TNetworkAddress result_addr_, std::string sql_) + : query_id(std::move(query_id_)), + result_addr(std::move(result_addr_)), + sql(std::move(sql_)) {} +}; + +class ArrowFlightBatchReaderBase : public arrow::RecordBatchReader { +public: + // RecordBatchReader force override + [[nodiscard]] std::shared_ptr<arrow::Schema> schema() const override; + +protected: + ArrowFlightBatchReaderBase(const std::shared_ptr<QueryStatement>& statement); + ~ArrowFlightBatchReaderBase() override; + arrow::Status _return_invalid_status(const std::string& msg); + + std::shared_ptr<QueryStatement> _statement; + std::shared_ptr<arrow::Schema> _schema; + cctz::time_zone _timezone_obj; + std::atomic<int64_t> _packet_seq = 0; + + std::atomic<int64_t> _convert_arrow_batch_timer = 0; + std::atomic<int64_t> _deserialize_block_timer = 0; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; }; -class ArrowFlightBatchReader : public arrow::RecordBatchReader { +class ArrowFlightBatchLocalReader : public ArrowFlightBatchReaderBase { public: - static arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> Create( + static arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>> Create( const std::shared_ptr<QueryStatement>& statement); - [[nodiscard]] std::shared_ptr<arrow::Schema> schema() const override; + arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out) override; +private: + ArrowFlightBatchLocalReader(const std::shared_ptr<QueryStatement>& statement, + const std::shared_ptr<arrow::Schema>& schema, + const std::shared_ptr<MemTrackerLimiter>& mem_tracker); +}; + +class ArrowFlightBatchRemoteReader : public ArrowFlightBatchReaderBase { +public: + static arrow::Result<std::shared_ptr<ArrowFlightBatchRemoteReader>> Create( + const std::shared_ptr<QueryStatement>& statement); + + // create arrow RecordBatchReader must initialize the schema. + // so when creating arrow RecordBatchReader, fetch result data once, + // which will return Block and some necessary information, and extract arrow schema from Block. + arrow::Status init_schema(); arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out) override; private: - std::shared_ptr<QueryStatement> statement_; - std::shared_ptr<arrow::Schema> schema_; + ArrowFlightBatchRemoteReader(const std::shared_ptr<QueryStatement>& statement, + const std::shared_ptr<PBackendService_Stub>& stub); - ArrowFlightBatchReader(std::shared_ptr<QueryStatement> statement, - std::shared_ptr<arrow::Schema> schema); + arrow::Status _fetch_schema(); + arrow::Status _fetch_data(); + + std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr; + std::once_flag _timezone_once_flag; + std::shared_ptr<vectorized::Block> _block; }; } // namespace flight + } // namespace doris diff --git a/be/src/service/arrow_flight/flight_sql_service.cpp b/be/src/service/arrow_flight/flight_sql_service.cpp index 60b665c62fc..90ee3edfbea 100644 --- a/be/src/service/arrow_flight/flight_sql_service.cpp +++ b/be/src/service/arrow_flight/flight_sql_service.cpp @@ -19,15 +19,17 @@ #include <arrow/status.h> +#include <memory> + #include "arrow/flight/sql/server.h" +#include "gutil/strings/split.h" #include "service/arrow_flight/arrow_flight_batch_reader.h" #include "service/arrow_flight/flight_sql_info.h" #include "service/backend_options.h" #include "util/arrow/utils.h" #include "util/uid_util.h" -namespace doris { -namespace flight { +namespace doris::flight { class FlightSqlServer::Impl { private: @@ -41,14 +43,21 @@ private: return arrow::flight::Ticket {std::move(ticket)}; } - arrow::Result<std::pair<std::string, std::string>> decode_ticket(const std::string& ticket) { - auto divider = ticket.find(':'); - if (divider == std::string::npos) { - return arrow::Status::Invalid("Malformed ticket"); + arrow::Result<std::shared_ptr<QueryStatement>> decode_ticket(const std::string& ticket) { + std::vector<string> fields = strings::Split(ticket, "&"); + if (fields.size() != 4) { + return arrow::Status::Invalid(fmt::format("Malformed ticket, size: {}", fields.size())); } - std::string query_id = ticket.substr(0, divider); - std::string sql = ticket.substr(divider + 1); - return std::make_pair(std::move(sql), std::move(query_id)); + + TUniqueId queryid; + parse_id(fields[0], &queryid); + TNetworkAddress result_addr; + result_addr.hostname = fields[1]; + result_addr.port = std::stoi(fields[2]); + std::string sql = fields[3]; + std::shared_ptr<QueryStatement> statement = + std::make_shared<QueryStatement>(queryid, result_addr, sql); + return statement; } public: @@ -59,18 +68,21 @@ public: arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> DoGetStatement( const arrow::flight::ServerCallContext& context, const arrow::flight::sql::StatementQueryTicket& command) { - ARROW_ASSIGN_OR_RAISE(auto pair, decode_ticket(command.statement_handle)); - const std::string& sql = pair.first; - const std::string query_id = pair.second; - TUniqueId queryid; - parse_id(query_id, &queryid); - - auto statement = std::make_shared<QueryStatement>(queryid, sql); - - std::shared_ptr<ArrowFlightBatchReader> reader; - ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchReader::Create(statement)); - - return std::make_unique<arrow::flight::RecordBatchStream>(reader); + ARROW_ASSIGN_OR_RAISE(auto statement, decode_ticket(command.statement_handle)); + // if IP:BrpcPort in the Ticket is not current BE node, + // pulls the query result Block from the BE node specified by IP:BrpcPort, + // converts it to Arrow Batch and returns it to ADBC client. + // use brpc to transmit blocks between BEs. + if (statement->result_addr.hostname == BackendOptions::get_localhost() && + statement->result_addr.port == config::brpc_port) { + std::shared_ptr<ArrowFlightBatchLocalReader> reader; + ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchLocalReader::Create(statement)); + return std::make_unique<arrow::flight::RecordBatchStream>(reader); + } else { + std::shared_ptr<ArrowFlightBatchRemoteReader> reader; + ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchRemoteReader::Create(statement)); + return std::make_unique<arrow::flight::RecordBatchStream>(reader); + } } }; @@ -135,5 +147,4 @@ Status FlightSqlServer::join() { return Status::OK(); } -} // namespace flight -} // namespace doris +} // namespace doris::flight diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index bcadbfd90e7..a82ab9988b1 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -155,6 +155,11 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::N DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads, MetricUnit::NOUNIT); + bthread_key_t btls_key; static void thread_context_deleter(void* d) { @@ -219,7 +224,14 @@ PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env) config::brpc_light_work_pool_max_queue_size != -1 ? config::brpc_light_work_pool_max_queue_size : std::max(10240, CpuInfo::num_cores() * 320), - "brpc_light") { + "brpc_light"), + _arrow_flight_work_pool(config::brpc_arrow_flight_work_pool_threads != -1 + ? config::brpc_arrow_flight_work_pool_threads + : std::max(512, CpuInfo::num_cores() * 16), + config::brpc_arrow_flight_work_pool_max_queue_size != -1 + ? config::brpc_arrow_flight_work_pool_max_queue_size + : std::max(20480, CpuInfo::num_cores() * 640), + "brpc_arrow_flight") { REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, [this]() { return _heavy_work_pool.get_queue_size(); }); REGISTER_HOOK_METRIC(light_work_pool_queue_size, @@ -238,6 +250,15 @@ PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env) REGISTER_HOOK_METRIC(light_work_max_threads, []() { return config::brpc_light_work_pool_threads; }); + REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size, + [this]() { return _arrow_flight_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(arrow_flight_work_active_threads, + [this]() { return _arrow_flight_work_pool.get_active_threads(); }); + REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size, + []() { return config::brpc_arrow_flight_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(arrow_flight_work_max_threads, + []() { return config::brpc_arrow_flight_work_pool_threads; }); + _exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool); _exec_env->load_stream_mgr()->set_light_work_pool(&_light_work_pool); @@ -256,6 +277,11 @@ PInternalServiceImpl::~PInternalServiceImpl() { DEREGISTER_HOOK_METRIC(heavy_work_max_threads); DEREGISTER_HOOK_METRIC(light_work_max_threads); + DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads); + DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads); + CHECK_EQ(0, bthread_key_delete(btls_key)); CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key)); } @@ -672,6 +698,22 @@ void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controlle } } +void PInternalServiceImpl::fetch_arrow_data(google::protobuf::RpcController* controller, + const PFetchArrowDataRequest* request, + PFetchArrowDataResult* result, + google::protobuf::Closure* done) { + bool ret = _arrow_flight_work_pool.try_offer([this, controller, request, result, done]() { + brpc::ClosureGuard closure_guard(done); + auto* cntl = static_cast<brpc::Controller*>(controller); + auto* ctx = new GetArrowResultBatchCtx(cntl, result, done); + _exec_env->result_mgr()->fetch_arrow_data(request->finst_id(), ctx); + }); + if (!ret) { + offer_failed(result, done, _arrow_flight_work_pool); + return; + } +} + void PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController* controller, const POutfileWriteSuccessRequest* request, POutfileWriteSuccessResult* result, @@ -877,23 +919,21 @@ void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro google::protobuf::Closure* done) { bool ret = _light_work_pool.try_offer([request, result, done]() { brpc::ClosureGuard closure_guard(done); - std::shared_ptr<arrow::Schema> schema = - ExecEnv::GetInstance()->result_mgr()->find_arrow_schema( - UniqueId(request->finst_id()).to_thrift()); - if (schema == nullptr) { - LOG(INFO) << "FE not found arrow flight schema, maybe query has been canceled"; - auto st = Status::NotFound( - "FE not found arrow flight schema, maybe query has been canceled"); + std::shared_ptr<arrow::Schema> schema; + auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema( + UniqueId(request->finst_id()).to_thrift(), &schema); + if (!st.ok()) { st.to_protobuf(result->mutable_status()); return; } std::string schema_str; - auto st = serialize_arrow_schema(&schema, &schema_str); + st = serialize_arrow_schema(&schema, &schema_str); if (st.ok()) { result->set_schema(std::move(schema_str)); - if (config::public_access_ip != "") { + if (!config::public_access_ip.empty() && config::public_access_port != -1) { result->set_be_arrow_flight_ip(config::public_access_ip); + result->set_be_arrow_flight_port(config::public_access_port); } } st.to_protobuf(result->mutable_status()); diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 4bf09255ffb..c264feab46e 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -77,6 +77,10 @@ public: void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) override; + void fetch_arrow_data(google::protobuf::RpcController* controller, + const PFetchArrowDataRequest* request, PFetchArrowDataResult* result, + google::protobuf::Closure* done) override; + void outfile_write_success(google::protobuf::RpcController* controller, const POutfileWriteSuccessRequest* request, POutfileWriteSuccessResult* result, @@ -282,6 +286,7 @@ private: // otherwise as light interface FifoThreadPool _heavy_work_pool; FifoThreadPool _light_work_pool; + FifoThreadPool _arrow_flight_work_pool; }; } // namespace doris diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index ba6f4adf6c6..ecdd733e76b 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -165,9 +165,9 @@ Status convert_to_arrow_field(SlotDescriptor* desc, std::shared_ptr<arrow::Field return Status::OK(); } -Status convert_block_arrow_schema(const vectorized::Block& block, - std::shared_ptr<arrow::Schema>* result, - const std::string& timezone) { +Status get_arrow_schema_from_block(const vectorized::Block& block, + std::shared_ptr<arrow::Schema>* result, + const std::string& timezone) { std::vector<std::shared_ptr<arrow::Field>> fields; for (const auto& type_and_name : block) { std::shared_ptr<arrow::DataType> arrow_type; @@ -195,9 +195,9 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc, return Status::OK(); } -Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, - std::shared_ptr<arrow::Schema>* result, - const std::string& timezone) { +Status get_arrow_schema_from_expr_ctxs(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, + std::shared_ptr<arrow::Schema>* result, + const std::string& timezone) { std::vector<std::shared_ptr<arrow::Field>> fields; for (int i = 0; i < output_vexpr_ctxs.size(); i++) { std::shared_ptr<arrow::DataType> arrow_type; diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h index 9a33719a1cf..b509275d04a 100644 --- a/be/src/util/arrow/row_batch.h +++ b/be/src/util/arrow/row_batch.h @@ -48,13 +48,13 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow:: Status convert_to_arrow_schema(const RowDescriptor& row_desc, std::shared_ptr<arrow::Schema>* result, const std::string& timezone); -Status convert_block_arrow_schema(const vectorized::Block& block, - std::shared_ptr<arrow::Schema>* result, - const std::string& timezone); +Status get_arrow_schema_from_block(const vectorized::Block& block, + std::shared_ptr<arrow::Schema>* result, + const std::string& timezone); -Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, - std::shared_ptr<arrow::Schema>* result, - const std::string& timezone); +Status get_arrow_schema_from_expr_ctxs(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, + std::shared_ptr<arrow::Schema>* result, + const std::string& timezone); Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result); diff --git a/be/src/util/arrow/utils.cpp b/be/src/util/arrow/utils.cpp index 5ccff849034..742f5bd0fc3 100644 --- a/be/src/util/arrow/utils.cpp +++ b/be/src/util/arrow/utils.cpp @@ -33,9 +33,10 @@ Status to_doris_status(const arrow::Status& status) { } arrow::Status to_arrow_status(const Status& status) { - if (status.ok()) { + if (LIKELY(status.ok())) { return arrow::Status::OK(); } else { + LOG(WARNING) << status.to_string(); // The length of exception msg returned to the ADBC Client cannot larger than 8192, // otherwise ADBC Client will receive: // `INTERNAL: http2 exception Header size exceeded max allowed size (8192)`. diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 21e92bb82df..6e27dc73441 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -216,6 +216,11 @@ public: UIntGauge* heavy_work_max_threads = nullptr; UIntGauge* light_work_max_threads = nullptr; + UIntGauge* arrow_flight_work_pool_queue_size = nullptr; + UIntGauge* arrow_flight_work_active_threads = nullptr; + UIntGauge* arrow_flight_work_pool_max_queue_size = nullptr; + UIntGauge* arrow_flight_work_max_threads = nullptr; + UIntGauge* flush_thread_pool_queue_size = nullptr; UIntGauge* flush_thread_pool_thread_num = nullptr; diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp b/be/src/vec/sink/varrow_flight_result_writer.cpp index ca8e1cf3c3b..df38a06b2e3 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.cpp +++ b/be/src/vec/sink/varrow_flight_result_writer.cpp @@ -19,21 +19,16 @@ #include "runtime/buffer_control_block.h" #include "runtime/runtime_state.h" -#include "util/arrow/block_convertor.h" -#include "util/arrow/row_batch.h" +#include "runtime/thread_context.h" #include "vec/core/block.h" #include "vec/exprs/vexpr_context.h" -namespace doris { -namespace vectorized { +namespace doris::vectorized { -VArrowFlightResultWriter::VArrowFlightResultWriter( - BufferControlBlock* sinker, const VExprContextSPtrs& output_vexpr_ctxs, - RuntimeProfile* parent_profile, const std::shared_ptr<arrow::Schema>& arrow_schema) - : _sinker(sinker), - _output_vexpr_ctxs(output_vexpr_ctxs), - _parent_profile(parent_profile), - _arrow_schema(arrow_schema) {} +VArrowFlightResultWriter::VArrowFlightResultWriter(BufferControlBlock* sinker, + const VExprContextSPtrs& output_vexpr_ctxs, + RuntimeProfile* parent_profile) + : _sinker(sinker), _output_vexpr_ctxs(output_vexpr_ctxs), _parent_profile(parent_profile) {} Status VArrowFlightResultWriter::init(RuntimeState* state) { _init_profile(); @@ -41,13 +36,11 @@ Status VArrowFlightResultWriter::init(RuntimeState* state) { return Status::InternalError("sinker is NULL pointer."); } _is_dry_run = state->query_options().dry_run_query; - _timezone_obj = state->timezone_obj(); return Status::OK(); } void VArrowFlightResultWriter::_init_profile() { _append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime"); - _convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime"); _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime"); _sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT); _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES); @@ -66,29 +59,31 @@ Status VArrowFlightResultWriter::write(Block& input_block) { RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, input_block, &block)); - // convert one batch - std::shared_ptr<arrow::RecordBatch> result; - auto num_rows = block.rows(); - // arrow::RecordBatch without `nbytes()` in C++ - uint64_t bytes_sent = block.bytes(); { - SCOPED_TIMER(_convert_tuple_timer); - RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), - &result, _timezone_obj)); - } - { - SCOPED_TIMER(_result_send_timer); - // If this is a dry run task, no need to send data block - if (!_is_dry_run) { - status = _sinker->add_arrow_batch(result); - } - if (status.ok()) { - _written_rows += num_rows; + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_sinker->mem_tracker()); + std::unique_ptr<vectorized::MutableBlock> mutable_block = + vectorized::MutableBlock::create_unique(block.clone_empty()); + RETURN_IF_ERROR(mutable_block->merge_ignore_overflow(std::move(block))); + std::shared_ptr<vectorized::Block> output_block = vectorized::Block::create_shared(); + output_block->swap(mutable_block->to_block()); + + auto num_rows = output_block->rows(); + // arrow::RecordBatch without `nbytes()` in C++ + uint64_t bytes_sent = output_block->bytes(); + { + SCOPED_TIMER(_result_send_timer); + // If this is a dry run task, no need to send data block if (!_is_dry_run) { - _bytes_sent += bytes_sent; + status = _sinker->add_arrow_batch(output_block); + } + if (status.ok()) { + _written_rows += num_rows; + if (!_is_dry_run) { + _bytes_sent += bytes_sent; + } + } else { + LOG(WARNING) << "append result batch to sink failed."; } - } else { - LOG(WARNING) << "append result batch to sink failed."; } } return status; @@ -104,5 +99,4 @@ Status VArrowFlightResultWriter::close(Status st) { return Status::OK(); } -} // namespace vectorized -} // namespace doris +} // namespace doris::vectorized diff --git a/be/src/vec/sink/varrow_flight_result_writer.h b/be/src/vec/sink/varrow_flight_result_writer.h index c95d616b617..a968d5e675c 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.h +++ b/be/src/vec/sink/varrow_flight_result_writer.h @@ -17,13 +17,6 @@ #pragma once -#include <arrow/type.h> -#include <cctz/time_zone.h> -#include <stddef.h> - -#include <memory> -#include <vector> - #include "common/status.h" #include "runtime/result_writer.h" #include "util/runtime_profile.h" @@ -39,12 +32,11 @@ class Block; class VArrowFlightResultWriter final : public ResultWriter { public: VArrowFlightResultWriter(BufferControlBlock* sinker, const VExprContextSPtrs& output_vexpr_ctxs, - RuntimeProfile* parent_profile, - const std::shared_ptr<arrow::Schema>& arrow_schema); + RuntimeProfile* parent_profile); Status init(RuntimeState* state) override; - Status write(Block& block) override; + Status write(Block& input_block) override; bool can_sink() override; @@ -60,8 +52,6 @@ private: RuntimeProfile* _parent_profile = nullptr; // parent profile from result sink. not owned // total time cost on append batch operation RuntimeProfile::Counter* _append_row_batch_timer = nullptr; - // tuple convert timer, child timer of _append_row_batch_timer - RuntimeProfile::Counter* _convert_tuple_timer = nullptr; // file write timer, child timer of _append_row_batch_timer RuntimeProfile::Counter* _result_send_timer = nullptr; // number of sent rows @@ -72,10 +62,6 @@ private: bool _is_dry_run = false; uint64_t _bytes_sent = 0; - - std::shared_ptr<arrow::Schema> _arrow_schema; - - cctz::time_zone _timezone_obj; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp b/be/src/vec/sink/vmemory_scratch_sink.cpp index 904af9a9f1b..1d270f7beaf 100644 --- a/be/src/vec/sink/vmemory_scratch_sink.cpp +++ b/be/src/vec/sink/vmemory_scratch_sink.cpp @@ -88,7 +88,7 @@ Status MemoryScratchSink::send(RuntimeState* state, Block* input_block, bool eos *input_block, &block)); std::shared_ptr<arrow::Schema> block_arrow_schema; // After expr executed, use recaculated schema as final schema - RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema, state->timezone())); + RETURN_IF_ERROR(get_arrow_schema_from_block(block, &block_arrow_schema, state->timezone())); RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(), &result, _timezone_obj)); _queue->blocking_put(result); diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index 035a76a2f7f..6ecaaff18b0 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -79,7 +79,7 @@ Status VResultFileSink::prepare(RuntimeState* state) { // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec(), - state->execution_timeout())); + state)); // create writer _writer.reset(new (std::nothrow) VFileResultWriter( _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs, diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index dc63fdb4be6..af91e1f9d50 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -90,7 +90,7 @@ Status VResultSink::prepare(RuntimeState* state) { // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec(), - state->execution_timeout())); + state)); // create writer based on sink type switch (_sink_type) { @@ -106,12 +106,11 @@ Status VResultSink::prepare(RuntimeState* state) { } case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { std::shared_ptr<arrow::Schema> arrow_schema; - RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema, - state->timezone())); - state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(), - arrow_schema); + RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema, + state->timezone())); + _sender->register_arrow_schema(arrow_schema); _writer.reset(new (std::nothrow) VArrowFlightResultWriter(_sender.get(), _output_vexpr_ctxs, - _profile, arrow_schema)); + _profile)); break; } default: diff --git a/be/test/runtime/result_buffer_mgr_test.cpp b/be/test/runtime/result_buffer_mgr_test.cpp index 152c155ef0a..4ab9186c5fa 100644 --- a/be/test/runtime/result_buffer_mgr_test.cpp +++ b/be/test/runtime/result_buffer_mgr_test.cpp @@ -34,6 +34,7 @@ protected: virtual void SetUp() {} private: + RuntimeState _state; }; TEST_F(ResultBufferMgrTest, create_normal) { @@ -43,7 +44,7 @@ TEST_F(ResultBufferMgrTest, create_normal) { query_id.hi = 100; std::shared_ptr<BufferControlBlock> control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); } TEST_F(ResultBufferMgrTest, create_same_buffer) { @@ -53,9 +54,9 @@ TEST_F(ResultBufferMgrTest, create_same_buffer) { query_id.hi = 100; std::shared_ptr<BufferControlBlock> control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); std::shared_ptr<BufferControlBlock> control_block2; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block2, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block2, &_state).ok()); EXPECT_EQ(control_block1.get(), control_block1.get()); } @@ -67,7 +68,7 @@ TEST_F(ResultBufferMgrTest, fetch_data_normal) { query_id.hi = 100; std::shared_ptr<BufferControlBlock> control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); TFetchDataResult* result = new TFetchDataResult(); result->result_batch.rows.push_back("hello test"); @@ -85,7 +86,7 @@ TEST_F(ResultBufferMgrTest, fetch_data_no_block) { query_id.hi = 100; std::shared_ptr<BufferControlBlock> control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); TFetchDataResult* result = new TFetchDataResult(); query_id.lo = 11; @@ -101,7 +102,7 @@ TEST_F(ResultBufferMgrTest, normal_cancel) { query_id.hi = 100; std::shared_ptr<BufferControlBlock> control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); EXPECT_TRUE(buffer_mgr.cancel(query_id).ok()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index b0367e8c578..6f45f3faac7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -27,6 +27,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry; import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager; +import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Preconditions; import com.google.protobuf.Any; @@ -224,19 +225,36 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable } } else { // Now only query stmt will pull results from BE. - final ByteString handle = ByteString.copyFromUtf8( - DebugUtil.printId(connectContext.getFinstId()) + ":" + query); Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000); if (schema == null) { throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null") .toRuntimeException(); } + + TUniqueId queryId = connectContext.getFinstId(); + // Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located. + final ByteString handle = ByteString.copyFromUtf8( + DebugUtil.printId(queryId) + "&" + connectContext.getResultInternalServiceAddr().hostname + + "&" + connectContext.getResultInternalServiceAddr().port + "&" + query); TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle) .build(); Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); // TODO Support multiple endpoints. - Location location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, - connectContext.getResultFlightServerAddr().port); + Location location; + if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) { + // In a production environment, it is often inconvenient to expose Doris BE nodes + // to the external network. + // However, a reverse proxy (such as nginx) can be added to all Doris BE nodes, + // and the external client will be randomly routed to a Doris BE node when connecting to nginx. + // The query results of Arrow Flight SQL will be randomly saved on a Doris BE node. + // If it is different from the Doris BE node randomly routed by nginx, + // data forwarding needs to be done inside the Doris BE node. + location = Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname, + flightSQLConnectProcessor.getPublicAccessAddr().port); + } else { + location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, + connectContext.getResultFlightServerAddr().port); + } List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); // TODO Set in BE callback after query end, Client will not callback. return new FlightInfo(schema, descriptor, endpoints, -1, -1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index a816cf184ca..6724065f99a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -53,11 +53,12 @@ import java.util.concurrent.TimeoutException; /** * Process one flgiht sql connection. - * + * <p> * Must use try-with-resources. */ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class); + private TNetworkAddress publicAccessAddr = new TNetworkAddress(); public FlightSqlConnectProcessor(ConnectContext context) { super(context); @@ -66,6 +67,10 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC context.setReturnResultFromLocal(true); } + public TNetworkAddress getPublicAccessAddr() { + return publicAccessAddr; + } + public void prepare(MysqlCommand command) { // set status of query to OK. ctx.getState().reset(); @@ -123,11 +128,12 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC } Status resultStatus = new Status(pResult.getStatus()); if (resultStatus.getErrorCode() != TStatusCode.OK) { - throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s", - DebugUtil.printId(tid), resultStatus.toString())); + throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", + DebugUtil.printId(tid), resultStatus)); } - if (pResult.hasBeArrowFlightIp()) { - ctx.getResultFlightServerAddr().hostname = pResult.getBeArrowFlightIp().toStringUtf8(); + if (pResult.hasBeArrowFlightIp() && pResult.hasBeArrowFlightPort()) { + publicAccessAddr.hostname = pResult.getBeArrowFlightIp().toStringUtf8(); + publicAccessAddr.port = pResult.getBeArrowFlightPort(); } if (pResult.hasSchema() && pResult.getSchema().size() > 0) { RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index c2ec3205541..9c993bb9a93 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -262,6 +262,20 @@ message PFetchDataResult { optional bool empty_batch = 6; }; +message PFetchArrowDataRequest { + optional PUniqueId finst_id = 1; +}; + +message PFetchArrowDataResult { + optional PStatus status = 1; + // valid when status is ok + optional int64 packet_seq = 2; + optional bool eos = 3; + optional PBlock block = 4; + optional bool empty_batch = 5; + optional string timezone = 6; +}; + message PFetchArrowFlightSchemaRequest { optional PUniqueId finst_id = 1; }; @@ -271,6 +285,7 @@ message PFetchArrowFlightSchemaResult { // valid when status is ok optional bytes schema = 2; optional bytes be_arrow_flight_ip = 3; + optional int32 be_arrow_flight_port = 4; }; message KeyTuple { @@ -922,6 +937,7 @@ service PBackendService { rpc exec_plan_fragment_start(PExecPlanFragmentStartRequest) returns (PExecPlanFragmentResult); rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns (PCancelPlanFragmentResult); rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult); + rpc fetch_arrow_data(PFetchArrowDataRequest) returns (PFetchArrowDataResult); rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult); rpc open_load_stream(POpenLoadStreamRequest) returns (POpenLoadStreamResponse); rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns (PTabletWriterAddBlockResult); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org