This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 83208ee1a88 [pick](branch-2.1) pick #43960 #43929 #44177 (#44240)
83208ee1a88 is described below

commit 83208ee1a8815ca95a9531853141b920c4992a6e
Author: Xinyi Zou <zouxi...@selectdb.com>
AuthorDate: Tue Nov 19 17:25:16 2024 +0800

    [pick](branch-2.1) pick #43960 #43929 #44177 (#44240)
    
    pick #43960 #43929 #44177
---
 be/src/common/config.cpp                           | 25 +++++++++++++++++--
 be/src/common/config.h                             | 28 ++++++++++++++++++----
 be/src/pipeline/exec/result_sink_operator.cpp      | 19 ++++++++-------
 be/src/service/internal_service.cpp                |  9 ++++---
 .../data_types/serde/data_type_number_serde.cpp    | 21 +++++++++++-----
 .../arrowflight/DorisFlightSqlProducer.java        | 11 +++++++--
 .../arrowflight/FlightSqlConnectProcessor.java     |  8 ++++---
 7 files changed, 92 insertions(+), 29 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index d5b67c2c128..c41f19a3e27 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -63,8 +63,29 @@ DEFINE_Int32(brpc_port, "8060");
 
 DEFINE_Int32(arrow_flight_sql_port, "-1");
 
-DEFINE_mString(public_access_ip, "");
-DEFINE_Int32(public_access_port, "-1");
+// If the external client cannot directly access priority_networks, set 
public_host to be accessible
+// to external client.
+// There are usually two usage scenarios:
+// 1. in production environment, it is often inconvenient to expose Doris BE 
nodes to the external network.
+// However, a reverse proxy (such as Nginx) can be added to all Doris BE 
nodes, and the external client will be
+// randomly routed to a Doris BE node when connecting to Nginx. set 
public_host to the host of Nginx.
+// 2. if priority_networks is an internal network IP, and BE node has its own 
independent external IP,
+// but Doris currently does not support modifying priority_networks, setting 
public_host to the real external IP.
+DEFINE_mString(public_host, "");
+
+// If the BE node is connected to the external network through a reverse proxy 
like Nginx
+// and need to use Arrow Flight SQL, should add a server in Nginx to reverse 
proxy
+// `Nginx:arrow_flight_sql_proxy_port` to 
`BE_priority_networks:arrow_flight_sql_port`. For example:
+// upstream arrowflight {
+//    server 10.16.10.8:8069;
+//    server 10.16.10.8:8068;
+//}
+// server {
+//    listen 8167 http2;
+//    listen [::]:8167 http2;
+//    server_name doris.arrowflight.com;
+// }
+DEFINE_Int32(arrow_flight_sql_proxy_port, "-1");
 
 // the number of bthreads for brpc, the default value is set to -1,
 // which means the number of bthreads is #cpu-cores
diff --git a/be/src/common/config.h b/be/src/common/config.h
index aca5b6b829a..7693af0f7ae 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -100,11 +100,29 @@ DECLARE_Int32(brpc_port);
 // Default -1, do not start arrow flight sql server.
 DECLARE_Int32(arrow_flight_sql_port);
 
-// If priority_networks is incorrect but cannot be modified, set 
public_access_ip as BE’s real IP.
-// For ADBC client fetch result, default is empty, the ADBC client uses the 
backend ip to fetch the result.
-// If ADBC client cannot access the backend ip, can set public_access_ip to 
modify the fetch result ip.
-DECLARE_mString(public_access_ip);
-DECLARE_Int32(public_access_port);
+// If the external client cannot directly access priority_networks, set 
public_host to be accessible
+// to external client.
+// There are usually two usage scenarios:
+// 1. in production environment, it is often inconvenient to expose Doris BE 
nodes to the external network.
+// However, a reverse proxy (such as Nginx) can be added to all Doris BE 
nodes, and the external client will be
+// randomly routed to a Doris BE node when connecting to Nginx. set 
public_host to the host of Nginx.
+// 2. if priority_networks is an internal network IP, and BE node has its own 
independent external IP,
+// but Doris currently does not support modifying priority_networks, setting 
public_host to the real external IP.
+DECLARE_mString(public_host);
+
+// If the BE node is connected to the external network through a reverse proxy 
like Nginx
+// and need to use Arrow Flight SQL, should add a server in Nginx to reverse 
proxy
+// `Nginx:arrow_flight_sql_proxy_port` to 
`BE_priority_networks:arrow_flight_sql_port`. For example:
+// upstream arrowflight {
+//    server 10.16.10.8:8069;
+//    server 10.16.10.8:8068;
+//}
+// server {
+//    listen 8167 http2;
+//    listen [::]:8167 http2;
+//    server_name doris.arrowflight.com;
+// }
+DECLARE_Int32(arrow_flight_sql_proxy_port);
 
 // the number of bthreads for brpc, the default value is set to -1,
 // which means the number of bthreads is #cpu-cores
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index d2dfa89cdd6..0ef1164b2e5 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -71,6 +71,17 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
             state->fragment_instance_id(), p._result_sink_buffer_size_rows, 
&_sender, true, state));
     
((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this());
+
+    _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
+    for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
+    }
+    if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+        std::shared_ptr<arrow::Schema> arrow_schema;
+        RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, 
&arrow_schema,
+                                                        state->timezone()));
+        _sender->register_arrow_schema(arrow_schema);
+    }
     return Status::OK();
 }
 
@@ -79,10 +90,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(Base::open(state));
     auto& p = _parent->cast<ResultSinkOperatorX>();
-    _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
-    for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
-        RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
-    }
     // create writer based on sink type
     switch (p._sink_type) {
     case TResultSinkType::MYSQL_PROTOCAL: {
@@ -96,10 +103,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
         break;
     }
     case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
-        std::shared_ptr<arrow::Schema> arrow_schema;
-        RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, 
&arrow_schema,
-                                                        state->timezone()));
-        _sender->register_arrow_schema(arrow_schema);
         _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
                 _sender.get(), _output_vexpr_ctxs, _profile));
         break;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index a82ab9988b1..701fc6c018d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -923,6 +923,7 @@ void 
PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro
         auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
                 UniqueId(request->finst_id()).to_thrift(), &schema);
         if (!st.ok()) {
+            LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st;
             st.to_protobuf(result->mutable_status());
             return;
         }
@@ -931,9 +932,11 @@ void 
PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro
         st = serialize_arrow_schema(&schema, &schema_str);
         if (st.ok()) {
             result->set_schema(std::move(schema_str));
-            if (!config::public_access_ip.empty() && 
config::public_access_port != -1) {
-                result->set_be_arrow_flight_ip(config::public_access_ip);
-                result->set_be_arrow_flight_port(config::public_access_port);
+            if (!config::public_host.empty()) {
+                result->set_be_arrow_flight_ip(config::public_host);
+            }
+            if (config::arrow_flight_sql_proxy_port != -1) {
+                
result->set_be_arrow_flight_port(config::arrow_flight_sql_proxy_port);
             }
         }
         st.to_protobuf(result->mutable_status());
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp 
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index efa41e346bf..f4fb6bbbb1f 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -78,12 +78,21 @@ void DataTypeNumberSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
     auto arrow_null_map = revert_null_map(null_map, start, end);
     auto arrow_null_map_data = arrow_null_map.empty() ? nullptr : 
arrow_null_map.data();
     if constexpr (std::is_same_v<T, UInt8>) {
-        ARROW_BUILDER_TYPE& builder = 
assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
-        checkArrowStatus(
-                builder.AppendValues(reinterpret_cast<const 
uint8_t*>(col_data.data() + start),
-                                     end - start,
-                                     reinterpret_cast<const 
uint8_t*>(arrow_null_map_data)),
-                column.get_name(), array_builder->type()->name());
+        auto* null_builder = dynamic_cast<arrow::NullBuilder*>(array_builder);
+        if (null_builder) {
+            for (size_t i = start; i < end; ++i) {
+                checkArrowStatus(null_builder->AppendNull(), column.get_name(),
+                                 null_builder->type()->name());
+            }
+        } else {
+            ARROW_BUILDER_TYPE& builder = 
assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
+            checkArrowStatus(
+                    builder.AppendValues(reinterpret_cast<const 
uint8_t*>(col_data.data() + start),
+                                         end - start,
+                                         reinterpret_cast<const 
uint8_t*>(arrow_null_map_data)),
+                    column.get_name(), array_builder->type()->name());
+        }
+
     } else if constexpr (std::is_same_v<T, Int128>) {
         auto& string_builder = 
assert_cast<arrow::StringBuilder&>(*array_builder);
         for (size_t i = start; i < end; ++i) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 6f45f3faac7..758f30469bf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -249,8 +249,15 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                         // The query results of Arrow Flight SQL will be 
randomly saved on a Doris BE node.
                         // If it is different from the Doris BE node randomly 
routed by nginx,
                         // data forwarding needs to be done inside the Doris 
BE node.
-                        location = 
Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname,
-                                
flightSQLConnectProcessor.getPublicAccessAddr().port);
+                        if 
(flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) {
+                            location = Location.forGrpcInsecure(
+                                    
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
+                                    
flightSQLConnectProcessor.getPublicAccessAddr().port);
+                        } else {
+                            location = Location.forGrpcInsecure(
+                                    
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
+                                    
connectContext.getResultFlightServerAddr().port);
+                        }
                     } else {
                         location = 
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
                                 
connectContext.getResultFlightServerAddr().port);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index 6724065f99a..20b377eb5c3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -131,9 +131,11 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
                 throw new RuntimeException(String.format("fetch arrow flight 
schema failed, queryId: %s, errmsg: %s",
                         DebugUtil.printId(tid), resultStatus));
             }
-            if (pResult.hasBeArrowFlightIp() && 
pResult.hasBeArrowFlightPort()) {
-                publicAccessAddr.hostname = 
pResult.getBeArrowFlightIp().toStringUtf8();
-                publicAccessAddr.port = pResult.getBeArrowFlightPort();
+            if (pResult.hasBeArrowFlightIp()) {
+                
publicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8());
+            }
+            if (pResult.hasBeArrowFlightPort()) {
+                publicAccessAddr.setPort(pResult.getBeArrowFlightPort());
             }
             if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
                 RootAllocator rootAllocator = new 
RootAllocator(Integer.MAX_VALUE);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to