morningman commented on code in PR #54304: URL: https://github.com/apache/doris/pull/54304#discussion_r2271423314
########## be/src/runtime/plugin/cloud_plugin_downloader.h: ########## @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <string> + +#include "common/status.h" +#include "runtime/plugin/s3_plugin_downloader.h" + +namespace doris { + +/** + * CloudPluginDownloader is the unified entry point for plugin downloads in cloud mode. + * + * Architecture: + * 1. CloudPluginConfigProvider - Get S3 config from cloud mode + * 2. S3PluginDownloader - Execute S3 downloads + * 3. CloudPluginDownloader - Simple unified API + */ +class CloudPluginDownloader { +public: + // Plugin type enumeration + enum class PluginType { + JDBC_DRIVERS, // JDBC driver jar files + JAVA_UDF, // Java UDF jar files + CONNECTORS, // Trino connector tar.gz packages + HADOOP_CONF // Hadoop configuration files + }; + + // Download plugin from cloud storage (supports MD5 verification) + static Status download_from_cloud(PluginType plugin_type, const std::string& plugin_name, + const std::string& local_target_path, std::string* local_path, + const std::string& expected_md5 = ""); + + // Download with manually configured S3 parameters + static Status download_from_s3(const S3PluginDownloader::S3Config& s3_config, Review Comment: This method is not used? ########## fe/fe-core/src/main/java/org/apache/doris/common/plugin/PluginFileCache.java: ########## @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.plugin; + +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; +import java.util.concurrent.ConcurrentHashMap; + +/** + * PluginFileCache - Simple static memory cache for plugin file validation. + * <p> + * Caching logic: + * 1. Users do not provide MD5 -> Use it directly if there are files locally (most efficient) + * 2. User provides MD5 -> Quick check using cached MD5 (avoid recalculation) + */ +public class PluginFileCache { + private static final Logger LOG = LogManager.getLogger(PluginFileCache.class); + + // Cache: localPath -> FileInfo + private static final ConcurrentHashMap<String, FileInfo> CACHE = new ConcurrentHashMap<>(); Review Comment: i think you should use CaffineCache to expire the unused entry automatically. There is no way to clear the cache, currently. Same issue for BE's cache.BE has lru cache that can do this ########## fe/fe-core/src/main/java/org/apache/doris/common/plugin/S3PluginDownloader.java: ########## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.plugin; + +import org.apache.doris.backup.Status; +import org.apache.doris.datasource.property.storage.S3Properties; +import org.apache.doris.fs.obj.S3ObjStorage; + +import com.google.common.base.Strings; +import org.apache.commons.codec.binary.Hex; + +import java.io.File; +import java.io.FileInputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.MessageDigest; +import java.util.HashMap; +import java.util.Map; + +/** + * S3PluginDownloader is an independent, generic S3 downloader. + * <p> + * Design principles: + * 1. Single responsibility: Only downloads files from S3, no business logic + * 2. Complete decoupling: No dependency on cloud mode or Doris-specific configuration + * 3. Reusable: Supports both cloud mode auto-configuration and manual S3 parameters + * 4. Features: Single file download, batch directory download, MD5 verification, retry mechanism + */ +public class S3PluginDownloader implements AutoCloseable { + private static final int MAX_RETRY_ATTEMPTS = 3; + private static final long RETRY_DELAY_MS = 1000; + + private final S3ObjStorage s3Storage; + + /** + * S3 configuration info (completely independent, no dependency on any Doris internal types) + */ + public static class S3Config { + public final String endpoint; + public final String region; + public final String bucket; + public final String accessKey; + public final String secretKey; + + public S3Config(String endpoint, String region, String bucket, + String accessKey, String secretKey) { + this.endpoint = endpoint; + this.region = region; + this.bucket = bucket; + this.accessKey = accessKey; + this.secretKey = secretKey; + } + + @Override + public String toString() { + return String.format("S3Config{endpoint='%s', region='%s', bucket='%s', accessKey='%s'}", + endpoint, region, bucket, accessKey != null ? "***" : "null"); + } + } + + /** + * Constructor - pass in S3 configuration + */ + public S3PluginDownloader(S3Config config) { + this.s3Storage = createS3Storage(config); + } + + // ======================== Core Download Methods ======================== + + /** + * Download single file (supports MD5 verification and retry) + * + * @param remoteS3Path complete S3 path like "s3://bucket/path/to/file.jar" + * @param localPath local target file path + * @param expectedMd5 optional MD5 verification value, null to skip verification + * @return returns local file path on success + * @throws RuntimeException if download fails after all retries + */ + public String downloadFile(String remoteS3Path, String localPath, String expectedMd5) { + // Check if you need to download it first + if (PluginFileCache.isFileValid(localPath, expectedMd5)) { + return localPath; // The local file is valid and returns directly + } + + // Execute the download retry logic Review Comment: Should be protected by lock ########## be/src/runtime/plugin/s3_plugin_downloader.cpp: ########## @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/plugin/s3_plugin_downloader.h" + +#include <fmt/format.h> + +#include <filesystem> +#include <fstream> +#include <memory> +#include <string> +#include <thread> + +#include "common/logging.h" +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "io/fs/s3_file_system.h" +#include "runtime/plugin/plugin_file_cache.h" +#include "util/s3_util.h" + +namespace doris { + +std::string S3PluginDownloader::S3Config::to_string() const { + return fmt::format("S3Config{{endpoint='{}', region='{}', bucket='{}', access_key='{}'}}", + endpoint, region, bucket, access_key.empty() ? "null" : "***"); +} + +S3PluginDownloader::S3PluginDownloader(const S3Config& config) : config_(config) { + s3_fs_ = create_s3_filesystem(config_); + // Note: We don't throw here. The caller should check via download_file return status +} + +S3PluginDownloader::~S3PluginDownloader() = default; + +Status S3PluginDownloader::download_file(const std::string& remote_s3_path, + const std::string& local_target_path, + std::string* local_path, const std::string& expected_md5) { + // Check if S3 filesystem is initialized + if (!s3_fs_) { + return Status::InternalError("S3 filesystem not initialized"); + } + + // Check if download is needed first + if (PluginFileCache::is_file_valid(local_target_path, expected_md5)) { + *local_path = local_target_path; + return Status::OK(); // Local file is valid, return directly + } + + // Execute download retry logic Review Comment: This download logic need to be protected by lock, to avoid concurrency issue. ########## be/src/runtime/plugin/s3_plugin_downloader.cpp: ########## @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/plugin/s3_plugin_downloader.h" + +#include <fmt/format.h> + +#include <filesystem> +#include <fstream> +#include <memory> +#include <string> +#include <thread> + +#include "common/logging.h" +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "io/fs/s3_file_system.h" +#include "runtime/plugin/plugin_file_cache.h" +#include "util/s3_util.h" + +namespace doris { + +std::string S3PluginDownloader::S3Config::to_string() const { + return fmt::format("S3Config{{endpoint='{}', region='{}', bucket='{}', access_key='{}'}}", + endpoint, region, bucket, access_key.empty() ? "null" : "***"); +} + +S3PluginDownloader::S3PluginDownloader(const S3Config& config) : config_(config) { + s3_fs_ = create_s3_filesystem(config_); + // Note: We don't throw here. The caller should check via download_file return status +} + +S3PluginDownloader::~S3PluginDownloader() = default; + +Status S3PluginDownloader::download_file(const std::string& remote_s3_path, + const std::string& local_target_path, + std::string* local_path, const std::string& expected_md5) { + // Check if S3 filesystem is initialized + if (!s3_fs_) { + return Status::InternalError("S3 filesystem not initialized"); + } + + // Check if download is needed first + if (PluginFileCache::is_file_valid(local_target_path, expected_md5)) { + *local_path = local_target_path; + return Status::OK(); // Local file is valid, return directly + } + + // Execute download retry logic + Status last_status; + for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; ++attempt) { + last_status = execute_download(remote_s3_path, local_target_path, expected_md5); + if (last_status.ok()) { + *local_path = local_target_path; + return Status::OK(); + } + if (attempt < MAX_RETRY_ATTEMPTS) { + sleep_for_retry(attempt); + } + } + + // All retries failed + return last_status; +} + +Status S3PluginDownloader::execute_download(const std::string& remote_s3_path, + const std::string& local_path, + const std::string& expected_md5) { + // Create parent directory + Status status = create_parent_directory(local_path); + RETURN_IF_ERROR(status); + + // Use S3FileSystem's public download method + status = s3_fs_->download(remote_s3_path, local_path); Review Comment: I think we need to remove the existing file(if has) before downloading the new one? ########## be/src/runtime/plugin/s3_plugin_downloader.h: ########## @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> + +#include "common/status.h" + +// Forward declarations +namespace doris::io { +class S3FileSystem; +} + +namespace doris { + +/** + * S3PluginDownloader is an independent S3 downloader with MD5 verification and retry. + */ +class S3PluginDownloader { +public: + // S3 configuration info + struct S3Config { + std::string endpoint; + std::string region; + std::string bucket; + std::string access_key; + std::string secret_key; + + S3Config(const std::string& endpoint, const std::string& region, const std::string& bucket, + const std::string& access_key, const std::string& secret_key) + : endpoint(endpoint), + region(region), + bucket(bucket), + access_key(access_key), + secret_key(secret_key) {} + + std::string to_string() const; + }; + + explicit S3PluginDownloader(const S3Config& config); + ~S3PluginDownloader(); + + // Download single file with MD5 verification and retry + Status download_file(const std::string& remote_s3_path, const std::string& local_target_path, + std::string* local_path, const std::string& expected_md5 = ""); + +private: + static constexpr int MAX_RETRY_ATTEMPTS = 3; + static constexpr int RETRY_DELAY_MS = 1000; + + S3Config config_; + std::shared_ptr<io::S3FileSystem> s3_fs_; + + // Execute single file download with retry logic + Status execute_download(const std::string& remote_s3_path, const std::string& local_path, Review Comment: Add `_` as prefix of "private" method and fields. eg: `_execute_download()` ########## be/src/runtime/plugin/s3_plugin_downloader.cpp: ########## @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/plugin/s3_plugin_downloader.h" + +#include <fmt/format.h> + +#include <filesystem> +#include <fstream> +#include <memory> +#include <string> +#include <thread> + +#include "common/logging.h" +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "io/fs/s3_file_system.h" +#include "runtime/plugin/plugin_file_cache.h" +#include "util/s3_util.h" + +namespace doris { + +std::string S3PluginDownloader::S3Config::to_string() const { + return fmt::format("S3Config{{endpoint='{}', region='{}', bucket='{}', access_key='{}'}}", + endpoint, region, bucket, access_key.empty() ? "null" : "***"); +} + +S3PluginDownloader::S3PluginDownloader(const S3Config& config) : config_(config) { + s3_fs_ = create_s3_filesystem(config_); + // Note: We don't throw here. The caller should check via download_file return status +} + +S3PluginDownloader::~S3PluginDownloader() = default; + +Status S3PluginDownloader::download_file(const std::string& remote_s3_path, + const std::string& local_target_path, + std::string* local_path, const std::string& expected_md5) { + // Check if S3 filesystem is initialized + if (!s3_fs_) { + return Status::InternalError("S3 filesystem not initialized"); + } + + // Check if download is needed first + if (PluginFileCache::is_file_valid(local_target_path, expected_md5)) { + *local_path = local_target_path; + return Status::OK(); // Local file is valid, return directly + } + + // Execute download retry logic + Status last_status; + for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; ++attempt) { + last_status = execute_download(remote_s3_path, local_target_path, expected_md5); + if (last_status.ok()) { + *local_path = local_target_path; + return Status::OK(); + } + if (attempt < MAX_RETRY_ATTEMPTS) { Review Comment: I don't think we need this retry logic, just let the query to retry is enough. Otherwise, if it is network issue or the credential info is incorrect, the retry is wasteful. ########## be/src/runtime/plugin/s3_plugin_downloader.cpp: ########## @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/plugin/s3_plugin_downloader.h" + +#include <fmt/format.h> + +#include <filesystem> +#include <fstream> +#include <memory> +#include <string> +#include <thread> + +#include "common/logging.h" +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "io/fs/s3_file_system.h" +#include "runtime/plugin/plugin_file_cache.h" +#include "util/s3_util.h" + +namespace doris { + +std::string S3PluginDownloader::S3Config::to_string() const { + return fmt::format("S3Config{{endpoint='{}', region='{}', bucket='{}', access_key='{}'}}", + endpoint, region, bucket, access_key.empty() ? "null" : "***"); +} + +S3PluginDownloader::S3PluginDownloader(const S3Config& config) : config_(config) { + s3_fs_ = create_s3_filesystem(config_); + // Note: We don't throw here. The caller should check via download_file return status +} + +S3PluginDownloader::~S3PluginDownloader() = default; + +Status S3PluginDownloader::download_file(const std::string& remote_s3_path, + const std::string& local_target_path, + std::string* local_path, const std::string& expected_md5) { + // Check if S3 filesystem is initialized + if (!s3_fs_) { + return Status::InternalError("S3 filesystem not initialized"); + } + + // Check if download is needed first + if (PluginFileCache::is_file_valid(local_target_path, expected_md5)) { + *local_path = local_target_path; + return Status::OK(); // Local file is valid, return directly + } + + // Execute download retry logic + Status last_status; + for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; ++attempt) { + last_status = execute_download(remote_s3_path, local_target_path, expected_md5); + if (last_status.ok()) { + *local_path = local_target_path; + return Status::OK(); + } + if (attempt < MAX_RETRY_ATTEMPTS) { + sleep_for_retry(attempt); + } + } + + // All retries failed + return last_status; +} + +Status S3PluginDownloader::execute_download(const std::string& remote_s3_path, + const std::string& local_path, + const std::string& expected_md5) { + // Create parent directory + Status status = create_parent_directory(local_path); + RETURN_IF_ERROR(status); + + // Use S3FileSystem's public download method + status = s3_fs_->download(remote_s3_path, local_path); + RETURN_IF_ERROR(status); + + // MD5 verification and cache update + std::string actual_md5 = calculate_file_md5(local_path); + + // If user provided MD5, must verify consistency + if (!expected_md5.empty()) { + if (actual_md5.empty() || expected_md5 != actual_md5) { + std::filesystem::remove(local_path); // Delete invalid file + return Status::InvalidArgument("MD5 mismatch: expected={}, actual={}", expected_md5, + actual_md5); + } + } + + // Update cache + update_cache_after_download(local_path, actual_md5); + + LOG(INFO) << "Successfully downloaded " << remote_s3_path << " to " << local_path; + return Status::OK(); +} + +void S3PluginDownloader::sleep_for_retry(int attempt) { + try { + std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_DELAY_MS * attempt)); + } catch (const std::exception& e) { + throw std::runtime_error("Download interrupted: " + std::string(e.what())); + } +} + +void S3PluginDownloader::update_cache_after_download(const std::string& local_path, + const std::string& actual_md5) { + try { + std::filesystem::path file_path(local_path); + if (!std::filesystem::exists(file_path)) { + return; + } + + long file_size = std::filesystem::file_size(file_path); + PluginFileCache::update_cache(local_path, actual_md5, file_size); + } catch (const std::exception& e) { + // Ignore cache update failures - not critical + LOG(WARNING) << "Failed to update cache for " << local_path << ": " << e.what(); + } +} + +Status S3PluginDownloader::create_parent_directory(const std::string& file_path) { Review Comment: You can use local fs' `create_directory` method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
