This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cb7bed8f07e [fix](arrow-flight-sql) Fix arrow flight result sink 
(#36827)
cb7bed8f07e is described below

commit cb7bed8f07eb847f6a59b102192a30b9316c47c0
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Wed Jun 26 14:14:11 2024 +0800

    [fix](arrow-flight-sql) Fix arrow flight result sink (#36827)
    
    1. get arrow flight result schema use query id instead of instance id.
    2. get arrow flight result is a sync method, need wait for data ready
    and return result, introduced by #36035 36667.
    TODO, waiting for data will block pipeline, so use a request pool to
    save requests waiting for data.
---
 be/src/pipeline/exec/result_sink_operator.cpp      |  3 +--
 be/src/runtime/buffer_control_block.cpp            | 13 ++++++++-----
 be/src/runtime/buffer_control_block.h              |  4 ++++
 .../arrowflight/DorisFlightSqlProducer.java        |  2 +-
 .../arrowflight/FlightSqlConnectProcessor.java     | 22 +++++++++++-----------
 5 files changed, 25 insertions(+), 19 deletions(-)

diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 378fea18eea..0495e48b7dc 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -80,8 +80,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
     case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
         std::shared_ptr<arrow::Schema> arrow_schema;
         RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, 
&arrow_schema));
-        
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
-                                                               arrow_schema);
+        
state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), 
arrow_schema);
         _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
                 _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
         break;
diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index a1a83b22840..845afb9a84b 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -151,10 +151,6 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* 
state,
 
     int num_rows = result->num_rows();
 
-    if (_is_cancelled) {
-        return Status::Cancelled("Cancelled");
-    }
-
     // TODO: merge RocordBatch, ToStructArray -> Make again
 
     _arrow_flight_batch_queue.push_back(std::move(result));
@@ -162,6 +158,7 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* 
state,
     _instance_rows_in_queue.emplace_back();
     _instance_rows[state->fragment_instance_id()] += num_rows;
     _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
+    _arrow_data_arrival.notify_one();
     _update_dependency();
     return Status::OK();
 }
@@ -212,6 +209,10 @@ Status 
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
         return Status::Cancelled("Cancelled");
     }
 
+    while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
+        _arrow_data_arrival.wait_for(l, std::chrono::seconds(1));
+    }
+
     if (_is_cancelled) {
         return Status::Cancelled("Cancelled");
     }
@@ -234,7 +235,7 @@ Status 
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
         _update_dependency();
         return Status::OK();
     }
-    return Status::InternalError("Abnormal Ending");
+    return Status::InternalError("Get Arrow Batch Abnormal Ending");
 }
 
 Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
@@ -250,6 +251,7 @@ Status BufferControlBlock::close(const TUniqueId& id, 
Status exec_status) {
 
     _is_close = true;
     _status = exec_status;
+    _arrow_data_arrival.notify_all();
 
     if (!_waiting_rpc.empty()) {
         if (_status.ok()) {
@@ -269,6 +271,7 @@ Status BufferControlBlock::close(const TUniqueId& id, 
Status exec_status) {
 void BufferControlBlock::cancel() {
     std::unique_lock<std::mutex> l(_lock);
     _is_cancelled = true;
+    _arrow_data_arrival.notify_all();
     for (auto& ctx : _waiting_rpc) {
         ctx->on_failure(Status::Cancelled("Cancelled"));
     }
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 12cbc72ff52..d8bb6e0f506 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -124,6 +124,10 @@ protected:
     // protects all subsequent data in this block
     std::mutex _lock;
 
+    // get arrow flight result is a sync method, need wait for data ready and 
return result.
+    // TODO, waiting for data will block pipeline, so use a request pool to 
save requests waiting for data.
+    std::condition_variable _arrow_data_arrival;
+
     std::deque<GetResultBatchCtx*> _waiting_rpc;
 
     // only used for FE using return rows to check limit
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 2c7aaae4f2a..af6d85c954e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -225,7 +225,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
             } else {
                 // Now only query stmt will pull results from BE.
                 final ByteString handle = ByteString.copyFromUtf8(
-                        DebugUtil.printId(connectContext.getFinstId()) + ":" + 
query);
+                        DebugUtil.printId(connectContext.queryId()) + ":" + 
query);
                 Schema schema = 
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
                 if (schema == null) {
                     throw CallStatus.INTERNAL.withDescription("fetch arrow 
flight schema is null").toRuntimeException();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index a4aa5a88c8f..f91d63ed90d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -102,13 +102,13 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
 
     public Schema fetchArrowFlightSchema(int timeoutMs) {
         TNetworkAddress address = ctx.getResultInternalServiceAddr();
-        TUniqueId tid = ctx.getFinstId();
+        TUniqueId tid = ctx.queryId();
         ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs();
-        Types.PUniqueId finstId = 
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
+        Types.PUniqueId queryId = 
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
         try {
             InternalService.PFetchArrowFlightSchemaRequest request =
                     InternalService.PFetchArrowFlightSchemaRequest.newBuilder()
-                            .setFinstId(finstId)
+                            .setFinstId(queryId)
                             .build();
 
             Future<InternalService.PFetchArrowFlightSchemaResult> future
@@ -116,12 +116,12 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
             InternalService.PFetchArrowFlightSchemaResult pResult;
             pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
             if (pResult == null) {
-                throw new RuntimeException(String.format("fetch arrow flight 
schema timeout, finstId: %s",
+                throw new RuntimeException(String.format("fetch arrow flight 
schema timeout, queryId: %s",
                         DebugUtil.printId(tid)));
             }
             Status resultStatus = new Status(pResult.getStatus());
             if (resultStatus.getErrorCode() != TStatusCode.OK) {
-                throw new RuntimeException(String.format("fetch arrow flight 
schema failed, finstId: %s, errmsg: %s",
+                throw new RuntimeException(String.format("fetch arrow flight 
schema failed, queryId: %s, errmsg: %s",
                         DebugUtil.printId(tid), resultStatus.toString()));
             }
             if (pResult.hasBeArrowFlightIp()) {
@@ -138,7 +138,7 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
                     List<FieldVector> fieldVectors = root.getFieldVectors();
                     if (fieldVectors.size() != resultOutputExprs.size()) {
                         throw new RuntimeException(String.format(
-                                "Schema size %s' is not equal to arrow field 
size %s, finstId: %s.",
+                                "Schema size %s' is not equal to arrow field 
size %s, queryId: %s.",
                                 fieldVectors.size(), resultOutputExprs.size(), 
DebugUtil.printId(tid)));
                     }
                     return root.getSchema();
@@ -146,24 +146,24 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
                     throw new RuntimeException("Read Arrow Flight Schema 
failed.", e);
                 }
             } else {
-                throw new RuntimeException(String.format("get empty arrow 
flight schema, finstId: %s",
+                throw new RuntimeException(String.format("get empty arrow 
flight schema, queryId: %s",
                         DebugUtil.printId(tid)));
             }
         } catch (RpcException e) {
             throw new RuntimeException(String.format(
-                    "arrow flight schema fetch catch rpc exception, finstId: 
%s,backend: %s",
+                    "arrow flight schema fetch catch rpc exception, queryId: 
%s,backend: %s",
                     DebugUtil.printId(tid), address), e);
         } catch (InterruptedException e) {
             throw new RuntimeException(String.format(
-                    "arrow flight schema future get interrupted exception, 
finstId: %s,backend: %s",
+                    "arrow flight schema future get interrupted exception, 
queryId: %s,backend: %s",
                     DebugUtil.printId(tid), address), e);
         } catch (ExecutionException e) {
             throw new RuntimeException(String.format(
-                    "arrow flight schema future get execution exception, 
finstId: %s,backend: %s",
+                    "arrow flight schema future get execution exception, 
queryId: %s,backend: %s",
                     DebugUtil.printId(tid), address), e);
         } catch (TimeoutException e) {
             throw new RuntimeException(String.format(
-                    "arrow flight schema fetch timeout, finstId: %s,backend: 
%s",
+                    "arrow flight schema fetch timeout, queryId: %s,backend: 
%s",
                     DebugUtil.printId(tid), address), e);
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to