This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new cb7bed8f07e [fix](arrow-flight-sql) Fix arrow flight result sink (#36827) cb7bed8f07e is described below commit cb7bed8f07eb847f6a59b102192a30b9316c47c0 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Jun 26 14:14:11 2024 +0800 [fix](arrow-flight-sql) Fix arrow flight result sink (#36827) 1. get arrow flight result schema use query id instead of instance id. 2. get arrow flight result is a sync method, need wait for data ready and return result, introduced by #36035 36667. TODO, waiting for data will block pipeline, so use a request pool to save requests waiting for data. --- be/src/pipeline/exec/result_sink_operator.cpp | 3 +-- be/src/runtime/buffer_control_block.cpp | 13 ++++++++----- be/src/runtime/buffer_control_block.h | 4 ++++ .../arrowflight/DorisFlightSqlProducer.java | 2 +- .../arrowflight/FlightSqlConnectProcessor.java | 22 +++++++++++----------- 5 files changed, 25 insertions(+), 19 deletions(-) diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 378fea18eea..0495e48b7dc 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -80,8 +80,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) { case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { std::shared_ptr<arrow::Schema> arrow_schema; RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema)); - state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(), - arrow_schema); + state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema); _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema)); break; diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index a1a83b22840..845afb9a84b 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -151,10 +151,6 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state, int num_rows = result->num_rows(); - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } - // TODO: merge RocordBatch, ToStructArray -> Make again _arrow_flight_batch_queue.push_back(std::move(result)); @@ -162,6 +158,7 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state, _instance_rows_in_queue.emplace_back(); _instance_rows[state->fragment_instance_id()] += num_rows; _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; + _arrow_data_arrival.notify_one(); _update_dependency(); return Status::OK(); } @@ -212,6 +209,10 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* return Status::Cancelled("Cancelled"); } + while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) { + _arrow_data_arrival.wait_for(l, std::chrono::seconds(1)); + } + if (_is_cancelled) { return Status::Cancelled("Cancelled"); } @@ -234,7 +235,7 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* _update_dependency(); return Status::OK(); } - return Status::InternalError("Abnormal Ending"); + return Status::InternalError("Get Arrow Batch Abnormal Ending"); } Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { @@ -250,6 +251,7 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { _is_close = true; _status = exec_status; + _arrow_data_arrival.notify_all(); if (!_waiting_rpc.empty()) { if (_status.ok()) { @@ -269,6 +271,7 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { void BufferControlBlock::cancel() { std::unique_lock<std::mutex> l(_lock); _is_cancelled = true; + _arrow_data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { ctx->on_failure(Status::Cancelled("Cancelled")); } diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 12cbc72ff52..d8bb6e0f506 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -124,6 +124,10 @@ protected: // protects all subsequent data in this block std::mutex _lock; + // get arrow flight result is a sync method, need wait for data ready and return result. + // TODO, waiting for data will block pipeline, so use a request pool to save requests waiting for data. + std::condition_variable _arrow_data_arrival; + std::deque<GetResultBatchCtx*> _waiting_rpc; // only used for FE using return rows to check limit diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 2c7aaae4f2a..af6d85c954e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -225,7 +225,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable } else { // Now only query stmt will pull results from BE. final ByteString handle = ByteString.copyFromUtf8( - DebugUtil.printId(connectContext.getFinstId()) + ":" + query); + DebugUtil.printId(connectContext.queryId()) + ":" + query); Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000); if (schema == null) { throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index a4aa5a88c8f..f91d63ed90d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -102,13 +102,13 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC public Schema fetchArrowFlightSchema(int timeoutMs) { TNetworkAddress address = ctx.getResultInternalServiceAddr(); - TUniqueId tid = ctx.getFinstId(); + TUniqueId tid = ctx.queryId(); ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs(); - Types.PUniqueId finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); + Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); try { InternalService.PFetchArrowFlightSchemaRequest request = InternalService.PFetchArrowFlightSchemaRequest.newBuilder() - .setFinstId(finstId) + .setFinstId(queryId) .build(); Future<InternalService.PFetchArrowFlightSchemaResult> future @@ -116,12 +116,12 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC InternalService.PFetchArrowFlightSchemaResult pResult; pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS); if (pResult == null) { - throw new RuntimeException(String.format("fetch arrow flight schema timeout, finstId: %s", + throw new RuntimeException(String.format("fetch arrow flight schema timeout, queryId: %s", DebugUtil.printId(tid))); } Status resultStatus = new Status(pResult.getStatus()); if (resultStatus.getErrorCode() != TStatusCode.OK) { - throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s", + throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", DebugUtil.printId(tid), resultStatus.toString())); } if (pResult.hasBeArrowFlightIp()) { @@ -138,7 +138,7 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC List<FieldVector> fieldVectors = root.getFieldVectors(); if (fieldVectors.size() != resultOutputExprs.size()) { throw new RuntimeException(String.format( - "Schema size %s' is not equal to arrow field size %s, finstId: %s.", + "Schema size %s' is not equal to arrow field size %s, queryId: %s.", fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid))); } return root.getSchema(); @@ -146,24 +146,24 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC throw new RuntimeException("Read Arrow Flight Schema failed.", e); } } else { - throw new RuntimeException(String.format("get empty arrow flight schema, finstId: %s", + throw new RuntimeException(String.format("get empty arrow flight schema, queryId: %s", DebugUtil.printId(tid))); } } catch (RpcException e) { throw new RuntimeException(String.format( - "arrow flight schema fetch catch rpc exception, finstId: %s,backend: %s", + "arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s", DebugUtil.printId(tid), address), e); } catch (InterruptedException e) { throw new RuntimeException(String.format( - "arrow flight schema future get interrupted exception, finstId: %s,backend: %s", + "arrow flight schema future get interrupted exception, queryId: %s,backend: %s", DebugUtil.printId(tid), address), e); } catch (ExecutionException e) { throw new RuntimeException(String.format( - "arrow flight schema future get execution exception, finstId: %s,backend: %s", + "arrow flight schema future get execution exception, queryId: %s,backend: %s", DebugUtil.printId(tid), address), e); } catch (TimeoutException e) { throw new RuntimeException(String.format( - "arrow flight schema fetch timeout, finstId: %s,backend: %s", + "arrow flight schema fetch timeout, queryId: %s,backend: %s", DebugUtil.printId(tid), address), e); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org