This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1544110c1b9 [feature-wip](arrow-flight)(step4) Support other DML and 
DDL statements, besides `Select` (#25919)
1544110c1b9 is described below

commit 1544110c1b9484226ac4a853a506983c2e26f9f0
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Wed Nov 8 10:50:42 2023 +0800

    [feature-wip](arrow-flight)(step4) Support other DML and DDL statements, 
besides `Select` (#25919)
    
    Design Documentation Linked to #25514
---
 be/src/runtime/result_buffer_mgr.cpp               |  26 +-
 be/src/runtime/result_buffer_mgr.h                 |  15 +-
 .../arrow_flight/arrow_flight_batch_reader.cpp     |  16 +-
 be/src/service/internal_service.cpp                |  20 +-
 be/src/util/arrow/row_batch.cpp                    |  32 ++-
 be/src/util/arrow/row_batch.h                      |   7 +-
 be/src/vec/sink/varrow_flight_result_writer.cpp    |  13 +-
 be/src/vec/sink/varrow_flight_result_writer.h      |   5 +-
 be/src/vec/sink/vmemory_scratch_sink.cpp           |   2 -
 be/src/vec/sink/vmemory_scratch_sink.h             |   2 -
 be/src/vec/sink/vresult_sink.cpp                   |  12 +-
 .../java/org/apache/doris/common/ErrorCode.java    |   5 +-
 .../org/apache/doris/mysql/AcceptListener.java     |   3 +-
 .../java/org/apache/doris/mysql/MysqlCommand.java  |   1 +
 .../java/org/apache/doris/qe/ConnectContext.java   | 119 +++++---
 .../java/org/apache/doris/qe/ConnectProcessor.java | 313 +++++----------------
 .../java/org/apache/doris/qe/ConnectScheduler.java |   4 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |  35 +--
 .../org/apache/doris/qe/MysqlConnectProcessor.java | 272 ++++++++++++++++++
 .../main/java/org/apache/doris/qe/QueryState.java  |   1 +
 .../java/org/apache/doris/qe/StmtExecutor.java     | 196 +++++++------
 .../apache/doris/service/FrontendServiceImpl.java  |  14 +-
 .../arrowflight/DorisFlightSqlProducer.java        | 140 ++++++---
 ...xecutor.java => FlightSqlConnectProcessor.java} | 155 +++++-----
 .../arrowflight/results/FlightSqlChannel.java      | 147 ++++++++++
 .../results/FlightSqlResultCacheEntry.java         |  64 +++++
 .../sessions/FlightSessionsManager.java            |   4 +-
 .../sessions/FlightSessionsWithTokenManager.java   |   1 +
 .../sessions/FlightSqlConnectContext.java          | 104 +++++++
 .../java/org/apache/doris/qe/StmtExecutorTest.java |   9 +
 30 files changed, 1137 insertions(+), 600 deletions(-)

diff --git a/be/src/runtime/result_buffer_mgr.cpp 
b/be/src/runtime/result_buffer_mgr.cpp
index 9dbe228bcd9..a2009c5ec3c 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -109,21 +109,21 @@ std::shared_ptr<BufferControlBlock> 
ResultBufferMgr::find_control_block(const TU
     return std::shared_ptr<BufferControlBlock>();
 }
 
-void ResultBufferMgr::register_row_descriptor(const TUniqueId& query_id,
-                                              const RowDescriptor& row_desc) {
-    std::unique_lock<std::shared_mutex> wlock(_row_descriptor_map_lock);
-    _row_descriptor_map.insert(std::make_pair(query_id, row_desc));
+void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id,
+                                            const 
std::shared_ptr<arrow::Schema>& arrow_schema) {
+    std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock);
+    _arrow_schema_map.insert(std::make_pair(query_id, arrow_schema));
 }
 
-RowDescriptor ResultBufferMgr::find_row_descriptor(const TUniqueId& query_id) {
-    std::shared_lock<std::shared_mutex> rlock(_row_descriptor_map_lock);
-    RowDescriptorMap::iterator iter = _row_descriptor_map.find(query_id);
+std::shared_ptr<arrow::Schema> ResultBufferMgr::find_arrow_schema(const 
TUniqueId& query_id) {
+    std::shared_lock<std::shared_mutex> rlock(_arrow_schema_map_lock);
+    auto iter = _arrow_schema_map.find(query_id);
 
-    if (_row_descriptor_map.end() != iter) {
+    if (_arrow_schema_map.end() != iter) {
         return iter->second;
     }
 
-    return RowDescriptor();
+    return nullptr;
 }
 
 void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* 
ctx) {
@@ -162,11 +162,11 @@ Status ResultBufferMgr::cancel(const TUniqueId& query_id) 
{
     }
 
     {
-        std::unique_lock<std::shared_mutex> wlock(_row_descriptor_map_lock);
-        RowDescriptorMap::iterator row_desc_iter = 
_row_descriptor_map.find(query_id);
+        std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock);
+        auto arrow_schema_iter = _arrow_schema_map.find(query_id);
 
-        if (_row_descriptor_map.end() != row_desc_iter) {
-            _row_descriptor_map.erase(row_desc_iter);
+        if (_arrow_schema_map.end() != arrow_schema_iter) {
+            _arrow_schema_map.erase(arrow_schema_iter);
         }
     }
 
diff --git a/be/src/runtime/result_buffer_mgr.h 
b/be/src/runtime/result_buffer_mgr.h
index 4e5cd38a726..7995496cbf9 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -29,12 +29,12 @@
 
 #include "common/status.h"
 #include "gutil/ref_counted.h"
-#include "runtime/descriptors.h"
 #include "util/countdown_latch.h"
 #include "util/hash_util.hpp"
 
 namespace arrow {
 class RecordBatch;
+class Schema;
 } // namespace arrow
 
 namespace doris {
@@ -66,8 +66,9 @@ public:
     // fetch data result to Arrow Flight Server
     Status fetch_arrow_data(const TUniqueId& finst_id, 
std::shared_ptr<arrow::RecordBatch>* result);
 
-    void register_row_descriptor(const TUniqueId& query_id, const 
RowDescriptor& row_desc);
-    RowDescriptor find_row_descriptor(const TUniqueId& query_id);
+    void register_arrow_schema(const TUniqueId& query_id,
+                               const std::shared_ptr<arrow::Schema>& 
arrow_schema);
+    std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& 
query_id);
 
     // cancel
     Status cancel(const TUniqueId& fragment_id);
@@ -78,7 +79,7 @@ public:
 private:
     using BufferMap = std::unordered_map<TUniqueId, 
std::shared_ptr<BufferControlBlock>>;
     using TimeoutMap = std::map<time_t, std::vector<TUniqueId>>;
-    using RowDescriptorMap = std::unordered_map<TUniqueId, RowDescriptor>;
+    using ArrowSchemaMap = std::unordered_map<TUniqueId, 
std::shared_ptr<arrow::Schema>>;
 
     std::shared_ptr<BufferControlBlock> find_control_block(const TUniqueId& 
query_id);
 
@@ -90,10 +91,10 @@ private:
     std::shared_mutex _buffer_map_lock;
     // buffer block map
     BufferMap _buffer_map;
-    // lock for descriptor map
-    std::shared_mutex _row_descriptor_map_lock;
+    // lock for arrow schema map
+    std::shared_mutex _arrow_schema_map_lock;
     // for arrow flight
-    RowDescriptorMap _row_descriptor_map;
+    ArrowSchemaMap _arrow_schema_map;
 
     // lock for timeout map
     std::mutex _timeout_lock;
diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp 
b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
index 8a0f1e67859..d2b45a0b77c 100644
--- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
+++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
@@ -39,18 +39,12 @@ 
ArrowFlightBatchReader::ArrowFlightBatchReader(std::shared_ptr<QueryStatement> s
 arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> 
ArrowFlightBatchReader::Create(
         const std::shared_ptr<QueryStatement>& statement_) {
     // Make sure that FE send the fragment to BE and creates the 
BufferControlBlock before returning ticket
-    // to the ADBC client, so that the row_descriptor and control block can be 
found.
-    RowDescriptor row_desc =
-            
ExecEnv::GetInstance()->result_mgr()->find_row_descriptor(statement_->query_id);
-    if (row_desc.equals(RowDescriptor())) {
+    // to the ADBC client, so that the schema and control block can be found.
+    auto schema = 
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id);
+    if (schema == nullptr) {
         ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format(
-                "Schema RowDescriptor Not Found, queryid: {}", 
print_id(statement_->query_id))));
-    }
-    std::shared_ptr<arrow::Schema> schema;
-    auto st = convert_to_arrow_schema(row_desc, &schema);
-    if (UNLIKELY(!st.ok())) {
-        LOG(WARNING) << st.to_string();
-        ARROW_RETURN_NOT_OK(to_arrow_status(st));
+                "not found arrow flight schema, maybe query has been canceled, 
queryid: {}",
+                print_id(statement_->query_id))));
     }
     std::shared_ptr<ArrowFlightBatchReader> result(new 
ArrowFlightBatchReader(statement_, schema));
     return result;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index dba01f1b668..52b448a5f6d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -718,23 +718,19 @@ void 
PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro
                                                      
google::protobuf::Closure* done) {
     bool ret = _light_work_pool.try_offer([request, result, done]() {
         brpc::ClosureGuard closure_guard(done);
-        RowDescriptor row_desc = 
ExecEnv::GetInstance()->result_mgr()->find_row_descriptor(
-                UniqueId(request->finst_id()).to_thrift());
-        if (row_desc.equals(RowDescriptor())) {
-            auto st = Status::NotFound("not found row descriptor");
-            st.to_protobuf(result->mutable_status());
-            return;
-        }
-
-        std::shared_ptr<arrow::Schema> schema;
-        auto st = convert_to_arrow_schema(row_desc, &schema);
-        if (UNLIKELY(!st.ok())) {
+        std::shared_ptr<arrow::Schema> schema =
+                ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
+                        UniqueId(request->finst_id()).to_thrift());
+        if (schema == nullptr) {
+            LOG(INFO) << "not found arrow flight schema, maybe query has been 
canceled";
+            auto st = Status::NotFound(
+                    "not found arrow flight schema, maybe query has been 
canceled");
             st.to_protobuf(result->mutable_status());
             return;
         }
 
         std::string schema_str;
-        st = serialize_arrow_schema(row_desc, &schema, &schema_str);
+        auto st = serialize_arrow_schema(&schema, &schema_str);
         if (st.ok()) {
             result->set_schema(std::move(schema_str));
         }
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index 6a44da2ec6b..6662f2e0ba7 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -39,6 +39,8 @@
 #include "runtime/types.h"
 #include "util/arrow/block_convertor.h"
 #include "vec/core/block.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
 
 namespace doris {
 
@@ -163,6 +165,22 @@ Status convert_to_arrow_schema(const RowDescriptor& 
row_desc,
     return Status::OK();
 }
 
+Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& 
output_vexpr_ctxs,
+                                      std::shared_ptr<arrow::Schema>* result) {
+    std::vector<std::shared_ptr<arrow::Field>> fields;
+    for (auto expr_ctx : output_vexpr_ctxs) {
+        std::shared_ptr<arrow::DataType> arrow_type;
+        auto root_expr = expr_ctx->root();
+        RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type));
+        auto field_name = root_expr->is_slot_ref() ? root_expr->expr_name()
+                                                   : 
root_expr->data_type()->get_name();
+        fields.push_back(
+                std::make_shared<arrow::Field>(field_name, arrow_type, 
root_expr->is_nullable()));
+    }
+    *result = arrow::schema(std::move(fields));
+    return Status::OK();
+}
+
 Status serialize_record_batch(const arrow::RecordBatch& record_batch, 
std::string* result) {
     // create sink memory buffer outputstream with the computed capacity
     int64_t capacity;
@@ -206,15 +224,13 @@ Status serialize_record_batch(const arrow::RecordBatch& 
record_batch, std::strin
     return Status::OK();
 }
 
-Status serialize_arrow_schema(RowDescriptor row_desc, 
std::shared_ptr<arrow::Schema>* schema,
-                              std::string* result) {
-    std::vector<SlotDescriptor*> slots;
-    for (auto tuple_desc : row_desc.tuple_descriptors()) {
-        slots.insert(slots.end(), tuple_desc->slots().begin(), 
tuple_desc->slots().end());
+Status serialize_arrow_schema(std::shared_ptr<arrow::Schema>* schema, 
std::string* result) {
+    auto make_empty_result = arrow::RecordBatch::MakeEmpty(*schema);
+    if (!make_empty_result.ok()) {
+        return Status::InternalError("serialize_arrow_schema failed, reason: 
{}",
+                                     make_empty_result.status().ToString());
     }
-    auto block = vectorized::Block(slots, 0);
-    std::shared_ptr<arrow::RecordBatch> batch;
-    RETURN_IF_ERROR(convert_to_arrow_batch(block, *schema, 
arrow::default_memory_pool(), &batch));
+    auto batch = make_empty_result.ValueOrDie();
     return serialize_record_batch(*batch, result);
 }
 
diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h
index 1bd408754f1..ddffc3324d3 100644
--- a/be/src/util/arrow/row_batch.h
+++ b/be/src/util/arrow/row_batch.h
@@ -23,6 +23,7 @@
 #include "common/status.h"
 #include "runtime/types.h"
 #include "vec/core/block.h"
+#include "vec/exprs/vexpr_fwd.h"
 
 // This file will convert Doris RowBatch to/from Arrow's RecordBatch
 // RowBatch is used by Doris query engine to exchange data between
@@ -49,9 +50,11 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc,
 Status convert_block_arrow_schema(const vectorized::Block& block,
                                   std::shared_ptr<arrow::Schema>* result);
 
+Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& 
output_vexpr_ctxs,
+                                      std::shared_ptr<arrow::Schema>* result);
+
 Status serialize_record_batch(const arrow::RecordBatch& record_batch, 
std::string* result);
 
-Status serialize_arrow_schema(RowDescriptor row_desc, 
std::shared_ptr<arrow::Schema>* schema,
-                              std::string* result);
+Status serialize_arrow_schema(std::shared_ptr<arrow::Schema>* schema, 
std::string* result);
 
 } // namespace doris
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp 
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index 4a71e10df42..771040bfb8b 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -27,14 +27,13 @@
 namespace doris {
 namespace vectorized {
 
-VArrowFlightResultWriter::VArrowFlightResultWriter(BufferControlBlock* sinker,
-                                                   const VExprContextSPtrs& 
output_vexpr_ctxs,
-                                                   RuntimeProfile* 
parent_profile,
-                                                   const RowDescriptor& 
row_desc)
+VArrowFlightResultWriter::VArrowFlightResultWriter(
+        BufferControlBlock* sinker, const VExprContextSPtrs& output_vexpr_ctxs,
+        RuntimeProfile* parent_profile, const std::shared_ptr<arrow::Schema>& 
arrow_schema)
         : _sinker(sinker),
           _output_vexpr_ctxs(output_vexpr_ctxs),
           _parent_profile(parent_profile),
-          _row_desc(row_desc) {}
+          _arrow_schema(arrow_schema) {}
 
 Status VArrowFlightResultWriter::init(RuntimeState* state) {
     _init_profile();
@@ -42,8 +41,6 @@ Status VArrowFlightResultWriter::init(RuntimeState* state) {
         return Status::InternalError("sinker is NULL pointer.");
     }
     _is_dry_run = state->query_options().dry_run_query;
-    // generate the arrow schema
-    RETURN_IF_ERROR(convert_to_arrow_schema(_row_desc, &_arrow_schema));
     return Status::OK();
 }
 
@@ -100,7 +97,7 @@ bool VArrowFlightResultWriter::can_sink() {
     return _sinker->can_sink();
 }
 
-Status VArrowFlightResultWriter::close(Status) {
+Status VArrowFlightResultWriter::close(Status st) {
     COUNTER_SET(_sent_rows_counter, _written_rows);
     COUNTER_UPDATE(_bytes_sent_counter, _bytes_sent);
     return Status::OK();
diff --git a/be/src/vec/sink/varrow_flight_result_writer.h 
b/be/src/vec/sink/varrow_flight_result_writer.h
index 7aa8ec6824a..02faebfddb3 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.h
+++ b/be/src/vec/sink/varrow_flight_result_writer.h
@@ -31,7 +31,6 @@
 namespace doris {
 class BufferControlBlock;
 class RuntimeState;
-class RowDescriptor;
 
 namespace vectorized {
 class Block;
@@ -39,7 +38,8 @@ class Block;
 class VArrowFlightResultWriter final : public ResultWriter {
 public:
     VArrowFlightResultWriter(BufferControlBlock* sinker, const 
VExprContextSPtrs& output_vexpr_ctxs,
-                             RuntimeProfile* parent_profile, const 
RowDescriptor& row_desc);
+                             RuntimeProfile* parent_profile,
+                             const std::shared_ptr<arrow::Schema>& 
arrow_schema);
 
     Status init(RuntimeState* state) override;
 
@@ -72,7 +72,6 @@ private:
 
     uint64_t _bytes_sent = 0;
 
-    const RowDescriptor& _row_desc;
     std::shared_ptr<arrow::Schema> _arrow_schema;
 };
 } // namespace vectorized
diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp 
b/be/src/vec/sink/vmemory_scratch_sink.cpp
index f9192d5c79f..d4f0d4521c0 100644
--- a/be/src/vec/sink/vmemory_scratch_sink.cpp
+++ b/be/src/vec/sink/vmemory_scratch_sink.cpp
@@ -56,8 +56,6 @@ Status MemoryScratchSink::_prepare_vexpr(RuntimeState* state) 
{
     RETURN_IF_ERROR(VExpr::create_expr_trees(_t_output_expr, 
_output_vexpr_ctxs));
     // Prepare the exprs to run.
     RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
-    // generate the arrow schema
-    RETURN_IF_ERROR(convert_to_arrow_schema(_row_desc, &_arrow_schema));
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/vmemory_scratch_sink.h 
b/be/src/vec/sink/vmemory_scratch_sink.h
index e91d130547a..c9a6922336c 100644
--- a/be/src/vec/sink/vmemory_scratch_sink.h
+++ b/be/src/vec/sink/vmemory_scratch_sink.h
@@ -65,8 +65,6 @@ public:
 private:
     Status _prepare_vexpr(RuntimeState* state);
 
-    std::shared_ptr<arrow::Schema> _arrow_schema;
-
     BlockQueueSharedPtr _queue;
 
     // Owned by the RuntimeState.
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index b3a2d3bae7f..d5ca67b76c7 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -33,6 +33,7 @@
 #include "runtime/exec_env.h"
 #include "runtime/result_buffer_mgr.h"
 #include "runtime/runtime_state.h"
+#include "util/arrow/row_batch.h"
 #include "util/runtime_profile.h"
 #include "util/telemetry/telemetry.h"
 #include "vec/exprs/vexpr.h"
@@ -98,12 +99,15 @@ Status VResultSink::prepare(RuntimeState* state) {
         _writer.reset(new (std::nothrow)
                               VMysqlResultWriter(_sender.get(), 
_output_vexpr_ctxs, _profile));
         break;
-    case TResultSinkType::ARROW_FLIGHT_PROTOCAL:
-        
state->exec_env()->result_mgr()->register_row_descriptor(state->fragment_instance_id(),
-                                                                 _row_desc);
+    case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
+        std::shared_ptr<arrow::Schema> arrow_schema;
+        RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, 
&arrow_schema));
+        
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
+                                                               arrow_schema);
         _writer.reset(new (std::nothrow) 
VArrowFlightResultWriter(_sender.get(), _output_vexpr_ctxs,
-                                                                  _profile, 
_row_desc));
+                                                                  _profile, 
arrow_schema));
         break;
+    }
     default:
         return Status::InternalError("Unknown result sink type");
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
index 5a4806b12ec..27e61ff87e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
@@ -1204,7 +1204,10 @@ public enum ErrorCode {
             "the auto increment must be BIGINT type."),
 
     ERR_AUTO_INCREMENT_COLUMN_IN_AGGREGATE_TABLE(5096, new byte[]{'4', '2', 
'0', '0', '0'},
-            "the auto increment is only supported in duplicate table and 
unique table.");
+            "the auto increment is only supported in duplicate table and 
unique table."),
+
+    ERR_ARROW_FLIGHT_SQL_MUST_ONLY_RESULT_STMT(5097, new byte[]{'4', '2', '0', 
'0', '0'},
+            "There can only be one stmt that returns the result and it is at 
the end.");
 
     // This is error code
     private final int code;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
index 1bde95c1650..67a84bfb8a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ConnectProcessor;
 import org.apache.doris.qe.ConnectScheduler;
+import org.apache.doris.qe.MysqlConnectProcessor;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -81,7 +82,7 @@ public class AcceptListener implements 
ChannelListener<AcceptingChannel<StreamCo
                             
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()));
                     context.setUserInsertTimeout(
                             
context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser()));
-                    ConnectProcessor processor = new ConnectProcessor(context);
+                    ConnectProcessor processor = new 
MysqlConnectProcessor(context);
                     context.startAcceptQuery(processor);
                 } catch (AfterConnectedException e) {
                     // do not need to print log for this kind of exception.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java
index f8a03029d5a..f1f1a443131 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java
@@ -23,6 +23,7 @@ import java.util.EnumSet;
 import java.util.Map;
 
 // MySQL protocol text command
+// Reused by arrow flight protocol
 public enum MysqlCommand {
     COM_SLEEP("Sleep", 0),
     COM_QUIT("Quit", 1),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 15359faacaf..183edb765cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.qe;
 
+import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
@@ -39,10 +40,12 @@ import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
 import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
 import org.apache.doris.statistics.ColumnStatistic;
 import org.apache.doris.statistics.Histogram;
 import org.apache.doris.system.Backend;
 import org.apache.doris.task.LoadTaskInfo;
+import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TResultSinkType;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionEntry;
@@ -59,6 +62,7 @@ import org.json.JSONObject;
 import org.xnio.StreamConnection;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -77,7 +81,7 @@ public class ConnectContext {
 
     public enum ConnectType {
         MYSQL,
-        ARROW_FLIGHT
+        ARROW_FLIGHT_SQL
     }
 
     protected volatile ConnectType connectType;
@@ -96,8 +100,14 @@ public class ConnectContext {
     protected volatile int connectionId;
     // Timestamp when the connection is make
     protected volatile long loginTime;
-    // arrow flight token
+    // for arrow flight
     protected volatile String peerIdentity;
+    private String runningQuery;
+    private TNetworkAddress resultFlightServerAddr;
+    private TNetworkAddress resultInternalServiceAddr;
+    private ArrayList<Expr> resultOutputExprs;
+    private TUniqueId finstId;
+    private boolean returnResultFromLocal = true;
     // mysql net
     protected volatile MysqlChannel mysqlChannel;
     // state
@@ -190,7 +200,7 @@ public class ConnectContext {
 
     private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL;
 
-    //internal call like `insert overwrite` need skipAuth
+    // internal call like `insert overwrite` need skipAuth
     // For example, `insert overwrite` only requires load permission,
     // but the internal implementation will call the logic of `AlterTable`.
     // In this case, `skipAuth` needs to be set to `true` to skip the 
permission check of `AlterTable`
@@ -286,41 +296,30 @@ public class ConnectContext {
         return connectType;
     }
 
-    public ConnectContext() {
-        this((StreamConnection) null);
-    }
-
-    public ConnectContext(String peerIdentity) {
-        this.connectType = ConnectType.ARROW_FLIGHT;
-        this.peerIdentity = peerIdentity;
+    public void init() {
         state = new QueryState();
         returnRows = 0;
         isKilled = false;
         sessionVariable = VariableMgr.newSessionVariable();
-        mysqlChannel = new DummyMysqlChannel();
         command = MysqlCommand.COM_SLEEP;
         if (Config.use_fuzzy_session_variable) {
             sessionVariable.initFuzzyModeVariables();
         }
-        setResultSinkType(TResultSinkType.ARROW_FLIGHT_PROTOCAL);
+    }
+
+    public ConnectContext() {
+        this((StreamConnection) null);
     }
 
     public ConnectContext(StreamConnection connection) {
         connectType = ConnectType.MYSQL;
-        state = new QueryState();
-        returnRows = 0;
         serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
-        isKilled = false;
         if (connection != null) {
             mysqlChannel = new MysqlChannel(connection);
         } else {
             mysqlChannel = new DummyMysqlChannel();
         }
-        sessionVariable = VariableMgr.newSessionVariable();
-        command = MysqlCommand.COM_SLEEP;
-        if (Config.use_fuzzy_session_variable) {
-            sessionVariable.initFuzzyModeVariables();
-        }
+        init();
     }
 
     public boolean isTxnModel() {
@@ -541,14 +540,70 @@ public class ConnectContext {
         this.loginTime = System.currentTimeMillis();
     }
 
+    public void setRunningQuery(String runningQuery) {
+        this.runningQuery = runningQuery;
+    }
+
+    public String getRunningQuery() {
+        return runningQuery;
+    }
+
+    public void setResultFlightServerAddr(TNetworkAddress 
resultFlightServerAddr) {
+        this.resultFlightServerAddr = resultFlightServerAddr;
+    }
+
+    public TNetworkAddress getResultFlightServerAddr() {
+        return resultFlightServerAddr;
+    }
+
+    public void setResultInternalServiceAddr(TNetworkAddress 
resultInternalServiceAddr) {
+        this.resultInternalServiceAddr = resultInternalServiceAddr;
+    }
+
+    public TNetworkAddress getResultInternalServiceAddr() {
+        return resultInternalServiceAddr;
+    }
+
+    public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
+        this.resultOutputExprs = resultOutputExprs;
+    }
+
+    public ArrayList<Expr> getResultOutputExprs() {
+        return resultOutputExprs;
+    }
+
+    public void setFinstId(TUniqueId finstId) {
+        this.finstId = finstId;
+    }
+
+    public TUniqueId getFinstId() {
+        return finstId;
+    }
+
+    public void setReturnResultFromLocal(boolean returnResultFromLocal) {
+        this.returnResultFromLocal = returnResultFromLocal;
+    }
+
+    public boolean isReturnResultFromLocal() {
+        return returnResultFromLocal;
+    }
+
     public String getPeerIdentity() {
         return peerIdentity;
     }
 
+    public FlightSqlChannel getFlightSqlChannel() {
+        throw new RuntimeException("getFlightSqlChannel not in flight sql 
connection");
+    }
+
     public MysqlChannel getMysqlChannel() {
         return mysqlChannel;
     }
 
+    public String getClientIP() {
+        return getMysqlChannel().getRemoteHostPortString();
+    }
+
     public QueryState getState() {
         return state;
     }
@@ -620,10 +675,14 @@ public class ConnectContext {
         return executor;
     }
 
-    public void cleanup() {
+    protected void closeChannel() {
         if (mysqlChannel != null) {
             mysqlChannel.close();
         }
+    }
+
+    public void cleanup() {
+        closeChannel();
         threadLocalInfo.remove();
         returnRows = 0;
     }
@@ -701,27 +760,17 @@ public class ConnectContext {
     }
 
     public String getRemoteHostPortString() {
-        if (connectType.equals(ConnectType.MYSQL)) {
-            return getMysqlChannel().getRemoteHostPortString();
-        } else if (connectType.equals(ConnectType.ARROW_FLIGHT)) {
-            // TODO Get flight client IP:Port
-            return peerIdentity;
-        }
-        return "";
+        return getMysqlChannel().getRemoteHostPortString();
     }
 
     // kill operation with no protect.
     public void kill(boolean killConnection) {
-        LOG.warn("kill query from {}, kill connection: {}", 
getRemoteHostPortString(), killConnection);
+        LOG.warn("kill query from {}, kill mysql connection: {}", 
getRemoteHostPortString(), killConnection);
 
         if (killConnection) {
             isKilled = true;
-            if (connectType.equals(ConnectType.MYSQL)) {
-                // Close channel to break connection with client
-                getMysqlChannel().close();
-            } else if (connectType.equals(ConnectType.ARROW_FLIGHT)) {
-                connectScheduler.unregisterConnection(this);
-            }
+            // Close channel to break connection with client
+            closeChannel();
         }
         // Now, cancel running query.
         cancelQuery();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 5640a8c034c..3885af944b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -17,11 +17,8 @@
 
 package org.apache.doris.qe;
 
-import org.apache.doris.analysis.ExecuteStmt;
 import org.apache.doris.analysis.InsertStmt;
 import org.apache.doris.analysis.KillStmt;
-import org.apache.doris.analysis.LiteralExpr;
-import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.QueryStmt;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
@@ -35,7 +32,7 @@ import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.telemetry.Telemetry;
 import org.apache.doris.common.util.DebugUtil;
@@ -47,7 +44,6 @@ import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.mysql.MysqlPacket;
-import org.apache.doris.mysql.MysqlProto;
 import org.apache.doris.mysql.MysqlSerializer;
 import org.apache.doris.mysql.MysqlServerStatusFlag;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
@@ -61,6 +57,7 @@ import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TUniqueId;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.SpanKind;
@@ -74,9 +71,6 @@ import org.apache.logging.log4j.Logger;
 import java.io.IOException;
 import java.io.StringReader;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -84,11 +78,16 @@ import java.util.Map;
 import java.util.UUID;
 
 /**
- * Process one mysql connection, receive one packet, process, send one packet.
+ * Process one connection, the life cycle is the same as connection
  */
-public class ConnectProcessor {
+public abstract class ConnectProcessor {
+    public enum ConnectType {
+        MYSQL,
+        ARROW_FLIGHT_SQL
+    }
+
     private static final Logger LOG = 
LogManager.getLogger(ConnectProcessor.class);
-    private static final TextMapGetter<Map<String, String>> getter =
+    protected static final TextMapGetter<Map<String, String>> getter =
             new TextMapGetter<Map<String, String>>() {
                 @Override
                 public Iterable<String> keys(Map<String, String> carrier) {
@@ -103,17 +102,17 @@ public class ConnectProcessor {
                     return "";
                 }
             };
-    private final ConnectContext ctx;
-    private ByteBuffer packetBuf;
-    private StmtExecutor executor = null;
+    protected final ConnectContext ctx;
+    protected StmtExecutor executor = null;
+    protected ConnectType connectType;
+    protected ArrayList<StmtExecutor> returnResultFromRemoteExecutor = new 
ArrayList<>();
 
     public ConnectProcessor(ConnectContext context) {
         this.ctx = context;
     }
 
-    // COM_INIT_DB: change current database of this session.
-    private void handleInitDb() {
-        String fullDbName = new String(packetBuf.array(), 1, packetBuf.limit() 
- 1);
+    // change current database of this session.
+    protected void handleInitDb(String fullDbName) {
         if (Strings.isNullOrEmpty(ctx.getClusterName())) {
             ctx.getState().setError(ErrorCode.ERR_CLUSTER_NAME_NULL, "Please 
enter cluster");
             return;
@@ -160,24 +159,22 @@ public class ConnectProcessor {
         ctx.getState().setOk();
     }
 
-    // COM_QUIT: set killed flag and then return OK packet.
-    private void handleQuit() {
+    // set killed flag
+    protected void handleQuit() {
         ctx.setKilled();
         ctx.getState().setOk();
     }
 
-    // process COM_PING statement, do nothing, just return one OK packet.
-    private void handlePing() {
+    // do nothing
+    protected void handlePing() {
         ctx.getState().setOk();
     }
 
-    private void handleStmtReset() {
+    protected void handleStmtReset() {
         ctx.getState().setOk();
     }
 
-    private void handleStmtClose() {
-        packetBuf = packetBuf.order(ByteOrder.LITTLE_ENDIAN);
-        int stmtId = packetBuf.getInt();
+    protected void handleStmtClose(int stmtId) {
         LOG.debug("close stmt id: {}", stmtId);
         ConnectContext.get().removePrepareStmt(String.valueOf(stmtId));
         // No response packet is sent back to the client, see
@@ -185,119 +182,27 @@ public class ConnectProcessor {
         ctx.getState().setNoop();
     }
 
-    private void debugPacket() {
-        byte[] bytes = packetBuf.array();
-        StringBuilder printB = new StringBuilder();
-        for (byte b : bytes) {
-            if (Character.isLetterOrDigit((char) b & 0xFF)) {
-                char x = (char) b;
-                printB.append(x);
-            } else {
-                printB.append("0x" + Integer.toHexString(b & 0xFF));
-            }
-            printB.append(" ");
-        }
-        LOG.debug("debug packet {}", printB.toString().substring(0, 200));
-    }
-
-    private static boolean isNull(byte[] bitmap, int position) {
+    protected static boolean isNull(byte[] bitmap, int position) {
         return (bitmap[position / 8] & (1 << (position & 7))) != 0;
     }
 
-    // process COM_EXECUTE, parse binary row data
-    // 
https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_execute.html
-    private void handleExecute() {
-        // debugPacket();
-        packetBuf = packetBuf.order(ByteOrder.LITTLE_ENDIAN);
-        // parse stmt_id, flags, params
-        int stmtId = packetBuf.getInt();
-        // flag
-        packetBuf.get();
-        // iteration_count always 1,
-        packetBuf.getInt();
-        LOG.debug("execute prepared statement {}", stmtId);
-        PrepareStmtContext prepareCtx = 
ctx.getPreparedStmt(String.valueOf(stmtId));
-        if (prepareCtx == null) {
-            LOG.debug("No such statement in context, stmtId:{}", stmtId);
-            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR,
-                    "msg: Not supported such prepared statement");
-            return;
-        }
-        ctx.setStartTime();
-        if (prepareCtx.stmt.getInnerStmt() instanceof QueryStmt) {
-            ctx.getState().setIsQuery(true);
-        }
-        prepareCtx.stmt.setIsPrepared();
-        int paramCount = prepareCtx.stmt.getParmCount();
-        // null bitmap
-        byte[] nullbitmapData = new byte[(paramCount + 7) / 8];
-        packetBuf.get(nullbitmapData);
-        String stmtStr = "";
-        try {
-            // new_params_bind_flag
-            if ((int) packetBuf.get() != 0) {
-                // parse params's types
-                for (int i = 0; i < paramCount; ++i) {
-                    int typeCode = packetBuf.getChar();
-                    LOG.debug("code {}", typeCode);
-                    
prepareCtx.stmt.placeholders().get(i).setTypeCode(typeCode);
-                }
-            }
-            List<LiteralExpr> realValueExprs = new ArrayList<>();
-            // parse param data
-            for (int i = 0; i < paramCount; ++i) {
-                if (isNull(nullbitmapData, i)) {
-                    realValueExprs.add(new NullLiteral());
-                    continue;
-                }
-                LiteralExpr l = 
prepareCtx.stmt.placeholders().get(i).createLiteralFromType();
-                l.setupParamFromBinary(packetBuf);
-                realValueExprs.add(l);
-            }
-            ExecuteStmt executeStmt = new ExecuteStmt(String.valueOf(stmtId), 
realValueExprs);
-            // TODO set real origin statement
-            executeStmt.setOrigStmt(new OriginStatement("null", 0));
-            executeStmt.setUserInfo(ctx.getCurrentUserIdentity());
-            LOG.debug("executeStmt {}", executeStmt);
-            executor = new StmtExecutor(ctx, executeStmt);
-            ctx.setExecutor(executor);
-            executor.execute();
-            stmtStr = executeStmt.toSql();
-        } catch (Throwable e)  {
-            // Catch all throwable.
-            // If reach here, maybe palo bug.
-            LOG.warn("Process one query failed because unknown reason: ", e);
-            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
-                    e.getClass().getSimpleName() + ", msg: " + e.getMessage());
-        }
-        auditAfterExec(stmtStr, prepareCtx.stmt.getInnerStmt(), null, false);
-    }
-
-    private void auditAfterExec(String origStmt, StatementBase parsedStmt,
-                    Data.PQueryStatistics statistics, boolean 
printFuzzyVariables) {
+    protected void auditAfterExec(String origStmt, StatementBase parsedStmt,
+            Data.PQueryStatistics statistics, boolean printFuzzyVariables) {
         AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, statistics, 
printFuzzyVariables);
     }
 
-    // Process COM_QUERY statement,
     // only throw an exception when there is a problem interacting with the 
requesting client
-    private void handleQuery(MysqlCommand mysqlCommand) {
+    protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) {
         if (MetricRepo.isInit) {
             MetricRepo.COUNTER_REQUEST_ALL.increase(1L);
         }
-        // convert statement to Java string
-        byte[] bytes = packetBuf.array();
-        int ending = packetBuf.limit() - 1;
-        while (ending >= 1 && bytes[ending] == '\0') {
-            ending--;
-        }
-        String originStmt = new String(bytes, 1, ending, 
StandardCharsets.UTF_8);
 
         String sqlHash = DigestUtils.md5Hex(originStmt);
         ctx.setSqlHash(sqlHash);
         ctx.getAuditEventBuilder().reset();
         ctx.getAuditEventBuilder()
                 .setTimestamp(System.currentTimeMillis())
-                .setClientIp(ctx.getMysqlChannel().getRemoteHostPortString())
+                .setClientIp(ctx.getClientIP())
                 
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
                 .setSqlHash(ctx.getSqlHash());
 
@@ -356,10 +261,25 @@ public class ConnectProcessor {
 
             try {
                 executor.execute();
-                if (i != stmts.size() - 1) {
-                    ctx.getState().serverStatus |= 
MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
-                    if (ctx.getState().getStateType() != MysqlStateType.ERR) {
-                        finalizeCommand();
+                if (connectType.equals(ConnectType.MYSQL)) {
+                    if (i != stmts.size() - 1) {
+                        ctx.getState().serverStatus |= 
MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
+                        if (ctx.getState().getStateType() != 
MysqlStateType.ERR) {
+                            finalizeCommand();
+                        }
+                    }
+                } else if (connectType.equals(ConnectType.ARROW_FLIGHT_SQL)) {
+                    if (!ctx.isReturnResultFromLocal()) {
+                        returnResultFromRemoteExecutor.add(executor);
+                    }
+                    
Preconditions.checkState(ctx.getFlightSqlChannel().resultNum() <= 1);
+                    if (ctx.getFlightSqlChannel().resultNum() == 1 && i != 
stmts.size() - 1) {
+                        String errMsg = "Only be one stmt that returns the 
result and it is at the end. stmts.size(): "
+                                + stmts.size();
+                        LOG.warn(errMsg);
+                        
ctx.getState().setError(ErrorCode.ERR_ARROW_FLIGHT_SQL_MUST_ONLY_RESULT_STMT, 
errMsg);
+                        
ctx.getState().setErrType(QueryState.ErrType.OTHER_ERR);
+                        break;
                     }
                 }
                 auditAfterExec(auditStmt, executor.getParsedStmt(), 
executor.getQueryStatisticsForAuditLog(), true);
@@ -381,8 +301,8 @@ public class ConnectProcessor {
     }
 
     // Use a handler for exception to avoid big try catch block which is a 
little hard to understand
-    private void handleQueryException(Throwable throwable, String origStmt,
-                                      StatementBase parsedStmt, 
Data.PQueryStatistics statistics) {
+    protected void handleQueryException(Throwable throwable, String origStmt,
+            StatementBase parsedStmt, Data.PQueryStatistics statistics) {
         if (ctx.getMinidump() != null) {
             MinidumpUtils.saveMinidumpString(ctx.getMinidump(), 
DebugUtil.printId(ctx.queryId()));
         }
@@ -415,7 +335,7 @@ public class ConnectProcessor {
     }
 
     // analyze the origin stmt and return multi-statements
-    private List<StatementBase> parse(String originStmt) throws 
AnalysisException, DdlException {
+    protected List<StatementBase> parse(String originStmt) throws 
AnalysisException, DdlException {
         LOG.debug("the originStmts are: {}", originStmt);
         // Parse statement with parser generated by CUP&FLEX
         SqlScanner input = new SqlScanner(new StringReader(originStmt), 
ctx.getSessionVariable().getSqlMode());
@@ -443,9 +363,8 @@ public class ConnectProcessor {
 
     // Get the column definitions of a table
     @SuppressWarnings("rawtypes")
-    private void handleFieldList() throws IOException {
+    protected void handleFieldList(String tableName) {
         // Already get command code.
-        String tableName = new 
String(MysqlProto.readNulTerminateString(packetBuf), StandardCharsets.UTF_8);
         if (Strings.isNullOrEmpty(tableName)) {
             ctx.getState().setError(ErrorCode.ERR_UNKNOWN_TABLE, "Empty 
tableName");
             return;
@@ -463,18 +382,21 @@ public class ConnectProcessor {
 
         table.readLock();
         try {
-            MysqlChannel channel = ctx.getMysqlChannel();
-            MysqlSerializer serializer = channel.getSerializer();
-
-            // Send fields
-            // NOTE: Field list doesn't send number of fields
-            List<Column> baseSchema = table.getBaseSchema();
-            for (Column column : baseSchema) {
-                serializer.reset();
-                serializer.writeField(db.getFullName(), table.getName(), 
column, true);
-                channel.sendOnePacket(serializer.toByteBuffer());
+            if (connectType.equals(ConnectType.MYSQL)) {
+                MysqlChannel channel = ctx.getMysqlChannel();
+                MysqlSerializer serializer = channel.getSerializer();
+
+                // Send fields
+                // NOTE: Field list doesn't send number of fields
+                List<Column> baseSchema = table.getBaseSchema();
+                for (Column column : baseSchema) {
+                    serializer.reset();
+                    serializer.writeField(db.getFullName(), table.getName(), 
column, true);
+                    channel.sendOnePacket(serializer.toByteBuffer());
+                }
+            } else if (connectType.equals(ConnectType.ARROW_FLIGHT_SQL)) {
+                // TODO
             }
-
         } catch (Throwable throwable) {
             handleQueryException(throwable, "", null, null);
         } finally {
@@ -483,62 +405,9 @@ public class ConnectProcessor {
         ctx.getState().setEof();
     }
 
-    private void dispatch() throws IOException {
-        int code = packetBuf.get();
-        MysqlCommand command = MysqlCommand.fromCode(code);
-        if (command == null) {
-            ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
-            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, "Unknown 
command(" + code + ")");
-            LOG.warn("Unknown command(" + code + ")");
-            return;
-        }
-        LOG.debug("handle command {}", command);
-        ctx.setCommand(command);
-        ctx.setStartTime();
-
-        switch (command) {
-            case COM_INIT_DB:
-                handleInitDb();
-                break;
-            case COM_QUIT:
-                handleQuit();
-                break;
-            case COM_QUERY:
-            case COM_STMT_PREPARE:
-                ctx.initTracer("trace");
-                Span rootSpan = 
ctx.getTracer().spanBuilder("handleQuery").setNoParent().startSpan();
-                try (Scope scope = rootSpan.makeCurrent()) {
-                    handleQuery(command);
-                } catch (Exception e) {
-                    rootSpan.recordException(e);
-                    throw e;
-                } finally {
-                    rootSpan.end();
-                }
-                break;
-            case COM_STMT_EXECUTE:
-                handleExecute();
-                break;
-            case COM_FIELD_LIST:
-                handleFieldList();
-                break;
-            case COM_PING:
-                handlePing();
-                break;
-            case COM_STMT_RESET:
-                handleStmtReset();
-                break;
-            case COM_STMT_CLOSE:
-                handleStmtClose();
-                break;
-            default:
-                ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, 
"Unsupported command(" + command + ")");
-                LOG.warn("Unsupported command(" + command + ")");
-                break;
-        }
-    }
-
-    private ByteBuffer getResultPacket() {
+    // only Mysql protocol
+    protected ByteBuffer getResultPacket() {
+        Preconditions.checkState(connectType.equals(ConnectType.MYSQL));
         MysqlPacket packet = ctx.getState().toResponsePacket();
         if (packet == null) {
             // possible two cases:
@@ -555,7 +424,9 @@ public class ConnectProcessor {
 
     // When any request is completed, it will generally need to send a 
response packet to the client
     // This method is used to send a response packet to the client
-    private void finalizeCommand() throws IOException {
+    // only Mysql protocol
+    public void finalizeCommand() throws IOException {
+        Preconditions.checkState(connectType.equals(ConnectType.MYSQL));
         ByteBuffer packet;
         if (executor != null && executor.isForwardToMaster()
                 && ctx.getState().getStateType() != 
QueryState.MysqlStateType.ERR) {
@@ -736,47 +607,9 @@ public class ConnectProcessor {
         return result;
     }
 
-    // Process a MySQL request
-    public void processOnce() throws IOException {
-        // set status of query to OK.
-        ctx.getState().reset();
-        executor = null;
-
-        // reset sequence id of MySQL protocol
-        final MysqlChannel channel = ctx.getMysqlChannel();
-        channel.setSequenceId(0);
-        // read packet from channel
-        try {
-            packetBuf = channel.fetchOnePacket();
-            if (packetBuf == null) {
-                LOG.warn("Null packet received from network. remote: {}", 
channel.getRemoteHostPortString());
-                throw new IOException("Error happened when receiving packet.");
-            }
-        } catch (AsynchronousCloseException e) {
-            // when this happened, timeout checker close this channel
-            // killed flag in ctx has been already set, just return
-            return;
-        }
-
-        // dispatch
-        dispatch();
-        // finalize
-        finalizeCommand();
-
-        ctx.setCommand(MysqlCommand.COM_SLEEP);
-    }
-
-    public void loop() {
-        while (!ctx.isKilled()) {
-            try {
-                processOnce();
-            } catch (Exception e) {
-                // TODO(zhaochun): something wrong
-                LOG.warn("Exception happened in one session(" + ctx + ").", e);
-                ctx.setKilled();
-                break;
-            }
-        }
+    // only Mysql protocol
+    public void processOnce() throws IOException, NotImplementedException {
+        throw new NotImplementedException("Not Impl processOnce");
     }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index 70bfd7e2d8c..5be4c330e0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -102,7 +102,7 @@ public class ConnectScheduler {
             return false;
         }
         connectionMap.put(ctx.getConnectionId(), ctx);
-        if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT)) {
+        if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
             flightToken2ConnectionId.put(ctx.getPeerIdentity(), 
ctx.getConnectionId());
         }
         return true;
@@ -116,7 +116,7 @@ public class ConnectScheduler {
                 conns.decrementAndGet();
             }
             numberConnection.decrementAndGet();
-            if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT)) {
+            if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
                 flightToken2ConnectionId.remove(ctx.getPeerIdentity());
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 752dedf8015..9aaca2f8e2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -19,7 +19,6 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.DescriptorTable;
-import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.FsBroker;
@@ -68,6 +67,7 @@ import 
org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
 import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
 import org.apache.doris.proto.Types;
 import org.apache.doris.proto.Types.PUniqueId;
+import org.apache.doris.qe.ConnectContext.ConnectType;
 import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
@@ -210,11 +210,6 @@ public class Coordinator implements CoordInterface {
     private final List<BackendExecState> needCheckBackendExecStates = 
Lists.newArrayList();
     private final List<PipelineExecContext> needCheckPipelineExecContexts = 
Lists.newArrayList();
     private ResultReceiver receiver;
-    private TNetworkAddress resultFlightServerAddr;
-    private TNetworkAddress resultInternalServiceAddr;
-    private ArrayList<Expr> resultOutputExprs;
-
-    private TUniqueId finstId;
     private final List<ScanNode> scanNodes;
     private int scanRangeNum = 0;
     // number of instances of this query, equals to
@@ -283,22 +278,6 @@ public class Coordinator implements CoordInterface {
         return executionProfile;
     }
 
-    public TNetworkAddress getResultFlightServerAddr() {
-        return resultFlightServerAddr;
-    }
-
-    public TNetworkAddress getResultInternalServiceAddr() {
-        return resultInternalServiceAddr;
-    }
-
-    public ArrayList<Expr> getResultOutputExprs() {
-        return resultOutputExprs;
-    }
-
-    public TUniqueId getFinstId() {
-        return finstId;
-    }
-
     // True if all scan node are ExternalScanNode.
     private boolean isAllExternalScan = true;
 
@@ -631,10 +610,14 @@ public class Coordinator implements CoordInterface {
             TNetworkAddress execBeAddr = 
topParams.instanceExecParams.get(0).host;
             receiver = new ResultReceiver(queryId, 
topParams.instanceExecParams.get(0).instanceId,
                     addressToBackendID.get(execBeAddr), 
toBrpcHost(execBeAddr), this.timeoutDeadline);
-            finstId = topParams.instanceExecParams.get(0).instanceId;
-            resultFlightServerAddr = toArrowFlightHost(execBeAddr);
-            resultInternalServiceAddr = toBrpcHost(execBeAddr);
-            resultOutputExprs = fragments.get(0).getOutputExprs();
+
+            if (!context.isReturnResultFromLocal()) {
+                
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
+                
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
+                
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
+                context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
+                
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
+            }
             if (LOG.isDebugEnabled()) {
                 LOG.debug("dispatch result sink of query {} to {}", 
DebugUtil.printId(queryId),
                         topParams.instanceExecParams.get(0).host);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
new file mode 100644
index 00000000000..63781d2adda
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.analysis.ExecuteStmt;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.analysis.QueryStmt;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.mysql.MysqlChannel;
+import org.apache.doris.mysql.MysqlCommand;
+import org.apache.doris.mysql.MysqlProto;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Process one mysql connection, receive one packet, process, send one packet.
+ */
+public class MysqlConnectProcessor extends ConnectProcessor {
+    private static final Logger LOG = 
LogManager.getLogger(MysqlConnectProcessor.class);
+
+    private ByteBuffer packetBuf;
+
+    public MysqlConnectProcessor(ConnectContext context) {
+        super(context);
+        connectType = ConnectType.MYSQL;
+    }
+
+    // COM_INIT_DB: change current database of this session.
+    private void handleInitDb() {
+        String fullDbName = new String(packetBuf.array(), 1, packetBuf.limit() 
- 1);
+        handleInitDb(fullDbName);
+    }
+
+    private void handleStmtClose() {
+        packetBuf = packetBuf.order(ByteOrder.LITTLE_ENDIAN);
+        int stmtId = packetBuf.getInt();
+        handleStmtClose(stmtId);
+    }
+
+    private void debugPacket() {
+        byte[] bytes = packetBuf.array();
+        StringBuilder printB = new StringBuilder();
+        for (byte b : bytes) {
+            if (Character.isLetterOrDigit((char) b & 0xFF)) {
+                char x = (char) b;
+                printB.append(x);
+            } else {
+                printB.append("0x" + Integer.toHexString(b & 0xFF));
+            }
+            printB.append(" ");
+        }
+        LOG.debug("debug packet {}", printB.toString().substring(0, 200));
+    }
+
+    // process COM_EXECUTE, parse binary row data
+    // 
https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_execute.html
+    private void handleExecute() {
+        // debugPacket();
+        packetBuf = packetBuf.order(ByteOrder.LITTLE_ENDIAN);
+        // parse stmt_id, flags, params
+        int stmtId = packetBuf.getInt();
+        // flag
+        packetBuf.get();
+        // iteration_count always 1,
+        packetBuf.getInt();
+        LOG.debug("execute prepared statement {}", stmtId);
+        PrepareStmtContext prepareCtx = 
ctx.getPreparedStmt(String.valueOf(stmtId));
+        if (prepareCtx == null) {
+            LOG.debug("No such statement in context, stmtId:{}", stmtId);
+            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR,
+                    "msg: Not supported such prepared statement");
+            return;
+        }
+        ctx.setStartTime();
+        if (prepareCtx.stmt.getInnerStmt() instanceof QueryStmt) {
+            ctx.getState().setIsQuery(true);
+        }
+        prepareCtx.stmt.setIsPrepared();
+        int paramCount = prepareCtx.stmt.getParmCount();
+        // null bitmap
+        byte[] nullbitmapData = new byte[(paramCount + 7) / 8];
+        packetBuf.get(nullbitmapData);
+        String stmtStr = "";
+        try {
+            // new_params_bind_flag
+            if ((int) packetBuf.get() != 0) {
+                // parse params's types
+                for (int i = 0; i < paramCount; ++i) {
+                    int typeCode = packetBuf.getChar();
+                    LOG.debug("code {}", typeCode);
+                    
prepareCtx.stmt.placeholders().get(i).setTypeCode(typeCode);
+                }
+            }
+            List<LiteralExpr> realValueExprs = new ArrayList<>();
+            // parse param data
+            for (int i = 0; i < paramCount; ++i) {
+                if (isNull(nullbitmapData, i)) {
+                    realValueExprs.add(new NullLiteral());
+                    continue;
+                }
+                LiteralExpr l = 
prepareCtx.stmt.placeholders().get(i).createLiteralFromType();
+                l.setupParamFromBinary(packetBuf);
+                realValueExprs.add(l);
+            }
+            ExecuteStmt executeStmt = new ExecuteStmt(String.valueOf(stmtId), 
realValueExprs);
+            // TODO set real origin statement
+            executeStmt.setOrigStmt(new OriginStatement("null", 0));
+            executeStmt.setUserInfo(ctx.getCurrentUserIdentity());
+            LOG.debug("executeStmt {}", executeStmt);
+            executor = new StmtExecutor(ctx, executeStmt);
+            ctx.setExecutor(executor);
+            executor.execute();
+            stmtStr = executeStmt.toSql();
+        } catch (Throwable e) {
+            // Catch all throwable.
+            // If reach here, maybe palo bug.
+            LOG.warn("Process one query failed because unknown reason: ", e);
+            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
+                    e.getClass().getSimpleName() + ", msg: " + e.getMessage());
+        }
+        auditAfterExec(stmtStr, prepareCtx.stmt.getInnerStmt(), null, false);
+    }
+
+    // Process COM_QUERY statement,
+    private void handleQuery(MysqlCommand mysqlCommand) {
+        // convert statement to Java string
+        byte[] bytes = packetBuf.array();
+        int ending = packetBuf.limit() - 1;
+        while (ending >= 1 && bytes[ending] == '\0') {
+            ending--;
+        }
+        String originStmt = new String(bytes, 1, ending, 
StandardCharsets.UTF_8);
+
+        handleQuery(mysqlCommand, originStmt);
+    }
+
+    private void dispatch() throws IOException {
+        int code = packetBuf.get();
+        MysqlCommand command = MysqlCommand.fromCode(code);
+        if (command == null) {
+            ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
+            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, "Unknown 
command(" + code + ")");
+            LOG.warn("Unknown command(" + code + ")");
+            return;
+        }
+        LOG.debug("handle command {}", command);
+        ctx.setCommand(command);
+        ctx.setStartTime();
+
+        switch (command) {
+            case COM_INIT_DB:
+                handleInitDb();
+                break;
+            case COM_QUIT:
+                // COM_QUIT: set killed flag and then return OK packet.
+                handleQuit();
+                break;
+            case COM_QUERY:
+            case COM_STMT_PREPARE:
+                // Process COM_QUERY statement,
+                ctx.initTracer("trace");
+                Span rootSpan = 
ctx.getTracer().spanBuilder("handleQuery").setNoParent().startSpan();
+                try (Scope scope = rootSpan.makeCurrent()) {
+                    handleQuery(command);
+                } catch (Exception e) {
+                    rootSpan.recordException(e);
+                    throw e;
+                } finally {
+                    rootSpan.end();
+                }
+                break;
+            case COM_STMT_EXECUTE:
+                handleExecute();
+                break;
+            case COM_FIELD_LIST:
+                handleFieldList();
+                break;
+            case COM_PING:
+                // process COM_PING statement, do nothing, just return one OK 
packet.
+                handlePing();
+                break;
+            case COM_STMT_RESET:
+                handleStmtReset();
+                break;
+            case COM_STMT_CLOSE:
+                handleStmtClose();
+                break;
+            default:
+                ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, 
"Unsupported command(" + command + ")");
+                LOG.warn("Unsupported command(" + command + ")");
+                break;
+        }
+    }
+
+    private void handleFieldList() {
+        String tableName = new 
String(MysqlProto.readNulTerminateString(packetBuf), StandardCharsets.UTF_8);
+        handleFieldList(tableName);
+    }
+
+    // Process a MySQL request
+    public void processOnce() throws IOException {
+        // set status of query to OK.
+        ctx.getState().reset();
+        executor = null;
+
+        // reset sequence id of MySQL protocol
+        final MysqlChannel channel = ctx.getMysqlChannel();
+        channel.setSequenceId(0);
+        // read packet from channel
+        try {
+            packetBuf = channel.fetchOnePacket();
+            if (packetBuf == null) {
+                LOG.warn("Null packet received from network. remote: {}", 
channel.getRemoteHostPortString());
+                throw new IOException("Error happened when receiving packet.");
+            }
+        } catch (AsynchronousCloseException e) {
+            // when this happened, timeout checker close this channel
+            // killed flag in ctx has been already set, just return
+            return;
+        }
+
+        // dispatch
+        dispatch();
+        // finalize
+        finalizeCommand();
+
+        ctx.setCommand(MysqlCommand.COM_SLEEP);
+    }
+
+    public void loop() {
+        while (!ctx.isKilled()) {
+            try {
+                processOnce();
+            } catch (Exception e) {
+                // TODO(zhaochun): something wrong
+                LOG.warn("Exception happened in one session(" + ctx + ").", e);
+                ctx.setKilled();
+                break;
+            }
+        }
+    }
+}
+
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java
index 3619a15876b..a5f52f26288 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java
@@ -25,6 +25,7 @@ import org.apache.doris.mysql.MysqlPacket;
 
 // query state used to record state of query, maybe query status is better
 public class QueryState {
+    // Reused by arrow flight protocol
     public enum MysqlStateType {
         NOOP,   // send nothing to remote
         OK,     // send OK packet to remote
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index ec3a5a79586..8eaff7fdf10 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -136,6 +136,7 @@ import 
org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
 import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
 import org.apache.doris.proto.Types;
 import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
+import org.apache.doris.qe.ConnectContext.ConnectType;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.cache.Cache;
 import org.apache.doris.qe.cache.CacheAnalyzer;
@@ -147,7 +148,6 @@ import 
org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.FrontendOptions;
-import org.apache.doris.service.arrowflight.FlightStatementExecutor;
 import org.apache.doris.statistics.ResultRow;
 import org.apache.doris.statistics.util.InternalQueryBuffer;
 import org.apache.doris.system.Backend;
@@ -242,6 +242,7 @@ public class StmtExecutor {
 
     // this constructor is mainly for proxy
     public StmtExecutor(ConnectContext context, OriginStatement originStmt, 
boolean isProxy) {
+        
Preconditions.checkState(context.getConnectType().equals(ConnectType.MYSQL));
         this.context = context;
         this.originStmt = originStmt;
         this.serializer = context.getMysqlChannel().getSerializer();
@@ -262,7 +263,11 @@ public class StmtExecutor {
         this.context = ctx;
         this.parsedStmt = parsedStmt;
         this.originStmt = parsedStmt.getOrigStmt();
-        this.serializer = context.getMysqlChannel().getSerializer();
+        if (context.getConnectType() == ConnectType.MYSQL) {
+            this.serializer = context.getMysqlChannel().getSerializer();
+        } else {
+            this.serializer = null;
+        }
         this.isProxy = false;
         if (parsedStmt instanceof LogicalPlanAdapter) {
             this.statementContext = ((LogicalPlanAdapter) 
parsedStmt).getStatementContext();
@@ -428,7 +433,7 @@ public class StmtExecutor {
      *      isValuesOrConstantSelect: when this interface return true, 
original string is truncated at 1024
      *
      * @return parsed and analyzed statement for Stale planner.
-     * an unresolved LogicalPlan wrapped with a LogicalPlanAdapter for Nereids.
+     *         an unresolved LogicalPlan wrapped with a LogicalPlanAdapter for 
Nereids.
      */
     public StatementBase getParsedStmt() {
         return parsedStmt;
@@ -444,13 +449,16 @@ public class StmtExecutor {
     public void execute(TUniqueId queryId) throws Exception {
         SessionVariable sessionVariable = context.getSessionVariable();
         Span executeSpan = 
context.getTracer().spanBuilder("execute").setParent(Context.current()).startSpan();
+        if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
+            context.setReturnResultFromLocal(true);
+        }
         try (Scope scope = executeSpan.makeCurrent()) {
             if (parsedStmt instanceof LogicalPlanAdapter
                     || (parsedStmt == null && 
sessionVariable.isEnableNereidsPlanner())) {
                 try {
                     executeByNereids(queryId);
                 } catch (NereidsException | ParseException e) {
-                    if (context.getMinidump() != null) {
+                    if (context.getMinidump() != null && 
context.getMinidump().toString(4) != null) {
                         
MinidumpUtils.saveMinidumpString(context.getMinidump(), 
DebugUtil.printId(context.queryId()));
                     }
                     // try to fall back to legacy planner
@@ -600,12 +608,23 @@ public class StmtExecutor {
         }
         if (statements.size() <= originStmt.idx) {
             throw new ParseException("Nereids parse failed. Parser get " + 
statements.size() + " statements,"
-                            + " but we need at least " + originStmt.idx + " 
statements.");
+                    + " but we need at least " + originStmt.idx + " 
statements.");
         }
         parsedStmt = statements.get(originStmt.idx);
     }
 
+    public void finalizeQuery() {
+        // The final profile report occurs after be returns the query data, 
and the profile cannot be
+        // received after unregisterQuery(), causing the instance profile to 
be lost, so we should wait
+        // for the profile before unregisterQuery().
+        updateProfile(true);
+        QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
+    }
+
     private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
+        if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
+            context.setReturnResultFromLocal(false);
+        }
         // queue query here
         syncJournalIfNeeded();
         QueueOfferToken offerRet = null;
@@ -631,7 +650,7 @@ public class StmtExecutor {
         try {
             for (int i = 0; i < retryTime; i++) {
                 try {
-                    //reset query id for each retry
+                    // reset query id for each retry
                     if (i > 0) {
                         UUID uuid = UUID.randomUUID();
                         TUniqueId newQueryId = new 
TUniqueId(uuid.getMostSignificantBits(),
@@ -646,17 +665,15 @@ public class StmtExecutor {
                     if (i == retryTime - 1) {
                         throw e;
                     }
-                    if (!context.getMysqlChannel().isSend()) {
+                    if (context.getConnectType().equals(ConnectType.MYSQL) && 
!context.getMysqlChannel().isSend()) {
                         LOG.warn("retry {} times. stmt: {}", (i + 1), 
parsedStmt.getOrigStmt().originStmt);
                     } else {
                         throw e;
                     }
                 } finally {
-                    // The final profile report occurs after be returns the 
query data, and the profile cannot be
-                    // received after unregisterQuery(), causing the instance 
profile to be lost, so we should wait
-                    // for the profile before unregisterQuery().
-                    updateProfile(true);
-                    
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
+                    if (context.isReturnResultFromLocal()) {
+                        finalizeQuery();
+                    }
                 }
             }
         } finally {
@@ -1361,9 +1378,11 @@ public class StmtExecutor {
     // Process a select statement.
     private void handleQueryStmt() throws Exception {
         LOG.info("Handling query {} with query id {}",
-                          originStmt.originStmt, 
DebugUtil.printId(context.queryId));
-        // Every time set no send flag and clean all data in buffer
-        context.getMysqlChannel().reset();
+                originStmt.originStmt, DebugUtil.printId(context.queryId));
+        if (context.getConnectType() == ConnectType.MYSQL) {
+            // Every time set no send flag and clean all data in buffer
+            context.getMysqlChannel().reset();
+        }
         Queriable queryStmt = (Queriable) parsedStmt;
 
         QueryDetail queryDetail = new QueryDetail(context.getStartTime(),
@@ -1390,12 +1409,16 @@ public class StmtExecutor {
             return;
         }
 
-        MysqlChannel channel = context.getMysqlChannel();
+        MysqlChannel channel = null;
+        if (context.getConnectType().equals(ConnectType.MYSQL)) {
+            channel = context.getMysqlChannel();
+        }
         boolean isOutfileQuery = queryStmt.hasOutFileClause();
 
         // Sql and PartitionCache
         CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, 
planner);
-        if (cacheAnalyzer.enableCache() && !isOutfileQuery
+        // TODO support arrow flight sql
+        if (channel != null && cacheAnalyzer.enableCache() && !isOutfileQuery
                 && context.getSessionVariable().getSqlSelectLimit() < 0
                 && context.getSessionVariable().getDefaultOrderByLimit() < 0) {
             if (queryStmt instanceof QueryStmt || queryStmt instanceof 
LogicalPlanAdapter) {
@@ -1406,7 +1429,8 @@ public class StmtExecutor {
         }
 
         // handle select .. from xx  limit 0
-        if (parsedStmt instanceof SelectStmt) {
+        // TODO support arrow flight sql
+        if (channel != null && parsedStmt instanceof SelectStmt) {
             SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt;
             if (parsedSelectStmt.getLimit() == 0) {
                 LOG.info("ignore handle limit 0 ,sql:{}", 
parsedSelectStmt.toSql());
@@ -1469,6 +1493,22 @@ public class StmtExecutor {
             }
         }
 
+        if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
+            Preconditions.checkState(!context.isReturnResultFromLocal());
+            profile.getSummaryProfile().setTempStartTime();
+            if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
+                try {
+                    LOG.debug("Finish to execute fragment. user: {}, db: {}, 
sql: {}, fragment instance num: {}",
+                            context.getQualifiedUser(), context.getDatabase(),
+                            parsedStmt.getOrigStmt().originStmt.replace("\n", 
" "),
+                            coordBase.getInstanceTotalNum());
+                } catch (Exception e) {
+                    LOG.warn("Fail to print fragment concurrency for Query.", 
e);
+                }
+            }
+            return;
+        }
+
         Span fetchResultSpan = context.getTracer().spanBuilder("fetch 
result").setParent(Context.current()).startSpan();
         try (Scope scope = fetchResultSpan.makeCurrent()) {
             while (true) {
@@ -1573,8 +1613,10 @@ public class StmtExecutor {
     }
 
     private void handleTransactionStmt() throws Exception {
-        // Every time set no send flag and clean all data in buffer
-        context.getMysqlChannel().reset();
+        if (context.getConnectType() == ConnectType.MYSQL) {
+            // Every time set no send flag and clean all data in buffer
+            context.getMysqlChannel().reset();
+        }
         context.getState().setOk(0, 0, "");
         // create plan
         if (context.getTxnEntry() != null && 
context.getTxnEntry().getRowsInTransaction() == 0
@@ -1774,8 +1816,8 @@ public class StmtExecutor {
 
     // Process an insert statement.
     private void handleInsertStmt() throws Exception {
-        // Every time set no send flag and clean all data in buffer
-        if (context.getMysqlChannel() != null) {
+        if (context.getConnectType() == ConnectType.MYSQL) {
+            // Every time set no send flag and clean all data in buffer
             context.getMysqlChannel().reset();
         }
         InsertStmt insertStmt = (InsertStmt) parsedStmt;
@@ -1989,8 +2031,7 @@ public class StmtExecutor {
                  */
                 throwable = t;
             } finally {
-                updateProfile(true);
-                QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
+                finalizeQuery();
             }
 
             // Go here, which means:
@@ -2059,7 +2100,9 @@ public class StmtExecutor {
     }
 
     private void handleUnsupportedStmt() {
-        context.getMysqlChannel().reset();
+        if (context.getConnectType() == ConnectType.MYSQL) {
+            context.getMysqlChannel().reset();
+        }
         // do nothing
         context.getState().setOk();
     }
@@ -2084,10 +2127,10 @@ public class StmtExecutor {
     private void handlePrepareStmt() throws Exception {
         // register prepareStmt
         LOG.debug("add prepared statement {}, isBinaryProtocol {}",
-                        prepareStmt.getName(), prepareStmt.isBinaryProtocol());
+                prepareStmt.getName(), prepareStmt.isBinaryProtocol());
         context.addPreparedStmt(prepareStmt.getName(),
                 new PrepareStmtContext(prepareStmt,
-                            context, planner, analyzer, 
prepareStmt.getName()));
+                        context, planner, analyzer, prepareStmt.getName()));
         if (prepareStmt.isBinaryProtocol()) {
             sendStmtPrepareOK();
         }
@@ -2114,6 +2157,7 @@ public class StmtExecutor {
     }
 
     private void sendMetaData(ResultSetMetaData metaData) throws IOException {
+        Preconditions.checkState(context.getConnectType() == 
ConnectType.MYSQL);
         // sends how many columns
         serializer.reset();
         serializer.writeVInt(metaData.getColumnCount());
@@ -2137,6 +2181,7 @@ public class StmtExecutor {
     }
 
     private void sendStmtPrepareOK() throws IOException {
+        Preconditions.checkState(context.getConnectType() == 
ConnectType.MYSQL);
         // 
https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_prepare.html#sect_protocol_com_stmt_prepare_response
         serializer.reset();
         // 0x00 OK
@@ -2174,6 +2219,7 @@ public class StmtExecutor {
     }
 
     private void sendFields(List<String> colNames, List<Type> types) throws 
IOException {
+        Preconditions.checkState(context.getConnectType() == 
ConnectType.MYSQL);
         // sends how many columns
         serializer.reset();
         serializer.writeVInt(colNames.size());
@@ -2205,24 +2251,33 @@ public class StmtExecutor {
     }
 
     public void sendResultSet(ResultSet resultSet) throws IOException {
-        context.updateReturnRows(resultSet.getResultRows().size());
-        // Send meta data.
-        sendMetaData(resultSet.getMetaData());
+        if (context.getConnectType().equals(ConnectType.MYSQL)) {
+            context.updateReturnRows(resultSet.getResultRows().size());
+            // Send meta data.
+            sendMetaData(resultSet.getMetaData());
 
-        // Send result set.
-        for (List<String> row : resultSet.getResultRows()) {
-            serializer.reset();
-            for (String item : row) {
-                if (item == null || item.equals(FeConstants.null_string)) {
-                    serializer.writeNull();
-                } else {
-                    serializer.writeLenEncodedString(item);
+            // Send result set.
+            for (List<String> row : resultSet.getResultRows()) {
+                serializer.reset();
+                for (String item : row) {
+                    if (item == null || item.equals(FeConstants.null_string)) {
+                        serializer.writeNull();
+                    } else {
+                        serializer.writeLenEncodedString(item);
+                    }
                 }
+                
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
             }
-            context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
-        }
 
-        context.getState().setEof();
+            context.getState().setEof();
+        } else if 
(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
+            context.updateReturnRows(resultSet.getResultRows().size());
+            context.getFlightSqlChannel()
+                    .addResult(DebugUtil.printId(context.queryId()), 
context.getRunningQuery(), resultSet);
+            context.getState().setEof();
+        } else {
+            LOG.error("sendResultSet error connect type");
+        }
     }
 
     // Process show statement
@@ -2248,6 +2303,7 @@ public class StmtExecutor {
     }
 
     public void handleExplainStmt(String result, boolean isNereids) throws 
IOException {
+        // TODO support arrow flight sql
         ShowResultSetMetaData metaData = ShowResultSetMetaData.builder()
                 .addColumn(new Column("Explain String" + (isNereids ? 
"(Nereids Planner)" : "(Old Planner)"),
                         ScalarType.createVarchar(20)))
@@ -2686,64 +2742,6 @@ public class StmtExecutor {
         }
     }
 
-    public void executeArrowFlightQuery(FlightStatementExecutor 
flightStatementExecutor) {
-        LOG.debug("ARROW FLIGHT QUERY: " + originStmt.toString());
-        try {
-            try {
-                if (ConnectContext.get() != null
-                        && 
ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) {
-                    try {
-                        parseByNereids();
-                        Preconditions.checkState(parsedStmt instanceof 
LogicalPlanAdapter,
-                                "Nereids only process LogicalPlanAdapter,"
-                                        + " but parsedStmt is " + 
parsedStmt.getClass().getName());
-                        context.getState().setNereids(true);
-                        context.getState().setIsQuery(true);
-                        planner = new NereidsPlanner(statementContext);
-                        planner.plan(parsedStmt, 
context.getSessionVariable().toThrift());
-                    } catch (Exception e) {
-                        LOG.warn("fall back to legacy planner, because: {}", 
e.getMessage(), e);
-                        parsedStmt = null;
-                        context.getState().setNereids(false);
-                        analyzer = new Analyzer(context.getEnv(), context);
-                        analyze(context.getSessionVariable().toThrift());
-                    }
-                } else {
-                    analyzer = new Analyzer(context.getEnv(), context);
-                    analyze(context.getSessionVariable().toThrift());
-                }
-            } catch (Exception e) {
-                throw new RuntimeException("Failed to execute Arrow Flight 
SQL. " + Util.getRootCauseMessage(e), e);
-            }
-            coord = new Coordinator(context, analyzer, planner, 
context.getStatsErrorEstimator());
-            profile.addExecutionProfile(coord.getExecutionProfile());
-            try {
-                QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
-                        new QeProcessorImpl.QueryInfo(context, 
originStmt.originStmt, coord));
-            } catch (UserException e) {
-                throw new RuntimeException("Failed to execute Arrow Flight 
SQL. " + Util.getRootCauseMessage(e), e);
-            }
-
-            Span queryScheduleSpan = context.getTracer()
-                    .spanBuilder("Arrow Flight SQL 
schedule").setParent(Context.current()).startSpan();
-            try (Scope scope = queryScheduleSpan.makeCurrent()) {
-                coord.exec();
-            } catch (Exception e) {
-                queryScheduleSpan.recordException(e);
-                LOG.warn("Failed to coord exec Arrow Flight SQL, because: {}", 
e.getMessage(), e);
-                throw new RuntimeException(e.getMessage() + 
Util.getRootCauseMessage(e), e);
-            } finally {
-                queryScheduleSpan.end();
-            }
-        } finally {
-            QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); // 
TODO for query profile
-        }
-        flightStatementExecutor.setFinstId(coord.getFinstId());
-        
flightStatementExecutor.setResultFlightServerAddr(coord.getResultFlightServerAddr());
-        
flightStatementExecutor.setResultInternalServiceAddr(coord.getResultInternalServiceAddr());
-        
flightStatementExecutor.setResultOutputExprs(coord.getResultOutputExprs());
-    }
-
     private List<ResultRow> convertResultBatchToResultRows(TResultBatch batch) 
{
         List<String> columns = parsedStmt.getColLabels();
         List<ResultRow> resultRows = new ArrayList<>();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 514b9df525d..6ef0956808b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -83,15 +83,18 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.StreamLoadPlanner;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ConnectContext.ConnectType;
 import org.apache.doris.qe.ConnectProcessor;
 import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.DdlExecutor;
 import org.apache.doris.qe.MasterCatalogExecutor;
+import org.apache.doris.qe.MysqlConnectProcessor;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.qe.VariableMgr;
+import org.apache.doris.service.arrowflight.FlightSqlConnectProcessor;
 import org.apache.doris.statistics.ColumnStatistic;
 import org.apache.doris.statistics.ResultRow;
 import org.apache.doris.statistics.StatisticsCacheKey;
@@ -1104,7 +1107,16 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         ConnectContext context = new ConnectContext();
         // Set current connected FE to the client address, so that we can know 
where this request come from.
         context.setCurrentConnectedFEIp(params.getClientNodeHost());
-        ConnectProcessor processor = new ConnectProcessor(context);
+
+        ConnectProcessor processor = null;
+        if (context.getConnectType().equals(ConnectType.MYSQL)) {
+            processor = new MysqlConnectProcessor(context);
+        } else if 
(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
+            processor = new FlightSqlConnectProcessor(context);
+        } else {
+            throw new TException("unknown ConnectType: " + 
context.getConnectType());
+        }
+
         TMasterOpResult result = processor.proxyExecute(params);
         if 
(QueryState.MysqlStateType.ERR.name().equalsIgnoreCase(result.getStatus())) {
             context.getState().setError(result.getStatus());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 0e73fbb2ad6..d2f8b46b893 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -24,8 +24,11 @@ import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
 import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
@@ -63,12 +66,15 @@ import 
org.apache.arrow.flight.sql.impl.FlightSql.TicketStatementQuery;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
 
 public class DorisFlightSqlProducer implements FlightSqlProducer, 
AutoCloseable {
     private static final Logger LOG = 
LogManager.getLogger(DorisFlightSqlProducer.class);
@@ -111,33 +117,72 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
         ConnectContext connectContext = null;
         try {
             connectContext = 
flightSessionsManager.getConnectContext(context.peerIdentity());
-            // Only for ConnectContext check timeout.
-            connectContext.setCommand(MysqlCommand.COM_QUERY);
+            // After the previous query was executed, there was no 
getStreamStatement to take away the result.
+            connectContext.getFlightSqlChannel().reset();
             final String query = request.getQuery();
-            final FlightStatementExecutor flightStatementExecutor = new 
FlightStatementExecutor(query, connectContext);
-
-            flightStatementExecutor.executeQuery();
-
-            TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder()
-                    .setStatementHandle(ByteString.copyFromUtf8(
-                            
DebugUtil.printId(flightStatementExecutor.getFinstId()) + ":" + query)).build();
-            final Ticket ticket = new 
Ticket(Any.pack(ticketStatement).toByteArray());
-            // TODO Support multiple endpoints.
-            Location location = 
Location.forGrpcInsecure(flightStatementExecutor.getResultFlightServerAddr().hostname,
-                    flightStatementExecutor.getResultFlightServerAddr().port);
-            List<FlightEndpoint> endpoints = Collections.singletonList(new 
FlightEndpoint(ticket, location));
-
-            Schema schema;
-            schema = flightStatementExecutor.fetchArrowFlightSchema(5000);
-            if (schema == null) {
-                throw CallStatus.INTERNAL.withDescription("fetch arrow flight 
schema is null").toRuntimeException();
+            final FlightSqlConnectProcessor flightSQLConnectProcessor = new 
FlightSqlConnectProcessor(connectContext);
+
+            flightSQLConnectProcessor.handleQuery(query);
+            if (connectContext.getState().getStateType() == 
MysqlStateType.ERR) {
+                throw new RuntimeException("after handleQuery");
+            }
+
+            if (connectContext.isReturnResultFromLocal()) {
+                // set/use etc. stmt returns an OK result by default.
+                if (connectContext.getFlightSqlChannel().resultNum() == 0) {
+                    // a random query id and add empty results
+                    String queryId = UUID.randomUUID().toString();
+                    
connectContext.getFlightSqlChannel().addEmptyResult(queryId, query);
+
+                    final ByteString handle = 
ByteString.copyFromUtf8(context.peerIdentity() + ":" + queryId);
+                    TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder().setStatementHandle(handle)
+                            .build();
+                    return getFlightInfoForSchema(ticketStatement, descriptor,
+                            
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot().getSchema());
+                }
+
+                // A Flight Sql request can only contain one statement that 
returns result,
+                // otherwise expected thrown exception during execution.
+                
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);
+
+                // The tokens used for authentication between 
getStreamStatement and getFlightInfoStatement
+                // are different. So put the peerIdentity into the ticket and 
then getStreamStatement is used to find
+                // the correct ConnectContext.
+                // queryId is used to find query results.
+                final ByteString handle = ByteString.copyFromUtf8(
+                        context.peerIdentity() + ":" + 
DebugUtil.printId(connectContext.queryId()));
+                TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder().setStatementHandle(handle)
+                        .build();
+                return getFlightInfoForSchema(ticketStatement, descriptor,
+                        
connectContext.getFlightSqlChannel().getResult(DebugUtil.printId(connectContext.queryId()))
+                                .getVectorSchemaRoot().getSchema());
+            } else {
+                // Now only query stmt will pull results from BE.
+                final ByteString handle = ByteString.copyFromUtf8(
+                        DebugUtil.printId(connectContext.getFinstId()) + ":" + 
query);
+                Schema schema = 
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
+                if (schema == null) {
+                    throw CallStatus.INTERNAL.withDescription("fetch arrow 
flight schema is null").toRuntimeException();
+                }
+                TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder().setStatementHandle(handle)
+                        .build();
+                Ticket ticket = new 
Ticket(Any.pack(ticketStatement).toByteArray());
+                // TODO Support multiple endpoints.
+                Location location = 
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
+                        connectContext.getResultFlightServerAddr().port);
+                List<FlightEndpoint> endpoints = Collections.singletonList(new 
FlightEndpoint(ticket, location));
+                // TODO Set in BE callback after query end, Client will not 
callback.
+                connectContext.setCommand(MysqlCommand.COM_SLEEP);
+                return new FlightInfo(schema, descriptor, endpoints, -1, -1);
             }
-            // TODO Set in BE callback after query end, Client client will not 
callback by default.
-            connectContext.setCommand(MysqlCommand.COM_SLEEP);
-            return new FlightInfo(schema, descriptor, endpoints, -1, -1);
         } catch (Exception e) {
             if (null != connectContext) {
                 connectContext.setCommand(MysqlCommand.COM_SLEEP);
+                String errMsg = "get flight info statement failed, " + 
e.getMessage() + ", " + Util.getRootCauseMessage(
+                        e) + ", error code: " + 
connectContext.getState().getErrorCode() + ", error msg: "
+                        + connectContext.getState().getErrorMessage();
+                LOG.warn(errMsg, e);
+                throw 
CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
             }
             LOG.warn("get flight info statement failed, " + e.getMessage(), e);
             throw 
CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException();
@@ -146,8 +191,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
 
     @Override
     public FlightInfo getFlightInfoPreparedStatement(final 
CommandPreparedStatementQuery command,
-            final CallContext context,
-            final FlightDescriptor descriptor) {
+            final CallContext context, final FlightDescriptor descriptor) {
         throw 
CallStatus.UNIMPLEMENTED.withDescription("getFlightInfoPreparedStatement 
unimplemented")
                 .toRuntimeException();
     }
@@ -158,6 +202,42 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
         throw CallStatus.UNIMPLEMENTED.withDescription("getSchemaStatement 
unimplemented").toRuntimeException();
     }
 
+    @Override
+    public void getStreamStatement(final TicketStatementQuery 
ticketStatementQuery, final CallContext context,
+            final ServerStreamListener listener) {
+        ConnectContext connectContext = null;
+        final String handle = 
ticketStatementQuery.getStatementHandle().toStringUtf8();
+        String[] handleParts = handle.split(":");
+        String executedPeerIdentity = handleParts[0];
+        String queryId = handleParts[1];
+        try {
+            // The tokens used for authentication between getStreamStatement 
and getFlightInfoStatement are different.
+            connectContext = 
flightSessionsManager.getConnectContext(executedPeerIdentity);
+            final FlightSqlResultCacheEntry flightSqlResultCacheEntry = 
Objects.requireNonNull(
+                    connectContext.getFlightSqlChannel().getResult(queryId));
+            final VectorSchemaRoot vectorSchemaRoot = 
flightSqlResultCacheEntry.getVectorSchemaRoot();
+            listener.start(vectorSchemaRoot);
+            listener.putNext();
+        } catch (Exception e) {
+            listener.error(e);
+            if (null != connectContext) {
+                String errMsg = "get stream statement failed, " + 
e.getMessage() + ", " + Util.getRootCauseMessage(e)
+                        + ", error code: " + 
connectContext.getState().getErrorCode() + ", error msg: "
+                        + connectContext.getState().getErrorMessage();
+                LOG.warn(errMsg, e);
+                throw 
CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
+            }
+            LOG.warn("get stream statement failed, " + e.getMessage(), e);
+            throw 
CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException();
+        } finally {
+            listener.completed();
+            if (null != connectContext) {
+                // The result has been sent, delete it.
+                connectContext.getFlightSqlChannel().invalidate(queryId);
+            }
+        }
+    }
+
     @Override
     public void close() throws Exception {
         AutoCloseables.close(rootAllocator);
@@ -180,8 +260,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
     }
 
     @Override
-    public Runnable acceptPutStatement(CommandStatementUpdate command,
-            CallContext context, FlightStream flightStream,
+    public Runnable acceptPutStatement(CommandStatementUpdate command, 
CallContext context, FlightStream flightStream,
             StreamListener<PutResult> ackStream) {
         throw CallStatus.UNIMPLEMENTED.withDescription("acceptPutStatement 
unimplemented").toRuntimeException();
     }
@@ -219,8 +298,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
     }
 
     @Override
-    public void getStreamTypeInfo(CommandGetXdbcTypeInfo request, CallContext 
context,
-            ServerStreamListener listener) {
+    public void getStreamTypeInfo(CommandGetXdbcTypeInfo request, CallContext 
context, ServerStreamListener listener) {
         throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTypeInfo 
unimplemented").toRuntimeException();
     }
 
@@ -323,12 +401,6 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
         throw 
CallStatus.UNIMPLEMENTED.withDescription("getStreamCrossReference 
unimplemented").toRuntimeException();
     }
 
-    @Override
-    public void getStreamStatement(final TicketStatementQuery 
ticketStatementQuery, final CallContext context,
-            final ServerStreamListener listener) {
-        throw CallStatus.UNIMPLEMENTED.withDescription("getStreamStatement 
unimplemented").toRuntimeException();
-    }
-
     private <T extends Message> FlightInfo getFlightInfoForSchema(final T 
request, final FlightDescriptor descriptor,
             final Schema schema) {
         final Ticket ticket = new Ticket(Any.pack(request).toByteArray());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
similarity index 65%
rename from 
fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index 8c9cdf124f3..ef5b53c2d1f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -14,18 +14,19 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-// This file is copied from
-// 
https://github.com/apache/arrow/blob/main/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/StatementContext.java
-// and modified by Doris
 
 package org.apache.doris.service.arrowflight;
 
 import org.apache.doris.analysis.Expr;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.Types;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ConnectProcessor;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
@@ -33,112 +34,84 @@ import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
 
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.ipc.ArrowStreamReader;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
-import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-public final class FlightStatementExecutor implements AutoCloseable {
-    private ConnectContext connectContext;
-    private final String query;
-    private TUniqueId queryId;
-    private TUniqueId finstId;
-    private TNetworkAddress resultFlightServerAddr;
-    private TNetworkAddress resultInternalServiceAddr;
-    private ArrayList<Expr> resultOutputExprs;
-
-    public FlightStatementExecutor(final String query, ConnectContext 
connectContext) {
-        this.query = query;
-        this.connectContext = connectContext;
-        connectContext.setThreadLocalInfo();
-    }
-
-    public void setQueryId(TUniqueId queryId) {
-        this.queryId = queryId;
-    }
-
-    public void setFinstId(TUniqueId finstId) {
-        this.finstId = finstId;
-    }
-
-    public void setResultFlightServerAddr(TNetworkAddress 
resultFlightServerAddr) {
-        this.resultFlightServerAddr = resultFlightServerAddr;
-    }
-
-    public void setResultInternalServiceAddr(TNetworkAddress 
resultInternalServiceAddr) {
-        this.resultInternalServiceAddr = resultInternalServiceAddr;
-    }
-
-    public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
-        this.resultOutputExprs = resultOutputExprs;
-    }
-
-    public String getQuery() {
-        return query;
-    }
-
-    public TUniqueId getQueryId() {
-        return queryId;
-    }
-
-    public TUniqueId getFinstId() {
-        return finstId;
-    }
-
-    public TNetworkAddress getResultFlightServerAddr() {
-        return resultFlightServerAddr;
-    }
-
-    public TNetworkAddress getResultInternalServiceAddr() {
-        return resultInternalServiceAddr;
-    }
-
-    public ArrayList<Expr> getResultOutputExprs() {
-        return resultOutputExprs;
-    }
-
-    @Override
-    public boolean equals(final Object other) {
-        if (!(other instanceof FlightStatementExecutor)) {
-            return false;
+/**
+ * Process one flgiht sql connection.
+ */
+public class FlightSqlConnectProcessor extends ConnectProcessor implements 
AutoCloseable {
+    private static final Logger LOG = 
LogManager.getLogger(FlightSqlConnectProcessor.class);
+
+    public FlightSqlConnectProcessor(ConnectContext context) {
+        super(context);
+        connectType = ConnectType.ARROW_FLIGHT_SQL;
+        context.setThreadLocalInfo();
+        context.setReturnResultFromLocal(true);
+    }
+
+    public void prepare(MysqlCommand command) {
+        // set status of query to OK.
+        ctx.getState().reset();
+        executor = null;
+
+        if (command == null) {
+            ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
+            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, "Unknown 
command(" + command.toString() + ")");
+            LOG.warn("Unknown command(" + command + ")");
+            return;
         }
-        return this == other;
+        LOG.debug("arrow flight sql handle command {}", command);
+        ctx.setCommand(command);
+        ctx.setStartTime();
     }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(this);
-    }
+    public void handleQuery(String query) {
+        MysqlCommand command = MysqlCommand.COM_QUERY;
+        prepare(command);
 
-    public void executeQuery() {
-        try {
-            UUID uuid = UUID.randomUUID();
-            TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
-            setQueryId(queryId);
-            connectContext.setQueryId(queryId);
-            StmtExecutor stmtExecutor = new StmtExecutor(connectContext, 
getQuery());
-            connectContext.setExecutor(stmtExecutor);
-            stmtExecutor.executeArrowFlightQuery(this);
+        ctx.setRunningQuery(query);
+        ctx.initTracer("trace");
+        Span rootSpan = 
ctx.getTracer().spanBuilder("handleQuery").setNoParent().startSpan();
+        try (Scope scope = rootSpan.makeCurrent()) {
+            handleQuery(command, query);
         } catch (Exception e) {
-            throw new RuntimeException("Failed to coord exec", e);
+            rootSpan.recordException(e);
+            throw e;
+        } finally {
+            rootSpan.end();
         }
     }
 
+    // TODO
+    // private void handleInitDb() {
+    //     handleInitDb(fullDbName);
+    // }
+
+    // TODO
+    // private void handleFieldList() {
+    //     handleFieldList(tableName);
+    // }
+
     public Schema fetchArrowFlightSchema(int timeoutMs) {
-        TNetworkAddress address = getResultInternalServiceAddr();
-        TUniqueId tid = getFinstId();
-        ArrayList<Expr> resultOutputExprs = getResultOutputExprs();
+        TNetworkAddress address = ctx.getResultInternalServiceAddr();
+        TUniqueId tid = ctx.getFinstId();
+        ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs();
         Types.PUniqueId finstId = 
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
         try {
             InternalService.PFetchArrowFlightSchemaRequest request =
@@ -156,7 +129,7 @@ public final class FlightStatementExecutor implements 
AutoCloseable {
             }
             TStatusCode code = 
TStatusCode.findByValue(pResult.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
-                Status status = null;
+                Status status = new Status();
                 status.setPstatus(pResult.getStatus());
                 throw new RuntimeException(String.format("fetch arrow flight 
schema failed, finstId: %s, errmsg: %s",
                         DebugUtil.printId(tid), status));
@@ -204,6 +177,14 @@ public final class FlightStatementExecutor implements 
AutoCloseable {
 
     @Override
     public void close() throws Exception {
+        ctx.setCommand(MysqlCommand.COM_SLEEP);
+        // TODO support query profile
+        for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) {
+            asynExecutor.finalizeQuery();
+        }
+        returnResultFromRemoteExecutor.clear();
         ConnectContext.remove();
     }
 }
+
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlChannel.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlChannel.java
new file mode 100644
index 00000000000..174e733c2db
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlChannel.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.service.arrowflight.results;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.qe.ResultSet;
+import org.apache.doris.qe.ResultSetMetaData;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class FlightSqlChannel {
+    private final Cache<String, FlightSqlResultCacheEntry> resultCache;
+    private final BufferAllocator allocator;
+
+    public FlightSqlChannel() {
+        // The Stmt result is not picked up by the Client within 10 minutes 
and will be deleted.
+        resultCache =
+                CacheBuilder.newBuilder()
+                        .maximumSize(100)
+                        .expireAfterWrite(10, TimeUnit.MINUTES)
+                        .removalListener(new ResultRemovalListener())
+                        .build();
+        allocator = new RootAllocator(Long.MAX_VALUE);
+    }
+
+    // TODO
+    public String getRemoteIp() {
+        return "0.0.0.0";
+    }
+
+    // TODO
+    public String getRemoteHostPortString() {
+        return "0.0.0.0:0";
+    }
+
+    public void addResult(String queryId, String runningQuery, ResultSet 
resultSet) {
+        List<Field> schemaFields = new ArrayList<>();
+        List<FieldVector> dataFields = new ArrayList<>();
+        List<List<String>> resultData = resultSet.getResultRows();
+        ResultSetMetaData metaData = resultSet.getMetaData();
+
+        // TODO: only support varchar type
+        for (Column col : metaData.getColumns()) {
+            schemaFields.add(new Field(col.getName(), FieldType.nullable(new 
Utf8()), null));
+            VarCharVector varCharVector = new VarCharVector(col.getName(), 
allocator);
+            varCharVector.allocateNew();
+            varCharVector.setValueCount(resultData.size());
+            dataFields.add(varCharVector);
+        }
+
+        for (int i = 0; i < resultData.size(); i++) {
+            List<String> row = resultData.get(i);
+            for (int j = 0; j < row.size(); j++) {
+                String item = row.get(j);
+                if (item == null || item.equals(FeConstants.null_string)) {
+                    dataFields.get(j).setNull(i);
+                } else {
+                    ((VarCharVector) dataFields.get(j)).setSafe(i, 
item.getBytes());
+                }
+            }
+        }
+        VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(schemaFields, 
dataFields);
+        final FlightSqlResultCacheEntry flightSqlResultCacheEntry = new 
FlightSqlResultCacheEntry(vectorSchemaRoot,
+                runningQuery);
+        resultCache.put(queryId, flightSqlResultCacheEntry);
+    }
+
+    public void addEmptyResult(String queryId, String query) {
+        List<Field> schemaFields = new ArrayList<>();
+        List<FieldVector> dataFields = new ArrayList<>();
+        schemaFields.add(new Field("StatusResult", FieldType.nullable(new 
Utf8()), null));
+        VarCharVector varCharVector = new VarCharVector("StatusResult", 
allocator);
+        varCharVector.allocateNew();
+        varCharVector.setValueCount(1);
+        varCharVector.setSafe(0, "OK".getBytes());
+        dataFields.add(varCharVector);
+
+        VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(schemaFields, 
dataFields);
+        final FlightSqlResultCacheEntry flightSqlResultCacheEntry = new 
FlightSqlResultCacheEntry(vectorSchemaRoot,
+                query);
+        resultCache.put(queryId, flightSqlResultCacheEntry);
+    }
+
+    public FlightSqlResultCacheEntry getResult(String queryId) {
+        return resultCache.getIfPresent(queryId);
+    }
+
+    public void invalidate(String handle) {
+        resultCache.invalidate(handle);
+    }
+
+    public long resultNum() {
+        return resultCache.size();
+    }
+
+    public void reset() {
+        resultCache.invalidateAll();
+    }
+
+    public void close() {
+        reset();
+    }
+
+    private static class ResultRemovalListener implements 
RemovalListener<String, FlightSqlResultCacheEntry> {
+        @Override
+        public void onRemoval(@NotNull final RemovalNotification<String, 
FlightSqlResultCacheEntry> notification) {
+            try {
+                AutoCloseables.close(notification.getValue());
+            } catch (final Exception e) {
+                // swallow
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlResultCacheEntry.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlResultCacheEntry.java
new file mode 100644
index 00000000000..12ce04ca8ed
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlResultCacheEntry.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.service.arrowflight.results;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+import java.util.Objects;
+
+
+public final class FlightSqlResultCacheEntry implements AutoCloseable {
+
+    private final VectorSchemaRoot vectorSchemaRoot;
+    private final String query;
+
+    public FlightSqlResultCacheEntry(final VectorSchemaRoot vectorSchemaRoot, 
final String query) {
+        this.vectorSchemaRoot = Objects.requireNonNull(vectorSchemaRoot, 
"result cannot be null.");
+        this.query = query;
+    }
+
+    public VectorSchemaRoot getVectorSchemaRoot() {
+        return vectorSchemaRoot;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+
+    @Override
+    public void close() throws Exception {
+        vectorSchemaRoot.clear();
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof VectorSchemaRoot)) {
+            return false;
+        }
+        final VectorSchemaRoot that = (VectorSchemaRoot) other;
+        return vectorSchemaRoot.equals(that);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(vectorSchemaRoot);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java
index ed01098c675..f850384d4ed 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java
@@ -49,8 +49,8 @@ public interface FlightSessionsManager {
      */
     ConnectContext createConnectContext(String peerIdentity);
 
-    public static ConnectContext buildConnectContext(String peerIdentity, 
UserIdentity userIdentity, String remoteIP) {
-        ConnectContext connectContext = new ConnectContext(peerIdentity);
+    static ConnectContext buildConnectContext(String peerIdentity, 
UserIdentity userIdentity, String remoteIP) {
+        ConnectContext connectContext = new 
FlightSqlConnectContext(peerIdentity);
         connectContext.setEnv(Env.getCurrentEnv());
         connectContext.setStartTime();
         connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
index ce12f610ea2..e1866b094b2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
@@ -58,6 +58,7 @@ public class FlightSessionsWithTokenManager implements 
FlightSessionsManager {
             if (flightTokenDetails.getCreatedSession()) {
                 return null;
             }
+            flightTokenDetails.setCreatedSession(true);
             return FlightSessionsManager.buildConnectContext(peerIdentity, 
flightTokenDetails.getUserIdentity(),
                     flightTokenDetails.getRemoteIp());
         } catch (IllegalArgumentException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
new file mode 100644
index 00000000000..9f703dff92b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.service.arrowflight.sessions;
+
+import org.apache.doris.mysql.MysqlChannel;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ConnectProcessor;
+import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
+import org.apache.doris.thrift.TResultSinkType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+
+public class FlightSqlConnectContext extends ConnectContext {
+    private static final Logger LOG = 
LogManager.getLogger(FlightSqlConnectContext.class);
+    protected volatile FlightSqlChannel flightSqlChannel;
+
+    public FlightSqlConnectContext(String peerIdentity) {
+        this.connectType = ConnectType.ARROW_FLIGHT_SQL;
+        this.peerIdentity = peerIdentity;
+        mysqlChannel = null; // Use of MysqlChannel is not expected
+        flightSqlChannel = new FlightSqlChannel();
+        setResultSinkType(TResultSinkType.ARROW_FLIGHT_PROTOCAL);
+        init();
+    }
+
+    @Override
+    public FlightSqlChannel getFlightSqlChannel() {
+        return flightSqlChannel;
+    }
+
+    @Override
+    public MysqlChannel getMysqlChannel() {
+        throw new RuntimeException("getMysqlChannel not in mysql connection");
+    }
+
+    @Override
+    public String getClientIP() {
+        return flightSqlChannel.getRemoteHostPortString();
+    }
+
+    @Override
+    protected void closeChannel() {
+        if (flightSqlChannel != null) {
+            flightSqlChannel.close();
+        }
+    }
+
+    // kill operation with no protect.
+    @Override
+    public void kill(boolean killConnection) {
+        LOG.warn("kill query from {}, kill flight sql connection: {}", 
getRemoteHostPortString(), killConnection);
+
+        if (killConnection) {
+            isKilled = true;
+            closeChannel();
+            connectScheduler.unregisterConnection(this);
+        }
+        // Now, cancel running query.
+        cancelQuery();
+    }
+
+    @Override
+    public String getRemoteHostPortString() {
+        return getFlightSqlChannel().getRemoteHostPortString();
+    }
+
+    @Override
+    public void startAcceptQuery(ConnectProcessor connectProcessor) {
+        throw new RuntimeException("Flight Sql Not impl startAcceptQuery");
+    }
+
+    @Override
+    public void suspendAcceptQuery() {
+        throw new RuntimeException("Flight Sql Not impl suspendAcceptQuery");
+    }
+
+    @Override
+    public void resumeAcceptQuery() {
+        throw new RuntimeException("Flight Sql Not impl resumeAcceptQuery");
+    }
+
+    @Override
+    public void stopAcceptQuery() throws IOException {
+        throw new RuntimeException("Flight Sql Not impl stopAcceptQuery");
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
index 507102fb0d2..dcc0d85cb47 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
@@ -38,6 +38,7 @@ import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlSerializer;
 import org.apache.doris.planner.OriginalPlanner;
+import org.apache.doris.qe.ConnectContext.ConnectType;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TQueryOptions;
@@ -380,6 +381,10 @@ public class StmtExecutorTest {
                 killCtx.kill(true);
                 minTimes = 0;
 
+                killCtx.getConnectType();
+                minTimes = 0;
+                result = ConnectType.MYSQL;
+
                 ConnectContext.get();
                 minTimes = 0;
                 result = ctx;
@@ -437,6 +442,10 @@ public class StmtExecutorTest {
                 killCtx.kill(true);
                 minTimes = 0;
 
+                killCtx.getConnectType();
+                minTimes = 0;
+                result = ConnectType.MYSQL;
+
                 ConnectContext.get();
                 minTimes = 0;
                 result = ctx;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to