This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ddd9f9b3eba [fix](arrow-flight-sql) Fix pipelineX Unknown result sink 
type (#35804)
ddd9f9b3eba is described below

commit ddd9f9b3ebad43115d4b9c3df1c2829f02b68736
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Wed Jun 5 11:35:58 2024 +0800

    [fix](arrow-flight-sql) Fix pipelineX Unknown result sink type (#35804)
    
    Fix meet error status: [INTERNAL_ERROR]Unknown result sink type
    
    ```
    W20240422 14:52:07.509462 40713 status.h:380] meet error status: 
[INTERNAL_ERROR]Unknown result sink type
    
            0#  
doris::pipeline::ResultSinkLocalState::open(doris::RuntimeState*) at 
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:0
            1#  doris::pipeline::PipelineXTask::_open() at 
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:449
            2#  doris::pipeline::PipelineXTask::execute(bool*) at 
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:449
            3#  doris::pipeline::TaskScheduler::_do_work(unsigned long) at 
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:345
            4#  doris::ThreadPool::dispatch_thread() at 
/home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
            5#  doris::Thread::supervise_thread(void*) at 
/var/local/ldb_toolchain/bin/../usr/include/pthread.h:562
            6#  start_thread
            7#  clone
    ```
---
 be/src/pipeline/exec/result_sink_operator.cpp | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index d169aa0ee58..b8ae962ea28 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -25,8 +25,10 @@
 #include "runtime/buffer_control_block.h"
 #include "runtime/exec_env.h"
 #include "runtime/result_buffer_mgr.h"
+#include "util/arrow/row_batch.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
+#include "vec/sink/varrow_flight_result_writer.h"
 #include "vec/sink/vmysql_result_writer.h"
 
 namespace doris::pipeline {
@@ -61,7 +63,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
     }
     // create writer based on sink type
     switch (p._sink_type) {
-    case TResultSinkType::MYSQL_PROTOCAL:
+    case TResultSinkType::MYSQL_PROTOCAL: {
         if (state->mysql_row_binary_format()) {
             _writer.reset(new (std::nothrow) 
vectorized::VMysqlResultWriter<true>(
                     _sender.get(), _output_vexpr_ctxs, _profile));
@@ -70,6 +72,16 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
                     _sender.get(), _output_vexpr_ctxs, _profile));
         }
         break;
+    }
+    case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
+        std::shared_ptr<arrow::Schema> arrow_schema;
+        RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, 
&arrow_schema));
+        
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
+                                                               arrow_schema);
+        _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
+                _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
+        break;
+    }
     default:
         return Status::InternalError("Unknown result sink type");
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to