This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 83208ee1a88 [pick](branch-2.1) pick #43960 #43929 #44177 (#44240) 83208ee1a88 is described below commit 83208ee1a8815ca95a9531853141b920c4992a6e Author: Xinyi Zou <zouxi...@selectdb.com> AuthorDate: Tue Nov 19 17:25:16 2024 +0800 [pick](branch-2.1) pick #43960 #43929 #44177 (#44240) pick #43960 #43929 #44177 --- be/src/common/config.cpp | 25 +++++++++++++++++-- be/src/common/config.h | 28 ++++++++++++++++++---- be/src/pipeline/exec/result_sink_operator.cpp | 19 ++++++++------- be/src/service/internal_service.cpp | 9 ++++--- .../data_types/serde/data_type_number_serde.cpp | 21 +++++++++++----- .../arrowflight/DorisFlightSqlProducer.java | 11 +++++++-- .../arrowflight/FlightSqlConnectProcessor.java | 8 ++++--- 7 files changed, 92 insertions(+), 29 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index d5b67c2c128..c41f19a3e27 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -63,8 +63,29 @@ DEFINE_Int32(brpc_port, "8060"); DEFINE_Int32(arrow_flight_sql_port, "-1"); -DEFINE_mString(public_access_ip, ""); -DEFINE_Int32(public_access_port, "-1"); +// If the external client cannot directly access priority_networks, set public_host to be accessible +// to external client. +// There are usually two usage scenarios: +// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network. +// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be +// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx. +// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP, +// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP. +DEFINE_mString(public_host, ""); + +// If the BE node is connected to the external network through a reverse proxy like Nginx +// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy +// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example: +// upstream arrowflight { +// server 10.16.10.8:8069; +// server 10.16.10.8:8068; +//} +// server { +// listen 8167 http2; +// listen [::]:8167 http2; +// server_name doris.arrowflight.com; +// } +DEFINE_Int32(arrow_flight_sql_proxy_port, "-1"); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores diff --git a/be/src/common/config.h b/be/src/common/config.h index aca5b6b829a..7693af0f7ae 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -100,11 +100,29 @@ DECLARE_Int32(brpc_port); // Default -1, do not start arrow flight sql server. DECLARE_Int32(arrow_flight_sql_port); -// If priority_networks is incorrect but cannot be modified, set public_access_ip as BE’s real IP. -// For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result. -// If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip. -DECLARE_mString(public_access_ip); -DECLARE_Int32(public_access_port); +// If the external client cannot directly access priority_networks, set public_host to be accessible +// to external client. +// There are usually two usage scenarios: +// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network. +// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be +// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx. +// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP, +// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP. +DECLARE_mString(public_host); + +// If the BE node is connected to the external network through a reverse proxy like Nginx +// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy +// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example: +// upstream arrowflight { +// server 10.16.10.8:8069; +// server 10.16.10.8:8068; +//} +// server { +// listen 8167 http2; +// listen [::]:8167 http2; +// server_name doris.arrowflight.com; +// } +DECLARE_Int32(arrow_flight_sql_proxy_port); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index d2dfa89cdd6..0ef1164b2e5 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -71,6 +71,17 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( state->fragment_instance_id(), p._result_sink_buffer_size_rows, &_sender, true, state)); ((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this()); + + _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); + for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); + } + if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { + std::shared_ptr<arrow::Schema> arrow_schema; + RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema, + state->timezone())); + _sender->register_arrow_schema(arrow_schema); + } return Status::OK(); } @@ -79,10 +90,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast<ResultSinkOperatorX>(); - _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); - for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { - RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); - } // create writer based on sink type switch (p._sink_type) { case TResultSinkType::MYSQL_PROTOCAL: { @@ -96,10 +103,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) { break; } case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { - std::shared_ptr<arrow::Schema> arrow_schema; - RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema, - state->timezone())); - _sender->register_arrow_schema(arrow_schema); _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( _sender.get(), _output_vexpr_ctxs, _profile)); break; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index a82ab9988b1..701fc6c018d 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -923,6 +923,7 @@ void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema( UniqueId(request->finst_id()).to_thrift(), &schema); if (!st.ok()) { + LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st; st.to_protobuf(result->mutable_status()); return; } @@ -931,9 +932,11 @@ void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro st = serialize_arrow_schema(&schema, &schema_str); if (st.ok()) { result->set_schema(std::move(schema_str)); - if (!config::public_access_ip.empty() && config::public_access_port != -1) { - result->set_be_arrow_flight_ip(config::public_access_ip); - result->set_be_arrow_flight_port(config::public_access_port); + if (!config::public_host.empty()) { + result->set_be_arrow_flight_ip(config::public_host); + } + if (config::arrow_flight_sql_proxy_port != -1) { + result->set_be_arrow_flight_port(config::arrow_flight_sql_proxy_port); } } st.to_protobuf(result->mutable_status()); diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp index efa41e346bf..f4fb6bbbb1f 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp @@ -78,12 +78,21 @@ void DataTypeNumberSerDe<T>::write_column_to_arrow(const IColumn& column, const auto arrow_null_map = revert_null_map(null_map, start, end); auto arrow_null_map_data = arrow_null_map.empty() ? nullptr : arrow_null_map.data(); if constexpr (std::is_same_v<T, UInt8>) { - ARROW_BUILDER_TYPE& builder = assert_cast<ARROW_BUILDER_TYPE&>(*array_builder); - checkArrowStatus( - builder.AppendValues(reinterpret_cast<const uint8_t*>(col_data.data() + start), - end - start, - reinterpret_cast<const uint8_t*>(arrow_null_map_data)), - column.get_name(), array_builder->type()->name()); + auto* null_builder = dynamic_cast<arrow::NullBuilder*>(array_builder); + if (null_builder) { + for (size_t i = start; i < end; ++i) { + checkArrowStatus(null_builder->AppendNull(), column.get_name(), + null_builder->type()->name()); + } + } else { + ARROW_BUILDER_TYPE& builder = assert_cast<ARROW_BUILDER_TYPE&>(*array_builder); + checkArrowStatus( + builder.AppendValues(reinterpret_cast<const uint8_t*>(col_data.data() + start), + end - start, + reinterpret_cast<const uint8_t*>(arrow_null_map_data)), + column.get_name(), array_builder->type()->name()); + } + } else if constexpr (std::is_same_v<T, Int128>) { auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder); for (size_t i = start; i < end; ++i) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 6f45f3faac7..758f30469bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -249,8 +249,15 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable // The query results of Arrow Flight SQL will be randomly saved on a Doris BE node. // If it is different from the Doris BE node randomly routed by nginx, // data forwarding needs to be done inside the Doris BE node. - location = Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname, - flightSQLConnectProcessor.getPublicAccessAddr().port); + if (flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) { + location = Location.forGrpcInsecure( + flightSQLConnectProcessor.getPublicAccessAddr().hostname, + flightSQLConnectProcessor.getPublicAccessAddr().port); + } else { + location = Location.forGrpcInsecure( + flightSQLConnectProcessor.getPublicAccessAddr().hostname, + connectContext.getResultFlightServerAddr().port); + } } else { location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, connectContext.getResultFlightServerAddr().port); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index 6724065f99a..20b377eb5c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -131,9 +131,11 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", DebugUtil.printId(tid), resultStatus)); } - if (pResult.hasBeArrowFlightIp() && pResult.hasBeArrowFlightPort()) { - publicAccessAddr.hostname = pResult.getBeArrowFlightIp().toStringUtf8(); - publicAccessAddr.port = pResult.getBeArrowFlightPort(); + if (pResult.hasBeArrowFlightIp()) { + publicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8()); + } + if (pResult.hasBeArrowFlightPort()) { + publicAccessAddr.setPort(pResult.getBeArrowFlightPort()); } if (pResult.hasSchema() && pResult.getSchema().size() > 0) { RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org