This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ddd9f9b3eba [fix](arrow-flight-sql) Fix pipelineX Unknown result sink type (#35804) ddd9f9b3eba is described below commit ddd9f9b3ebad43115d4b9c3df1c2829f02b68736 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Jun 5 11:35:58 2024 +0800 [fix](arrow-flight-sql) Fix pipelineX Unknown result sink type (#35804) Fix meet error status: [INTERNAL_ERROR]Unknown result sink type ``` W20240422 14:52:07.509462 40713 status.h:380] meet error status: [INTERNAL_ERROR]Unknown result sink type 0# doris::pipeline::ResultSinkLocalState::open(doris::RuntimeState*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:0 1# doris::pipeline::PipelineXTask::_open() at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:449 2# doris::pipeline::PipelineXTask::execute(bool*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:449 3# doris::pipeline::TaskScheduler::_do_work(unsigned long) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:345 4# doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0 5# doris::Thread::supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562 6# start_thread 7# clone ``` --- be/src/pipeline/exec/result_sink_operator.cpp | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index d169aa0ee58..b8ae962ea28 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -25,8 +25,10 @@ #include "runtime/buffer_control_block.h" #include "runtime/exec_env.h" #include "runtime/result_buffer_mgr.h" +#include "util/arrow/row_batch.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" +#include "vec/sink/varrow_flight_result_writer.h" #include "vec/sink/vmysql_result_writer.h" namespace doris::pipeline { @@ -61,7 +63,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) { } // create writer based on sink type switch (p._sink_type) { - case TResultSinkType::MYSQL_PROTOCAL: + case TResultSinkType::MYSQL_PROTOCAL: { if (state->mysql_row_binary_format()) { _writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter<true>( _sender.get(), _output_vexpr_ctxs, _profile)); @@ -70,6 +72,16 @@ Status ResultSinkLocalState::open(RuntimeState* state) { _sender.get(), _output_vexpr_ctxs, _profile)); } break; + } + case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { + std::shared_ptr<arrow::Schema> arrow_schema; + RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema)); + state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(), + arrow_schema); + _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( + _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema)); + break; + } default: return Status::InternalError("Unknown result sink type"); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org