This is an automated email from the ASF dual-hosted git repository.
zclll pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e95a17ef809 [Enhancement](udf) clear cache when droping function
(#60630)
e95a17ef809 is described below
commit e95a17ef8094cebd8085e39dbcc59dd29568651e
Author: linrrarity <[email protected]>
AuthorDate: Mon Mar 2 11:40:07 2026 +0800
[Enhancement](udf) clear cache when droping function (#60630)
1. drop python udf file cache when drop function
2. Fixed the issue where the same-named module from two different paths
would incorrectly use the cache.
```sql
-- same1(x) -> x + 1
-- same2(x) -> x + 2
Doris> CREATE FUNCTION same1(INT)
-> RETURNS INT
-> PROPERTIES (
-> "type" = "PYTHON_UDF",
-> "file" =
"file:///mnt/disk7/linzhenqi/d2/same_name_py_test/1/1.zip",
-> "symbol" = "same_name.same_name",
-> "runtime_version" = "3.9.18",
-> "always_nullable" = "false"
-> );
Query OK, 0 rows affected (0.04 sec)Doris> select same1(1);
+----------+
| same1(1) |
+----------+
| 2 |
+----------+
1 row in set (6.25 sec)
Doris> CREATE FUNCTION same2(INT)
-> RETURNS INT
-> PROPERTIES (
-> "type" = "PYTHON_UDF",
-> "file" =
"file:///mnt/disk7/linzhenqi/d2/same_name_py_test/2/2.zip",
-> "symbol" = "same_name.same_name",
-> "runtime_version" = "3.9.18",
-> "always_nullable" = "false"
-> );
Query OK, 0 rows affected (0.00 sec)
-- due to the same module name, it incorrectly hit the cache of same1.
Doris> select same2(1);
+----------+
| same2(1) |
+----------+
| 2 |
+----------+
1 row in set (0.09 sec)
```
---
be/src/agent/task_worker_pool.cpp | 14 +-
be/src/runtime/user_function_cache.cpp | 51 ++++++-
be/src/runtime/user_function_cache.h | 3 +
be/src/udf/python/python_server.cpp | 73 +++++++++
be/src/udf/python/python_server.h | 3 +
be/src/udf/python/python_server.py | 164 ++++++++++++++++++---
.../nereids/rules/analysis/ExpressionAnalyzer.java | 5 +-
.../trees/plans/commands/DropFunctionCommand.java | 35 ++++-
.../org/apache/doris/task/CleanUDFCacheTask.java | 5 +-
gensrc/thrift/AgentService.thrift | 1 +
.../data/pythonudaf_p0/test_pythonudaf_drop.out | 10 ++
.../data/pythonudf_p0/test_pythonudf_drop.out | 10 ++
.../data/pythonudtf_p0/test_pythonudtf_drop.out | 13 ++
.../pythonudaf_p0/test_pythonudaf_drop.groovy | 96 ++++++++++++
.../udaf_scripts/python_udaf_drop_a/drop_udaf.py | 48 ++++++
.../python_udaf_drop_a/python_udaf_drop_test.zip | Bin 0 -> 850 bytes
.../udaf_scripts/python_udaf_drop_b/drop_udaf.py | 47 ++++++
.../python_udaf_drop_b/python_udaf_drop_test.zip | Bin 0 -> 817 bytes
.../suites/pythonudf_p0/test_pythonudf_drop.groovy | 96 ++++++++++++
.../udf_scripts/python_udf_drop_a/drop_udf.py | 22 +++
.../python_udf_drop_a/python_udf_drop_test.zip | Bin 0 -> 2143 bytes
.../udf_scripts/python_udf_drop_b/drop_udf.py | 22 +++
.../python_udf_drop_b/python_udf_drop_test.zip | Bin 0 -> 2143 bytes
.../pythonudtf_p0/test_pythonudtf_drop.groovy | 130 ++++++++++++++++
.../udtf_scripts/python_udtf_drop_a/drop_udtf.py | 22 +++
.../python_udtf_drop_a/python_udtf_drop_test.zip | Bin 0 -> 662 bytes
.../udtf_scripts/python_udtf_drop_b/drop_udtf.py | 21 +++
.../python_udtf_drop_b/python_udtf_drop_test.zip | Bin 0 -> 664 bytes
28 files changed, 858 insertions(+), 33 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 4e0a9756c37..ed55f2769fe 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -88,6 +88,7 @@
#include "runtime/index_policy/index_policy_mgr.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/snapshot_loader.h"
+#include "runtime/user_function_cache.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
@@ -2396,12 +2397,17 @@ void clean_trash_callback(StorageEngine& engine, const
TAgentTaskRequest& req) {
}
void clean_udf_cache_callback(const TAgentTaskRequest& req) {
+ const auto& clean_req = req.clean_udf_cache_req;
+
if (doris::config::enable_java_support) {
- LOG(INFO) << "clean udf cache start: " <<
req.clean_udf_cache_req.function_signature;
- static_cast<void>(
-
Jni::Util::clean_udf_class_load_cache(req.clean_udf_cache_req.function_signature));
- LOG(INFO) << "clean udf cache finish: " <<
req.clean_udf_cache_req.function_signature;
+
static_cast<void>(Jni::Util::clean_udf_class_load_cache(clean_req.function_signature));
+ }
+
+ if (clean_req.__isset.function_id && clean_req.function_id > 0) {
+
UserFunctionCache::instance()->drop_function_cache(clean_req.function_id);
}
+
+ LOG(INFO) << "clean udf cache finish: function_signature=" <<
clean_req.function_signature;
}
void report_index_policy_callback(const ClusterInfo* cluster_info) {
diff --git a/be/src/runtime/user_function_cache.cpp
b/be/src/runtime/user_function_cache.cpp
index d54c2f473b4..0ba6786462d 100644
--- a/be/src/runtime/user_function_cache.cpp
+++ b/be/src/runtime/user_function_cache.cpp
@@ -42,6 +42,7 @@
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
#include "runtime/plugin/cloud_plugin_downloader.h"
+#include "udf/python/python_server.h"
#include "util/defer_op.h"
#include "util/dynamic_util.h"
#include "util/md5.h"
@@ -111,7 +112,16 @@ UserFunctionCacheEntry::~UserFunctionCacheEntry() {
// delete library file if should_delete_library is set
if (should_delete_library.load()) {
- unlink(lib_file.c_str());
+ WARN_IF_ERROR(
+
io::global_local_filesystem()->delete_directory_or_file(lib_file),
+ "failed to delete unzipped directory of python udf library,
lib_file=" + lib_file);
+
+ if (type == LibType::PY_ZIP) {
+ // For Python UDF, we need to delete both the unzipped directory
and the original zip file.
+ std::string zip_file = lib_file + ".zip";
+
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory_or_file(zip_file),
+ "failed to delete zip file of python udf library,
lib_file=" + zip_file);
+ }
}
}
@@ -174,10 +184,20 @@ Status UserFunctionCache::_load_entry_from_lib(const
std::string& dir, const std
<< ", other_checksum info: = " <<
it->second->debug_string();
return Status::InternalError("duplicate function id");
}
+
+ std::string full_path = dir + "/" + file;
// create a cache entry and put it into entry map
- std::shared_ptr<UserFunctionCacheEntry> entry =
UserFunctionCacheEntry::create_shared(
- function_id, checksum, dir + "/" + file, lib_type);
+ std::shared_ptr<UserFunctionCacheEntry> entry =
+ UserFunctionCacheEntry::create_shared(function_id, checksum,
full_path, lib_type);
entry->is_downloaded = true;
+
+ // For Python UDF, _check_cache_is_python_udf has already unzipped the
file.
+ // Set lib_file to the unzipped directory.
+ if (lib_type == LibType::PY_ZIP) {
+ entry->lib_file = full_path.substr(0, full_path.size() - 4);
+ entry->is_unziped = true;
+ }
+
_entry_map[function_id] = entry;
return Status::OK();
@@ -547,4 +567,29 @@ Status
UserFunctionCache::_check_and_return_default_java_udf_url(const std::stri
return Status::OK();
}
+void UserFunctionCache::drop_function_cache(int64_t fid) {
+ std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
+ {
+ std::lock_guard<std::mutex> l(_cache_lock);
+ auto it = _entry_map.find(fid);
+ if (it == _entry_map.end()) {
+ return;
+ }
+ entry = it->second;
+ _entry_map.erase(it);
+ }
+
+ // For Python UDF, clear module cache in Python server before deleting
files
+ if (entry->type == LibType::PY_ZIP && !entry->lib_file.empty()) {
+ auto status =
PythonServerManager::instance().clear_module_cache(entry->lib_file);
+ if (!status.ok()) [[unlikely]] {
+ LOG(WARNING) << "drop_function_cache: failed to clear Python
module cache for "
+ << entry->lib_file << ": " << status.to_string();
+ }
+ }
+
+ // Mark for deletion, destructor will delete the files
+ entry->should_delete_library.store(true);
+}
+
} // namespace doris
diff --git a/be/src/runtime/user_function_cache.h
b/be/src/runtime/user_function_cache.h
index bfd99574491..e606899f969 100644
--- a/be/src/runtime/user_function_cache.h
+++ b/be/src/runtime/user_function_cache.h
@@ -62,6 +62,9 @@ public:
Status get_pypath(int64_t fid, const std::string& url, const std::string&
checksum,
std::string* libpath);
+ // Drop the cached function library by function id.
+ void drop_function_cache(int64_t fid);
+
#ifndef BE_TEST
private:
#endif
diff --git a/be/src/udf/python/python_server.cpp
b/be/src/udf/python/python_server.cpp
index 3fb5a9e4542..646e1e79039 100644
--- a/be/src/udf/python/python_server.cpp
+++ b/be/src/udf/python/python_server.cpp
@@ -28,6 +28,7 @@
#include <boost/process.hpp>
#include <fstream>
+#include "arrow/flight/client.h"
#include "common/config.h"
#include "udf/python/python_udaf_client.h"
#include "udf/python/python_udf_client.h"
@@ -333,6 +334,78 @@ void PythonServerManager::_refresh_memory_stats() {
}
}
+Status PythonServerManager::clear_module_cache(const std::string& location) {
+ if (location.empty()) {
+ return Status::InvalidArgument("Empty location for
clear_module_cache");
+ }
+
+ std::lock_guard<std::mutex> lock(_pools_mutex);
+
+ std::string body = fmt::format(R"({{"location": "{}"}})", location);
+
+ int success_count = 0;
+ int fail_count = 0;
+ bool has_active_process = false;
+
+ for (auto& [version, pool] : _process_pools) {
+ for (auto& process : pool) {
+ if (!process || !process->is_alive()) {
+ continue;
+ }
+ has_active_process = true;
+ try {
+ auto loc_result =
arrow::flight::Location::Parse(process->get_uri());
+ if (!loc_result.ok()) [[unlikely]] {
+ fail_count++;
+ continue;
+ }
+
+ auto client_result =
arrow::flight::FlightClient::Connect(*loc_result);
+ if (!client_result.ok()) [[unlikely]] {
+ fail_count++;
+ continue;
+ }
+ auto client = std::move(*client_result);
+
+ arrow::flight::Action action;
+ action.type = "clear_module_cache";
+ action.body = arrow::Buffer::FromString(body);
+
+ auto result_stream = client->DoAction(action);
+ if (!result_stream.ok()) {
+ fail_count++;
+ continue;
+ }
+
+ auto result = (*result_stream)->Next();
+ if (result.ok() && *result) {
+ success_count++;
+ } else {
+ fail_count++;
+ }
+
+ } catch (...) {
+ fail_count++;
+ }
+ }
+ }
+
+ if (!has_active_process) {
+ return Status::OK();
+ }
+
+ LOG(INFO) << "clear_module_cache completed for location=" << location
+ << ", success=" << success_count << ", failed=" << fail_count;
+
+ if (fail_count > 0) {
+ return Status::InternalError(
+ "clear_module_cache failed for location={}, success={},
failed={}", location,
+ success_count, fail_count);
+ }
+
+ return Status::OK();
+}
+
// Explicit template instantiation for UDF, UDAF and UDTF clients
template Status PythonServerManager::get_client<PythonUDFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
diff --git a/be/src/udf/python/python_server.h
b/be/src/udf/python/python_server.h
index 3ac22f71c53..6427cb7e63c 100644
--- a/be/src/udf/python/python_server.h
+++ b/be/src/udf/python/python_server.h
@@ -48,6 +48,9 @@ public:
Status get_process(const PythonVersion& version, ProcessPtr* process);
+ // Clear Python module cache for a specific UDF location across all
processes
+ Status clear_module_cache(const std::string& location);
+
Status ensure_pool_initialized(const PythonVersion& version);
void shutdown();
diff --git a/be/src/udf/python/python_server.py
b/be/src/udf/python/python_server.py
index 754325ae136..f759f2054a3 100644
--- a/be/src/udf/python/python_server.py
+++ b/be/src/udf/python/python_server.py
@@ -814,28 +814,37 @@ class ModuleUDFLoader(UDFLoader):
# Class-level lock dictionary for thread-safe module imports
# Using RLock allows the same thread to acquire the lock multiple times
- _import_locks: Dict[str, threading.RLock] = {}
+ # Key: (location, module_name) tuple to avoid conflicts between different
locations
+ _import_locks: Dict[Tuple[str, str], threading.RLock] = {}
_import_locks_lock = threading.Lock()
+ _module_cache: Dict[Tuple[str, str], Any] = {}
+ _module_cache_lock = threading.Lock()
@classmethod
- def _get_import_lock(cls, module_name: str) -> threading.RLock:
+ def _get_import_lock(cls, location: str, module_name: str) ->
threading.RLock:
"""
- Get or create a reentrant lock for the given module name.
+ Get or create a reentrant lock for the given location and module name.
Uses double-checked locking pattern for optimal performance:
- Fast path: return existing lock without acquiring global lock
- Slow path: create new lock under global lock protection
+
+ Args:
+ location: The directory path where the module is located
+ module_name: The full module name to import
"""
+ cache_key = (location, module_name)
+
# Fast path: check without lock (read-only, safe for most cases)
- if module_name in cls._import_locks:
- return cls._import_locks[module_name]
+ if cache_key in cls._import_locks:
+ return cls._import_locks[cache_key]
# Slow path: create lock under protection
with cls._import_locks_lock:
# Double-check: another thread might have created it while we
waited
- if module_name not in cls._import_locks:
- cls._import_locks[module_name] = threading.RLock()
- return cls._import_locks[module_name]
+ if cache_key not in cls._import_locks:
+ cls._import_locks[cache_key] = threading.RLock()
+ return cls._import_locks[cache_key]
def load(self) -> AdaptivePythonUDF:
"""
@@ -903,34 +912,50 @@ class ModuleUDFLoader(UDFLoader):
return package_name, module_name, func_name
def _get_or_import_module(self, location: str, full_module_name: str) ->
Any:
- """Get module from cache or import it (thread-safe)."""
- # Use a per-module lock to prevent race conditions during import
- import_lock = ModuleUDFLoader._get_import_lock(full_module_name)
+ """
+ Get module from cache or import it (thread-safe).
+
+ Uses a location-aware cache to prevent conflicts when different
locations
+ have modules with the same name.
+ """
+ cache_key = (location, full_module_name)
+
+ # Use a per-(location, module) lock to prevent race conditions during
import
+ import_lock = ModuleUDFLoader._get_import_lock(location,
full_module_name)
with import_lock:
- # Double-check pattern: verify module is still not loaded after
acquiring lock
- if full_module_name in sys.modules:
- cached_module = sys.modules[full_module_name]
- # Verify the cached module is valid (has __file__ or __path__
attribute)
- # This prevents using broken/incomplete modules from failed
imports
+ # Fast path: check location-aware cache first
+ if cache_key in ModuleUDFLoader._module_cache:
+ cached_module = ModuleUDFLoader._module_cache[cache_key]
if cached_module is not None and (
hasattr(cached_module, "__file__")
or hasattr(cached_module, "__path__")
):
return cached_module
else:
+ del ModuleUDFLoader._module_cache[cache_key]
+
+ # Before importing, clear any existing module with the same name
in sys.modules
+ # that might have been loaded from a different location
+ if full_module_name in sys.modules:
+ existing_module = sys.modules[full_module_name]
+ existing_file = getattr(existing_module, "__file__", None)
+ # Check if the existing module is from a different location
+ if existing_file and not existing_file.startswith(location):
del sys.modules[full_module_name]
- # Import the module (only one thread will reach here per module)
with temporary_sys_path(location):
try:
module = importlib.import_module(full_module_name)
+ # Store in location-aware cache
+ ModuleUDFLoader._module_cache[cache_key] = module
return module
- except Exception as e:
- # Clean up any partially-imported modules from sys.modules
- # This prevents broken modules from being cached
+ except Exception:
+ # Clean up any partially-imported modules
if full_module_name in sys.modules:
del sys.modules[full_module_name]
+ if cache_key in ModuleUDFLoader._module_cache:
+ del ModuleUDFLoader._module_cache[cache_key]
raise
def _extract_function(
@@ -2442,6 +2467,105 @@ class FlightServer(flight.FlightServerBase):
else:
raise ValueError(f"Unsupported client type:
{python_udf_meta.client_type}")
+ def do_action(
+ self,
+ context: flight.ServerCallContext,
+ action: flight.Action,
+ ):
+ """
+ Handle Flight actions for cache management.
+
+ Supported actions:
+ - "clear_module_cache": Clear Python module cache for a specific
location
+ Body: JSON with "location" field (the UDF cache directory path)
+ """
+ action_type = action.type
+
+ if action_type == "clear_module_cache":
+ yield from
self._handle_clear_module_cache(action.body.to_pybytes())
+ else:
+ raise flight.FlightUnavailableError(f"Unknown action:
{action_type}")
+
+ def _handle_clear_module_cache(self, body: bytes):
+ """
+ Clear Python module cache for a specific UDF location.
+
+ This removes modules from sys.modules that were loaded from the
specified
+ location, allowing fresh imports when a new UDF with the same module
name
+ is created.
+ """
+ try:
+ params = json.loads(body.decode("utf-8"))
+ location = params.get("location", "")
+
+ if not location:
+ yield flight.Result(b'{"success": false, "error": "empty
location"}')
+ return
+
+ cleared_modules = self._clear_modules_from_location(location)
+
+ result = {
+ "success": True,
+ "cleared_modules": cleared_modules,
+ "location": location,
+ }
+ yield flight.Result(json.dumps(result).encode("utf-8"))
+
+ except Exception as e:
+ logging.error("clear_module_cache failed: %s", e)
+ yield flight.Result(json.dumps({
+ "success": False,
+ "error": str(e)
+ }).encode("utf-8"))
+
+ def _clear_modules_from_location(self, location: str) -> list:
+ """
+ Clear module cache for the given location.
+
+ Acquires per-module import locks to ensure no concurrent import is
+ in progress for the modules being cleared, preventing race conditions
+ where sys.modules entries are removed mid-import.
+
+ Returns list of cleared module names.
+ """
+ cleared = []
+
+ with ModuleUDFLoader._module_cache_lock:
+ keys_to_remove = [
+ key for key in ModuleUDFLoader._module_cache
+ if key[0] == location
+ ]
+
+ # For each module, acquire its import lock before clearing.
+ # This ensures no concurrent _get_or_import_module is in progress
+ # for this (location, module_name) pair.
+ for key in keys_to_remove:
+ loc, module_name = key
+ import_lock = ModuleUDFLoader._get_import_lock(loc, module_name)
+
+ with import_lock:
+ with ModuleUDFLoader._module_cache_lock:
+ if key in ModuleUDFLoader._module_cache:
+ del ModuleUDFLoader._module_cache[key]
+
+ modules_to_remove = [
+ name for name, mod in sys.modules.items()
+ if name == module_name or name.startswith(module_name +
".")
+ or (
+ hasattr(mod, "__file__") and mod.__file__ is not None
+ and mod.__file__.startswith(location)
+ )
+ ]
+ for mod_name in modules_to_remove:
+ del sys.modules[mod_name]
+ if mod_name not in cleared:
+ cleared.append(mod_name)
+
+ if module_name not in cleared:
+ cleared.append(module_name)
+
+ return cleared
+
class UDAFOperationType(Enum):
"""Enum representing UDAF operation types."""
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java
index 3c777e5e381..3d1faf318e6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java
@@ -71,6 +71,7 @@ import
org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.functions.BoundFunction;
import org.apache.doris.nereids.trees.expressions.functions.FunctionBuilder;
import org.apache.doris.nereids.trees.expressions.functions.RewriteWhenAnalyze;
+import org.apache.doris.nereids.trees.expressions.functions.Udf;
import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import
org.apache.doris.nereids.trees.expressions.functions.agg.NullableAggregateFunction;
import
org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct;
@@ -78,8 +79,6 @@ import
org.apache.doris.nereids.trees.expressions.functions.scalar.ElementAt;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Lambda;
import
org.apache.doris.nereids.trees.expressions.functions.scalar.StructElement;
import
org.apache.doris.nereids.trees.expressions.functions.udf.AliasUdfBuilder;
-import org.apache.doris.nereids.trees.expressions.functions.udf.JavaUdaf;
-import org.apache.doris.nereids.trees.expressions.functions.udf.JavaUdf;
import org.apache.doris.nereids.trees.expressions.functions.udf.UdfBuilder;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLikeLiteral;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
@@ -588,7 +587,7 @@ public class ExpressionAnalyzer extends
SubExprAnalyzer<ExpressionRewriteContext
if (wantToParseSqlFromSqlCache) {
sqlCacheContext =
context.cascadesContext.getStatementContext().getSqlCacheContext();
if (builder instanceof AliasUdfBuilder
- || buildResult.second instanceof JavaUdf ||
buildResult.second instanceof JavaUdaf) {
+ || buildResult.second instanceof Udf) {
if (sqlCacheContext.isPresent()) {
sqlCacheContext.get().setCannotProcessExpression(true);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropFunctionCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropFunctionCommand.java
index 28823b5d19f..6199b406746 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropFunctionCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropFunctionCommand.java
@@ -22,7 +22,9 @@ import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.FunctionSearchDesc;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -71,6 +73,34 @@ public class DropFunctionCommand extends Command implements
ForwardWithSync {
}
argsDef.analyze();
FunctionSearchDesc function = new FunctionSearchDesc(functionName,
argsDef.getArgTypes(), argsDef.isVariadic());
+
+ // Get function id before dropping, for cleaning cached library files
in BE
+ long functionId = -1;
+ try {
+ Function fn = null;
+ if (SetType.GLOBAL.equals(setType)) {
+ fn =
Env.getCurrentEnv().getGlobalFunctionMgr().getFunction(function);
+ } else {
+ String dbName = functionName.getDb();
+ if (dbName == null) {
+ dbName = ctx.getDatabase();
+ functionName.setDb(dbName);
+ }
+ Database db =
Env.getCurrentInternalCatalog().getDbNullable(dbName);
+ if (db != null) {
+ fn = db.getFunction(function);
+ }
+ }
+ if (fn != null) {
+ functionId = fn.getId();
+ } else {
+ LOG.warn("Function not found: {}, setType: {}",
function.getName(), setType);
+ }
+ } catch (AnalysisException e) {
+ LOG.warn("Function not found when getting function id: {}, error:
{}",
+ function.getName(), e.getMessage());
+ }
+
if (SetType.GLOBAL.equals(setType)) {
Env.getCurrentEnv().getGlobalFunctionMgr().dropFunction(function,
ifExists);
} else {
@@ -90,9 +120,10 @@ public class DropFunctionCommand extends Command implements
ForwardWithSync {
String functionSignature = getSignatureString();
AgentBatchTask batchTask = new AgentBatchTask();
for (Backend backend : backendsInfo.values()) {
- CleanUDFCacheTask cleanUDFCacheTask = new
CleanUDFCacheTask(backend.getId(), functionSignature);
+ CleanUDFCacheTask cleanUDFCacheTask = new
CleanUDFCacheTask(backend.getId(), functionSignature, functionId);
batchTask.addTask(cleanUDFCacheTask);
- LOG.info("clean udf cache in be {}, beId {}", backend.getHost(),
backend.getId());
+ LOG.info("clean udf cache in be {}, beId {}, functionId {}",
+ backend.getHost(), backend.getId(), functionId);
}
AgentTaskExecutor.submit(batchTask);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/CleanUDFCacheTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/CleanUDFCacheTask.java
index 63ed5473bf1..85871ca202b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CleanUDFCacheTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CleanUDFCacheTask.java
@@ -27,15 +27,18 @@ import org.apache.logging.log4j.Logger;
public class CleanUDFCacheTask extends AgentTask {
private static final Logger LOG =
LogManager.getLogger(CleanUDFCacheTask.class);
private String functionSignature;
+ private long functionId;
- public CleanUDFCacheTask(long backendId, String signature) {
+ public CleanUDFCacheTask(long backendId, String signature, long
functionId) {
super(null, backendId, TTaskType.CLEAN_UDF_CACHE, -1, -1, -1, -1, -1,
-1, -1);
this.functionSignature = signature;
+ this.functionId = functionId;
}
public TCleanUDFCacheReq toThrift() {
TCleanUDFCacheReq req = new TCleanUDFCacheReq();
req.setFunctionSignature(this.functionSignature);
+ req.setFunctionId(this.functionId);
return req;
}
}
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index 1ac8c2cfa88..e8044a06c9a 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -162,6 +162,7 @@ struct TCleanTrashReq {}
struct TCleanUDFCacheReq {
1: optional string function_signature //function_name(arg_type)
+ 2: optional i64 function_id // function id for cleaning cached library
files
}
enum TCompressionType {
diff --git a/regression-test/data/pythonudaf_p0/test_pythonudaf_drop.out
b/regression-test/data/pythonudaf_p0/test_pythonudaf_drop.out
new file mode 100644
index 00000000000..79e35e30ee5
--- /dev/null
+++ b/regression-test/data/pythonudaf_p0/test_pythonudaf_drop.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !py_udaf_drop_1 --
+6
+
+-- !py_udaf_drop_2 --
+6 7
+
+-- !py_udaf_drop_3 --
+6
+
diff --git a/regression-test/data/pythonudf_p0/test_pythonudf_drop.out
b/regression-test/data/pythonudf_p0/test_pythonudf_drop.out
new file mode 100644
index 00000000000..254ebe44809
--- /dev/null
+++ b/regression-test/data/pythonudf_p0/test_pythonudf_drop.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !py_udf_drop_1 --
+11
+
+-- !py_udf_drop_2 --
+6 7
+
+-- !py_udf_drop_3 --
+8
+
diff --git a/regression-test/data/pythonudtf_p0/test_pythonudtf_drop.out
b/regression-test/data/pythonudtf_p0/test_pythonudtf_drop.out
new file mode 100644
index 00000000000..6f1159a95d1
--- /dev/null
+++ b/regression-test/data/pythonudtf_p0/test_pythonudtf_drop.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !py_udtf_drop_1 --
+1
+2
+
+-- !py_udtf_drop_2 --
+1 2
+2 3
+
+-- !py_udtf_drop_3 --
+1
+2
+
diff --git a/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
b/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
new file mode 100644
index 00000000000..993ca7adfa9
--- /dev/null
+++ b/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('test_pythonudaf_drop') {
+ def runtime_version = '3.8.10'
+ def zipA =
"""${context.file.parent}/udaf_scripts/python_udaf_drop_a/python_udaf_drop_test.zip"""
+ def zipB =
"""${context.file.parent}/udaf_scripts/python_udaf_drop_b/python_udaf_drop_test.zip"""
+
+ scp_udf_file_to_all_be(zipA)
+ scp_udf_file_to_all_be(zipB)
+
+ sql '''DROP TABLE IF EXISTS py_udaf_drop_tbl'''
+ sql '''
+ CREATE TABLE py_udaf_drop_tbl (
+ v INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(v)
+ DISTRIBUTED BY HASH(v) BUCKETS 1
+ PROPERTIES("replication_num" = "1");
+ '''
+ sql '''INSERT INTO py_udaf_drop_tbl VALUES (1), (2), (3);'''
+
+ try {
+ // Case 1: simple drop should make subsequent call fail
+ sql '''DROP FUNCTION IF EXISTS py_drop_sum_once(INT)'''
+ sql """
+ CREATE AGGREGATE FUNCTION py_drop_sum_once(INT) RETURNS BIGINT
PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${zipA}",
+ "symbol" = "drop_udaf.SumAgg",
+ "runtime_version" = "${runtime_version}"
+ )
+ """
+
+ qt_py_udaf_drop_1 '''SELECT py_drop_sum_once(v) FROM
py_udaf_drop_tbl;'''
+ try_sql('DROP FUNCTION IF EXISTS py_drop_sum_once(INT);')
+ test {
+ sql '''SELECT py_drop_sum_once(v) FROM py_udaf_drop_tbl;'''
+ exception 'Can not found function'
+ }
+
+ // Case 2: same module name, different file paths
+ sql '''DROP FUNCTION IF EXISTS py_drop_sum_a(INT)'''
+ sql '''DROP FUNCTION IF EXISTS py_drop_sum_b(INT)'''
+ sql """
+ CREATE AGGREGATE FUNCTION py_drop_sum_a(INT) RETURNS BIGINT
PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${zipA}",
+ "symbol" = "drop_udaf.SumAgg",
+ "runtime_version" = "${runtime_version}"
+ )
+ """
+ sql """
+ CREATE AGGREGATE FUNCTION py_drop_sum_b(INT) RETURNS BIGINT
PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${zipB}",
+ "symbol" = "drop_udaf.SumAgg",
+ "runtime_version" = "${runtime_version}"
+ )
+ """
+
+ qt_py_udaf_drop_2 '''SELECT py_drop_sum_a(v), py_drop_sum_b(v) FROM
py_udaf_drop_tbl;'''
+
+ try_sql('DROP FUNCTION IF EXISTS py_drop_sum_b(INT);')
+ test {
+ sql '''SELECT py_drop_sum_b(v) FROM py_udaf_drop_tbl;'''
+ exception 'Can not found function'
+ }
+
+ qt_py_udaf_drop_3 '''SELECT py_drop_sum_a(v) FROM py_udaf_drop_tbl;'''
+
+ try_sql('DROP FUNCTION IF EXISTS py_drop_sum_a(INT);')
+ test {
+ sql '''SELECT py_drop_sum_a(v) FROM py_udaf_drop_tbl;'''
+ exception 'Can not found function'
+ }
+ } finally {
+ try_sql('DROP FUNCTION IF EXISTS py_drop_sum_once(INT);')
+ try_sql('DROP FUNCTION IF EXISTS py_drop_sum_a(INT);')
+ try_sql('DROP FUNCTION IF EXISTS py_drop_sum_b(INT);')
+ }
+}
diff --git
a/regression-test/suites/pythonudaf_p0/udaf_scripts/python_udaf_drop_a/drop_udaf.py
b/regression-test/suites/pythonudaf_p0/udaf_scripts/python_udaf_drop_a/drop_udaf.py
new file mode 100644
index 00000000000..046fc3a0be3
--- /dev/null
+++
b/regression-test/suites/pythonudaf_p0/udaf_scripts/python_udaf_drop_a/drop_udaf.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pickle
+
+
+class SumAgg:
+ def __init__(self):
+ self.sum = 0
+
+ def init(self):
+ self.sum = 0
+
+ @property
+ def aggregate_state(self):
+ return self.sum
+
+ # Doris python UDAF expects accumulate/merge/serialize/deserialize/finish
+ def accumulate(self, val):
+ if val is not None:
+ self.sum += val
+
+ def merge(self, other_state):
+ if other_state is not None:
+ self.sum += other_state
+
+ def serialize(self):
+ return pickle.dumps(self.sum)
+
+ def deserialize(self, data):
+ self.sum = pickle.loads(data)
+
+ def finish(self):
+ return self.sum
diff --git
a/regression-test/suites/pythonudaf_p0/udaf_scripts/python_udaf_drop_a/python_udaf_drop_test.zip
b/regression-test/suites/pythonudaf_p0/udaf_scripts/python_udaf_drop_a/python_udaf_drop_test.zip
new file mode 100644
index 00000000000..b1cce418fd8
Binary files /dev/null and
b/regression-test/suites/pythonudaf_p0/udaf_scripts/python_udaf_drop_a/python_udaf_drop_test.zip
differ
diff --git
a/regression-test/suites/pythonudaf_p0/udaf_scripts/python_udaf_drop_b/drop_udaf.py
b/regression-test/suites/pythonudaf_p0/udaf_scripts/python_udaf_drop_b/drop_udaf.py
new file mode 100644
index 00000000000..6e1c2a3c6df
--- /dev/null
+++
b/regression-test/suites/pythonudaf_p0/udaf_scripts/python_udaf_drop_b/drop_udaf.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pickle
+
+
+class SumAgg:
+ def __init__(self):
+ self.sum = 0
+
+ def init(self):
+ self.sum = 0
+
+ @property
+ def aggregate_state(self):
+ return self.sum
+
+ def accumulate(self, val):
+ if val is not None:
+ self.sum += val
+
+ def merge(self, other_state):
+ if other_state is not None:
+ self.sum += other_state
+
+ def serialize(self):
+ return pickle.dumps(self.sum)
+
+ def deserialize(self, data):
+ self.sum = pickle.loads(data)
+
+ def finish(self):
+ return self.sum + 1
diff --git
a/regression-test/suites/pythonudaf_p0/udaf_scripts/python_udaf_drop_b/python_udaf_drop_test.zip
b/regression-test/suites/pythonudaf_p0/udaf_scripts/python_udaf_drop_b/python_udaf_drop_test.zip
new file mode 100644
index 00000000000..6de9b733aa9
Binary files /dev/null and
b/regression-test/suites/pythonudaf_p0/udaf_scripts/python_udaf_drop_b/python_udaf_drop_test.zip
differ
diff --git a/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
b/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
new file mode 100644
index 00000000000..b34ecafb55b
--- /dev/null
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_pythonudf_drop") {
+ def runtime_version = "3.8.10"
+ def zipA =
"""${context.file.parent}/udf_scripts/python_udf_drop_a/python_udf_drop_test.zip"""
+ def zipB =
"""${context.file.parent}/udf_scripts/python_udf_drop_b/python_udf_drop_test.zip"""
+
+ scp_udf_file_to_all_be(zipA)
+ scp_udf_file_to_all_be(zipB)
+
+ sql """DROP TABLE IF EXISTS py_udf_drop_tbl"""
+ sql """
+ CREATE TABLE py_udf_drop_tbl (
+ id INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES("replication_num" = "1");
+ """
+ sql """INSERT INTO py_udf_drop_tbl VALUES (1), (2), (3);"""
+
+ try {
+ // Case 1: simple drop should make subsequent call fail
+ sql """DROP FUNCTION IF EXISTS py_drop_once(INT)"""
+ sql """
+ CREATE FUNCTION py_drop_once(INT) RETURNS INT PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${zipA}",
+ "symbol" = "drop_udf.evaluate",
+ "runtime_version" = "${runtime_version}"
+ )
+ """
+
+ qt_py_udf_drop_1 """SELECT py_drop_once(10);"""
+ try_sql("DROP FUNCTION IF EXISTS py_drop_once(INT);")
+ test {
+ sql """SELECT py_drop_once(10);"""
+ exception "Can not found function"
+ }
+
+ // Case 2: same module name, different file paths
+ sql """DROP FUNCTION IF EXISTS py_drop_a(INT)"""
+ sql """DROP FUNCTION IF EXISTS py_drop_b(INT)"""
+ sql """
+ CREATE FUNCTION py_drop_a(INT) RETURNS INT PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${zipA}",
+ "symbol" = "drop_udf.evaluate",
+ "runtime_version" = "${runtime_version}"
+ )
+ """
+ sql """
+ CREATE FUNCTION py_drop_b(INT) RETURNS INT PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${zipB}",
+ "symbol" = "drop_udf.evaluate",
+ "runtime_version" = "${runtime_version}"
+ )
+ """
+
+ qt_py_udf_drop_2 """SELECT py_drop_a(5), py_drop_b(5);"""
+
+ try_sql("DROP FUNCTION IF EXISTS py_drop_b(INT);")
+ test {
+ sql """SELECT py_drop_b(5);"""
+ exception "Can not found function"
+ }
+
+ qt_py_udf_drop_3 """SELECT py_drop_a(7);"""
+
+ try_sql("DROP FUNCTION IF EXISTS py_drop_a(INT);")
+ test {
+ sql """SELECT py_drop_a(1);"""
+ exception "Can not found function"
+ }
+ } finally {
+ try_sql("DROP FUNCTION IF EXISTS py_drop_once(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_drop_a(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_drop_b(INT);")
+ }
+}
diff --git
a/regression-test/suites/pythonudf_p0/udf_scripts/python_udf_drop_a/drop_udf.py
b/regression-test/suites/pythonudf_p0/udf_scripts/python_udf_drop_a/drop_udf.py
new file mode 100644
index 00000000000..329d3b5e12a
--- /dev/null
+++
b/regression-test/suites/pythonudf_p0/udf_scripts/python_udf_drop_a/drop_udf.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+def evaluate(x):
+ if x is None:
+ return None
+ return x + 1
diff --git
a/regression-test/suites/pythonudf_p0/udf_scripts/python_udf_drop_a/python_udf_drop_test.zip
b/regression-test/suites/pythonudf_p0/udf_scripts/python_udf_drop_a/python_udf_drop_test.zip
new file mode 100644
index 00000000000..00724b8b40e
Binary files /dev/null and
b/regression-test/suites/pythonudf_p0/udf_scripts/python_udf_drop_a/python_udf_drop_test.zip
differ
diff --git
a/regression-test/suites/pythonudf_p0/udf_scripts/python_udf_drop_b/drop_udf.py
b/regression-test/suites/pythonudf_p0/udf_scripts/python_udf_drop_b/drop_udf.py
new file mode 100644
index 00000000000..a0eeb7a8445
--- /dev/null
+++
b/regression-test/suites/pythonudf_p0/udf_scripts/python_udf_drop_b/drop_udf.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+def evaluate(x):
+ if x is None:
+ return None
+ return x + 2
diff --git
a/regression-test/suites/pythonudf_p0/udf_scripts/python_udf_drop_b/python_udf_drop_test.zip
b/regression-test/suites/pythonudf_p0/udf_scripts/python_udf_drop_b/python_udf_drop_test.zip
new file mode 100644
index 00000000000..43ee8734d46
Binary files /dev/null and
b/regression-test/suites/pythonudf_p0/udf_scripts/python_udf_drop_b/python_udf_drop_test.zip
differ
diff --git a/regression-test/suites/pythonudtf_p0/test_pythonudtf_drop.groovy
b/regression-test/suites/pythonudtf_p0/test_pythonudtf_drop.groovy
new file mode 100644
index 00000000000..797b0074fcb
--- /dev/null
+++ b/regression-test/suites/pythonudtf_p0/test_pythonudtf_drop.groovy
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_pythonudtf_drop") {
+ def runtime_version = "3.8.10"
+ def zipA =
"""${context.file.parent}/udtf_scripts/python_udtf_drop_a/python_udtf_drop_test.zip"""
+ def zipB =
"""${context.file.parent}/udtf_scripts/python_udtf_drop_b/python_udtf_drop_test.zip"""
+
+ scp_udf_file_to_all_be(zipA)
+ scp_udf_file_to_all_be(zipB)
+
+ sql """DROP TABLE IF EXISTS py_udtf_drop_tbl"""
+ sql """
+ CREATE TABLE py_udtf_drop_tbl (
+ v INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(v)
+ DISTRIBUTED BY HASH(v) BUCKETS 1
+ PROPERTIES("replication_num" = "1");
+ """
+ sql """INSERT INTO py_udtf_drop_tbl VALUES (1), (2);"""
+
+ try {
+ // Case 1: simple drop should make subsequent call fail
+ sql """DROP FUNCTION IF EXISTS py_drop_t_once(INT)"""
+ sql """
+ CREATE TABLES FUNCTION py_drop_t_once(INT)
+ RETURNS ARRAY<INT>
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${zipA}",
+ "symbol" = "drop_udtf.process",
+ "runtime_version" = "${runtime_version}"
+ )
+ """
+
+ qt_py_udtf_drop_1 """
+ SELECT c
+ FROM py_udtf_drop_tbl
+ LATERAL VIEW py_drop_t_once(v) tmp AS c
+ ORDER BY c;
+ """
+ try_sql("DROP FUNCTION IF EXISTS py_drop_t_once(INT);")
+ test {
+ sql """
+ SELECT c
+ FROM py_udtf_drop_tbl
+ LATERAL VIEW py_drop_t_once(v) tmp AS c;
+ """
+ exception "Can not found function"
+ }
+
+ // Case 2: same module name, different file paths
+ sql """DROP FUNCTION IF EXISTS py_drop_t_a(INT)"""
+ sql """DROP FUNCTION IF EXISTS py_drop_t_b(INT)"""
+ sql """
+ CREATE TABLES FUNCTION py_drop_t_a(INT)
+ RETURNS ARRAY<INT>
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${zipA}",
+ "symbol" = "drop_udtf.process",
+ "runtime_version" = "${runtime_version}"
+ )
+ """
+ sql """
+ CREATE TABLES FUNCTION py_drop_t_b(INT)
+ RETURNS ARRAY<INT>
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${zipB}",
+ "symbol" = "drop_udtf.process",
+ "runtime_version" = "${runtime_version}"
+ )
+ """
+
+ qt_py_udtf_drop_2 """
+ SELECT a_tmp.c, b_tmp.c
+ FROM py_udtf_drop_tbl
+ LATERAL VIEW py_drop_t_a(v) a_tmp AS c
+ LATERAL VIEW py_drop_t_b(v) b_tmp AS c
+ ORDER BY a_tmp.c, b_tmp.c;
+ """
+
+ try_sql("DROP FUNCTION IF EXISTS py_drop_t_b(INT);")
+ test {
+ sql """
+ SELECT c
+ FROM py_udtf_drop_tbl
+ LATERAL VIEW py_drop_t_b(v) tmp AS c;
+ """
+ exception "Can not found function"
+ }
+
+ qt_py_udtf_drop_3 """
+ SELECT c
+ FROM py_udtf_drop_tbl
+ LATERAL VIEW py_drop_t_a(v) tmp AS c
+ ORDER BY c;
+ """
+
+ try_sql("DROP FUNCTION IF EXISTS py_drop_t_a(INT);")
+ test {
+ sql """
+ SELECT c
+ FROM py_udtf_drop_tbl
+ LATERAL VIEW py_drop_t_a(v) tmp AS c;
+ """
+ exception "Can not found function"
+ }
+ } finally {
+ try_sql("DROP FUNCTION IF EXISTS py_drop_t_once(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_drop_t_a(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_drop_t_b(INT);")
+ }
+}
diff --git
a/regression-test/suites/pythonudtf_p0/udtf_scripts/python_udtf_drop_a/drop_udtf.py
b/regression-test/suites/pythonudtf_p0/udtf_scripts/python_udtf_drop_a/drop_udtf.py
new file mode 100644
index 00000000000..0fa9f2686e2
--- /dev/null
+++
b/regression-test/suites/pythonudtf_p0/udtf_scripts/python_udtf_drop_a/drop_udtf.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+def process(x):
+ if x is not None:
+ yield(x,)
+
diff --git
a/regression-test/suites/pythonudtf_p0/udtf_scripts/python_udtf_drop_a/python_udtf_drop_test.zip
b/regression-test/suites/pythonudtf_p0/udtf_scripts/python_udtf_drop_a/python_udtf_drop_test.zip
new file mode 100644
index 00000000000..d012ab78017
Binary files /dev/null and
b/regression-test/suites/pythonudtf_p0/udtf_scripts/python_udtf_drop_a/python_udtf_drop_test.zip
differ
diff --git
a/regression-test/suites/pythonudtf_p0/udtf_scripts/python_udtf_drop_b/drop_udtf.py
b/regression-test/suites/pythonudtf_p0/udtf_scripts/python_udtf_drop_b/drop_udtf.py
new file mode 100644
index 00000000000..29cb7a24801
--- /dev/null
+++
b/regression-test/suites/pythonudtf_p0/udtf_scripts/python_udtf_drop_b/drop_udtf.py
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+def process(x):
+ if x is not None:
+ yield(x+1,)
diff --git
a/regression-test/suites/pythonudtf_p0/udtf_scripts/python_udtf_drop_b/python_udtf_drop_test.zip
b/regression-test/suites/pythonudtf_p0/udtf_scripts/python_udtf_drop_b/python_udtf_drop_test.zip
new file mode 100644
index 00000000000..47846f97e0c
Binary files /dev/null and
b/regression-test/suites/pythonudtf_p0/udtf_scripts/python_udtf_drop_b/python_udtf_drop_test.zip
differ
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]