This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit a55b5ea9caa3d2e75ab7f2a8e9ea8cb1f3c2830c Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Fri Mar 22 19:43:14 2024 +0800 [bug](udaf) fix memory leak in the java udaf (#32630) fix memory leak in the java udaf --- .../aggregate_function_java_udaf.h | 35 ++++++++++++++++------ 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h index 7a367623eaf..59711d513dd 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h +++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h @@ -59,18 +59,28 @@ public: AggregateJavaUdafData() = default; AggregateJavaUdafData(int64_t num_args) { argument_size = num_args; } - ~AggregateJavaUdafData() { + ~AggregateJavaUdafData() = default; + + Status close_and_delete_object() { JNIEnv* env = nullptr; - if (!JniUtil::GetJNIEnv(&env).ok()) { + Defer defer {[&]() { + if (env != nullptr) { + env->DeleteGlobalRef(executor_cl); + env->DeleteGlobalRef(executor_obj); + } + }}; + Status st = JniUtil::GetJNIEnv(&env); + if (!st.ok()) { LOG(WARNING) << "Failed to get JNIEnv"; + return st; } env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id); - Status st = JniUtil::GetJniExceptionMsg(env); + st = JniUtil::GetJniExceptionMsg(env); if (!st.ok()) { LOG(WARNING) << "Failed to close JAVA UDAF: " << st.to_string(); + return st; } - env->DeleteGlobalRef(executor_cl); - env->DeleteGlobalRef(executor_obj); + return Status::OK(); } Status init_udaf(const TFunction& fn, const std::string& local_location) { @@ -268,8 +278,8 @@ public: } void create(AggregateDataPtr __restrict place) const override { + new (place) Data(argument_types.size()); if (_first_created) { - new (place) Data(argument_types.size()); Status status = Status::OK(); SAFE_CREATE(RETURN_IF_STATUS_ERROR(status, this->data(place).init_udaf(_fn, _local_location)), @@ -279,16 +289,24 @@ public: }); _first_created = false; _exec_place = place; + if (UNLIKELY(!status.ok())) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, status.to_string()); + } } } // To avoid multiple times JNI call, Here will destroy all data at once void destroy(AggregateDataPtr __restrict place) const noexcept override { if (place == _exec_place) { - static_cast<void>(this->data(_exec_place).destroy()); - this->data(_exec_place).~Data(); + Status status = Status::OK(); + status = this->data(_exec_place).destroy(); + status = this->data(_exec_place).close_and_delete_object(); _first_created = true; + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Failed to destroy function: " << status.to_string(); + } } + this->data(place).~Data(); } String get_name() const override { return _fn.name.function_name; } @@ -372,7 +390,6 @@ public: // so it's can't call ~Data, only to change _destory_deserialize flag. void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, Arena*) const override { - new (place) Data(argument_types.size()); this->data(place).read(buf); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org