This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4ad3a7a8de [fix](exec) run exec_plan_fragment in pthread to avoid BE
crash (#21343)
4ad3a7a8de is described below
commit 4ad3a7a8de9170c7d33e7d6956281898588c6003
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat Jul 1 12:29:22 2023 +0800
[fix](exec) run exec_plan_fragment in pthread to avoid BE crash (#21343)
If there is only one fragment of a query plan, FE will call
`exec_plan_fragment` rpc to BE.
And on BE side, the `exec_plan_fragment()` will be executed directly in
bthread, but it may call
some JNI method like `AttachCurrentThread()`, which will return error in
bthread.
So I modify the `exec_plan_fragment` to make sure it will be executed in
pthread pool.
---
be/src/service/internal_service.cpp | 23 +++++++++--
be/src/service/internal_service.h | 9 +++-
regression-test/data/export_p2/test_outfile_p2.out | 4 ++
.../suites/export_p2/test_outfile_p2.groovy | 48 ++++++++++++++++++++++
4 files changed, 78 insertions(+), 6 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index e1ab48d2d8..7a936487c4 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -277,6 +277,20 @@ void
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
const PExecPlanFragmentRequest*
request,
PExecPlanFragmentResult*
response,
google::protobuf::Closure* done)
{
+ bool ret = _light_work_pool.try_offer([this, controller, request,
response, done]() {
+ _exec_plan_fragment_in_pthread(controller, request, response, done);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
+}
+
+void PInternalServiceImpl::_exec_plan_fragment_in_pthread(
+ google::protobuf::RpcController* controller, const
PExecPlanFragmentRequest* request,
+ PExecPlanFragmentResult* response, google::protobuf::Closure* done) {
auto span = telemetry::start_rpc_server_span("exec_plan_fragment",
controller);
auto scope = OpentelemetryScope {span};
brpc::ClosureGuard closure_guard(done);
@@ -285,7 +299,7 @@ void
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
PFragmentRequestVersion version =
request->has_version() ? request->version() :
PFragmentRequestVersion::VERSION_1;
try {
- st = _exec_plan_fragment(request->request(), version, compact);
+ st = _exec_plan_fragment_impl(request->request(), version, compact);
} catch (const Exception& e) {
st = e.to_status();
} catch (...) {
@@ -302,7 +316,7 @@ void
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcContr
PExecPlanFragmentResult*
response,
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([this, controller, request,
response, done]() {
- exec_plan_fragment(controller, request, response, done);
+ _exec_plan_fragment_in_pthread(controller, request, response, done);
});
if (!ret) {
LOG(WARNING) << "fail to offer request to the work pool";
@@ -427,8 +441,9 @@ void
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
}
}
-Status PInternalServiceImpl::_exec_plan_fragment(const std::string&
ser_request,
- PFragmentRequestVersion
version, bool compact) {
+Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string&
ser_request,
+ PFragmentRequestVersion
version,
+ bool compact) {
// Sometimes the BE do not receive the first heartbeat message and it
receives request from FE
// If BE execute this fragment, it will core when it wants to get some
property from master info.
if (ExecEnv::GetInstance()->master_info() == nullptr) {
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index d9ff2cea39..d6e825497e 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -186,8 +186,13 @@ public:
google::protobuf::Closure* done) override;
private:
- Status _exec_plan_fragment(const std::string& s_request,
PFragmentRequestVersion version,
- bool compact);
+ void _exec_plan_fragment_in_pthread(google::protobuf::RpcController*
controller,
+ const PExecPlanFragmentRequest*
request,
+ PExecPlanFragmentResult* result,
+ google::protobuf::Closure* done);
+
+ Status _exec_plan_fragment_impl(const std::string& s_request,
PFragmentRequestVersion version,
+ bool compact);
Status _fold_constant_expr(const std::string& ser_request,
PConstantExprResult* response);
diff --git a/regression-test/data/export_p2/test_outfile_p2.out
b/regression-test/data/export_p2/test_outfile_p2.out
new file mode 100644
index 0000000000..ccb2d43e0e
--- /dev/null
+++ b/regression-test/data/export_p2/test_outfile_p2.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_1 --
+1 abc
+
diff --git a/regression-test/suites/export_p2/test_outfile_p2.groovy
b/regression-test/suites/export_p2/test_outfile_p2.groovy
new file mode 100644
index 0000000000..429b7a88f7
--- /dev/null
+++ b/regression-test/suites/export_p2/test_outfile_p2.groovy
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_outfile_hdfs", "p2") {
+ String nameNodeHost = context.config.otherConfigs.get("extHiveHmsHost")
+ String hdfsPort = context.config.otherConfigs.get("extHdfsPort")
+ String fs = "hdfs://${nameNodeHost}:${hdfsPort}"
+ String user_name = "hadoop"
+
+ def table_outfile_name = "test_outfile_hdfs"
+ // create table and insert
+ sql """ DROP TABLE IF EXISTS ${table_outfile_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_outfile_name} (
+ `id` int(11) NULL,
+ `name` string NULL
+ )
+ DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ """
+
+ sql """insert into ${table_outfile_name} values(1, 'abc');"""
+
+ qt_sql_1 """select * from ${table_outfile_name} order by id"""
+
+ // use a simple sql to make sure there is only one fragment
+ // #21343
+ sql """select * from ${table_outfile_name} INTO OUTFILE
'${fs}/user/outfile_test/'
+ FORMAT AS PARQUET PROPERTIES
+ (
+ 'hadoop.username' = '${user_name}',
+ 'fs.defaultFS'='${fs}'
+ );
+ """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]