This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch wasm in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/wasm by this push: new 7c482a80d6c [Feature][UDF][WASM] support wasm udf (#30789) 7c482a80d6c is described below commit 7c482a80d6c16436035ef2891e100e617ce3ce29 Author: Benjaminwei <33219531+tap...@users.noreply.github.com> AuthorDate: Sun Feb 4 14:09:50 2024 +0800 [Feature][UDF][WASM] support wasm udf (#30789) --- be/cmake/thirdparty.cmake | 1 + be/src/common/config.cpp | 3 + be/src/common/config.h | 3 + be/src/runtime/exec_env.h | 5 + be/src/runtime/exec_env_init.cpp | 6 + be/src/runtime/user_function_cache.cpp | 19 ++- be/src/runtime/user_function_cache.h | 5 +- be/src/util/wasm_manager.cpp | 88 ++++++++++ be/src/util/wasm_manager.h | 65 +++++++ be/src/vec/exprs/vectorized_agg_fn.cpp | 2 + be/src/vec/exprs/vectorized_fn_call.cpp | 9 + be/src/vec/functions/function_wasm.cpp | 189 +++++++++++++++++++++ be/src/vec/functions/function_wasm.h | 82 +++++++++ be/test/util/wasm_manager_test.cpp | 57 +++++++ be/test/util/wasm_test.cpp | 61 +++++++ .../ecosystem/udf/wasm-user-defined-function.md | 116 +++++++++++++ .../ecosystem/udf/wasm-user-defined-function.md | 114 +++++++++++++ fe/fe-core/pom.xml | 7 + .../apache/doris/analysis/CreateFunctionStmt.java | 27 +++ .../org/apache/doris/catalog/ScalarFunction.java | 3 +- gensrc/thrift/Types.thrift | 4 +- .../data/wasmudf_p0/test_wasmudf_float.out | 23 +++ .../data/wasmudf_p0/test_wasmudf_int.out | 37 ++++ .../suites/wasmudf_p0/test_wasmudf_float.groovy | 68 ++++++++ .../suites/wasmudf_p0/test_wasmudf_int.groovy | 73 ++++++++ regression-test/suites/wasmudf_p0/wat/f32_add.wat | 8 + regression-test/suites/wasmudf_p0/wat/i32_add.wat | 8 + 27 files changed, 1077 insertions(+), 6 deletions(-) diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake index 5c07f79d6e4..cdefd9f7247 100644 --- a/be/cmake/thirdparty.cmake +++ b/be/cmake/thirdparty.cmake @@ -156,6 +156,7 @@ add_thirdparty(gssapi_krb5) add_thirdparty(dragonbox_to_chars LIB64) add_thirdparty(streamvbyte LIB64) target_include_directories(dragonbox_to_chars INTERFACE "${THIRDPARTY_DIR}/include/dragonbox-1.1.3") +add_thirdparty(wasmtime) if (OS_MACOSX) add_thirdparty(bfd) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index ff9233381cf..6e9852e553d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -965,6 +965,9 @@ DEFINE_mInt32(segcompaction_num_threads, "5"); // enable java udf and jdbc scannode DEFINE_Bool(enable_java_support, "true"); +// enable wasm udf +DEFINE_Bool(enable_wasm_support, "true"); + // Set config randomly to check more issues in github workflow DEFINE_Bool(enable_fuzzy_mode, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index ff703680336..66f862a8106 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1014,6 +1014,9 @@ DECLARE_mInt32(segcompaction_num_threads); // enable java udf and jdbc scannode DECLARE_Bool(enable_java_support); +// enable wasm udf +DECLARE_Bool(enable_wasm_support); + // Set config randomly to check more issues in github workflow DECLARE_Bool(enable_fuzzy_mode); diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index b8120f1c731..9cbea709b8a 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -36,6 +36,7 @@ #include "olap/tablet_fwd.h" #include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove this include header #include "util/threadpool.h" +#include "util/wasm_manager.h" namespace doris { namespace vectorized { @@ -192,6 +193,9 @@ public: BrpcClientCache<PFunctionService_Stub>* brpc_function_client_cache() const { return _function_client_cache; } + std::shared_ptr<WasmFunctionManager> wasm_function_manager() const { + return _wasm_function_manager; + } LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; } std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return _new_load_stream_mgr; } SmallFileMgr* small_file_mgr() { return _small_file_mgr; } @@ -334,6 +338,7 @@ private: std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr; BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr; BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr; + std::shared_ptr<WasmFunctionManager> _wasm_function_manager = nullptr; std::shared_ptr<StreamLoadExecutor> _stream_load_executor; RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 9942262f9c9..8e6fda2f466 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -96,6 +96,7 @@ #include "util/threadpool.h" #include "util/thrift_rpc_helper.h" #include "util/timezone_utils.h" +#include "util/wasm_manager.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/vdata_stream_mgr.h" #include "vec/sink/delta_writer_v2_pool.h" @@ -217,6 +218,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, _new_load_stream_mgr = NewLoadStreamMgr::create_shared(); _internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); _function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); + _wasm_function_manager = std::shared_ptr<WasmFunctionManager>(new WasmFunctionManager()); _stream_load_executor = StreamLoadExecutor::create_shared(this); _routine_load_task_executor = new RoutineLoadTaskExecutor(this); _small_file_mgr = new SmallFileMgr(this, config::small_file_dir); @@ -557,6 +559,10 @@ void ExecEnv::destroy() { _memtable_memory_limiter.reset(); _delta_writer_v2_pool.reset(); _load_stream_stub_pool.reset(); + + // _wasm_function_manager should be destroyed. + _wasm_function_manager.reset(); + SAFE_STOP(_storage_engine); SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool); SAFE_SHUTDOWN(_s3_file_upload_thread_pool); diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp index ab9d90846ab..96255cffb2c 100644 --- a/be/src/runtime/user_function_cache.cpp +++ b/be/src/runtime/user_function_cache.cpp @@ -144,6 +144,8 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std lib_type = LibType::SO; } else if (ends_with(file, ".jar")) { lib_type = LibType::JAR; + } else if (ends_with(file, ".wat")) { + lib_type = LibType::WAT; } else { return Status::InternalError( "unknown library file format. the file type is not end with xxx.jar or xxx.so : " + @@ -251,9 +253,10 @@ Status UserFunctionCache::_load_cache_entry(const std::string& url, if (entry->type == LibType::SO) { RETURN_IF_ERROR(_load_cache_entry_internal(entry)); - } else if (entry->type != LibType::JAR) { + } else if (entry->type != LibType::JAR && entry->type != LibType::WAT) { return Status::InvalidArgument( - "Unsupported lib type! Make sure your lib type is one of 'so' and 'jar'!"); + "Unsupported lib type! Make sure your lib type is one of 'so' and 'jar' and " + "'wat'!"); } return Status::OK(); } @@ -356,6 +359,8 @@ std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::st ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum; if (type == LibType::JAR) { ss << '.' << file_name; + } else if (type == LibType::WAT) { + ss << '.' << file_name; } else { ss << ".so"; } @@ -370,6 +375,14 @@ Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url, return Status::OK(); } +Status UserFunctionCache::get_watpath(int64_t fid, const std::string& url, + const std::string& checksum, std::string* libpath) { + std::shared_ptr<UserFunctionCacheEntry> entry = nullptr; + RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::WAT)); + *libpath = entry->lib_file; + return Status::OK(); +} + std::vector<std::string> UserFunctionCache::_split_string_by_checksum(const std::string& file) { std::vector<std::string> result; @@ -393,4 +406,4 @@ std::vector<std::string> UserFunctionCache::_split_string_by_checksum(const std: return result; } -} // namespace doris +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/user_function_cache.h b/be/src/runtime/user_function_cache.h index 5d1bff8b866..6d57c32cd5e 100644 --- a/be/src/runtime/user_function_cache.h +++ b/be/src/runtime/user_function_cache.h @@ -43,7 +43,7 @@ struct UserFunctionCacheEntry; // with id, this function library is valid. And when user wants to // change its implementation(URL), Doris will generate a new function // id. -enum class LibType { JAR, SO }; +enum class LibType { JAR, SO, WAT }; class UserFunctionCache { public: @@ -59,6 +59,9 @@ public: Status get_jarpath(int64_t fid, const std::string& url, const std::string& checksum, std::string* libpath); + Status get_watpath(int64_t fid, const std::string& url, const std::string& checksum, + std::string* libpath); + private: Status _load_cached_lib(); Status _load_entry_from_lib(const std::string& dir, const std::string& file); diff --git a/be/src/util/wasm_manager.cpp b/be/src/util/wasm_manager.cpp new file mode 100644 index 00000000000..17630214bab --- /dev/null +++ b/be/src/util/wasm_manager.cpp @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/wasm_manager.h" + +namespace doris { + +WasmFunctionManager::WasmFunctionManager() { + engine = std::unique_ptr<wasmtime::Engine>(new wasmtime::Engine); + store = std::unique_ptr<wasmtime::Store>(new wasmtime::Store(*engine)); +} + +WasmFunctionManager::~WasmFunctionManager() { + engine.reset(); + store.reset(); +} + +bool WasmFunctionManager::RegisterFunction(std::string functionName, std::string functionHandler, + const std::string& watString) { + auto funcBody = funcs.find(functionName); + if (funcBody != funcs.end()) { + return false; + } + auto wasmRuntime = createInstanceAndFunction(watString, functionHandler); + funcs.emplace(functionName, wasmRuntime); + return true; +} + +bool WasmFunctionManager::RegisterFunction(std::string functionName, std::string functionHandler, + const wasmtime::Span<uint8_t>& wasm) { + auto funcBody = funcs.find(functionName); + if (funcBody != funcs.end()) { + return false; + } + auto wasmRuntime = createInstanceAndFunction(wasm, functionHandler); + funcs.emplace(functionName, wasmRuntime); + return true; +} + +WasmtimeRunInstance WasmFunctionManager::createInstanceAndFunction( + const std::string& watString, const std::string& functionHandler) { + auto module = wasmtime::Module::compile(*engine, watString).unwrap(); + auto instance = wasmtime::Instance::create(store.get(), module, {}).unwrap(); + auto function_obj = instance.get(store.get(), functionHandler); + wasmtime::Func* func = std::get_if<wasmtime::Func>(&*function_obj); + return WasmtimeRunInstance(*func, instance); +} + +WasmtimeRunInstance WasmFunctionManager::createInstanceAndFunction( + const wasmtime::Span<uint8_t> wasm, const std::string& functionHandler) { + auto module = wasmtime::Module::compile(*engine, wasm).unwrap(); + auto instance = wasmtime::Instance::create(store.get(), module, {}).unwrap(); + auto function_obj = instance.get(store.get(), functionHandler); + wasmtime::Func* func = std::get_if<wasmtime::Func>(&*function_obj); + return WasmtimeRunInstance(*func, instance); +} + +std::vector<wasmtime::Val> WasmFunctionManager::runElemFunc(const std::string functionName, + std::vector<wasmtime::Val> args) { + auto module = funcs.at(functionName); + auto results = module.func.call(store.get(), args).unwrap(); + return results; +} + +bool WasmFunctionManager::DeleteFunction(std::string functionName) { + auto funcBody = funcs.find(functionName); + if (funcBody == funcs.end()) { + return false; + } + funcs.erase(functionName); + return true; +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/util/wasm_manager.h b/be/src/util/wasm_manager.h new file mode 100644 index 00000000000..08c14f1165b --- /dev/null +++ b/be/src/util/wasm_manager.h @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cassert> +#include <fstream> +#include <iostream> +#include <limits> +#include <sstream> +#include <string> +#include <unordered_map> +#include <vector> +#include <wasmtime.hh> + +namespace doris { + +struct WasmtimeRunInstance { + wasmtime::Func func; + wasmtime::Instance instance; + WasmtimeRunInstance(const wasmtime::Func& func, const wasmtime::Instance& instance) + : func(func), instance(instance) {} +}; + +class WasmFunctionManager { +private: + // wasmtime + std::unique_ptr<wasmtime::Engine> engine; + std::unique_ptr<wasmtime::Store> store; + std::unordered_map<std::string, WasmtimeRunInstance> funcs; + + WasmFunctionManager(const WasmFunctionManager&); + WasmFunctionManager& operator=(const WasmFunctionManager&); + +public: + WasmFunctionManager(); + ~WasmFunctionManager(); + WasmtimeRunInstance createInstanceAndFunction(const std::string& watString, + const std::string& functionHandler); + WasmtimeRunInstance createInstanceAndFunction(const wasmtime::Span<uint8_t> wasm, + const std::string& functionHandler); + bool RegisterFunction(std::string functionName, std::string functionHandler, + const std::string& watString); + bool RegisterFunction(std::string functionName, std::string functionHandler, + const wasmtime::Span<uint8_t>& wasm); + std::vector<wasmtime::Val> runElemFunc(const std::string functionName, + std::vector<wasmtime::Val> args); + bool DeleteFunction(std::string functionName); +}; + +} // namespace doris diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index 166ad9bc2b2..6d063a54813 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -150,6 +150,8 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const RowDescriptor& desc, } } else if (_fn.binary_type == TFunctionBinaryType::RPC) { _function = AggregateRpcUdaf::create(_fn, argument_types, _data_type); + } else if (_fn.binary_type == TFunctionBinaryType::WASM_UDF) { + return Status::InternalError("WASM_UDF is not supported"); } else if (_fn.binary_type == TFunctionBinaryType::AGG_STATE) { if (argument_types.size() != 1) { return Status::InternalError("Agg state Function must input 1 argument but get {}", diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp b/be/src/vec/exprs/vectorized_fn_call.cpp index bf38185f7df..a2f507b11bd 100644 --- a/be/src/vec/exprs/vectorized_fn_call.cpp +++ b/be/src/vec/exprs/vectorized_fn_call.cpp @@ -41,6 +41,7 @@ #include "vec/functions/function_agg_state.h" #include "vec/functions/function_java_udf.h" #include "vec/functions/function_rpc.h" +#include "vec/functions/function_wasm.h" #include "vec/functions/simple_function_factory.h" #include "vec/utils/util.hpp" @@ -70,6 +71,14 @@ Status VectorizedFnCall::prepare(RuntimeState* state, const RowDescriptor& desc, if (_fn.binary_type == TFunctionBinaryType::RPC) { _function = FunctionRPC::create(_fn, argument_template, _data_type); + } else if (_fn.binary_type == TFunctionBinaryType::WASM_UDF) { + if (config::enable_wasm_support) { + _function = FunctionWasm::create(_fn, argument_template, _data_type); + } else { + return Status::InternalError( + "Wasm UDF is not enabled, you can change be config enable_wasm_support to true " + "and restart be."); + } } else if (_fn.binary_type == TFunctionBinaryType::JAVA_UDF) { if (config::enable_java_support) { _function = JavaFunctionCall::create(_fn, argument_template, _data_type); diff --git a/be/src/vec/functions/function_wasm.cpp b/be/src/vec/functions/function_wasm.cpp new file mode 100644 index 00000000000..c4b8f55c6c2 --- /dev/null +++ b/be/src/vec/functions/function_wasm.cpp @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/functions/function_wasm.h" + +#include <brpc/controller.h> +#include <fmt/format.h> + +#include <algorithm> +#include <memory> +#include <string> +#include <utility> + +#include "gutil/strings/substitute.h" +#include "runtime/exec_env.h" +#include "runtime/user_function_cache.h" +#include "vec/columns/column.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/serde/data_type_serde.h" +#include "vec/functions/function.h" + +namespace doris::vectorized { + +FunctionWasm::FunctionWasm(const TFunction& fn, const DataTypes& argument_types, + const DataTypePtr& return_type) + : _argument_types(argument_types), _return_type(return_type), _tfn(fn) { + _is_nullable = false; + for (const auto& type : argument_types) { + auto argument_type = type; + if (type->is_nullable()) { + argument_type = remove_nullable(type); + _is_nullable = true; + } + _not_nullable_argument_types.push_back(argument_type); + } +} + +Status FunctionWasm::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) { + if (scope == FunctionContext::FRAGMENT_LOCAL) { + string local_location; + auto* function_cache = UserFunctionCache::instance(); + RETURN_IF_ERROR(function_cache->get_watpath(_tfn.id, _tfn.hdfs_location, _tfn.checksum, + &local_location)); + std::shared_ptr<WasmFunctionManager> manager = std::make_shared<WasmFunctionManager>(); + context->set_function_state(FunctionContext::THREAD_LOCAL, manager); + std::ifstream wat_file; + wat_file.open(local_location.c_str()); + std::stringstream str_stream; + str_stream << wat_file.rdbuf(); + const std::string wasm_body = str_stream.str(); + manager->RegisterFunction(_tfn.name.function_name, _tfn.scalar_fn.symbol, wasm_body); + } + return Status::OK(); +} + +Status FunctionWasm::execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t input_rows_count, bool dry_run) { + /** + * i32, a 32-bit integer (equivalent to C++’s signed long int) + * i64, a 64-bit integer (equivalent to C++’s signed long long int) + * f32, 32-bit float (equivalent to C++’s float) + * f64, 64-bit float (equivalent to C++’s double) + */ + int arg_size = arguments.size(); + ColumnPtr data_cols[arg_size]; + auto* manager = reinterpret_cast<WasmFunctionManager*>( + context->get_function_state(FunctionContext::THREAD_LOCAL)); + auto return_type = _return_type; + auto result_nullable = return_type->is_nullable(); + ColumnUInt8::MutablePtr null_map = nullptr; + + if (result_nullable) { + return_type = remove_nullable(_return_type); + null_map = ColumnUInt8::create(input_rows_count, 0); + memset(null_map->get_data().data(), 0, input_rows_count); + } + + auto result_col = return_type->create_column(); + result_col->resize(input_rows_count); + + // check type : defined datatype same with param datatype + for (size_t arg_idx = 0; arg_idx < arg_size; ++arg_idx) { + ColumnWithTypeAndName& column = block.get_by_position(arguments[arg_idx]); + DataTypePtr data_type = column.type; + if (data_type->is_nullable() && !_is_nullable) { + return Status::InternalError(fmt::format( + "Defined datatype is not nullable, but param datatype is nullable")); + } + + if (data_type->is_nullable()) { + data_type = remove_nullable(data_type); + } + + DCHECK(_not_nullable_argument_types[arg_idx]->equals(*data_type)) + << " input column's type is " + data_type->get_name() + << " does not equal to required type " + << _not_nullable_argument_types[arg_idx]->get_name(); + + auto data_col = column.column->convert_to_full_column_if_const(); + + data_cols[arg_idx] = data_col; + } + + // step1. process column value to wasm param + // step2. call wasm function + // step3. return wasm result to column value + // TODO: vec the code to call wasm fun + int row_size = data_cols[0]->size(); + for (size_t i = 0; i < row_size; ++i) { + std::vector<wasmtime::Val> params; + for (size_t arg_idx = 0; arg_idx < arg_size; ++arg_idx) { + WhichDataType which_type(_not_nullable_argument_types[arg_idx]); + if (data_cols[arg_idx]->is_null_at(i)) { + null_map->get_data()[i] = 1; + continue; + } + if (which_type.is_int32()) { + auto data_col = data_cols[arg_idx]; + if (data_col->is_nullable()) { + data_col = remove_nullable(data_col); + } + const auto* param_column = check_and_get_column<ColumnInt32>(data_col); + params.emplace_back(param_column->get_data()[i]); + } else if (which_type.is_int64()) { + auto data_col = data_cols[arg_idx]; + if (data_col->is_nullable()) { + data_col = remove_nullable(data_col); + } + const auto* param_column = check_and_get_column<ColumnInt64>(data_col); + params.emplace_back(param_column->get_data()[i]); + } else if (which_type.is_float32()) { + auto data_col = data_cols[arg_idx]; + if (data_col->is_nullable()) { + data_col = remove_nullable(data_col); + } + const auto* param_column = check_and_get_column<ColumnFloat32>(data_col); + params.emplace_back(param_column->get_data()[i]); + } else if (which_type.is_float64()) { + auto data_col = data_cols[arg_idx]; + if (data_col->is_nullable()) { + data_col = remove_nullable(data_col); + } + const auto* param_column = check_and_get_column<ColumnFloat64>(data_col); + params.emplace_back(param_column->get_data()[i]); + } + } + if (null_map->get_data()[i] == 1) { + continue; + } + auto rets = manager->runElemFunc(_tfn.name.function_name, params); + + auto ret = rets.at(0); + + if (ret.kind() == wasmtime::ValKind::I32) { + reinterpret_cast<ColumnInt32&>(*result_col).get_data()[i] = ret.i32(); + } else if (ret.kind() == wasmtime::ValKind::I64) { + reinterpret_cast<ColumnInt64&>(*result_col).get_data()[i] = ret.i64(); + } else if (ret.kind() == wasmtime::ValKind::F32) { + reinterpret_cast<ColumnFloat32&>(*result_col).get_data()[i] = ret.f32(); + } else if (ret.kind() == wasmtime::ValKind::F64) { + reinterpret_cast<ColumnFloat64&>(*result_col).get_data()[i] = ret.f64(); + } + } + + if (result_nullable) { + block.replace_by_position( + result, ColumnNullable::create(std::move(result_col), std::move(null_map))); + } else { + block.replace_by_position(result, std::move(result_col)); + } + + return Status::OK(); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/functions/function_wasm.h b/be/src/vec/functions/function_wasm.h new file mode 100644 index 00000000000..9744f33f947 --- /dev/null +++ b/be/src/vec/functions/function_wasm.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <fmt/format.h> +#include <stddef.h> + +#include <memory> +#include <string> +#include <vector> + +#include "common/status.h" +#include "udf/udf.h" +#include "util/wasm_manager.h" +#include "vec/core/block.h" +#include "vec/core/column_numbers.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/columns_with_type_and_name.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/functions/function.h" + +namespace doris::vectorized { + +class FunctionWasm : public IFunctionBase { +public: + FunctionWasm(const TFunction& fn, const DataTypes& argument_types, + const DataTypePtr& return_type); + + static FunctionBasePtr create(const TFunction& fn, const ColumnsWithTypeAndName& argument_types, + const DataTypePtr& return_type) { + DataTypes data_types(argument_types.size()); + for (size_t i = 0; i < argument_types.size(); ++i) { + data_types[i] = argument_types[i].type; + } + return std::make_shared<FunctionWasm>(fn, data_types, return_type); + } + + /// Get the main function name. + String get_name() const override { + return fmt::format("{}: [{}/{}]", _tfn.name.function_name, _tfn.hdfs_location, + _tfn.scalar_fn.symbol); + } + + const DataTypes& get_argument_types() const override { return _argument_types; } + const DataTypePtr& get_return_type() const override { return _return_type; } + + PreparedFunctionPtr prepare(FunctionContext* context, const Block& sample_block, + const ColumnNumbers& arguments, size_t result) const override { + return nullptr; + } + + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; + + Status execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t input_rows_count, bool dry_run = false) const override; + + bool is_use_default_implementation_for_constants() const override { return true; } + +private: + DataTypes _argument_types; + DataTypes _not_nullable_argument_types; + bool _is_nullable; + DataTypePtr _return_type; + TFunction _tfn; +}; +} // namespace doris::vectorized diff --git a/be/test/util/wasm_manager_test.cpp b/be/test/util/wasm_manager_test.cpp new file mode 100644 index 00000000000..51b131887d2 --- /dev/null +++ b/be/test/util/wasm_manager_test.cpp @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/wasm_manager.h" + +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> + +#include <fstream> +#include <string> + +#include "gtest/gtest_pred_impl.h" +#include "testutil/test_util.h" + +namespace doris { + +class WasmManagerTest : public ::testing::Test { + std::string readFile(const char* name) { + std::ifstream wat_file; + wat_file.open(name); + std::stringstream str_stream; + str_stream << wat_file.rdbuf(); + return str_stream.str(); + } +}; + +TEST_F(WasmManagerTest, TestExecAdd) { + std::string dir_path = GetCurrentRunningDir(); + std::string stat_path(dir_path); + stat_path += "/util/test_data/add.wat"; + WasmFunctionManager manager; + const std::string wasm_body = readFile(stat_path.c_str()); + const std::string wasm_function_name = "add"; + manager.RegisterFunction(wasm_function_name, wasm_function_name, wasm_body); + std::vector<wasmtime::Val> params; + auto params_size = 2; + params.reserve(params_size); + params.emplace_back(10); + params.emplace_back(20); + auto results = manager.runElemFunc(wasm_function_name, params); + EXPECT_EQ(results[0].i32(), 30); +} +} // namespace doris \ No newline at end of file diff --git a/be/test/util/wasm_test.cpp b/be/test/util/wasm_test.cpp new file mode 100644 index 00000000000..dabe7d17047 --- /dev/null +++ b/be/test/util/wasm_test.cpp @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> + +#include <fstream> +#include <iostream> +#include <sstream> +#include <string> +#include <wasmtime.hh> + +#include "gtest/gtest_pred_impl.h" +#include "testutil/test_util.h" + +namespace doris { +class WasmtimeTest : public ::testing::Test { + std::string readFile(const char* name) { + std::ifstream wat_file; + wat_file.open(name); + std::stringstream str_stream; + str_stream << wat_file.rdbuf(); + return str_stream.str(); + } +}; + +TEST_F(WasmtimeTest, TestLoadWasm) { + std::string dir_path = GetCurrentRunningDir(); + std::string stat_path(dir_path); + stat_path += "/util/test_data/add.wat"; + wasmtime::Engine engine; + wasmtime::Store store(engine); + const std::string wasm_body = readFile(stat_path.c_str()); + auto module = wasmtime::Module::compile(engine, wasm_body).unwrap(); + auto instance = wasmtime::Instance::create(store, module, {}).unwrap(); + + auto add = std::get<wasmtime::Func>(*instance.get(store, "add")); + std::vector<wasmtime::Val> params; + auto params_size = 2; + params.reserve(params_size); + params.emplace_back(10); + params.emplace_back(20); + auto results = add.call(store, params).unwrap(); + + EXPECT_EQ(results[0].i32(), 30); +} +} // namespace doris diff --git a/docs/en/docs/ecosystem/udf/wasm-user-defined-function.md b/docs/en/docs/ecosystem/udf/wasm-user-defined-function.md new file mode 100644 index 00000000000..7915ce0b2a5 --- /dev/null +++ b/docs/en/docs/ecosystem/udf/wasm-user-defined-function.md @@ -0,0 +1,116 @@ +--- +{ +"title": "WASM UDF", +"language": "en" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# WASM UDF + +<version since="1.2.0"> + +WASM UDF + +</version> +WASM (WebAssembly) UDF provides a way to write custom functions using WebAssembly in Doris. WASM UDF has some unique advantages and limitations compared to Java UDF: + +1. The advantages: + +* Cross-language compatibility: WebAssembly is a cross-language binary instruction set that allows compilation into UDFs using multiple programming languages. This way, users can choose a language they are familiar with without being restricted to a specific language. +* Performance: WebAssembly generally executes faster than interpreted languages. By compiling UDFs to WebAssembly, users can achieve higher performance, especially when it comes to computationally intensive tasks. +* Lightweight: WebAssembly's binary format is relatively small, so the overhead of transmission and loading is low, making it more portable in distributed computing environments. + +2. Restrictions on use: + +* Ecosystem: WebAssembly's ecosystem is relatively new and lacks some mature libraries and tools. Users may need to write some tools themselves to meet their own needs. +* Debugging difficulty: Debugging WebAssembly is relatively complex because it is a low-level binary format. Users may need more tools and skills to debug and optimize their UDFs than with high-level languages. +* Security: Although WebAssembly is designed to run in a sandbox, users still need to be cautious about potential security risks. When loading a UDF you need to make sure it is from a trusted source. + +## Create UDF + +```sql +CREATE FUNCTION +name ([,...]) +[RETURNS] rettype +PROPERTIES (["key"="value"][,...]) +``` +Instructions: + +1. `symbol` in properties represents the class name containing UDF classes. This parameter must be set. +2. The Wat file containing UDF represented by `file` in properties must be set. +3. The UDF call type represented by `type` in properties is native by default. When using wasm UDF, it is transferred to `WASM_UDF`. +4. In PROPERTIES `always_nullable` indicates whether there may be a NULL value in the UDF return result. It is an optional parameter. The default value is true. +5. `name`: A function belongs to a DB and name is of the form`dbName`.`funcName`. When `dbName` is not explicitly specified, the db of the current session is used`dbName`. + +Sample: +```sql +CREATE FUNCTION wasm_udf_add_one(int) RETURNS int PROPERTIES ( + "file"="file:///path/to/wasm-udf-demo.wat, + "symbol"="add", + "always_nullable"="true", + "type"="WASM_UDF" +); +``` + +``` +* "file"=" http://IP:port/udf.wat ", you can also use http to download wat file in a multi machine environment. + +* The "always_nullable" is optional attribute, if there is special treatment for the NULL value in the calculation, it is determined that the result will not return NULL, and it can be set to false, so that the performance may be better in the whole calculation process. + +* If you use the local path method, the wat file that the database driver depends on, the FE and BE nodes must be placed here +## Create UDAF + +Currently, UDTF are not supported. + +```sql +CREATE FUNCTION wasm_udf_add_one(int) RETURNS int PROPERTIES ( + "file"="file:///path/to/wasm-udf-demo.wat, + "symbol"="add", + "always_nullable"="true", + "type"="WASM_UDF" +); +``` + +```wat +(module + (func $add (param i32) (param i32) (result i32) + (local.get 0) + (local.get 1) + (i32.add) + ) + (export "add" (func $add)) +) +``` + +## Use UDF + +Users must have the `SELECT` permission of the corresponding database to use UDF/UDAF. + +The use of UDF is consistent with ordinary function methods. The only difference is that the scope of built-in functions is global, and the scope of UDF is internal to DB. When the link session is inside the data, directly using the UDF name will find the corresponding UDF inside the current DB. Otherwise, the user needs to display the specified UDF database name, such as `dbName`.`funcName`. + +## Delete UDF + +When you no longer need UDF functions, you can delete a UDF function by the following command, you can refer to `DROP FUNCTION`. + +## Instructions +1. Data types other than INT, FLOAT are not supported. + diff --git a/docs/zh-CN/docs/ecosystem/udf/wasm-user-defined-function.md b/docs/zh-CN/docs/ecosystem/udf/wasm-user-defined-function.md new file mode 100644 index 00000000000..a785ba3ebed --- /dev/null +++ b/docs/zh-CN/docs/ecosystem/udf/wasm-user-defined-function.md @@ -0,0 +1,114 @@ +--- +{ +"title": "WASM UDF", +"language": "zh-CN" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# WASM UDF + +<version since="1.2.0"> + +WASM (WebAssembly) UDF 提供了一种在 Doris 中使用 WebAssembly 编写自定义函数的方式。与 Java UDF 相比,WASM UDF 有一些独特的优势和限制: + +1. 优势: + +* 跨语言兼容性:WebAssembly 是一种跨语言的二进制指令集,允许使用多种编程语言编译成 UDF。这样,用户可以选择他们熟悉的语言,而不受特定语言的限制。 +* 性能:WebAssembly 的执行速度通常比解释型语言更快。通过将 UDF 编译为 WebAssembly,用户可以获得更高的性能,特别是在涉及计算密集型任务时。 +* 轻量级:WebAssembly 的二进制格式相对较小,因此传输和加载的开销较低,使得在分布式计算环境中更为轻便。 + +2. 使用限制: + +* 生态系统:WebAssembly 的生态系统相对较新,缺少一些成熟的库和工具。用户可能需要自己编写一些工具来满足自己的需求。 +* 调试难度:WebAssembly 的调试相对复杂,因为它是一个低级的二进制格式。与高级语言相比,用户可能需要更多的工具和技能来调试和优化其 UDF。 +* 安全性:虽然 WebAssembly 被设计为在沙盒中运行,但用户仍需谨慎处理潜在的安全风险。在加载 UDF 时需要确保来自可信源。 + +</version> + +## 创建 UDF + +```sql +CREATE FUNCTION +name ([,...]) +[RETURNS] rettype +PROPERTIES (["key"="value"][,...]) +``` +说明: + +1. PROPERTIES中`symbol`表示的是包含 UDF 类的类名,这个参数是必须设定的。 +2. PROPERTIES中`file`表示的包含用户 UDF 的 wat 文件,这个参数是必须设定的。 +3. PROPERTIES中`type`表示的 UDF 调用类型,默认为 Native,使用 WASM UDF 时传 WASM_UDF。 +4. PROPERTIES中`always_nullable`表示的 UDF 返回结果中是否有可能出现 NULL 值,是可选参数,默认值为 true。 +5. name: 一个 function 是要归属于某个 DB 的,name 的形式为 `dbName`.`funcName`。当 `dbName` 没有明确指定的时候,就是使用当前 session 所在的 db 作为 `dbName`。 + +示例: +```sql +CREATE FUNCTION wasm_udf_add_one(int) RETURNS int PROPERTIES ( + "file"="file:///path/to/wasm-udf-demo.wat, + "symbol"="add", + "always_nullable"="true", + "type"="WASM_UDF" +); +``` +* "file"="http://IP:port/udf.wat", 当在多机环境时,也可以使用 http 的方式下载 wat 文件 +* "always_nullable" 可选属性, 如果在计算中对出现的 NULL 值有特殊处理,确定结果中不会返回 NULL,可以设为 false,这样在整个查询计算过程中性能可能更好些。 +* 如果你是**本地路径**方式,这里数据库驱动依赖的 wat 文件,**FE、BE节点都要放置** + +## 编写 UDF 函数 + +目前还暂不支持 UDTF + +```sql +CREATE FUNCTION wasm_udf_add_one(int) RETURNS int PROPERTIES ( + "file"="file:///path/to/wasm-udf-demo.wat, + "symbol"="add", + "always_nullable"="true", + "type"="WASM_UDF" +); +``` + +```wat +(module + (func $add (param i32) (param i32) (result i32) + (local.get 0) + (local.get 1) + (i32.add) + ) + (export "add" (func $add)) +) +``` + + +## 使用 UDF + +用户使用 UDF 必须拥有对应数据库的 `SELECT` 权限。 + +UDF 的使用与普通的函数方式一致,唯一的区别在于,内置函数的作用域是全局的,而 UDF 的作用域是 DB 内部。当链接 session 位于数据内部时,直接使用 UDF 名字会在当前DB内部查找对应的 UDF。否则用户需要显示的指定 UDF 的数据库名字,例如 `dbName`.`funcName`。 + +## 删除 UDF + +当你不再需要 UDF 函数时,你可以通过下述命令来删除一个 UDF 函数, 可以参考 `DROP FUNCTION`。 + +## 使用须知 +1. 不支持除 INT,FLOAT 之外的数据类型。 + + diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 29846ddf08f..d9ad8222bcd 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -777,6 +777,13 @@ under the License. <groupId>org.immutables</groupId> <artifactId>value</artifactId> </dependency> + + <!-- for wasm --> + <dependency> + <groupId>io.github.kawamuray.wasmtime</groupId> + <artifactId>wasmtime-java</artifactId> + <version>0.19.0</version> + </dependency> </dependencies> <repositories> <!-- for huawei obs sdk --> diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java index d498d1f75bc..b64f0c46709 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java @@ -46,6 +46,10 @@ import org.apache.doris.thrift.TFunctionBinaryType; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSortedMap; +import io.github.kawamuray.wasmtime.Engine; +import io.github.kawamuray.wasmtime.Module; +import io.github.kawamuray.wasmtime.Store; +import io.github.kawamuray.wasmtime.WasmtimeException; import io.grpc.ManagedChannel; import io.grpc.netty.NettyChannelBuilder; import org.apache.commons.codec.binary.Hex; @@ -349,6 +353,8 @@ public class CreateFunctionStmt extends DdlStmt { throw new AnalysisException("No 'symbol' in properties of java-udaf"); } analyzeJavaUdaf(symbol); + } else if (binaryType == TFunctionBinaryType.WASM_UDF) { + throw new AnalysisException("No Support asam-udaf"); } function = builder.initFnSymbol(initFnSymbol).updateFnSymbol(updateFnSymbol).mergeFnSymbol(mergeFnSymbol) .serializeFnSymbol(serializeFnSymbol).finalizeFnSymbol(finalizeFnSymbol) @@ -377,6 +383,8 @@ public class CreateFunctionStmt extends DdlStmt { checkRPCUdf(symbol); } else if (binaryType == TFunctionBinaryType.JAVA_UDF) { analyzeJavaUdf(symbol); + } else if (binaryType == TFunctionBinaryType.WASM_UDF) { + analyzeWasmUdf(); } URI location = URI.create(userFile); function = ScalarFunction.createUdf(binaryType, @@ -490,6 +498,25 @@ public class CreateFunctionStmt extends DdlStmt { } } + private void analyzeWasmUdf() throws AnalysisException { + try { + URL url = new URL(userFile); + try (Store<Void> store = Store.withoutData()) { + // TODO: Check the wasm function parameter type. wasmtime-java does not currently support obtaining parameters from function. + try (Engine engine = store.engine(); + Module module = Module.fromFile(engine, url.getFile())) { + return; + } catch (WasmtimeException e) { + throw new AnalysisException("Failed to compile file: " + userFile, e); + } + } catch (WasmtimeException e) { + throw new AnalysisException("Failed to create engine: " + userFile, e); + } + } catch (MalformedURLException e) { + throw new AnalysisException("Failed to load file: " + userFile); + } + } + private void checkMethodNonStaticAndPublic(String methoName, Method method, String udfClassName) throws AnalysisException { if (Modifier.isStatic(method.getModifiers())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java index 31d97e9b536..796684986d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java @@ -232,7 +232,8 @@ public class ScalarFunction extends Function { public TFunction toThrift(Type realReturnType, Type[] realArgTypes, Boolean[] realArgTypeNullables) { TFunction fn = super.toThrift(realReturnType, realArgTypes, realArgTypeNullables); fn.setScalarFn(new TScalarFunction()); - if (getBinaryType() == TFunctionBinaryType.JAVA_UDF || getBinaryType() == TFunctionBinaryType.RPC) { + if (getBinaryType() == TFunctionBinaryType.JAVA_UDF || getBinaryType() == TFunctionBinaryType.RPC + || getBinaryType() == TFunctionBinaryType.WASM_UDF) { fn.getScalarFn().setSymbol(symbolName); } else { fn.getScalarFn().setSymbol(""); diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index cde75a26012..e42429fd05a 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -314,7 +314,9 @@ enum TFunctionBinaryType { JAVA_UDF, - AGG_STATE + AGG_STATE, + + WASM_UDF, } // Represents a fully qualified function name. diff --git a/regression-test/data/wasmudf_p0/test_wasmudf_float.out b/regression-test/data/wasmudf_p0/test_wasmudf_float.out new file mode 100644 index 00000000000..02364f38750 --- /dev/null +++ b/regression-test/data/wasmudf_p0/test_wasmudf_float.out @@ -0,0 +1,23 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- +111 11111.111 222222.33 1.234567834455677E7 1111112.0 +112 1234556.1 222222.33 2.2222222233333334E8 4.444444444444556E12 +113 8.765432E7 \N 6.666666666666666E9 \N + +-- !select -- +113.94756 + +-- !select -- +113.94756 + +-- !select -- +\N + +-- !select -- +\N + +-- !select -- +111 233333.44 +112 1456778.5 +113 \N + diff --git a/regression-test/data/wasmudf_p0/test_wasmudf_int.out b/regression-test/data/wasmudf_p0/test_wasmudf_int.out new file mode 100644 index 00000000000..b52c483c3f4 --- /dev/null +++ b/regression-test/data/wasmudf_p0/test_wasmudf_int.out @@ -0,0 +1,37 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- +1 2 3 4 5 +2 4 6 8 10 +3 6 9 12 15 +4 8 12 16 20 +5 10 15 20 25 +6 12 18 24 30 +7 14 21 28 35 +8 16 24 32 40 +9 18 27 36 45 +10 20 30 40 50 + +-- !select -- +2 +4 +6 +8 +10 +12 +14 +16 +18 +20 + +-- !select -- +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N + diff --git a/regression-test/suites/wasmudf_p0/test_wasmudf_float.groovy b/regression-test/suites/wasmudf_p0/test_wasmudf_float.groovy new file mode 100644 index 00000000000..c1b7dbcb35c --- /dev/null +++ b/regression-test/suites/wasmudf_p0/test_wasmudf_float.groovy @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_wasmudf_float") { + def tableName = "test_wasmudf_float" + def watPath = """${context.file.parent}/wat/f32_add.wat""" + + log.info("Wat path: ${watPath}".toString()) + try { + try_sql("DROP FUNCTION IF EXISTS wasm_udf_float_test(FLOAT,FLOAT);") + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `user_id` INT NOT NULL COMMENT "", + `float_1` FLOAT NOT NULL COMMENT "", + `float_2` FLOAT COMMENT "", + `double_1` DOUBLE NOT NULL COMMENT "", + `double_2` DOUBLE COMMENT "" + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + + + sql """ INSERT INTO ${tableName} (`user_id`,`float_1`,`float_2`,double_1,double_2) VALUES + (111,11111.11111,222222.3333333,12345678.34455677,1111111.999999999999), + (112,1234556.11111,222222.3333333,222222222.3333333333333,4444444444444.555555555555), + (113,87654321.11111,null,6666666666.6666666666,null) + """ + qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; """ + + File path = new File(watPath) + if (!path.exists()) { + throw new IllegalStateException("""${watPath} doesn't exist! """) + } + + sql """ CREATE FUNCTION wasm_udf_float_test(FLOAT,FLOAT) RETURNS FLOAT PROPERTIES ( + "file"="file://${watPath}", + "symbol"="add", + "type"="WASM_UDF", + "always_nullable"="true" + ); """ + + qt_select """ SELECT wasm_udf_float_test(cast(2.83645 as float),cast(111.1111111 as float)) as result; """ + qt_select """ SELECT wasm_udf_float_test(2.83645,111.1111111) as result ; """ + qt_select """ SELECT wasm_udf_float_test(2.83645,null) as result ; """ + qt_select """ SELECT wasm_udf_float_test(cast(2.83645 as float),null) as result ; """ + qt_select """ SELECT user_id,wasm_udf_float_test(float_1, float_2) as sum FROM ${tableName} order by user_id; """ + + } finally { + try_sql("DROP FUNCTION IF EXISTS wasm_udf_float_test(FLOAT,FLOAT);") + try_sql("DROP TABLE IF EXISTS ${tableName}") + } +} diff --git a/regression-test/suites/wasmudf_p0/test_wasmudf_int.groovy b/regression-test/suites/wasmudf_p0/test_wasmudf_int.groovy new file mode 100644 index 00000000000..fe5de589a1d --- /dev/null +++ b/regression-test/suites/wasmudf_p0/test_wasmudf_int.groovy @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_wasmudf_int") { + def tableName = "test_wasmudf_int" + def watPath = """${context.file.parent}/wat/i32_add.wat""" + + log.info("Wat path: ${watPath}".toString()) + try { + try_sql("DROP FUNCTION IF EXISTS wasm_udf_int_add_test(int, int);") + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `user_id` INT NOT NULL COMMENT "", + `tinyint_col` TINYINT NOT NULL COMMENT "", + `smallint_col` SMALLINT NOT NULL COMMENT "", + `bigint_col` BIGINT NOT NULL COMMENT "", + `largeint_col` LARGEINT NOT NULL COMMENT "" + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + StringBuilder sb = new StringBuilder() + int i = 1 + for (; i < 10; i ++) { + sb.append(""" + (${i},${i}*2,${i}*3,${i}*4,${i}*5), + """) + } + sb.append(""" + (${i},${i}*2,${i}*3,${i}*4,${i}*5) + """) + sql """ INSERT INTO ${tableName} VALUES + ${sb.toString()} + """ + qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; """ + + File path = new File(watPath) + if (!path.exists()) { + throw new IllegalStateException("""${watPath} doesn't exist! """) + } + + sql """ CREATE FUNCTION wasm_udf_int_add_test(int, int) RETURNS int PROPERTIES ( + "file"="file://${watPath}", + "symbol"="add", + "type"="WASM_UDF", + "always_nullable"="true" + ); """ + + qt_select """ SELECT wasm_udf_int_add_test(user_id, user_id) result FROM ${tableName} ORDER BY result; """ + qt_select """ SELECT wasm_udf_int_add_test(user_id, null) result FROM ${tableName} ORDER BY result; """ + + + + } finally { + try_sql("DROP FUNCTION IF EXISTS wasm_udf_int_add_test(int, int);") + try_sql("DROP TABLE IF EXISTS ${tableName};") + } +} diff --git a/regression-test/suites/wasmudf_p0/wat/f32_add.wat b/regression-test/suites/wasmudf_p0/wat/f32_add.wat new file mode 100644 index 00000000000..9c30e6d8084 --- /dev/null +++ b/regression-test/suites/wasmudf_p0/wat/f32_add.wat @@ -0,0 +1,8 @@ +(module + (func $add (param f32) (param f32) (result f32) + (local.get 0) + (local.get 1) + (f32.add) + ) + (export "add" (func $add)) +) diff --git a/regression-test/suites/wasmudf_p0/wat/i32_add.wat b/regression-test/suites/wasmudf_p0/wat/i32_add.wat new file mode 100644 index 00000000000..0d1ed3f154a --- /dev/null +++ b/regression-test/suites/wasmudf_p0/wat/i32_add.wat @@ -0,0 +1,8 @@ +(module + (func $add (param i32) (param i32) (result i32) + (local.get 0) + (local.get 1) + (i32.add) + ) + (export "add" (func $add)) +) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org