This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a4fdf7324a [Bug](javaudf) fix BE crash if javaudf is push down (#21139) a4fdf7324a is described below commit a4fdf7324ae6faca6b33e54add38bd786cf8d3b7 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Jun 28 15:01:24 2023 +0800 [Bug](javaudf) fix BE crash if javaudf is push down (#21139) --- be/src/runtime/user_function_cache.cpp | 7 +- be/src/vec/functions/function_java_udf.cpp | 232 +++++++++++++++-------------- be/src/vec/functions/function_java_udf.h | 23 +-- be/src/vec/functions/function_jsonb.cpp | 41 +++-- be/src/vec/functions/function_rpc.cpp | 14 +- be/src/vec/functions/function_rpc.h | 1 - 6 files changed, 177 insertions(+), 141 deletions(-) diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp index 25e7405a0f..f7ec0890a6 100644 --- a/be/src/runtime/user_function_cache.cpp +++ b/be/src/runtime/user_function_cache.cpp @@ -140,8 +140,9 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std } std::vector<std::string> split_parts = strings::Split(file, "."); - if (split_parts.size() != 3) { - return Status::InternalError("user function's name should be function_id.checksum.so"); + if (split_parts.size() != 3 && split_parts.size() != 4) { + return Status::InternalError( + "user function's name should be function_id.checksum[.file_name].file_type"); } int64_t function_id = std::stol(split_parts[0]); std::string checksum = split_parts[1]; @@ -176,7 +177,7 @@ Status UserFunctionCache::_load_cached_lib() { auto st = _load_entry_from_lib(sub_dir, file.file_name); if (!st.ok()) { LOG(WARNING) << "load a library failed, dir=" << sub_dir - << ", file=" << file.file_name; + << ", file=" << file.file_name << ": " << st.to_string(); } return true; }; diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp index fe96c92051..9305bae949 100644 --- a/be/src/vec/functions/function_java_udf.cpp +++ b/be/src/vec/functions/function_java_udf.cpp @@ -54,16 +54,24 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio if (env == nullptr) { return Status::InternalError("Failed to get/create JVM"); } - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &executor_cl_)); - executor_ctor_id_ = env->GetMethodID(executor_cl_, "<init>", EXECUTOR_CTOR_SIGNATURE); - RETURN_ERROR_IF_EXC(env); - executor_evaluate_id_ = env->GetMethodID(executor_cl_, "evaluate", EXECUTOR_EVALUATE_SIGNATURE); - RETURN_ERROR_IF_EXC(env); - executor_close_id_ = env->GetMethodID(executor_cl_, "close", EXECUTOR_CLOSE_SIGNATURE); - RETURN_ERROR_IF_EXC(env); - - std::shared_ptr<JniContext> jni_ctx = - std::make_shared<JniContext>(_argument_types.size(), this); + if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) { + std::shared_ptr<JniEnv> jni_env = std::make_shared<JniEnv>(); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &jni_env->executor_cl)); + jni_env->executor_ctor_id = + env->GetMethodID(jni_env->executor_cl, "<init>", EXECUTOR_CTOR_SIGNATURE); + RETURN_ERROR_IF_EXC(env); + jni_env->executor_evaluate_id = + env->GetMethodID(jni_env->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE); + RETURN_ERROR_IF_EXC(env); + jni_env->executor_close_id = + env->GetMethodID(jni_env->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE); + RETURN_ERROR_IF_EXC(env); + context->set_function_state(FunctionContext::FRAGMENT_LOCAL, jni_env); + } + JniEnv* jni_env = + reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>( + _argument_types.size(), jni_env->executor_cl, jni_env->executor_close_id); context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx); // Add a scoped cleanup jni reference object. This cleans up local refs made below. @@ -99,7 +107,9 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio RETURN_IF_ERROR(jni_frame.push(env)); RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); - jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes); + + jni_ctx->executor = + env->NewObject(jni_env->executor_cl, jni_env->executor_ctor_id, ctor_params_bytes); jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr); env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT); @@ -118,6 +128,8 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block, RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); JniContext* jni_ctx = reinterpret_cast<JniContext*>( context->get_function_state(FunctionContext::THREAD_LOCAL)); + JniEnv* jni_env = + reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); int arg_idx = 0; ColumnPtr data_cols[arguments.size()]; ColumnPtr null_cols[arguments.size()]; @@ -192,105 +204,105 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block, *(jni_ctx->output_null_value) = reinterpret_cast<int64_t>(null_col->get_data().data()); #ifndef EVALUATE_JAVA_UDF -#define EVALUATE_JAVA_UDF \ - if (data_col->is_column_string()) { \ - const ColumnString* str_col = assert_cast<const ColumnString*>(data_col.get()); \ - ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars()); \ - ColumnString::Offsets& offsets = \ - const_cast<ColumnString::Offsets&>(str_col->get_offsets()); \ - int increase_buffer_size = 0; \ - int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ - chars.resize(buffer_size); \ - offsets.resize(num_rows); \ - *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \ - *(jni_ctx->output_offsets_ptr) = reinterpret_cast<int64_t>(offsets.data()); \ - jni_ctx->output_intermediate_state_ptr->row_idx = 0; \ - jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \ - env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \ - nullptr); \ - while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \ - increase_buffer_size++; \ - buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ - chars.resize(buffer_size); \ - *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \ - jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \ - env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \ - nullptr); \ - } \ - } else if (data_col->is_numeric() || data_col->is_column_decimal()) { \ - data_col->resize(num_rows); \ - *(jni_ctx->output_value_buffer) = \ - reinterpret_cast<int64_t>(data_col->get_raw_data().data); \ - env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \ - nullptr); \ - } else if (data_col->is_column_array()) { \ - ColumnArray* array_col = assert_cast<ColumnArray*>(data_col.get()); \ - ColumnNullable& array_nested_nullable = \ - assert_cast<ColumnNullable&>(array_col->get_data()); \ - auto data_column_null_map = array_nested_nullable.get_null_map_column_ptr(); \ - auto data_column = array_nested_nullable.get_nested_column_ptr(); \ - auto& offset_column = array_col->get_offsets_column(); \ - int increase_buffer_size = 0; \ - int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ - offset_column.resize(num_rows); \ - *(jni_ctx->output_offsets_ptr) = \ - reinterpret_cast<int64_t>(offset_column.get_raw_data().data); \ - data_column_null_map->resize(buffer_size); \ - auto& null_map_data = \ - assert_cast<ColumnVector<UInt8>*>(data_column_null_map.get())->get_data(); \ - *(jni_ctx->output_array_null_ptr) = reinterpret_cast<int64_t>(null_map_data.data()); \ - jni_ctx->output_intermediate_state_ptr->row_idx = 0; \ - jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \ - if (data_column->is_column_string()) { \ - ColumnString* str_col = assert_cast<ColumnString*>(data_column.get()); \ - ColumnString::Chars& chars = assert_cast<ColumnString::Chars&>(str_col->get_chars()); \ - ColumnString::Offsets& offsets = \ - assert_cast<ColumnString::Offsets&>(str_col->get_offsets()); \ - chars.resize(buffer_size); \ - offsets.resize(buffer_size); \ - *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \ - *(jni_ctx->output_array_string_offsets_ptr) = \ - reinterpret_cast<int64_t>(offsets.data()); \ - env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \ - nullptr); \ - while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \ - increase_buffer_size++; \ - buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ - null_map_data.resize(buffer_size); \ - chars.resize(buffer_size); \ - offsets.resize(buffer_size); \ - *(jni_ctx->output_array_null_ptr) = \ - reinterpret_cast<int64_t>(null_map_data.data()); \ - *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \ - *(jni_ctx->output_array_string_offsets_ptr) = \ - reinterpret_cast<int64_t>(offsets.data()); \ - jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \ - env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, \ - executor_evaluate_id_, nullptr); \ - } \ - } else { \ - data_column->resize(buffer_size); \ - *(jni_ctx->output_value_buffer) = \ - reinterpret_cast<int64_t>(data_column->get_raw_data().data); \ - env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \ - nullptr); \ - while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \ - increase_buffer_size++; \ - buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ - null_map_data.resize(buffer_size); \ - data_column->resize(buffer_size); \ - *(jni_ctx->output_array_null_ptr) = \ - reinterpret_cast<int64_t>(null_map_data.data()); \ - *(jni_ctx->output_value_buffer) = \ - reinterpret_cast<int64_t>(data_column->get_raw_data().data); \ - jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \ - env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, \ - executor_evaluate_id_, nullptr); \ - } \ - } \ - } else { \ - return Status::InvalidArgument(strings::Substitute( \ - "Java UDF doesn't support return type $0 now !", return_type->get_name())); \ +#define EVALUATE_JAVA_UDF \ + if (data_col->is_column_string()) { \ + const ColumnString* str_col = assert_cast<const ColumnString*>(data_col.get()); \ + ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars()); \ + ColumnString::Offsets& offsets = \ + const_cast<ColumnString::Offsets&>(str_col->get_offsets()); \ + int increase_buffer_size = 0; \ + int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + chars.resize(buffer_size); \ + offsets.resize(num_rows); \ + *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \ + *(jni_ctx->output_offsets_ptr) = reinterpret_cast<int64_t>(offsets.data()); \ + jni_ctx->output_intermediate_state_ptr->row_idx = 0; \ + jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \ + env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \ + jni_env->executor_evaluate_id, nullptr); \ + while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \ + increase_buffer_size++; \ + buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + chars.resize(buffer_size); \ + *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \ + jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \ + env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \ + jni_env->executor_evaluate_id, nullptr); \ + } \ + } else if (data_col->is_numeric() || data_col->is_column_decimal()) { \ + data_col->resize(num_rows); \ + *(jni_ctx->output_value_buffer) = \ + reinterpret_cast<int64_t>(data_col->get_raw_data().data); \ + env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \ + jni_env->executor_evaluate_id, nullptr); \ + } else if (data_col->is_column_array()) { \ + ColumnArray* array_col = assert_cast<ColumnArray*>(data_col.get()); \ + ColumnNullable& array_nested_nullable = \ + assert_cast<ColumnNullable&>(array_col->get_data()); \ + auto data_column_null_map = array_nested_nullable.get_null_map_column_ptr(); \ + auto data_column = array_nested_nullable.get_nested_column_ptr(); \ + auto& offset_column = array_col->get_offsets_column(); \ + int increase_buffer_size = 0; \ + int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + offset_column.resize(num_rows); \ + *(jni_ctx->output_offsets_ptr) = \ + reinterpret_cast<int64_t>(offset_column.get_raw_data().data); \ + data_column_null_map->resize(buffer_size); \ + auto& null_map_data = \ + assert_cast<ColumnVector<UInt8>*>(data_column_null_map.get())->get_data(); \ + *(jni_ctx->output_array_null_ptr) = reinterpret_cast<int64_t>(null_map_data.data()); \ + jni_ctx->output_intermediate_state_ptr->row_idx = 0; \ + jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \ + if (data_column->is_column_string()) { \ + ColumnString* str_col = assert_cast<ColumnString*>(data_column.get()); \ + ColumnString::Chars& chars = assert_cast<ColumnString::Chars&>(str_col->get_chars()); \ + ColumnString::Offsets& offsets = \ + assert_cast<ColumnString::Offsets&>(str_col->get_offsets()); \ + chars.resize(buffer_size); \ + offsets.resize(buffer_size); \ + *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \ + *(jni_ctx->output_array_string_offsets_ptr) = \ + reinterpret_cast<int64_t>(offsets.data()); \ + env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \ + jni_env->executor_evaluate_id, nullptr); \ + while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \ + increase_buffer_size++; \ + buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + null_map_data.resize(buffer_size); \ + chars.resize(buffer_size); \ + offsets.resize(buffer_size); \ + *(jni_ctx->output_array_null_ptr) = \ + reinterpret_cast<int64_t>(null_map_data.data()); \ + *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \ + *(jni_ctx->output_array_string_offsets_ptr) = \ + reinterpret_cast<int64_t>(offsets.data()); \ + jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \ + env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \ + jni_env->executor_evaluate_id, nullptr); \ + } \ + } else { \ + data_column->resize(buffer_size); \ + *(jni_ctx->output_value_buffer) = \ + reinterpret_cast<int64_t>(data_column->get_raw_data().data); \ + env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \ + jni_env->executor_evaluate_id, nullptr); \ + while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \ + increase_buffer_size++; \ + buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + null_map_data.resize(buffer_size); \ + data_column->resize(buffer_size); \ + *(jni_ctx->output_array_null_ptr) = \ + reinterpret_cast<int64_t>(null_map_data.data()); \ + *(jni_ctx->output_value_buffer) = \ + reinterpret_cast<int64_t>(data_column->get_raw_data().data); \ + jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \ + env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \ + jni_env->executor_evaluate_id, nullptr); \ + } \ + } \ + } else { \ + return Status::InvalidArgument(strings::Substitute( \ + "Java UDF doesn't support return type $0 now !", return_type->get_name())); \ } #endif EVALUATE_JAVA_UDF; diff --git a/be/src/vec/functions/function_java_udf.h b/be/src/vec/functions/function_java_udf.h index 1f47394c69..605d7c0198 100644 --- a/be/src/vec/functions/function_java_udf.h +++ b/be/src/vec/functions/function_java_udf.h @@ -23,6 +23,7 @@ #include <stdint.h> #include <memory> +#include <mutex> #include <ostream> #include "common/logging.h" @@ -83,13 +84,6 @@ private: const DataTypes _argument_types; const DataTypePtr _return_type; - /// Global class reference to the UdfExecutor Java class and related method IDs. Set in - /// Init(). These have the lifetime of the process (i.e. 'executor_cl_' is never freed). - jclass executor_cl_; - jmethodID executor_ctor_id_; - jmethodID executor_evaluate_id_; - jmethodID executor_close_id_; - struct IntermediateState { size_t buffer_size; size_t row_idx; @@ -97,6 +91,15 @@ private: IntermediateState() : buffer_size(0), row_idx(0) {} }; + struct JniEnv { + /// Global class reference to the UdfExecutor Java class and related method IDs. Set in + /// Init(). These have the lifetime of the process (i.e. 'executor_cl_' is never freed). + jclass executor_cl; + jmethodID executor_ctor_id; + jmethodID executor_evaluate_id; + jmethodID executor_close_id; + }; + struct JniContext { // Do not save parent directly, because parent is in VExpr, but jni context is in FunctionContext // The deconstruct sequence is not determined, it will core. @@ -124,9 +127,9 @@ private: // intermediate_state includes two parts: reserved / used buffer size and rows std::unique_ptr<IntermediateState> output_intermediate_state_ptr; - JniContext(int64_t num_args, JavaFunctionCall* parent) - : executor_cl_(parent->executor_cl_), - executor_close_id_(parent->executor_close_id_), + JniContext(int64_t num_args, jclass executor_cl, jmethodID executor_close_id) + : executor_cl_(executor_cl), + executor_close_id_(executor_close_id), input_values_buffer_ptr(new int64_t[num_args]), input_nulls_buffer_ptr(new int64_t[num_args]), input_offsets_ptrs(new int64_t[num_args]), diff --git a/be/src/vec/functions/function_jsonb.cpp b/be/src/vec/functions/function_jsonb.cpp index fbe5f11313..ac62c59ef7 100644 --- a/be/src/vec/functions/function_jsonb.cpp +++ b/be/src/vec/functions/function_jsonb.cpp @@ -78,8 +78,10 @@ enum class JsonbParseErrorMode { FAIL = 0, RETURN_NULL, RETURN_VALUE, RETURN_INV template <NullalbeMode nullable_mode, JsonbParseErrorMode parse_error_handle_mode> class FunctionJsonbParseBase : public IFunction { private: - JsonbParser default_value_parser; - bool has_const_default_value = false; + struct FunctionJsonbParseState { + JsonbParser default_value_parser; + bool has_const_default_value = false; + }; public: static constexpr auto name = "json_parse"; @@ -152,20 +154,31 @@ public: bool use_default_implementation_for_nulls() const override { return false; } Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { + if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) { + std::shared_ptr<FunctionJsonbParseState> state = + std::make_shared<FunctionJsonbParseState>(); + context->set_function_state(FunctionContext::FRAGMENT_LOCAL, state); + } if constexpr (parse_error_handle_mode == JsonbParseErrorMode::RETURN_VALUE) { if (context->is_col_constant(1)) { const auto default_value_col = context->get_constant_col(1)->column_ptr; const auto& default_value = default_value_col->get_data_at(0); JsonbErrType error = JsonbErrType::E_NONE; - if (!default_value_parser.parse(default_value.data, default_value.size)) { - error = default_value_parser.getErrorCode(); - return Status::InvalidArgument( - "invalid default json value: {} , error: {}", - std::string_view(default_value.data, default_value.size), - JsonbErrMsg::getErrMsg(error)); + if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) { + FunctionJsonbParseState* state = reinterpret_cast<FunctionJsonbParseState*>( + context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + + if (!state->default_value_parser.parse(default_value.data, + default_value.size)) { + error = state->default_value_parser.getErrorCode(); + return Status::InvalidArgument( + "invalid default json value: {} , error: {}", + std::string_view(default_value.data, default_value.size), + JsonbErrMsg::getErrMsg(error)); + } + state->has_const_default_value = true; } - has_const_default_value = true; } } return Status::OK(); @@ -257,10 +270,14 @@ public: continue; } case JsonbParseErrorMode::RETURN_VALUE: { - if (has_const_default_value) { + FunctionJsonbParseState* state = reinterpret_cast<FunctionJsonbParseState*>( + context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + if (state->has_const_default_value) { col_to->insert_data( - default_value_parser.getWriter().getOutput()->getBuffer(), - (size_t)default_value_parser.getWriter().getOutput()->getSize()); + state->default_value_parser.getWriter().getOutput()->getBuffer(), + (size_t)state->default_value_parser.getWriter() + .getOutput() + ->getSize()); } else { auto val = block.get_by_position(arguments[1]).column->get_data_at(i); if (parser.parse(val.data, val.size)) { diff --git a/be/src/vec/functions/function_rpc.cpp b/be/src/vec/functions/function_rpc.cpp index dbd92eeec2..44c5e1b369 100644 --- a/be/src/vec/functions/function_rpc.cpp +++ b/be/src/vec/functions/function_rpc.cpp @@ -90,16 +90,20 @@ FunctionRPC::FunctionRPC(const TFunction& fn, const DataTypes& argument_types, : _argument_types(argument_types), _return_type(return_type), _tfn(fn) {} Status FunctionRPC::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) { - _fn = std::make_unique<RPCFnImpl>(_tfn); - - if (!_fn->available()) { - return Status::InternalError("rpc env init error"); + if (scope == FunctionContext::FRAGMENT_LOCAL) { + std::shared_ptr<RPCFnImpl> fn = std::make_shared<RPCFnImpl>(_tfn); + if (!fn->available()) { + return Status::InternalError("rpc env init error"); + } + context->set_function_state(FunctionContext::FRAGMENT_LOCAL, fn); } return Status::OK(); } Status FunctionRPC::execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, size_t result, size_t input_rows_count, bool dry_run) { - return _fn->vec_call(context, block, arguments, result, input_rows_count); + RPCFnImpl* fn = reinterpret_cast<RPCFnImpl*>( + context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + return fn->vec_call(context, block, arguments, result, input_rows_count); } } // namespace doris::vectorized diff --git a/be/src/vec/functions/function_rpc.h b/be/src/vec/functions/function_rpc.h index 5623183470..d10b9be546 100644 --- a/be/src/vec/functions/function_rpc.h +++ b/be/src/vec/functions/function_rpc.h @@ -107,7 +107,6 @@ private: DataTypes _argument_types; DataTypePtr _return_type; TFunction _tfn; - std::unique_ptr<RPCFnImpl> _fn; }; } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org