This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ad3a7a8de [fix](exec) run exec_plan_fragment in pthread to avoid BE 
crash (#21343)
4ad3a7a8de is described below

commit 4ad3a7a8de9170c7d33e7d6956281898588c6003
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat Jul 1 12:29:22 2023 +0800

    [fix](exec) run exec_plan_fragment in pthread to avoid BE crash (#21343)
    
    If there is only one fragment of a query plan, FE will call 
`exec_plan_fragment` rpc to BE.
    And on BE side, the `exec_plan_fragment()` will be executed directly in 
bthread, but it may call
    some JNI method like `AttachCurrentThread()`, which will return error in 
bthread.
    
    So I modify the `exec_plan_fragment` to make sure it will be executed in 
pthread pool.
---
 be/src/service/internal_service.cpp                | 23 +++++++++--
 be/src/service/internal_service.h                  |  9 +++-
 regression-test/data/export_p2/test_outfile_p2.out |  4 ++
 .../suites/export_p2/test_outfile_p2.groovy        | 48 ++++++++++++++++++++++
 4 files changed, 78 insertions(+), 6 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index e1ab48d2d8..7a936487c4 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -277,6 +277,20 @@ void 
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
                                               const PExecPlanFragmentRequest* 
request,
                                               PExecPlanFragmentResult* 
response,
                                               google::protobuf::Closure* done) 
{
+    bool ret = _light_work_pool.try_offer([this, controller, request, 
response, done]() {
+        _exec_plan_fragment_in_pthread(controller, request, response, done);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
+}
+
+void PInternalServiceImpl::_exec_plan_fragment_in_pthread(
+        google::protobuf::RpcController* controller, const 
PExecPlanFragmentRequest* request,
+        PExecPlanFragmentResult* response, google::protobuf::Closure* done) {
     auto span = telemetry::start_rpc_server_span("exec_plan_fragment", 
controller);
     auto scope = OpentelemetryScope {span};
     brpc::ClosureGuard closure_guard(done);
@@ -285,7 +299,7 @@ void 
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
     PFragmentRequestVersion version =
             request->has_version() ? request->version() : 
PFragmentRequestVersion::VERSION_1;
     try {
-        st = _exec_plan_fragment(request->request(), version, compact);
+        st = _exec_plan_fragment_impl(request->request(), version, compact);
     } catch (const Exception& e) {
         st = e.to_status();
     } catch (...) {
@@ -302,7 +316,7 @@ void 
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcContr
                                                       PExecPlanFragmentResult* 
response,
                                                       
google::protobuf::Closure* done) {
     bool ret = _light_work_pool.try_offer([this, controller, request, 
response, done]() {
-        exec_plan_fragment(controller, request, response, done);
+        _exec_plan_fragment_in_pthread(controller, request, response, done);
     });
     if (!ret) {
         LOG(WARNING) << "fail to offer request to the work pool";
@@ -427,8 +441,9 @@ void 
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
     }
 }
 
-Status PInternalServiceImpl::_exec_plan_fragment(const std::string& 
ser_request,
-                                                 PFragmentRequestVersion 
version, bool compact) {
+Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& 
ser_request,
+                                                      PFragmentRequestVersion 
version,
+                                                      bool compact) {
     // Sometimes the BE do not receive the first heartbeat message and it 
receives request from FE
     // If BE execute this fragment, it will core when it wants to get some 
property from master info.
     if (ExecEnv::GetInstance()->master_info() == nullptr) {
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index d9ff2cea39..d6e825497e 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -186,8 +186,13 @@ public:
                                     google::protobuf::Closure* done) override;
 
 private:
-    Status _exec_plan_fragment(const std::string& s_request, 
PFragmentRequestVersion version,
-                               bool compact);
+    void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* 
controller,
+                                        const PExecPlanFragmentRequest* 
request,
+                                        PExecPlanFragmentResult* result,
+                                        google::protobuf::Closure* done);
+
+    Status _exec_plan_fragment_impl(const std::string& s_request, 
PFragmentRequestVersion version,
+                                    bool compact);
 
     Status _fold_constant_expr(const std::string& ser_request, 
PConstantExprResult* response);
 
diff --git a/regression-test/data/export_p2/test_outfile_p2.out 
b/regression-test/data/export_p2/test_outfile_p2.out
new file mode 100644
index 0000000000..ccb2d43e0e
--- /dev/null
+++ b/regression-test/data/export_p2/test_outfile_p2.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_1 --
+1      abc
+
diff --git a/regression-test/suites/export_p2/test_outfile_p2.groovy 
b/regression-test/suites/export_p2/test_outfile_p2.groovy
new file mode 100644
index 0000000000..429b7a88f7
--- /dev/null
+++ b/regression-test/suites/export_p2/test_outfile_p2.groovy
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_outfile_hdfs", "p2") {
+    String nameNodeHost = context.config.otherConfigs.get("extHiveHmsHost")
+    String hdfsPort = context.config.otherConfigs.get("extHdfsPort")
+    String fs = "hdfs://${nameNodeHost}:${hdfsPort}"
+    String user_name = "hadoop"
+
+    def table_outfile_name = "test_outfile_hdfs"
+    // create table and insert
+    sql """ DROP TABLE IF EXISTS ${table_outfile_name} """
+    sql """
+    CREATE TABLE IF NOT EXISTS ${table_outfile_name} (
+        `id` int(11) NULL,
+        `name` string NULL
+        )
+        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+    """
+
+    sql """insert into ${table_outfile_name} values(1, 'abc');"""
+
+    qt_sql_1 """select * from ${table_outfile_name} order by id"""
+
+    // use a simple sql to make sure there is only one fragment
+    // #21343
+    sql """select * from ${table_outfile_name} INTO OUTFILE 
'${fs}/user/outfile_test/'
+        FORMAT AS PARQUET PROPERTIES
+        (
+            'hadoop.username' = '${user_name}',
+            'fs.defaultFS'='${fs}'
+        );
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to