HappenLee commented on code in PR #58993:
URL: https://github.com/apache/doris/pull/58993#discussion_r2656313374
##########
be/src/udf/python/python_server.cpp:
##########
@@ -33,51 +34,130 @@
namespace doris {
-Status PythonServerManager::init(const std::vector<PythonVersion>& versions) {
+template <typename T>
+Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const
PythonVersion& version,
+ std::shared_ptr<T>* client,
+ const std::shared_ptr<arrow::Schema>&
data_schema) {
+ // Ensure process pool is initialized for this version
+ RETURN_IF_ERROR(ensure_pool_initialized(version));
+
+ ProcessPtr process;
+ RETURN_IF_ERROR(get_process(version, &process));
+
+ if constexpr (std::is_same_v<T, PythonUDAFClient>) {
+ RETURN_IF_ERROR(T::create(func_meta, std::move(process), data_schema,
client));
+ } else {
+ RETURN_IF_ERROR(T::create(func_meta, std::move(process), client));
+ }
+
+ return Status::OK();
+}
+
+Status PythonServerManager::ensure_pool_initialized(const PythonVersion&
version) {
std::lock_guard<std::mutex> lock(_pools_mutex);
- for (const auto& version : versions) {
- if (_pools.find(version) != _pools.end()) continue;
- PythonUDFProcessPoolPtr new_pool =
std::make_unique<PythonUDFProcessPool>(
- version, config::max_python_process_nums,
config::min_python_process_nums);
- RETURN_IF_ERROR(new_pool->init());
- _pools[version] = std::move(new_pool);
+
+ // Check if already initialized
+ if (_initialized_versions.count(version)) return Status::OK();
+
+ std::vector<ProcessPtr>& pool = _process_pools[version];
+ int max_pool_size = config::max_python_process_num;
+
+ LOG(INFO) << "Initializing Python process pool for version " <<
version.to_string() << " with "
+ << max_pool_size << " processes";
+
+ std::vector<std::future<Status>> futures;
+ std::vector<ProcessPtr> temp_processes(max_pool_size);
+
+ for (int i = 0; i < max_pool_size; i++) {
+ futures.push_back(std::async(std::launch::async, [this, &version, i,
&temp_processes]() {
+ ProcessPtr process;
+ Status s = fork(version, &process);
+ if (s.ok()) {
+ temp_processes[i] = std::move(process);
+ }
+ return s;
+ }));
}
+
+ int success_count = 0;
+ int failure_count = 0;
+ for (int i = 0; i < max_pool_size; i++) {
+ Status s = futures[i].get();
+ if (s.ok() && temp_processes[i]) {
+ pool.push_back(std::move(temp_processes[i]));
+ success_count++;
+ } else {
+ failure_count++;
+ LOG(WARNING) << "Failed to create Python process " << (i + 1) <<
"/" << max_pool_size
+ << ": " << s.to_string();
+ }
+ }
+
+ if (pool.empty()) {
+ return Status::InternalError(
+ "Failed to initialize Python process pool: all {} process
creation attempts failed",
+ max_pool_size);
+ }
+
+ LOG(INFO) << "Python process pool initialized for version " <<
version.to_string()
+ << ": created " << success_count << " processes"
+ << (failure_count > 0 ? fmt::format(" ({} failed)",
failure_count) : "");
+
+ _initialized_versions.insert(version);
return Status::OK();
}
-template <typename T>
-Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const
PythonVersion& version,
- std::shared_ptr<T>* client) {
- PythonUDFProcessPoolPtr* pool = nullptr;
- {
- std::lock_guard<std::mutex> lock(_pools_mutex);
- if (_pools.find(version) == _pools.end()) {
- PythonUDFProcessPoolPtr new_pool =
std::make_unique<PythonUDFProcessPool>(
- version, config::max_python_process_nums,
config::min_python_process_nums);
- RETURN_IF_ERROR(new_pool->init());
- _pools[version] = std::move(new_pool);
+Status PythonServerManager::get_process(const PythonVersion& version,
ProcessPtr* process) {
+ std::lock_guard<std::mutex> lock(_pools_mutex);
+ std::vector<ProcessPtr>& pool = _process_pools[version];
+
+ if (pool.empty()) {
Review Comment:
unlikely
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]