This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b64ae7811fc [pick](branch-3.0) #38215 #43281 #43960 (#44244)
b64ae7811fc is described below

commit b64ae7811fc63a4f0c34d39aefdf2f5d992410a8
Author: Xinyi Zou <zouxi...@selectdb.com>
AuthorDate: Tue Nov 26 11:25:04 2024 +0800

    [pick](branch-3.0) #38215 #43281 #43960 (#44244)
    
    [pick](branch-3.0) #38215 #43281 #43960
    
    ---------
    
    Co-authored-by: Guangdong Liu <liug...@gmail.com>
---
 be/src/common/config.cpp                           |   7 +
 be/src/common/config.h                             |   6 +
 .../pipeline/exec/memory_scratch_sink_operator.cpp |   2 +-
 be/src/pipeline/exec/result_file_sink_operator.cpp |   8 +-
 be/src/pipeline/exec/result_sink_operator.cpp      |  34 ++-
 be/src/runtime/buffer_control_block.cpp            | 229 ++++++++++++++--
 be/src/runtime/buffer_control_block.h              |  58 +++-
 be/src/runtime/result_buffer_mgr.cpp               |  76 +++---
 be/src/runtime/result_buffer_mgr.h                 |  30 ++-
 .../arrow_flight/arrow_flight_batch_reader.cpp     | 291 +++++++++++++++++++--
 .../arrow_flight/arrow_flight_batch_reader.h       |  69 ++++-
 be/src/service/arrow_flight/flight_sql_service.cpp |  57 ++--
 be/src/service/internal_service.cpp                |  61 ++++-
 be/src/service/internal_service.h                  |   5 +
 be/src/util/arrow/row_batch.cpp                    |  37 ++-
 be/src/util/arrow/row_batch.h                      |  12 +-
 be/src/util/arrow/utils.cpp                        |   3 +-
 be/src/util/doris_metrics.h                        |   5 +
 be/src/vec/runtime/vparquet_transformer.cpp        |   3 +-
 be/src/vec/sink/varrow_flight_result_writer.cpp    |  64 ++---
 be/src/vec/sink/varrow_flight_result_writer.h      |  16 +-
 be/test/runtime/result_buffer_mgr_test.cpp         |  13 +-
 .../arrowflight/DorisFlightSqlProducer.java        |  35 ++-
 .../arrowflight/FlightSqlConnectProcessor.java     |  14 +-
 gensrc/proto/internal_service.proto                |  16 ++
 .../data/arrow_flight_sql_p0/test_select.out       |   4 +
 regression-test/framework/pom.xml                  |   2 +-
 .../suites/arrow_flight_sql_p0/test_select.groovy  |  12 +
 28 files changed, 903 insertions(+), 266 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index a4efb631664..fa5fde4ea41 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -65,6 +65,7 @@ DEFINE_Int32(brpc_port, "8060");
 DEFINE_Int32(arrow_flight_sql_port, "-1");
 
 DEFINE_mString(public_access_ip, "");
+DEFINE_Int32(public_access_port, "-1");
 
 // the number of bthreads for brpc, the default value is set to -1,
 // which means the number of bthreads is #cpu-cores
@@ -535,6 +536,8 @@ DEFINE_Int32(brpc_light_work_pool_threads, "-1");
 DEFINE_Int32(brpc_heavy_work_pool_max_queue_size, "-1");
 DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
 DEFINE_mBool(enable_bthread_transmit_block, "true");
+DEFINE_Int32(brpc_arrow_flight_work_pool_threads, "-1");
+DEFINE_Int32(brpc_arrow_flight_work_pool_max_queue_size, "-1");
 
 //Enable brpc builtin services, see:
 
//https://brpc.apache.org/docs/server/basics/#disable-built-in-services-completely
@@ -643,7 +646,11 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
 // result buffer cancelled time (unit: second)
 DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");
 
+// arrow flight result sink buffer rows size, default 4096 * 8
 DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768");
+// The timeout for ADBC Client to wait for data using arrow flight reader.
+// If the query is very complex and no result is generated after this time, 
consider increasing this timeout.
+DEFINE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms, "300000");
 
 // the increased frequency of priority for remaining tasks in 
BlockingPriorityQueue
 DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index b840f87f725..f72f02c3a15 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -104,6 +104,7 @@ DECLARE_Int32(arrow_flight_sql_port);
 // For ADBC client fetch result, default is empty, the ADBC client uses the 
backend ip to fetch the result.
 // If ADBC client cannot access the backend ip, can set public_access_ip to 
modify the fetch result ip.
 DECLARE_mString(public_access_ip);
+DECLARE_Int32(public_access_port);
 
 // the number of bthreads for brpc, the default value is set to -1,
 // which means the number of bthreads is #cpu-cores
@@ -584,6 +585,8 @@ DECLARE_Int32(brpc_light_work_pool_threads);
 DECLARE_Int32(brpc_heavy_work_pool_max_queue_size);
 DECLARE_Int32(brpc_light_work_pool_max_queue_size);
 DECLARE_mBool(enable_bthread_transmit_block);
+DECLARE_Int32(brpc_arrow_flight_work_pool_threads);
+DECLARE_Int32(brpc_arrow_flight_work_pool_max_queue_size);
 
 // The maximum amount of data that can be processed by a stream load
 DECLARE_mInt64(streaming_load_max_mb);
@@ -693,6 +696,9 @@ DECLARE_mInt32(result_buffer_cancelled_interval_time);
 
 // arrow flight result sink buffer rows size, default 4096 * 8
 DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);
+// The timeout for ADBC Client to wait for data using arrow flight reader.
+// If the query is very complex and no result is generated after this time, 
consider increasing this timeout.
+DECLARE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms);
 
 // the increased frequency of priority for remaining tasks in 
BlockingPriorityQueue
 DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);
diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp 
b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
index b9f18c43e1e..874b6fd1ab1 100644
--- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
+++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
@@ -104,7 +104,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     {
         SCOPED_TIMER(local_state._get_arrow_schema_timer);
         // After expr executed, use recaculated schema as final schema
-        RETURN_IF_ERROR(get_arrow_schema(block, &block_arrow_schema));
+        RETURN_IF_ERROR(get_arrow_schema_from_block(block, 
&block_arrow_schema, state->timezone()));
     }
     {
         SCOPED_TIMER(local_state._convert_block_to_arrow_batch_timer);
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index bc4e4c88d14..c65b9dda89d 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -72,9 +72,8 @@ Status ResultFileSinkOperatorX::open(RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::open(state));
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
     if (state->query_options().enable_parallel_outfile) {
-        RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                state->query_id(), _buf_size, &_sender, 
state->execution_timeout(),
-                state->batch_size()));
+        
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->query_id(),
 _buf_size,
+                                                                       
&_sender, state));
     }
     return vectorized::VExpr::open(_output_vexpr_ctxs, state);
 }
@@ -92,8 +91,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& i
         _sender = _parent->cast<ResultFileSinkOperatorX>()._sender;
     } else {
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                state->fragment_instance_id(), p._buf_size, &_sender, 
state->execution_timeout(),
-                state->batch_size()));
+                state->fragment_instance_id(), p._buf_size, &_sender, state));
     }
     _sender->set_dependency(state->fragment_instance_id(), 
_dependency->shared_from_this());
 
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index b8faa4f76f7..f8196910021 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -18,6 +18,7 @@
 #include "result_sink_operator.h"
 
 #include <fmt/format.h>
+#include <sys/select.h>
 
 #include <memory>
 
@@ -45,15 +46,25 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1);
     auto fragment_instance_id = state->fragment_instance_id();
 
+    auto& p = _parent->cast<ResultSinkOperatorX>();
     if (state->query_options().enable_parallel_result_sink) {
         _sender = _parent->cast<ResultSinkOperatorX>()._sender;
     } else {
-        auto& p = _parent->cast<ResultSinkOperatorX>();
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                fragment_instance_id, p._result_sink_buffer_size_rows, 
&_sender,
-                state->execution_timeout(), state->batch_size()));
+                fragment_instance_id, p._result_sink_buffer_size_rows, 
&_sender, state));
     }
     _sender->set_dependency(fragment_instance_id, 
_dependency->shared_from_this());
+
+    _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
+    for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
+    }
+    if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+        std::shared_ptr<arrow::Schema> arrow_schema;
+        RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, 
&arrow_schema,
+                                                        state->timezone()));
+        _sender->register_arrow_schema(arrow_schema);
+    }
     return Status::OK();
 }
 
@@ -62,10 +73,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(Base::open(state));
     auto& p = _parent->cast<ResultSinkOperatorX>();
-    _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
-    for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
-        RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
-    }
     // create writer based on sink type
     switch (p._sink_type) {
     case TResultSinkType::MYSQL_PROTOCAL: {
@@ -79,16 +86,8 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
         break;
     }
     case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
-        std::shared_ptr<arrow::Schema> arrow_schema;
-        RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, 
&arrow_schema));
-        if (state->query_options().enable_parallel_result_sink) {
-            
state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), 
arrow_schema);
-        } else {
-            
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
-                                                                   
arrow_schema);
-        }
         _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
-                _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
+                _sender.get(), _output_vexpr_ctxs, _profile));
         break;
     }
     default:
@@ -133,8 +132,7 @@ Status ResultSinkOperatorX::open(RuntimeState* state) {
 
     if (state->query_options().enable_parallel_result_sink) {
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                state->query_id(), _result_sink_buffer_size_rows, &_sender,
-                state->execution_timeout(), state->batch_size()));
+                state->query_id(), _result_sink_buffer_size_rows, &_sender, 
state));
     }
     return vectorized::VExpr::open(_output_vexpr_ctxs, state);
 }
diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 61ea5ef080d..98feb85ad6b 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -33,9 +33,11 @@
 #include "arrow/record_batch.h"
 #include "arrow/type_fwd.h"
 #include "pipeline/dependency.h"
-#include "runtime/exec_env.h"
 #include "runtime/thread_context.h"
+#include "util/runtime_profile.h"
+#include "util/string_util.h"
 #include "util/thrift_util.h"
+#include "vec/core/block.h"
 
 namespace doris {
 
@@ -93,14 +95,80 @@ void GetResultBatchCtx::on_data(const 
std::unique_ptr<TFetchDataResult>& t_resul
     delete this;
 }
 
-BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, 
int batch_size)
+void GetArrowResultBatchCtx::on_failure(const Status& status) {
+    DCHECK(!status.ok()) << "status is ok, errmsg=" << status;
+    status.to_protobuf(result->mutable_status());
+    delete this;
+}
+
+void GetArrowResultBatchCtx::on_close(int64_t packet_seq) {
+    Status status;
+    status.to_protobuf(result->mutable_status());
+    result->set_packet_seq(packet_seq);
+    result->set_eos(true);
+    delete this;
+}
+
+void GetArrowResultBatchCtx::on_data(
+        const std::shared_ptr<vectorized::Block>& block, int64_t packet_seq, 
int be_exec_version,
+        segment_v2::CompressionTypePB fragement_transmission_compression_type, 
std::string timezone,
+        RuntimeProfile::Counter* serialize_batch_ns_timer,
+        RuntimeProfile::Counter* uncompressed_bytes_counter,
+        RuntimeProfile::Counter* compressed_bytes_counter) {
+    Status st = Status::OK();
+    if (result != nullptr) {
+        size_t uncompressed_bytes = 0, compressed_bytes = 0;
+        SCOPED_TIMER(serialize_batch_ns_timer);
+        st = block->serialize(be_exec_version, result->mutable_block(), 
&uncompressed_bytes,
+                              &compressed_bytes, 
fragement_transmission_compression_type, false);
+        COUNTER_UPDATE(uncompressed_bytes_counter, uncompressed_bytes);
+        COUNTER_UPDATE(compressed_bytes_counter, compressed_bytes);
+        if (st.ok()) {
+            result->set_packet_seq(packet_seq);
+            result->set_eos(false);
+            if (packet_seq == 0) {
+                result->set_timezone(timezone);
+            }
+        } else {
+            result->clear_block();
+            result->set_packet_seq(packet_seq);
+            LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st;
+        }
+    } else {
+        result->set_empty_batch(true);
+        result->set_packet_seq(packet_seq);
+        result->set_eos(false);
+    }
+
+    /// The size limit of proto buffer message is 2G
+    if (result->ByteSizeLong() > std::numeric_limits<int32_t>::max()) {
+        st = Status::InternalError("Message size exceeds 2GB: {}", 
result->ByteSizeLong());
+        result->clear_block();
+    }
+    st.to_protobuf(result->mutable_status());
+    delete this;
+}
+
+BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, 
RuntimeState* state)
         : _fragment_id(id),
           _is_close(false),
           _is_cancelled(false),
           _buffer_limit(buffer_size),
           _packet_num(0),
-          _batch_size(batch_size) {
+          _batch_size(state->batch_size()),
+          _timezone(state->timezone()),
+          _timezone_obj(state->timezone_obj()),
+          _be_exec_version(state->be_exec_version()),
+          _fragement_transmission_compression_type(
+                  state->fragement_transmission_compression_type()),
+          _profile("BufferControlBlock " + print_id(_fragment_id)) {
     _query_statistics = std::make_unique<QueryStatistics>();
+    _serialize_batch_ns_timer = ADD_TIMER(&_profile, "SerializeBatchNsTime");
+    _uncompressed_bytes_counter = ADD_COUNTER(&_profile, "UncompressedBytes", 
TUnit::BYTES);
+    _compressed_bytes_counter = ADD_COUNTER(&_profile, "CompressedBytes", 
TUnit::BYTES);
+    _mem_tracker = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::QUERY,
+            fmt::format("BufferControlBlock#FragmentInstanceId={}", 
print_id(_fragment_id)));
 }
 
 BufferControlBlock::~BufferControlBlock() {
@@ -148,36 +216,44 @@ Status BufferControlBlock::add_batch(RuntimeState* state,
 }
 
 Status BufferControlBlock::add_arrow_batch(RuntimeState* state,
-                                           
std::shared_ptr<arrow::RecordBatch>& result) {
+                                           std::shared_ptr<vectorized::Block>& 
result) {
     std::unique_lock<std::mutex> l(_lock);
 
     if (_is_cancelled) {
         return Status::Cancelled("Cancelled");
     }
 
-    int num_rows = result->num_rows();
-
-    // TODO: merge RocordBatch, ToStructArray -> Make again
+    if (_waiting_arrow_result_batch_rpc.empty()) {
+        // TODO: Merge result into block to reduce rpc times
+        int num_rows = result->rows();
+        _arrow_flight_result_batch_queue.push_back(std::move(result));
+        _instance_rows_in_queue.emplace_back();
+        _instance_rows[state->fragment_instance_id()] += num_rows;
+        _instance_rows_in_queue.back()[state->fragment_instance_id()] += 
num_rows;
+        _arrow_data_arrival
+                .notify_one(); // Only valid for 
get_arrow_batch(std::shared_ptr<vectorized::Block>,)
+    } else {
+        auto* ctx = _waiting_arrow_result_batch_rpc.front();
+        _waiting_arrow_result_batch_rpc.pop_front();
+        ctx->on_data(result, _packet_num, _be_exec_version,
+                     _fragement_transmission_compression_type, _timezone, 
_serialize_batch_ns_timer,
+                     _uncompressed_bytes_counter, _compressed_bytes_counter);
+        _packet_num++;
+    }
 
-    _arrow_flight_batch_queue.push_back(std::move(result));
-    _instance_rows_in_queue.emplace_back();
-    _instance_rows[state->fragment_instance_id()] += num_rows;
-    _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
-    _arrow_data_arrival.notify_one();
     _update_dependency();
     return Status::OK();
 }
 
 void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
     std::lock_guard<std::mutex> l(_lock);
+    Defer defer {[&]() { _update_dependency(); }};
     if (!_status.ok()) {
         ctx->on_failure(_status);
-        _update_dependency();
         return;
     }
     if (_is_cancelled) {
         ctx->on_failure(Status::Cancelled("Cancelled"));
-        _update_dependency();
         return;
     }
     if (!_fe_result_batch_queue.empty()) {
@@ -191,54 +267,132 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* 
ctx) {
 
         ctx->on_data(result, _packet_num);
         _packet_num++;
-        _update_dependency();
         return;
     }
     if (_is_close) {
         ctx->on_close(_packet_num, _query_statistics.get());
-        _update_dependency();
         return;
     }
     // no ready data, push ctx to waiting list
     _waiting_rpc.push_back(ctx);
-    _update_dependency();
 }
 
-Status 
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* 
result) {
+Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* 
result,
+                                           cctz::time_zone& timezone_obj) {
     std::unique_lock<std::mutex> l(_lock);
+    Defer defer {[&]() { _update_dependency(); }};
     if (!_status.ok()) {
         return _status;
     }
     if (_is_cancelled) {
-        return Status::Cancelled("Cancelled");
+        return Status::Cancelled(fmt::format("Cancelled ()", 
print_id(_fragment_id)));
     }
 
-    while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
-        _arrow_data_arrival.wait_for(l, std::chrono::seconds(1));
+    while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled && 
!_is_close) {
+        _arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20));
     }
 
     if (_is_cancelled) {
-        return Status::Cancelled("Cancelled");
+        return Status::Cancelled(fmt::format("Cancelled ()", 
print_id(_fragment_id)));
     }
 
-    if (!_arrow_flight_batch_queue.empty()) {
-        *result = std::move(_arrow_flight_batch_queue.front());
-        _arrow_flight_batch_queue.pop_front();
+    if (!_arrow_flight_result_batch_queue.empty()) {
+        *result = std::move(_arrow_flight_result_batch_queue.front());
+        _arrow_flight_result_batch_queue.pop_front();
+        timezone_obj = _timezone_obj;
+
         for (auto it : _instance_rows_in_queue.front()) {
             _instance_rows[it.first] -= it.second;
         }
         _instance_rows_in_queue.pop_front();
         _packet_num++;
-        _update_dependency();
         return Status::OK();
     }
 
     // normal path end
     if (_is_close) {
-        _update_dependency();
+        std::stringstream ss;
+        _profile.pretty_print(&ss);
+        VLOG_NOTICE << fmt::format(
+                "BufferControlBlock finished, fragment_id={}, is_close={}, 
is_cancelled={}, "
+                "packet_num={}, peak_memory_usage={}, profile={}",
+                print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
+                _mem_tracker->peak_consumption(), ss.str());
+        return Status::OK();
+    }
+    return Status::InternalError(
+            fmt::format("Get Arrow Batch Abnormal Ending ()", 
print_id(_fragment_id)));
+}
+
+void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
+    std::unique_lock<std::mutex> l(_lock);
+    SCOPED_ATTACH_TASK(_mem_tracker);
+    Defer defer {[&]() { _update_dependency(); }};
+    if (!_status.ok()) {
+        ctx->on_failure(_status);
+        return;
+    }
+    if (_is_cancelled) {
+        ctx->on_failure(Status::Cancelled(fmt::format("Cancelled ()", 
print_id(_fragment_id))));
+        return;
+    }
+
+    if (!_arrow_flight_result_batch_queue.empty()) {
+        auto block = _arrow_flight_result_batch_queue.front();
+        _arrow_flight_result_batch_queue.pop_front();
+        for (auto it : _instance_rows_in_queue.front()) {
+            _instance_rows[it.first] -= it.second;
+        }
+        _instance_rows_in_queue.pop_front();
+
+        ctx->on_data(block, _packet_num, _be_exec_version, 
_fragement_transmission_compression_type,
+                     _timezone, _serialize_batch_ns_timer, 
_uncompressed_bytes_counter,
+                     _compressed_bytes_counter);
+        _packet_num++;
+        return;
+    }
+
+    // normal path end
+    if (_is_close) {
+        ctx->on_close(_packet_num);
+        std::stringstream ss;
+        _profile.pretty_print(&ss);
+        VLOG_NOTICE << fmt::format(
+                "BufferControlBlock finished, fragment_id={}, is_close={}, 
is_cancelled={}, "
+                "packet_num={}, peak_memory_usage={}, profile={}",
+                print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
+                _mem_tracker->peak_consumption(), ss.str());
+        return;
+    }
+    // no ready data, push ctx to waiting list
+    _waiting_arrow_result_batch_rpc.push_back(ctx);
+}
+
+void BufferControlBlock::register_arrow_schema(const 
std::shared_ptr<arrow::Schema>& arrow_schema) {
+    std::lock_guard<std::mutex> l(_lock);
+    _arrow_schema = arrow_schema;
+}
+
+Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>* 
arrow_schema) {
+    std::unique_lock<std::mutex> l(_lock);
+    if (!_status.ok()) {
+        return _status;
+    }
+    if (_is_cancelled) {
+        return Status::Cancelled(fmt::format("Cancelled ()", 
print_id(_fragment_id)));
+    }
+
+    // normal path end
+    if (_arrow_schema != nullptr) {
+        *arrow_schema = _arrow_schema;
         return Status::OK();
     }
-    return Status::InternalError("Get Arrow Batch Abnormal Ending");
+
+    if (_is_close) {
+        return Status::RuntimeError(fmt::format("Closed ()", 
print_id(_fragment_id)));
+    }
+    return Status::InternalError(
+            fmt::format("Get Arrow Schema Abnormal Ending ()", 
print_id(_fragment_id)));
 }
 
 Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
@@ -272,18 +426,37 @@ Status BufferControlBlock::close(const TUniqueId& id, 
Status exec_status) {
         }
         _waiting_rpc.clear();
     }
+
+    if (!_waiting_arrow_result_batch_rpc.empty()) {
+        if (_status.ok()) {
+            for (auto& ctx : _waiting_arrow_result_batch_rpc) {
+                ctx->on_close(_packet_num);
+            }
+        } else {
+            for (auto& ctx : _waiting_arrow_result_batch_rpc) {
+                ctx->on_failure(_status);
+            }
+        }
+        _waiting_arrow_result_batch_rpc.clear();
+    }
     return Status::OK();
 }
 
 void BufferControlBlock::cancel() {
     std::unique_lock<std::mutex> l(_lock);
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
     _is_cancelled = true;
     _arrow_data_arrival.notify_all();
     for (auto& ctx : _waiting_rpc) {
         ctx->on_failure(Status::Cancelled("Cancelled"));
     }
     _waiting_rpc.clear();
+    for (auto& ctx : _waiting_arrow_result_batch_rpc) {
+        ctx->on_failure(Status::Cancelled("Cancelled"));
+    }
+    _waiting_arrow_result_batch_rpc.clear();
     _update_dependency();
+    _arrow_flight_result_batch_queue.clear();
 }
 
 void BufferControlBlock::set_dependency(
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 8b45552b2fa..a75b670836d 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <arrow/type.h>
+#include <cctz/time_zone.h>
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/Types_types.h>
 #include <stdint.h>
@@ -52,7 +54,12 @@ namespace pipeline {
 class Dependency;
 } // namespace pipeline
 
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
 class PFetchDataResult;
+class PFetchArrowDataResult;
 
 struct GetResultBatchCtx {
     brpc::Controller* cntl = nullptr;
@@ -69,18 +76,44 @@ struct GetResultBatchCtx {
                  bool eos = false);
 };
 
+struct GetArrowResultBatchCtx {
+    brpc::Controller* cntl = nullptr;
+    PFetchArrowDataResult* result = nullptr;
+    google::protobuf::Closure* done = nullptr;
+
+    GetArrowResultBatchCtx(brpc::Controller* cntl_, PFetchArrowDataResult* 
result_,
+                           google::protobuf::Closure* done_)
+            : cntl(cntl_), result(result_), done(done_) {}
+
+    void on_failure(const Status& status);
+    void on_close(int64_t packet_seq);
+    void on_data(const std::shared_ptr<vectorized::Block>& block, int64_t 
packet_seq,
+                 int be_exec_version,
+                 segment_v2::CompressionTypePB 
fragement_transmission_compression_type,
+                 std::string timezone, RuntimeProfile::Counter* 
serialize_batch_ns_timer,
+                 RuntimeProfile::Counter* uncompressed_bytes_counter,
+                 RuntimeProfile::Counter* compressed_bytes_counter);
+};
+
 // buffer used for result customer and producer
 class BufferControlBlock {
 public:
-    BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size);
+    BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* 
state);
     ~BufferControlBlock();
 
     Status init();
     Status add_batch(RuntimeState* state, std::unique_ptr<TFetchDataResult>& 
result);
-    Status add_arrow_batch(RuntimeState* state, 
std::shared_ptr<arrow::RecordBatch>& result);
+    Status add_arrow_batch(RuntimeState* state, 
std::shared_ptr<vectorized::Block>& result);
 
     void get_batch(GetResultBatchCtx* ctx);
-    Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result);
+    // for ArrowFlightBatchLocalReader
+    Status get_arrow_batch(std::shared_ptr<vectorized::Block>* result,
+                           cctz::time_zone& timezone_obj);
+    // for ArrowFlightBatchRemoteReader
+    void get_arrow_batch(GetArrowResultBatchCtx* ctx);
+
+    void register_arrow_schema(const std::shared_ptr<arrow::Schema>& 
arrow_schema);
+    Status find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema);
 
     // close buffer block, set _status to exec_status and set _is_close to 
true;
     // called because data has been read or error happened.
@@ -89,6 +122,7 @@ public:
     void cancel();
 
     [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }
+    [[nodiscard]] std::shared_ptr<MemTrackerLimiter> mem_tracker() { return 
_mem_tracker; }
 
     void update_return_rows(int64_t num_rows) {
         // _query_statistics may be null when the result sink init failed
@@ -106,7 +140,7 @@ protected:
     void _update_dependency();
 
     using FeResultQueue = std::list<std::unique_ptr<TFetchDataResult>>;
-    using ArrowFlightResultQueue = 
std::list<std::shared_ptr<arrow::RecordBatch>>;
+    using ArrowFlightResultQueue = 
std::list<std::shared_ptr<vectorized::Block>>;
 
     // result's query id
     TUniqueId _fragment_id;
@@ -118,7 +152,9 @@ protected:
 
     // blocking queue for batch
     FeResultQueue _fe_result_batch_queue;
-    ArrowFlightResultQueue _arrow_flight_batch_queue;
+    ArrowFlightResultQueue _arrow_flight_result_batch_queue;
+    // for arrow flight
+    std::shared_ptr<arrow::Schema> _arrow_schema;
 
     // protects all subsequent data in this block
     std::mutex _lock;
@@ -128,6 +164,7 @@ protected:
     std::condition_variable _arrow_data_arrival;
 
     std::deque<GetResultBatchCtx*> _waiting_rpc;
+    std::deque<GetArrowResultBatchCtx*> _waiting_arrow_result_batch_rpc;
 
     // only used for FE using return rows to check limit
     std::unique_ptr<QueryStatistics> _query_statistics;
@@ -137,6 +174,17 @@ protected:
     std::list<std::unordered_map<TUniqueId, size_t>> _instance_rows_in_queue;
 
     int _batch_size;
+    std::string _timezone;
+    cctz::time_zone _timezone_obj;
+    int _be_exec_version;
+    segment_v2::CompressionTypePB _fragement_transmission_compression_type;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
+
+    // only used for ArrowFlightBatchRemoteReader
+    RuntimeProfile _profile;
+    RuntimeProfile::Counter* _serialize_batch_ns_timer = nullptr;
+    RuntimeProfile::Counter* _uncompressed_bytes_counter = nullptr;
+    RuntimeProfile::Counter* _compressed_bytes_counter = nullptr;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/result_buffer_mgr.cpp 
b/be/src/runtime/result_buffer_mgr.cpp
index ccbf0c3ff67..ecc3d56773c 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -67,8 +67,8 @@ Status ResultBufferMgr::init() {
 }
 
 Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int 
buffer_size,
-                                      std::shared_ptr<BufferControlBlock>* 
sender, int exec_timout,
-                                      int batch_size) {
+                                      std::shared_ptr<BufferControlBlock>* 
sender,
+                                      RuntimeState* state) {
     *sender = find_control_block(query_id);
     if (*sender != nullptr) {
         LOG(WARNING) << "already have buffer control block for this instance " 
<< query_id;
@@ -77,7 +77,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& 
query_id, int buffer_size
 
     std::shared_ptr<BufferControlBlock> control_block = nullptr;
 
-    control_block = std::make_shared<BufferControlBlock>(query_id, 
buffer_size, batch_size);
+    control_block = std::make_shared<BufferControlBlock>(query_id, 
buffer_size, state);
 
     {
         std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
@@ -87,7 +87,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& 
query_id, int buffer_size
         // otherwise in some case may block all fragment handle threads
         // details see issue https://github.com/apache/doris/issues/16203
         // add extra 5s for avoid corner case
-        int64_t max_timeout = time(nullptr) + exec_timout + 5;
+        int64_t max_timeout = time(nullptr) + state->execution_timeout() + 5;
         cancel_at_time(max_timeout, query_id);
     }
     *sender = control_block;
@@ -105,27 +105,19 @@ std::shared_ptr<BufferControlBlock> 
ResultBufferMgr::find_control_block(const TU
     return {};
 }
 
-void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id,
-                                            const 
std::shared_ptr<arrow::Schema>& arrow_schema) {
-    std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock);
-    _arrow_schema_map.insert(std::make_pair(query_id, arrow_schema));
-}
-
-std::shared_ptr<arrow::Schema> ResultBufferMgr::find_arrow_schema(const 
TUniqueId& query_id) {
-    std::shared_lock<std::shared_mutex> rlock(_arrow_schema_map_lock);
-    auto iter = _arrow_schema_map.find(query_id);
-
-    if (_arrow_schema_map.end() != iter) {
-        return iter->second;
+Status ResultBufferMgr::find_arrow_schema(const TUniqueId& finst_id,
+                                          std::shared_ptr<arrow::Schema>* 
schema) {
+    std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
+    if (cb == nullptr) {
+        return Status::InternalError(
+                "no arrow schema for this query, maybe query has been 
canceled, finst_id={}",
+                print_id(finst_id));
     }
-
-    return nullptr;
+    return cb->find_arrow_schema(schema);
 }
 
 void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* 
ctx) {
-    TUniqueId tid;
-    tid.__set_hi(finst_id.hi());
-    tid.__set_lo(finst_id.lo());
+    TUniqueId tid = UniqueId(finst_id).to_thrift();
     std::shared_ptr<BufferControlBlock> cb = find_control_block(tid);
     if (cb == nullptr) {
         ctx->on_failure(Status::InternalError("no result for this query, 
tid={}", print_id(tid)));
@@ -134,16 +126,43 @@ void ResultBufferMgr::fetch_data(const PUniqueId& 
finst_id, GetResultBatchCtx* c
     cb->get_batch(ctx);
 }
 
+Status ResultBufferMgr::find_mem_tracker(const TUniqueId& finst_id,
+                                         std::shared_ptr<MemTrackerLimiter>* 
mem_tracker) {
+    std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
+    if (cb == nullptr) {
+        return Status::InternalError(
+                "no result for this query, maybe query has been canceled, 
finst_id={}",
+                print_id(finst_id));
+    }
+    *mem_tracker = cb->mem_tracker();
+    return Status::OK();
+}
+
 Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
-                                         std::shared_ptr<arrow::RecordBatch>* 
result) {
+                                         std::shared_ptr<vectorized::Block>* 
result,
+                                         cctz::time_zone& timezone_obj) {
     std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
     if (cb == nullptr) {
-        return Status::InternalError("no result for this query, finst_id={}", 
print_id(finst_id));
+        return Status::InternalError(
+                "no result for this query, maybe query has been canceled, 
finst_id={}",
+                print_id(finst_id));
     }
-    RETURN_IF_ERROR(cb->get_arrow_batch(result));
+    RETURN_IF_ERROR(cb->get_arrow_batch(result, timezone_obj));
     return Status::OK();
 }
 
+void ResultBufferMgr::fetch_arrow_data(const PUniqueId& finst_id, 
GetArrowResultBatchCtx* ctx) {
+    TUniqueId tid = UniqueId(finst_id).to_thrift();
+    std::shared_ptr<BufferControlBlock> cb = find_control_block(tid);
+    if (cb == nullptr) {
+        ctx->on_failure(Status::InternalError(
+                "no result for this query, maybe query has been canceled, 
finst_id={}",
+                print_id(tid)));
+        return;
+    }
+    cb->get_arrow_batch(ctx);
+}
+
 void ResultBufferMgr::cancel(const TUniqueId& query_id) {
     {
         std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
@@ -154,15 +173,6 @@ void ResultBufferMgr::cancel(const TUniqueId& query_id) {
             _buffer_map.erase(iter);
         }
     }
-
-    {
-        std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock);
-        auto arrow_schema_iter = _arrow_schema_map.find(query_id);
-
-        if (_arrow_schema_map.end() != arrow_schema_iter) {
-            _arrow_schema_map.erase(arrow_schema_iter);
-        }
-    }
 }
 
 void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& 
query_id) {
diff --git a/be/src/runtime/result_buffer_mgr.h 
b/be/src/runtime/result_buffer_mgr.h
index 8bac69c23ac..1efa0a544f1 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -17,7 +17,9 @@
 
 #pragma once
 
+#include <cctz/time_zone.h>
 #include <gen_cpp/Types_types.h>
+#include <gen_cpp/segment_v2.pb.h>
 
 #include <ctime>
 #include <map>
@@ -41,8 +43,14 @@ namespace doris {
 
 class BufferControlBlock;
 struct GetResultBatchCtx;
+struct GetArrowResultBatchCtx;
 class PUniqueId;
+class RuntimeState;
+class MemTrackerLimiter;
 class Thread;
+namespace vectorized {
+class Block;
+} // namespace vectorized
 
 // manage all result buffer control block in one backend
 class ResultBufferMgr {
@@ -58,17 +66,18 @@ public:
     // the returned sender do not need release
     // sender is not used when call cancel or unregister
     Status create_sender(const TUniqueId& query_id, int buffer_size,
-                         std::shared_ptr<BufferControlBlock>* sender, int 
exec_timeout,
-                         int batch_size);
+                         std::shared_ptr<BufferControlBlock>* sender, 
RuntimeState* state);
 
     // fetch data result to FE
     void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx);
-    // fetch data result to Arrow Flight Server
-    Status fetch_arrow_data(const TUniqueId& finst_id, 
std::shared_ptr<arrow::RecordBatch>* result);
-
-    void register_arrow_schema(const TUniqueId& query_id,
-                               const std::shared_ptr<arrow::Schema>& 
arrow_schema);
-    std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& 
query_id);
+    // fetch data result to Arrow Flight Client
+    Status fetch_arrow_data(const TUniqueId& finst_id, 
std::shared_ptr<vectorized::Block>* result,
+                            cctz::time_zone& timezone_obj);
+    // fetch data result to Other BE forwards to Client
+    void fetch_arrow_data(const PUniqueId& finst_id, GetArrowResultBatchCtx* 
ctx);
+    Status find_mem_tracker(const TUniqueId& finst_id,
+                            std::shared_ptr<MemTrackerLimiter>* mem_tracker);
+    Status find_arrow_schema(const TUniqueId& query_id, 
std::shared_ptr<arrow::Schema>* schema);
 
     // cancel
     void cancel(const TUniqueId& fragment_id);
@@ -79,7 +88,6 @@ public:
 private:
     using BufferMap = std::unordered_map<TUniqueId, 
std::shared_ptr<BufferControlBlock>>;
     using TimeoutMap = std::map<time_t, std::vector<TUniqueId>>;
-    using ArrowSchemaMap = std::unordered_map<TUniqueId, 
std::shared_ptr<arrow::Schema>>;
 
     std::shared_ptr<BufferControlBlock> find_control_block(const TUniqueId& 
query_id);
 
@@ -91,10 +99,6 @@ private:
     std::shared_mutex _buffer_map_lock;
     // buffer block map
     BufferMap _buffer_map;
-    // lock for arrow schema map
-    std::shared_mutex _arrow_schema_map_lock;
-    // for arrow flight
-    ArrowSchemaMap _arrow_schema_map;
 
     // lock for timeout map
     std::mutex _timeout_lock;
diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp 
b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
index a07e479d759..e935aff996d 100644
--- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
+++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
@@ -17,53 +17,294 @@
 
 #include "service/arrow_flight/arrow_flight_batch_reader.h"
 
+#include <arrow/io/memory.h>
+#include <arrow/ipc/reader.h>
 #include <arrow/status.h>
+#include <arrow/type.h>
+#include <gen_cpp/internal_service.pb.h>
 
-#include "arrow/builder.h"
 #include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/result_buffer_mgr.h"
+#include "runtime/thread_context.h"
+#include "service/backend_options.h"
+#include "util/arrow/block_convertor.h"
 #include "util/arrow/row_batch.h"
 #include "util/arrow/utils.h"
+#include "util/brpc_client_cache.h"
+#include "util/ref_count_closure.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
 
-namespace doris {
-namespace flight {
+namespace doris::flight {
 
-std::shared_ptr<arrow::Schema> ArrowFlightBatchReader::schema() const {
-    return schema_;
+ArrowFlightBatchReaderBase::ArrowFlightBatchReaderBase(
+        const std::shared_ptr<QueryStatement>& statement)
+        : _statement(statement) {}
+
+std::shared_ptr<arrow::Schema> ArrowFlightBatchReaderBase::schema() const {
+    return _schema;
+}
+
+arrow::Status ArrowFlightBatchReaderBase::_return_invalid_status(const 
std::string& msg) {
+    std::string status_msg =
+            fmt::format("ArrowFlightBatchReader {}, packet_seq={}, 
result={}:{}, finistId={}", msg,
+                        _packet_seq, _statement->result_addr.hostname, 
_statement->result_addr.port,
+                        print_id(_statement->query_id));
+    LOG(WARNING) << status_msg;
+    return arrow::Status::Invalid(status_msg);
 }
 
-ArrowFlightBatchReader::ArrowFlightBatchReader(std::shared_ptr<QueryStatement> 
statement,
-                                               std::shared_ptr<arrow::Schema> 
schema)
-        : statement_(std::move(statement)), schema_(std::move(schema)) {}
+ArrowFlightBatchReaderBase::~ArrowFlightBatchReaderBase() {
+    VLOG_NOTICE << fmt::format(
+            "ArrowFlightBatchReader finished, packet_seq={}, 
result_addr={}:{}, finistId={}, "
+            "convert_arrow_batch_timer={}, deserialize_block_timer={}, 
peak_memory_usage={}",
+            _packet_seq, _statement->result_addr.hostname, 
_statement->result_addr.port,
+            print_id(_statement->query_id), _convert_arrow_batch_timer, 
_deserialize_block_timer,
+            _mem_tracker->peak_consumption());
+}
 
-arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> 
ArrowFlightBatchReader::Create(
-        const std::shared_ptr<QueryStatement>& statement_) {
+ArrowFlightBatchLocalReader::ArrowFlightBatchLocalReader(
+        const std::shared_ptr<QueryStatement>& statement,
+        const std::shared_ptr<arrow::Schema>& schema,
+        const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
+        : ArrowFlightBatchReaderBase(statement) {
+    _schema = schema;
+    _mem_tracker = mem_tracker;
+}
+
+arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>> 
ArrowFlightBatchLocalReader::Create(
+        const std::shared_ptr<QueryStatement>& statement) {
+    DCHECK(statement->result_addr.hostname == BackendOptions::get_localhost());
     // Make sure that FE send the fragment to BE and creates the 
BufferControlBlock before returning ticket
     // to the ADBC client, so that the schema and control block can be found.
-    auto schema = 
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id);
-    if (schema == nullptr) {
-        ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format(
-                "Client not found arrow flight schema, maybe query has been 
canceled, queryid: {}",
-                print_id(statement_->query_id))));
+    std::shared_ptr<arrow::Schema> schema;
+    RETURN_ARROW_STATUS_IF_ERROR(
+            
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement->query_id, 
&schema));
+    std::shared_ptr<MemTrackerLimiter> mem_tracker;
+    
RETURN_ARROW_STATUS_IF_ERROR(ExecEnv::GetInstance()->result_mgr()->find_mem_tracker(
+            statement->query_id, &mem_tracker));
+
+    std::shared_ptr<ArrowFlightBatchLocalReader> result(
+            new ArrowFlightBatchLocalReader(statement, schema, mem_tracker));
+    return result;
+}
+
+arrow::Status 
ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) 
{
+    // parameter *out not nullptr
+    *out = nullptr;
+    SCOPED_ATTACH_TASK(_mem_tracker);
+    std::shared_ptr<vectorized::Block> result;
+    auto st = 
ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(_statement->query_id, 
&result,
+                                                                     
_timezone_obj);
+    st.prepend("ArrowFlightBatchLocalReader fetch arrow data failed");
+    ARROW_RETURN_NOT_OK(to_arrow_status(st));
+    if (result == nullptr) {
+        // eof, normal path end
+        return arrow::Status::OK();
+    }
+
+    {
+        // convert one batch
+        SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer);
+        st = convert_to_arrow_batch(*result, _schema, 
arrow::default_memory_pool(), out,
+                                    _timezone_obj);
+        st.prepend("ArrowFlightBatchLocalReader convert block to arrow batch 
failed");
+        ARROW_RETURN_NOT_OK(to_arrow_status(st));
+    }
+
+    _packet_seq++;
+    if (*out != nullptr) {
+        VLOG_NOTICE << "ArrowFlightBatchLocalReader read next: " << 
(*out)->num_rows() << ", "
+                    << (*out)->num_columns() << ", packet_seq: " << 
_packet_seq;
     }
-    std::shared_ptr<ArrowFlightBatchReader> result(new 
ArrowFlightBatchReader(statement_, schema));
+    return arrow::Status::OK();
+}
+
+ArrowFlightBatchRemoteReader::ArrowFlightBatchRemoteReader(
+        const std::shared_ptr<QueryStatement>& statement,
+        const std::shared_ptr<PBackendService_Stub>& stub)
+        : ArrowFlightBatchReaderBase(statement), _brpc_stub(stub), 
_block(nullptr) {
+    _mem_tracker = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::QUERY,
+            fmt::format("ArrowFlightBatchRemoteReader#QueryId={}", 
print_id(_statement->query_id)));
+}
+
+arrow::Result<std::shared_ptr<ArrowFlightBatchRemoteReader>> 
ArrowFlightBatchRemoteReader::Create(
+        const std::shared_ptr<QueryStatement>& statement) {
+    std::shared_ptr<PBackendService_Stub> stub =
+            ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+                    statement->result_addr);
+    if (!stub) {
+        std::string msg = fmt::format(
+                "ArrowFlightBatchRemoteReader get rpc stub failed, 
result_addr={}:{}, finistId={}",
+                statement->result_addr.hostname, statement->result_addr.port,
+                print_id(statement->query_id));
+        LOG(WARNING) << msg;
+        return arrow::Status::Invalid(msg);
+    }
+
+    std::shared_ptr<ArrowFlightBatchRemoteReader> result(
+            new ArrowFlightBatchRemoteReader(statement, stub));
+    ARROW_RETURN_NOT_OK(result->init_schema());
     return result;
 }
 
-arrow::Status 
ArrowFlightBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
-    // *out not nullptr
+arrow::Status ArrowFlightBatchRemoteReader::_fetch_schema() {
+    Status st;
+    auto request = std::make_shared<PFetchArrowFlightSchemaRequest>();
+    auto* pfinst_id = request->mutable_finst_id();
+    pfinst_id->set_hi(_statement->query_id.hi);
+    pfinst_id->set_lo(_statement->query_id.lo);
+    auto callback = 
DummyBrpcCallback<PFetchArrowFlightSchemaResult>::create_shared();
+    auto closure = AutoReleaseClosure<
+            PFetchArrowFlightSchemaRequest,
+            
DummyBrpcCallback<PFetchArrowFlightSchemaResult>>::create_unique(request, 
callback);
+    
callback->cntl_->set_timeout_ms(config::arrow_flight_reader_brpc_controller_timeout_ms);
+    callback->cntl_->ignore_eovercrowded();
+
+    _brpc_stub->fetch_arrow_flight_schema(closure->cntl_.get(), 
closure->request_.get(),
+                                          closure->response_.get(), 
closure.get());
+    closure.release();
+    callback->join();
+
+    if (callback->cntl_->Failed()) {
+        if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
+                    _brpc_stub, _statement->result_addr.hostname, 
_statement->result_addr.port)) {
+            ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+                    callback->cntl_->remote_side());
+        }
+        auto error_code = callback->cntl_->ErrorCode();
+        auto error_text = callback->cntl_->ErrorText();
+        return _return_invalid_status(fmt::format("fetch schema error: {}, 
error_text: {}",
+                                                  berror(error_code), 
error_text));
+    }
+    st = Status::create(callback->response_->status());
+    ARROW_RETURN_NOT_OK(to_arrow_status(st));
+
+    if (callback->response_->has_schema() && 
!callback->response_->schema().empty()) {
+        auto input =
+                
arrow::io::BufferReader::FromString(std::string(callback->response_->schema()));
+        ARROW_ASSIGN_OR_RAISE(auto reader,
+                              arrow::ipc::RecordBatchStreamReader::Open(
+                                      input.get(), 
arrow::ipc::IpcReadOptions::Defaults()));
+        _schema = reader->schema();
+    } else {
+        return _return_invalid_status(fmt::format("fetch schema error: not 
find schema"));
+    }
+    return arrow::Status::OK();
+}
+
+arrow::Status ArrowFlightBatchRemoteReader::_fetch_data() {
+    DCHECK(_block == nullptr);
+    while (true) {
+        // if `continue` occurs, data is invalid, continue fetch, block is 
nullptr.
+        // if `break` occurs, fetch data successfully (block is not nullptr) 
or fetch eos.
+        Status st;
+        auto request = std::make_shared<PFetchArrowDataRequest>();
+        auto* pfinst_id = request->mutable_finst_id();
+        pfinst_id->set_hi(_statement->query_id.hi);
+        pfinst_id->set_lo(_statement->query_id.lo);
+        auto callback = 
DummyBrpcCallback<PFetchArrowDataResult>::create_shared();
+        auto closure = AutoReleaseClosure<
+                PFetchArrowDataRequest,
+                
DummyBrpcCallback<PFetchArrowDataResult>>::create_unique(request, callback);
+        
callback->cntl_->set_timeout_ms(config::arrow_flight_reader_brpc_controller_timeout_ms);
+        callback->cntl_->ignore_eovercrowded();
+
+        _brpc_stub->fetch_arrow_data(closure->cntl_.get(), 
closure->request_.get(),
+                                     closure->response_.get(), closure.get());
+        closure.release();
+        callback->join();
+
+        if (callback->cntl_->Failed()) {
+            if 
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
+                        _brpc_stub, _statement->result_addr.hostname,
+                        _statement->result_addr.port)) {
+                ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+                        callback->cntl_->remote_side());
+            }
+            auto error_code = callback->cntl_->ErrorCode();
+            auto error_text = callback->cntl_->ErrorText();
+            return _return_invalid_status(fmt::format("fetch data error={}, 
error_text: {}",
+                                                      berror(error_code), 
error_text));
+        }
+        st = Status::create(callback->response_->status());
+        ARROW_RETURN_NOT_OK(to_arrow_status(st));
+
+        DCHECK(callback->response_->has_packet_seq());
+        if (_packet_seq != callback->response_->packet_seq()) {
+            return _return_invalid_status(
+                    fmt::format("fetch data receive packet failed, expect: {}, 
receive: {}",
+                                _packet_seq, 
callback->response_->packet_seq()));
+        }
+        _packet_seq++;
+
+        if (callback->response_->has_eos() && callback->response_->eos()) {
+            break;
+        }
+
+        if (callback->response_->has_empty_batch() && 
callback->response_->empty_batch()) {
+            continue;
+        }
+
+        DCHECK(callback->response_->has_block());
+        if (callback->response_->block().ByteSizeLong() == 0) {
+            continue;
+        }
+
+        std::call_once(_timezone_once_flag, [this, callback] {
+            DCHECK(callback->response_->has_timezone());
+            
TimezoneUtils::find_cctz_time_zone(callback->response_->timezone(), 
_timezone_obj);
+        });
+
+        {
+            SCOPED_ATOMIC_TIMER(&_deserialize_block_timer);
+            _block = vectorized::Block::create_shared();
+            st = _block->deserialize(callback->response_->block());
+            ARROW_RETURN_NOT_OK(to_arrow_status(st));
+            break;
+        }
+
+        const auto rows = _block->rows();
+        if (rows == 0) {
+            _block = nullptr;
+            continue;
+        }
+    }
+    return arrow::Status::OK();
+}
+
+arrow::Status ArrowFlightBatchRemoteReader::init_schema() {
+    ARROW_RETURN_NOT_OK(_fetch_schema());
+    DCHECK(_schema != nullptr);
+    return arrow::Status::OK();
+}
+
+arrow::Status 
ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* 
out) {
+    // parameter *out not nullptr
     *out = nullptr;
-    auto st = 
ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(statement_->query_id, 
out);
-    if (UNLIKELY(!st.ok())) {
-        LOG(WARNING) << "ArrowFlightBatchReader fetch arrow data failed: " + 
st.to_string();
+    SCOPED_ATTACH_TASK(_mem_tracker);
+    ARROW_RETURN_NOT_OK(_fetch_data());
+    if (_block == nullptr) {
+        // eof, normal path end, last _fetch_data return block is nullptr
+        return arrow::Status::OK();
+    }
+    {
+        // convert one batch
+        SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer);
+        auto st = convert_to_arrow_batch(*_block, _schema, 
arrow::default_memory_pool(), out,
+                                         _timezone_obj);
+        st.prepend("ArrowFlightBatchRemoteReader convert block to arrow batch 
failed");
         ARROW_RETURN_NOT_OK(to_arrow_status(st));
     }
+    _block = nullptr;
+
     if (*out != nullptr) {
-        VLOG_NOTICE << "ArrowFlightBatchReader read next: " << 
(*out)->num_rows() << ", "
-                    << (*out)->num_columns();
+        VLOG_NOTICE << "ArrowFlightBatchRemoteReader read next: " << 
(*out)->num_rows() << ", "
+                    << (*out)->num_columns() << ", packet_seq: " << 
_packet_seq;
     }
     return arrow::Status::OK();
 }
 
-} // namespace flight
-} // namespace doris
+} // namespace doris::flight
diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.h 
b/be/src/service/arrow_flight/arrow_flight_batch_reader.h
index e0279cbb70d..612ebc8063c 100644
--- a/be/src/service/arrow_flight/arrow_flight_batch_reader.h
+++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.h
@@ -17,40 +17,91 @@
 
 #pragma once
 
+#include <cctz/time_zone.h>
 #include <gen_cpp/Types_types.h>
 
 #include <memory>
+#include <utility>
 
 #include "arrow/record_batch.h"
+#include "runtime/exec_env.h"
 
 namespace doris {
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
 namespace flight {
 
 struct QueryStatement {
 public:
     TUniqueId query_id;
+    TNetworkAddress result_addr; // BE brpc ip & port
     std::string sql;
 
-    QueryStatement(const TUniqueId& query_id_, const std::string& sql_)
-            : query_id(query_id_), sql(sql_) {}
+    QueryStatement(TUniqueId query_id_, TNetworkAddress result_addr_, 
std::string sql_)
+            : query_id(std::move(query_id_)),
+              result_addr(std::move(result_addr_)),
+              sql(std::move(sql_)) {}
+};
+
+class ArrowFlightBatchReaderBase : public arrow::RecordBatchReader {
+public:
+    // RecordBatchReader force override
+    [[nodiscard]] std::shared_ptr<arrow::Schema> schema() const override;
+
+protected:
+    ArrowFlightBatchReaderBase(const std::shared_ptr<QueryStatement>& 
statement);
+    ~ArrowFlightBatchReaderBase() override;
+    arrow::Status _return_invalid_status(const std::string& msg);
+
+    std::shared_ptr<QueryStatement> _statement;
+    std::shared_ptr<arrow::Schema> _schema;
+    cctz::time_zone _timezone_obj;
+    std::atomic<int64_t> _packet_seq = 0;
+
+    std::atomic<int64_t> _convert_arrow_batch_timer = 0;
+    std::atomic<int64_t> _deserialize_block_timer = 0;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 };
 
-class ArrowFlightBatchReader : public arrow::RecordBatchReader {
+class ArrowFlightBatchLocalReader : public ArrowFlightBatchReaderBase {
 public:
-    static arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> Create(
+    static arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>> Create(
             const std::shared_ptr<QueryStatement>& statement);
 
-    [[nodiscard]] std::shared_ptr<arrow::Schema> schema() const override;
+    arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out) override;
 
+private:
+    ArrowFlightBatchLocalReader(const std::shared_ptr<QueryStatement>& 
statement,
+                                const std::shared_ptr<arrow::Schema>& schema,
+                                const std::shared_ptr<MemTrackerLimiter>& 
mem_tracker);
+};
+
+class ArrowFlightBatchRemoteReader : public ArrowFlightBatchReaderBase {
+public:
+    static arrow::Result<std::shared_ptr<ArrowFlightBatchRemoteReader>> Create(
+            const std::shared_ptr<QueryStatement>& statement);
+
+    // create arrow RecordBatchReader must initialize the schema.
+    // so when creating arrow RecordBatchReader, fetch result data once,
+    // which will return Block and some necessary information, and extract 
arrow schema from Block.
+    arrow::Status init_schema();
     arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out) override;
 
 private:
-    std::shared_ptr<QueryStatement> statement_;
-    std::shared_ptr<arrow::Schema> schema_;
+    ArrowFlightBatchRemoteReader(const std::shared_ptr<QueryStatement>& 
statement,
+                                 const std::shared_ptr<PBackendService_Stub>& 
stub);
 
-    ArrowFlightBatchReader(std::shared_ptr<QueryStatement> statement,
-                           std::shared_ptr<arrow::Schema> schema);
+    arrow::Status _fetch_schema();
+    arrow::Status _fetch_data();
+
+    std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
+    std::once_flag _timezone_once_flag;
+    std::shared_ptr<vectorized::Block> _block;
 };
 
 } // namespace flight
+
 } // namespace doris
diff --git a/be/src/service/arrow_flight/flight_sql_service.cpp 
b/be/src/service/arrow_flight/flight_sql_service.cpp
index 60b665c62fc..90ee3edfbea 100644
--- a/be/src/service/arrow_flight/flight_sql_service.cpp
+++ b/be/src/service/arrow_flight/flight_sql_service.cpp
@@ -19,15 +19,17 @@
 
 #include <arrow/status.h>
 
+#include <memory>
+
 #include "arrow/flight/sql/server.h"
+#include "gutil/strings/split.h"
 #include "service/arrow_flight/arrow_flight_batch_reader.h"
 #include "service/arrow_flight/flight_sql_info.h"
 #include "service/backend_options.h"
 #include "util/arrow/utils.h"
 #include "util/uid_util.h"
 
-namespace doris {
-namespace flight {
+namespace doris::flight {
 
 class FlightSqlServer::Impl {
 private:
@@ -41,14 +43,21 @@ private:
         return arrow::flight::Ticket {std::move(ticket)};
     }
 
-    arrow::Result<std::pair<std::string, std::string>> decode_ticket(const 
std::string& ticket) {
-        auto divider = ticket.find(':');
-        if (divider == std::string::npos) {
-            return arrow::Status::Invalid("Malformed ticket");
+    arrow::Result<std::shared_ptr<QueryStatement>> decode_ticket(const 
std::string& ticket) {
+        std::vector<string> fields = strings::Split(ticket, "&");
+        if (fields.size() != 4) {
+            return arrow::Status::Invalid(fmt::format("Malformed ticket, size: 
{}", fields.size()));
         }
-        std::string query_id = ticket.substr(0, divider);
-        std::string sql = ticket.substr(divider + 1);
-        return std::make_pair(std::move(sql), std::move(query_id));
+
+        TUniqueId queryid;
+        parse_id(fields[0], &queryid);
+        TNetworkAddress result_addr;
+        result_addr.hostname = fields[1];
+        result_addr.port = std::stoi(fields[2]);
+        std::string sql = fields[3];
+        std::shared_ptr<QueryStatement> statement =
+                std::make_shared<QueryStatement>(queryid, result_addr, sql);
+        return statement;
     }
 
 public:
@@ -59,18 +68,21 @@ public:
     arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> 
DoGetStatement(
             const arrow::flight::ServerCallContext& context,
             const arrow::flight::sql::StatementQueryTicket& command) {
-        ARROW_ASSIGN_OR_RAISE(auto pair, 
decode_ticket(command.statement_handle));
-        const std::string& sql = pair.first;
-        const std::string query_id = pair.second;
-        TUniqueId queryid;
-        parse_id(query_id, &queryid);
-
-        auto statement = std::make_shared<QueryStatement>(queryid, sql);
-
-        std::shared_ptr<ArrowFlightBatchReader> reader;
-        ARROW_ASSIGN_OR_RAISE(reader, 
ArrowFlightBatchReader::Create(statement));
-
-        return std::make_unique<arrow::flight::RecordBatchStream>(reader);
+        ARROW_ASSIGN_OR_RAISE(auto statement, 
decode_ticket(command.statement_handle));
+        // if IP:BrpcPort in the Ticket is not current BE node,
+        // pulls the query result Block from the BE node specified by 
IP:BrpcPort,
+        // converts it to Arrow Batch and returns it to ADBC client.
+        // use brpc to transmit blocks between BEs.
+        if (statement->result_addr.hostname == BackendOptions::get_localhost() 
&&
+            statement->result_addr.port == config::brpc_port) {
+            std::shared_ptr<ArrowFlightBatchLocalReader> reader;
+            ARROW_ASSIGN_OR_RAISE(reader, 
ArrowFlightBatchLocalReader::Create(statement));
+            return std::make_unique<arrow::flight::RecordBatchStream>(reader);
+        } else {
+            std::shared_ptr<ArrowFlightBatchRemoteReader> reader;
+            ARROW_ASSIGN_OR_RAISE(reader, 
ArrowFlightBatchRemoteReader::Create(statement));
+            return std::make_unique<arrow::flight::RecordBatchStream>(reader);
+        }
     }
 };
 
@@ -135,5 +147,4 @@ Status FlightSqlServer::join() {
     return Status::OK();
 }
 
-} // namespace flight
-} // namespace doris
+} // namespace doris::flight
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index ae84081813f..be99278ab54 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -157,6 +157,11 @@ 
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::N
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT);
 
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads, 
MetricUnit::NOUNIT);
+
 bthread_key_t btls_key;
 
 static void thread_context_deleter(void* d) {
@@ -200,7 +205,14 @@ PInternalService::PInternalService(ExecEnv* exec_env)
                            config::brpc_light_work_pool_max_queue_size != -1
                                    ? 
config::brpc_light_work_pool_max_queue_size
                                    : std::max(10240, CpuInfo::num_cores() * 
320),
-                           "brpc_light") {
+                           "brpc_light"),
+          _arrow_flight_work_pool(config::brpc_arrow_flight_work_pool_threads 
!= -1
+                                          ? 
config::brpc_arrow_flight_work_pool_threads
+                                          : std::max(512, CpuInfo::num_cores() 
* 16),
+                                  
config::brpc_arrow_flight_work_pool_max_queue_size != -1
+                                          ? 
config::brpc_arrow_flight_work_pool_max_queue_size
+                                          : std::max(20480, 
CpuInfo::num_cores() * 640),
+                                  "brpc_arrow_flight") {
     REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
                          [this]() { return _heavy_work_pool.get_queue_size(); 
});
     REGISTER_HOOK_METRIC(light_work_pool_queue_size,
@@ -219,6 +231,15 @@ PInternalService::PInternalService(ExecEnv* exec_env)
     REGISTER_HOOK_METRIC(light_work_max_threads,
                          []() { return config::brpc_light_work_pool_threads; 
});
 
+    REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size,
+                         [this]() { return 
_arrow_flight_work_pool.get_queue_size(); });
+    REGISTER_HOOK_METRIC(arrow_flight_work_active_threads,
+                         [this]() { return 
_arrow_flight_work_pool.get_active_threads(); });
+    REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size,
+                         []() { return 
config::brpc_arrow_flight_work_pool_max_queue_size; });
+    REGISTER_HOOK_METRIC(arrow_flight_work_max_threads,
+                         []() { return 
config::brpc_arrow_flight_work_pool_threads; });
+
     _exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool);
     _exec_env->load_stream_mgr()->set_light_work_pool(&_light_work_pool);
 
@@ -242,6 +263,11 @@ PInternalService::~PInternalService() {
     DEREGISTER_HOOK_METRIC(heavy_work_max_threads);
     DEREGISTER_HOOK_METRIC(light_work_max_threads);
 
+    DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads);
+    DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size);
+    DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads);
+
     CHECK_EQ(0, bthread_key_delete(btls_key));
     CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key));
 }
@@ -650,6 +676,22 @@ void 
PInternalService::fetch_data(google::protobuf::RpcController* controller,
     }
 }
 
+void PInternalService::fetch_arrow_data(google::protobuf::RpcController* 
controller,
+                                        const PFetchArrowDataRequest* request,
+                                        PFetchArrowDataResult* result,
+                                        google::protobuf::Closure* done) {
+    bool ret = _arrow_flight_work_pool.try_offer([this, controller, request, 
result, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto* cntl = static_cast<brpc::Controller*>(controller);
+        auto* ctx = new GetArrowResultBatchCtx(cntl, result, done);
+        _exec_env->result_mgr()->fetch_arrow_data(request->finst_id(), ctx);
+    });
+    if (!ret) {
+        offer_failed(result, done, _arrow_flight_work_pool);
+        return;
+    }
+}
+
 void PInternalService::outfile_write_success(google::protobuf::RpcController* 
controller,
                                              const 
POutfileWriteSuccessRequest* request,
                                              POutfileWriteSuccessResult* 
result,
@@ -857,23 +899,22 @@ void 
PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
                                                  google::protobuf::Closure* 
done) {
     bool ret = _light_work_pool.try_offer([request, result, done]() {
         brpc::ClosureGuard closure_guard(done);
-        std::shared_ptr<arrow::Schema> schema =
-                ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
-                        UniqueId(request->finst_id()).to_thrift());
-        if (schema == nullptr) {
-            LOG(INFO) << "FE not found arrow flight schema, maybe query has 
been canceled";
-            auto st = Status::NotFound(
-                    "FE not found arrow flight schema, maybe query has been 
canceled");
+        std::shared_ptr<arrow::Schema> schema;
+        auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
+                UniqueId(request->finst_id()).to_thrift(), &schema);
+        if (!st.ok()) {
+            LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st;
             st.to_protobuf(result->mutable_status());
             return;
         }
 
         std::string schema_str;
-        auto st = serialize_arrow_schema(&schema, &schema_str);
+        st = serialize_arrow_schema(&schema, &schema_str);
         if (st.ok()) {
             result->set_schema(std::move(schema_str));
-            if (config::public_access_ip != "") {
+            if (!config::public_access_ip.empty() && 
config::public_access_port != -1) {
                 result->set_be_arrow_flight_ip(config::public_access_ip);
+                result->set_be_arrow_flight_port(config::public_access_port);
             }
         }
         st.to_protobuf(result->mutable_status());
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index b3ab1c5a647..66a0f867393 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -97,6 +97,10 @@ public:
     void fetch_data(google::protobuf::RpcController* controller, const 
PFetchDataRequest* request,
                     PFetchDataResult* result, google::protobuf::Closure* done) 
override;
 
+    void fetch_arrow_data(google::protobuf::RpcController* controller,
+                          const PFetchArrowDataRequest* request, 
PFetchArrowDataResult* result,
+                          google::protobuf::Closure* done) override;
+
     void outfile_write_success(google::protobuf::RpcController* controller,
                                const POutfileWriteSuccessRequest* request,
                                POutfileWriteSuccessResult* result,
@@ -271,6 +275,7 @@ protected:
     // otherwise as light interface
     FifoThreadPool _heavy_work_pool;
     FifoThreadPool _light_work_pool;
+    FifoThreadPool _arrow_flight_work_pool;
 };
 
 // `StorageEngine` mixin for `PInternalService`
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index 084765e5aaa..dd11d5ae46f 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -46,7 +46,8 @@ namespace doris {
 
 using strings::Substitute;
 
-Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::DataType>* result) {
+Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::DataType>* result,
+                             const std::string& timezone) {
     switch (type.type) {
     case TYPE_NULL:
         *result = arrow::null();
@@ -96,11 +97,11 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
         break;
     case TYPE_DATETIMEV2:
         if (type.scale > 3) {
-            *result = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
+            *result = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, timezone);
         } else if (type.scale > 0) {
-            *result = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MILLI);
+            *result = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MILLI, timezone);
         } else {
-            *result = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND);
+            *result = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND, timezone);
         }
         break;
     case TYPE_DECIMALV2:
@@ -120,7 +121,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
     case TYPE_ARRAY: {
         DCHECK_EQ(type.children.size(), 1);
         std::shared_ptr<arrow::DataType> item_type;
-        static_cast<void>(convert_to_arrow_type(type.children[0], &item_type));
+        static_cast<void>(convert_to_arrow_type(type.children[0], &item_type, 
timezone));
         *result = std::make_shared<arrow::ListType>(item_type);
         break;
     }
@@ -128,8 +129,8 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
         DCHECK_EQ(type.children.size(), 2);
         std::shared_ptr<arrow::DataType> key_type;
         std::shared_ptr<arrow::DataType> val_type;
-        static_cast<void>(convert_to_arrow_type(type.children[0], &key_type));
-        static_cast<void>(convert_to_arrow_type(type.children[1], &val_type));
+        static_cast<void>(convert_to_arrow_type(type.children[0], &key_type, 
timezone));
+        static_cast<void>(convert_to_arrow_type(type.children[1], &val_type, 
timezone));
         *result = std::make_shared<arrow::MapType>(key_type, val_type);
         break;
     }
@@ -138,7 +139,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
         std::vector<std::shared_ptr<arrow::Field>> fields;
         for (size_t i = 0; i < type.children.size(); i++) {
             std::shared_ptr<arrow::DataType> field_type;
-            static_cast<void>(convert_to_arrow_type(type.children[i], 
&field_type));
+            static_cast<void>(convert_to_arrow_type(type.children[i], 
&field_type, timezone));
             
fields.push_back(std::make_shared<arrow::Field>(type.field_names[i], field_type,
                                                             
type.contains_nulls[i]));
         }
@@ -156,19 +157,14 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
     return Status::OK();
 }
 
-Status convert_to_arrow_field(SlotDescriptor* desc, 
std::shared_ptr<arrow::Field>* field) {
-    std::shared_ptr<arrow::DataType> type;
-    RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type));
-    *field = arrow::field(desc->col_name(), type, desc->is_nullable());
-    return Status::OK();
-}
-
-Status get_arrow_schema(const vectorized::Block& block, 
std::shared_ptr<arrow::Schema>* result) {
+Status get_arrow_schema_from_block(const vectorized::Block& block,
+                                   std::shared_ptr<arrow::Schema>* result,
+                                   const std::string& timezone) {
     std::vector<std::shared_ptr<arrow::Field>> fields;
     for (const auto& type_and_name : block) {
         std::shared_ptr<arrow::DataType> arrow_type;
         
RETURN_IF_ERROR(convert_to_arrow_type(type_and_name.type->get_type_as_type_descriptor(),
-                                              &arrow_type));
+                                              &arrow_type, timezone));
         fields.push_back(std::make_shared<arrow::Field>(type_and_name.name, 
arrow_type,
                                                         
type_and_name.type->is_nullable()));
     }
@@ -176,13 +172,14 @@ Status get_arrow_schema(const vectorized::Block& block, 
std::shared_ptr<arrow::S
     return Status::OK();
 }
 
-Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& 
output_vexpr_ctxs,
-                                      std::shared_ptr<arrow::Schema>* result) {
+Status get_arrow_schema_from_expr_ctxs(const vectorized::VExprContextSPtrs& 
output_vexpr_ctxs,
+                                       std::shared_ptr<arrow::Schema>* result,
+                                       const std::string& timezone) {
     std::vector<std::shared_ptr<arrow::Field>> fields;
     for (int i = 0; i < output_vexpr_ctxs.size(); i++) {
         std::shared_ptr<arrow::DataType> arrow_type;
         auto root_expr = output_vexpr_ctxs.at(i)->root();
-        RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type));
+        RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type, 
timezone));
         auto field_name = root_expr->is_slot_ref() && 
!root_expr->expr_label().empty()
                                   ? root_expr->expr_label()
                                   : fmt::format("{}_{}", 
root_expr->data_type()->get_name(), i);
diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h
index 5dd76ff66d7..d10bd54b2ae 100644
--- a/be/src/util/arrow/row_batch.h
+++ b/be/src/util/arrow/row_batch.h
@@ -41,12 +41,16 @@ namespace doris {
 
 class RowDescriptor;
 
-Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::DataType>* result);
+Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::DataType>* result,
+                             const std::string& timezone);
 
-Status get_arrow_schema(const vectorized::Block& block, 
std::shared_ptr<arrow::Schema>* result);
+Status get_arrow_schema_from_block(const vectorized::Block& block,
+                                   std::shared_ptr<arrow::Schema>* result,
+                                   const std::string& timezone);
 
-Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& 
output_vexpr_ctxs,
-                                      std::shared_ptr<arrow::Schema>* result);
+Status get_arrow_schema_from_expr_ctxs(const vectorized::VExprContextSPtrs& 
output_vexpr_ctxs,
+                                       std::shared_ptr<arrow::Schema>* result,
+                                       const std::string& timezone);
 
 Status serialize_record_batch(const arrow::RecordBatch& record_batch, 
std::string* result);
 
diff --git a/be/src/util/arrow/utils.cpp b/be/src/util/arrow/utils.cpp
index 5ccff849034..742f5bd0fc3 100644
--- a/be/src/util/arrow/utils.cpp
+++ b/be/src/util/arrow/utils.cpp
@@ -33,9 +33,10 @@ Status to_doris_status(const arrow::Status& status) {
 }
 
 arrow::Status to_arrow_status(const Status& status) {
-    if (status.ok()) {
+    if (LIKELY(status.ok())) {
         return arrow::Status::OK();
     } else {
+        LOG(WARNING) << status.to_string();
         // The length of exception msg returned to the ADBC Client cannot 
larger than 8192,
         // otherwise ADBC Client will receive:
         // `INTERNAL: http2 exception Header size exceeded max allowed size 
(8192)`.
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 3006461059c..69516773deb 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -218,6 +218,11 @@ public:
     UIntGauge* heavy_work_max_threads = nullptr;
     UIntGauge* light_work_max_threads = nullptr;
 
+    UIntGauge* arrow_flight_work_pool_queue_size = nullptr;
+    UIntGauge* arrow_flight_work_active_threads = nullptr;
+    UIntGauge* arrow_flight_work_pool_max_queue_size = nullptr;
+    UIntGauge* arrow_flight_work_max_threads = nullptr;
+
     UIntGauge* flush_thread_pool_queue_size = nullptr;
     UIntGauge* flush_thread_pool_thread_num = nullptr;
 
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp 
b/be/src/vec/runtime/vparquet_transformer.cpp
index 1969858349f..f0810d6c7ce 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -266,7 +266,8 @@ Status VParquetTransformer::_parse_schema() {
     std::vector<std::shared_ptr<arrow::Field>> fields;
     for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
         std::shared_ptr<arrow::DataType> type;
-        
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), 
&type));
+        
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), 
&type,
+                                              _state->timezone()));
         if (_parquet_schemas != nullptr) {
             std::shared_ptr<arrow::Field> field =
                     
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp 
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index b23d1668465..c54c27a8484 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -19,21 +19,16 @@
 
 #include "runtime/buffer_control_block.h"
 #include "runtime/runtime_state.h"
-#include "util/arrow/block_convertor.h"
-#include "util/arrow/row_batch.h"
+#include "runtime/thread_context.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr_context.h"
 
-namespace doris {
-namespace vectorized {
+namespace doris::vectorized {
 
-VArrowFlightResultWriter::VArrowFlightResultWriter(
-        BufferControlBlock* sinker, const VExprContextSPtrs& output_vexpr_ctxs,
-        RuntimeProfile* parent_profile, const std::shared_ptr<arrow::Schema>& 
arrow_schema)
-        : _sinker(sinker),
-          _output_vexpr_ctxs(output_vexpr_ctxs),
-          _parent_profile(parent_profile),
-          _arrow_schema(arrow_schema) {}
+VArrowFlightResultWriter::VArrowFlightResultWriter(BufferControlBlock* sinker,
+                                                   const VExprContextSPtrs& 
output_vexpr_ctxs,
+                                                   RuntimeProfile* 
parent_profile)
+        : _sinker(sinker), _output_vexpr_ctxs(output_vexpr_ctxs), 
_parent_profile(parent_profile) {}
 
 Status VArrowFlightResultWriter::init(RuntimeState* state) {
     _init_profile();
@@ -41,13 +36,11 @@ Status VArrowFlightResultWriter::init(RuntimeState* state) {
         return Status::InternalError("sinker is NULL pointer.");
     }
     _is_dry_run = state->query_options().dry_run_query;
-    _timezone_obj = state->timezone_obj();
     return Status::OK();
 }
 
 void VArrowFlightResultWriter::_init_profile() {
     _append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime");
-    _convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, 
"TupleConvertTime", "AppendBatchTime");
     _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", 
"AppendBatchTime");
     _sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", 
TUnit::UNIT);
     _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", 
TUnit::BYTES);
@@ -66,29 +59,31 @@ Status VArrowFlightResultWriter::write(RuntimeState* state, 
Block& input_block)
     
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
                                                                        
input_block, &block));
 
-    // convert one batch
-    std::shared_ptr<arrow::RecordBatch> result;
-    auto num_rows = block.rows();
-    // arrow::RecordBatch without `nbytes()` in C++
-    uint64_t bytes_sent = block.bytes();
     {
-        SCOPED_TIMER(_convert_tuple_timer);
-        RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, 
arrow::default_memory_pool(),
-                                               &result, _timezone_obj));
-    }
-    {
-        SCOPED_TIMER(_result_send_timer);
-        // If this is a dry run task, no need to send data block
-        if (!_is_dry_run) {
-            status = _sinker->add_arrow_batch(state, result);
-        }
-        if (status.ok()) {
-            _written_rows += num_rows;
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_sinker->mem_tracker());
+        std::unique_ptr<vectorized::MutableBlock> mutable_block =
+                vectorized::MutableBlock::create_unique(block.clone_empty());
+        
RETURN_IF_ERROR(mutable_block->merge_ignore_overflow(std::move(block)));
+        std::shared_ptr<vectorized::Block> output_block = 
vectorized::Block::create_shared();
+        output_block->swap(mutable_block->to_block());
+
+        auto num_rows = output_block->rows();
+        // arrow::RecordBatch without `nbytes()` in C++
+        uint64_t bytes_sent = output_block->bytes();
+        {
+            SCOPED_TIMER(_result_send_timer);
+            // If this is a dry run task, no need to send data block
             if (!_is_dry_run) {
-                _bytes_sent += bytes_sent;
+                status = _sinker->add_arrow_batch(state, output_block);
+            }
+            if (status.ok()) {
+                _written_rows += num_rows;
+                if (!_is_dry_run) {
+                    _bytes_sent += bytes_sent;
+                }
+            } else {
+                LOG(WARNING) << "append result batch to sink failed.";
             }
-        } else {
-            LOG(WARNING) << "append result batch to sink failed.";
         }
     }
     return status;
@@ -100,5 +95,4 @@ Status VArrowFlightResultWriter::close(Status st) {
     return Status::OK();
 }
 
-} // namespace vectorized
-} // namespace doris
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/varrow_flight_result_writer.h 
b/be/src/vec/sink/varrow_flight_result_writer.h
index ab2578421c8..c87518de5e1 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.h
+++ b/be/src/vec/sink/varrow_flight_result_writer.h
@@ -17,13 +17,6 @@
 
 #pragma once
 
-#include <arrow/type.h>
-#include <cctz/time_zone.h>
-#include <stddef.h>
-
-#include <memory>
-#include <vector>
-
 #include "common/status.h"
 #include "runtime/result_writer.h"
 #include "util/runtime_profile.h"
@@ -39,8 +32,7 @@ class Block;
 class VArrowFlightResultWriter final : public ResultWriter {
 public:
     VArrowFlightResultWriter(BufferControlBlock* sinker, const 
VExprContextSPtrs& output_vexpr_ctxs,
-                             RuntimeProfile* parent_profile,
-                             const std::shared_ptr<arrow::Schema>& 
arrow_schema);
+                             RuntimeProfile* parent_profile);
 
     Status init(RuntimeState* state) override;
 
@@ -58,8 +50,6 @@ private:
     RuntimeProfile* _parent_profile = nullptr; // parent profile from result 
sink. not owned
     // total time cost on append batch operation
     RuntimeProfile::Counter* _append_row_batch_timer = nullptr;
-    // tuple convert timer, child timer of _append_row_batch_timer
-    RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
     // file write timer, child timer of _append_row_batch_timer
     RuntimeProfile::Counter* _result_send_timer = nullptr;
     // number of sent rows
@@ -70,10 +60,6 @@ private:
     bool _is_dry_run = false;
 
     uint64_t _bytes_sent = 0;
-
-    std::shared_ptr<arrow::Schema> _arrow_schema;
-
-    cctz::time_zone _timezone_obj;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/test/runtime/result_buffer_mgr_test.cpp 
b/be/test/runtime/result_buffer_mgr_test.cpp
index 152c155ef0a..4ab9186c5fa 100644
--- a/be/test/runtime/result_buffer_mgr_test.cpp
+++ b/be/test/runtime/result_buffer_mgr_test.cpp
@@ -34,6 +34,7 @@ protected:
     virtual void SetUp() {}
 
 private:
+    RuntimeState _state;
 };
 
 TEST_F(ResultBufferMgrTest, create_normal) {
@@ -43,7 +44,7 @@ TEST_F(ResultBufferMgrTest, create_normal) {
     query_id.hi = 100;
 
     std::shared_ptr<BufferControlBlock> control_block1;
-    EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, 
false).ok());
+    EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, 
&_state).ok());
 }
 
 TEST_F(ResultBufferMgrTest, create_same_buffer) {
@@ -53,9 +54,9 @@ TEST_F(ResultBufferMgrTest, create_same_buffer) {
     query_id.hi = 100;
 
     std::shared_ptr<BufferControlBlock> control_block1;
-    EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, 
false).ok());
+    EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, 
&_state).ok());
     std::shared_ptr<BufferControlBlock> control_block2;
-    EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block2, 
false).ok());
+    EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block2, 
&_state).ok());
 
     EXPECT_EQ(control_block1.get(), control_block1.get());
 }
@@ -67,7 +68,7 @@ TEST_F(ResultBufferMgrTest, fetch_data_normal) {
     query_id.hi = 100;
 
     std::shared_ptr<BufferControlBlock> control_block1;
-    EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, 
false).ok());
+    EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, 
&_state).ok());
 
     TFetchDataResult* result = new TFetchDataResult();
     result->result_batch.rows.push_back("hello test");
@@ -85,7 +86,7 @@ TEST_F(ResultBufferMgrTest, fetch_data_no_block) {
     query_id.hi = 100;
 
     std::shared_ptr<BufferControlBlock> control_block1;
-    EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, 
false).ok());
+    EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, 
&_state).ok());
 
     TFetchDataResult* result = new TFetchDataResult();
     query_id.lo = 11;
@@ -101,7 +102,7 @@ TEST_F(ResultBufferMgrTest, normal_cancel) {
     query_id.hi = 100;
 
     std::shared_ptr<BufferControlBlock> control_block1;
-    EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, 
false).ok());
+    EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, 
&_state).ok());
 
     EXPECT_TRUE(buffer_mgr.cancel(query_id).ok());
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 5d431b386b7..37168081350 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -27,6 +27,7 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
 import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
+import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.Any;
@@ -224,24 +225,40 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                     }
                 } else {
                     // Now only query stmt will pull results from BE.
-                    final ByteString handle;
-                    if 
(connectContext.getSessionVariable().enableParallelResultSink()) {
-                        handle = 
ByteString.copyFromUtf8(DebugUtil.printId(connectContext.queryId()) + ":" + 
query);
-                    } else {
-                        // only one instance
-                        handle = 
ByteString.copyFromUtf8(DebugUtil.printId(connectContext.getFinstId()) + ":" + 
query);
-                    }
                     Schema schema = 
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
                     if (schema == null) {
                         throw CallStatus.INTERNAL.withDescription("fetch arrow 
flight schema is null")
                                 .toRuntimeException();
                     }
+
+                    TUniqueId queryId = connectContext.queryId();
+                    if 
(!connectContext.getSessionVariable().enableParallelResultSink()) {
+                        // only one instance
+                        queryId = connectContext.getFinstId();
+                    }
+                    // Ticket contains the IP and Brpc Port of the Doris BE 
node where the query result is located.
+                    final ByteString handle = ByteString.copyFromUtf8(
+                            DebugUtil.printId(queryId) + "&" + 
connectContext.getResultInternalServiceAddr().hostname
+                                    + "&" + 
connectContext.getResultInternalServiceAddr().port + "&" + query);
                     TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder().setStatementHandle(handle)
                             .build();
                     Ticket ticket = new 
Ticket(Any.pack(ticketStatement).toByteArray());
                     // TODO Support multiple endpoints.
-                    Location location = 
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
-                            connectContext.getResultFlightServerAddr().port);
+                    Location location;
+                    if 
(flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) {
+                        // In a production environment, it is often 
inconvenient to expose Doris BE nodes
+                        // to the external network.
+                        // However, a reverse proxy (such as nginx) can be 
added to all Doris BE nodes,
+                        // and the external client will be randomly routed to 
a Doris BE node when connecting to nginx.
+                        // The query results of Arrow Flight SQL will be 
randomly saved on a Doris BE node.
+                        // If it is different from the Doris BE node randomly 
routed by nginx,
+                        // data forwarding needs to be done inside the Doris 
BE node.
+                        location = 
Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname,
+                                
flightSQLConnectProcessor.getPublicAccessAddr().port);
+                    } else {
+                        location = 
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
+                                
connectContext.getResultFlightServerAddr().port);
+                    }
                     List<FlightEndpoint> endpoints = 
Collections.singletonList(new FlightEndpoint(ticket, location));
                     // TODO Set in BE callback after query end, Client will 
not callback.
                     return new FlightInfo(schema, descriptor, endpoints, -1, 
-1);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index b812bf81d8a..febadbef0ab 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -53,11 +53,12 @@ import java.util.concurrent.TimeoutException;
 
 /**
  * Process one flgiht sql connection.
- *
+ * <p>
  * Must use try-with-resources.
  */
 public class FlightSqlConnectProcessor extends ConnectProcessor implements 
AutoCloseable {
     private static final Logger LOG = 
LogManager.getLogger(FlightSqlConnectProcessor.class);
+    private TNetworkAddress publicAccessAddr = new TNetworkAddress();
 
     public FlightSqlConnectProcessor(ConnectContext context) {
         super(context);
@@ -66,6 +67,10 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
         context.setReturnResultFromLocal(true);
     }
 
+    public TNetworkAddress getPublicAccessAddr() {
+        return publicAccessAddr;
+    }
+
     public void prepare(MysqlCommand command) {
         // set status of query to OK.
         ctx.getState().reset();
@@ -130,10 +135,11 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
             Status resultStatus = new Status(pResult.getStatus());
             if (resultStatus.getErrorCode() != TStatusCode.OK) {
                 throw new RuntimeException(String.format("fetch arrow flight 
schema failed, queryId: %s, errmsg: %s",
-                        DebugUtil.printId(tid), resultStatus.toString()));
+                        DebugUtil.printId(tid), resultStatus));
             }
-            if (pResult.hasBeArrowFlightIp()) {
-                ctx.getResultFlightServerAddr().hostname = 
pResult.getBeArrowFlightIp().toStringUtf8();
+            if (pResult.hasBeArrowFlightIp() && 
pResult.hasBeArrowFlightPort()) {
+                publicAccessAddr.hostname = 
pResult.getBeArrowFlightIp().toStringUtf8();
+                publicAccessAddr.port = pResult.getBeArrowFlightPort();
             }
             if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
                 RootAllocator rootAllocator = new 
RootAllocator(Integer.MAX_VALUE);
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 29868e5ff12..254e0740943 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -283,6 +283,20 @@ message PFetchDataResult {
     optional bool empty_batch = 6;
 };
 
+message PFetchArrowDataRequest {
+    optional PUniqueId finst_id = 1;
+};
+
+message PFetchArrowDataResult {
+    optional PStatus status = 1;
+    // valid when status is ok
+    optional int64 packet_seq = 2;
+    optional bool eos = 3;
+    optional PBlock block = 4;
+    optional bool empty_batch = 5;
+    optional string timezone = 6;
+};
+
 message PFetchArrowFlightSchemaRequest {
     optional PUniqueId finst_id = 1;
 };
@@ -292,6 +306,7 @@ message PFetchArrowFlightSchemaResult {
     // valid when status is ok
     optional bytes schema = 2;
     optional bytes be_arrow_flight_ip = 3;
+    optional int32 be_arrow_flight_port = 4;
 };
 
 message KeyTuple {
@@ -981,6 +996,7 @@ service PBackendService {
     rpc exec_plan_fragment_start(PExecPlanFragmentStartRequest) returns 
(PExecPlanFragmentResult);
     rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns 
(PCancelPlanFragmentResult);
     rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
+    rpc fetch_arrow_data(PFetchArrowDataRequest) returns 
(PFetchArrowDataResult);
     rpc tablet_writer_open(PTabletWriterOpenRequest) returns 
(PTabletWriterOpenResult);
     rpc open_load_stream(POpenLoadStreamRequest) returns 
(POpenLoadStreamResponse);
     rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns 
(PTabletWriterAddBlockResult);
diff --git a/regression-test/data/arrow_flight_sql_p0/test_select.out 
b/regression-test/data/arrow_flight_sql_p0/test_select.out
index d643597bbaf..f2f4b86bbf5 100644
--- a/regression-test/data/arrow_flight_sql_p0/test_select.out
+++ b/regression-test/data/arrow_flight_sql_p0/test_select.out
@@ -2,3 +2,7 @@
 -- !arrow_flight_sql --
 777    4
 
+-- !arrow_flight_sql_datetime --
+333    plsql333        2024-07-21 12:00:00.123456      2024-07-21 12:00:00.0
+222    plsql222        2024-07-20 12:00:00.123456      2024-07-20 12:00:00.0
+111    plsql111        2024-07-19 12:00:00.123456      2024-07-19 12:00:00.0
\ No newline at end of file
diff --git a/regression-test/framework/pom.xml 
b/regression-test/framework/pom.xml
index aded781c08b..6b749bf0fd1 100644
--- a/regression-test/framework/pom.xml
+++ b/regression-test/framework/pom.xml
@@ -379,7 +379,7 @@ under the License.
         <dependency>
             <groupId>org.apache.doris</groupId>
             <artifactId>flink-doris-connector-1.16</artifactId>
-            <version>1.6.1</version>
+            <version>24.0.0</version>
         </dependency>
         <!-- flink end -->
         <dependency>
diff --git a/regression-test/suites/arrow_flight_sql_p0/test_select.groovy 
b/regression-test/suites/arrow_flight_sql_p0/test_select.groovy
index 55b3c301e24..950fb4af7e9 100644
--- a/regression-test/suites/arrow_flight_sql_p0/test_select.groovy
+++ b/regression-test/suites/arrow_flight_sql_p0/test_select.groovy
@@ -28,4 +28,16 @@ suite("test_select", "arrow_flight_sql") {
     sql """INSERT INTO ${tableName} VALUES(111, "plsql333")"""
     
     qt_arrow_flight_sql "select sum(id) as a, count(1) as b from ${tableName}"
+
+    tableName = "test_select_datetime"
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+        create table ${tableName} (id int, name varchar(20), f_datetime_p 
datetime(6), f_datetime datetime) DUPLICATE key(`id`) distributed by hash 
(`id`) buckets 4
+        properties ("replication_num"="1");
+        """
+    sql """INSERT INTO ${tableName} VALUES(111, "plsql111","2024-07-19 
12:00:00.123456","2024-07-19 12:00:00")"""
+    sql """INSERT INTO ${tableName} VALUES(222, "plsql222","2024-07-20 
12:00:00.123456","2024-07-20 12:00:00")"""
+    sql """INSERT INTO ${tableName} VALUES(333, "plsql333","2024-07-21 
12:00:00.123456","2024-07-21 12:00:00")"""
+
+    qt_arrow_flight_sql_datetime "select * from ${tableName} order by id desc"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to