This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e66ffc1b6d4 [branch-2.1](arrow-flight-sql) Fix pipelineX Unknown result sink type (#37540) e66ffc1b6d4 is described below commit e66ffc1b6d44e3d2b7d35584a564fa84ae569844 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Thu Jul 11 12:30:46 2024 +0800 [branch-2.1](arrow-flight-sql) Fix pipelineX Unknown result sink type (#37540) pick ##35804 --- be/src/pipeline/exec/result_sink_operator.cpp | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index ec54e1bdd5c..624b9ca192d 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -25,8 +25,10 @@ #include "runtime/buffer_control_block.h" #include "runtime/exec_env.h" #include "runtime/result_buffer_mgr.h" +#include "util/arrow/row_batch.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" +#include "vec/sink/varrow_flight_result_writer.h" #include "vec/sink/vmysql_result_writer.h" #include "vec/sink/vresult_sink.h" @@ -80,7 +82,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) { } // create writer based on sink type switch (p._sink_type) { - case TResultSinkType::MYSQL_PROTOCAL: + case TResultSinkType::MYSQL_PROTOCAL: { if (state->mysql_row_binary_format()) { _writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter<true>( _sender.get(), _output_vexpr_ctxs, _profile)); @@ -89,6 +91,16 @@ Status ResultSinkLocalState::open(RuntimeState* state) { _sender.get(), _output_vexpr_ctxs, _profile)); } break; + } + case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { + std::shared_ptr<arrow::Schema> arrow_schema; + RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema)); + state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(), + arrow_schema); + _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( + _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema)); + break; + } default: return Status::InternalError("Unknown result sink type"); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org