wgtmac commented on code in PR #548: URL: https://github.com/apache/iceberg-cpp/pull/548#discussion_r2999660372
########## src/iceberg/file_io_registry.h: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <functional> +#include <memory> +#include <mutex> +#include <string> +#include <unordered_map> + +#include "iceberg/file_io.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Registry for FileIO implementations. +/// +/// Provides a mechanism to register and load FileIO implementations by name. +/// This allows the REST catalog (and others) to resolve FileIO implementations +/// at runtime based on configuration properties like "io-impl". +class ICEBERG_EXPORT FileIORegistry { + public: + /// Well-known implementation names + static constexpr const char* kArrowLocalFileIO = "org.apache.iceberg.arrow.ArrowFileIO"; Review Comment: Let's use `std::string_view` instead of C-style string. ########## src/iceberg/file_io_registry.h: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <functional> +#include <memory> +#include <mutex> +#include <string> +#include <unordered_map> + +#include "iceberg/file_io.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Registry for FileIO implementations. +/// +/// Provides a mechanism to register and load FileIO implementations by name. +/// This allows the REST catalog (and others) to resolve FileIO implementations +/// at runtime based on configuration properties like "io-impl". +class ICEBERG_EXPORT FileIORegistry { + public: + /// Well-known implementation names + static constexpr const char* kArrowLocalFileIO = "org.apache.iceberg.arrow.ArrowFileIO"; + static constexpr const char* kArrowS3FileIO = "org.apache.iceberg.arrow.ArrowS3FileIO"; Review Comment: It looks odd to use the Java classpath style here. I haven't looked at other impls yet, perhaps it is worth investigating their conventions as well? My initial idea is just to use default keys like `"local"` and `"s3"` to locate the FileIO implementations. ########## src/iceberg/file_io_registry.h: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <functional> +#include <memory> +#include <mutex> +#include <string> +#include <unordered_map> + +#include "iceberg/file_io.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Registry for FileIO implementations. +/// +/// Provides a mechanism to register and load FileIO implementations by name. +/// This allows the REST catalog (and others) to resolve FileIO implementations +/// at runtime based on configuration properties like "io-impl". +class ICEBERG_EXPORT FileIORegistry { + public: + /// Well-known implementation names + static constexpr const char* kArrowLocalFileIO = "org.apache.iceberg.arrow.ArrowFileIO"; + static constexpr const char* kArrowS3FileIO = "org.apache.iceberg.arrow.ArrowS3FileIO"; + + /// Factory function type for creating FileIO instances. + using Factory = std::function<Result<std::shared_ptr<FileIO>>( Review Comment: It is better to return unique_ptr by default. ########## src/iceberg/arrow/file_io_register.cc: ########## @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "iceberg/arrow/file_io_register.h" + +#include <mutex> + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/file_io_registry.h" +#include "iceberg/util/macros.h" + +namespace iceberg::arrow { + +void RegisterFileIO() { + static std::once_flag flag; + std::call_once(flag, []() { + // Register Arrow local filesystem FileIO + FileIORegistry::Register( + FileIORegistry::kArrowLocalFileIO, + [](const std::string& /*warehouse*/, Review Comment: ```suggestion [](const std::string& /*name*/, ``` Same for below. ########## src/iceberg/file_io_registry.h: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <functional> +#include <memory> +#include <mutex> +#include <string> +#include <unordered_map> + +#include "iceberg/file_io.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Registry for FileIO implementations. +/// +/// Provides a mechanism to register and load FileIO implementations by name. +/// This allows the REST catalog (and others) to resolve FileIO implementations +/// at runtime based on configuration properties like "io-impl". +class ICEBERG_EXPORT FileIORegistry { + public: + /// Well-known implementation names + static constexpr const char* kArrowLocalFileIO = "org.apache.iceberg.arrow.ArrowFileIO"; + static constexpr const char* kArrowS3FileIO = "org.apache.iceberg.arrow.ArrowS3FileIO"; + + /// Factory function type for creating FileIO instances. + using Factory = std::function<Result<std::shared_ptr<FileIO>>( + const std::string& warehouse, Review Comment: ```suggestion const std::string& name, ``` ########## src/iceberg/arrow/s3_properties.h: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <string> + +namespace iceberg::arrow { + +/// \brief S3 configuration property keys for ArrowS3FileIO. +/// +/// These constants define the property keys used to configure S3 access +/// via the Arrow filesystem integration, following the Iceberg spec for +/// S3 configuration properties. +struct S3Properties { Review Comment: Should we add a separate subdirectory for s3 and move all s3-related files there? ########## src/iceberg/file_io_registry.h: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <functional> +#include <memory> +#include <mutex> +#include <string> +#include <unordered_map> + +#include "iceberg/file_io.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Registry for FileIO implementations. +/// +/// Provides a mechanism to register and load FileIO implementations by name. +/// This allows the REST catalog (and others) to resolve FileIO implementations +/// at runtime based on configuration properties like "io-impl". +class ICEBERG_EXPORT FileIORegistry { + public: + /// Well-known implementation names + static constexpr const char* kArrowLocalFileIO = "org.apache.iceberg.arrow.ArrowFileIO"; + static constexpr const char* kArrowS3FileIO = "org.apache.iceberg.arrow.ArrowS3FileIO"; + + /// Factory function type for creating FileIO instances. + using Factory = std::function<Result<std::shared_ptr<FileIO>>( + const std::string& warehouse, + const std::unordered_map<std::string, std::string>& properties)>; + + /// \brief Register a FileIO factory under the given name. + /// + /// \param name The implementation name (e.g., "org.apache.iceberg.arrow.ArrowFileIO") + /// \param factory The factory function that creates the FileIO instance. + static void Register(const std::string& name, Factory factory) { + std::lock_guard lock(Mutex()); + Registry()[name] = std::move(factory); + } + + /// \brief Load a FileIO implementation by name. + /// + /// \param name The implementation name to look up. + /// \param warehouse The warehouse location URI. + /// \param properties Configuration properties to pass to the factory. + /// \return A shared_ptr to the FileIO instance, or an error if not found. + static Result<std::shared_ptr<FileIO>> Load( + const std::string& name, const std::string& warehouse, Review Comment: Why do we need a two-jump design for registry? Specifically I don't quite understand why it has anything to do with `warehouse`. Is it much simpler to locate the FileIO by the `name`? ########## src/iceberg/test/CMakeLists.txt: ########## @@ -138,6 +138,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES arrow_fs_file_io_test.cc + arrow_s3_file_io_test.cc Review Comment: Let's create a new `file_io_test` executable? ########## src/iceberg/arrow/file_io_register.h: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/arrow/file_io_register.h +/// \brief Provide functions to register Arrow FileIO implementations. + +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::arrow { + +/// \brief Register Arrow FileIO implementations (local and S3) into the +/// FileIORegistry. +/// +/// This function is idempotent and thread-safe. It registers: +/// - ArrowFileIO (local filesystem) +/// - ArrowS3FileIO (S3 filesystem) +/// +/// Must be called before using FileIORegistry::Load() with the built-in +/// implementation names (e.g., from RestCatalog::Make(config)). +ICEBERG_BUNDLE_EXPORT void RegisterFileIO(); Review Comment: Should this be a separate register per FileIO implementation? ########## src/iceberg/file_io_registry.h: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <functional> +#include <memory> +#include <mutex> +#include <string> +#include <unordered_map> + +#include "iceberg/file_io.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Registry for FileIO implementations. +/// +/// Provides a mechanism to register and load FileIO implementations by name. +/// This allows the REST catalog (and others) to resolve FileIO implementations +/// at runtime based on configuration properties like "io-impl". +class ICEBERG_EXPORT FileIORegistry { + public: + /// Well-known implementation names + static constexpr const char* kArrowLocalFileIO = "org.apache.iceberg.arrow.ArrowFileIO"; + static constexpr const char* kArrowS3FileIO = "org.apache.iceberg.arrow.ArrowS3FileIO"; + + /// Factory function type for creating FileIO instances. + using Factory = std::function<Result<std::shared_ptr<FileIO>>( + const std::string& warehouse, + const std::unordered_map<std::string, std::string>& properties)>; + + /// \brief Register a FileIO factory under the given name. + /// + /// \param name The implementation name (e.g., "org.apache.iceberg.arrow.ArrowFileIO") + /// \param factory The factory function that creates the FileIO instance. + static void Register(const std::string& name, Factory factory) { + std::lock_guard lock(Mutex()); + Registry()[name] = std::move(factory); + } + + /// \brief Load a FileIO implementation by name. + /// + /// \param name The implementation name to look up. + /// \param warehouse The warehouse location URI. + /// \param properties Configuration properties to pass to the factory. + /// \return A shared_ptr to the FileIO instance, or an error if not found. + static Result<std::shared_ptr<FileIO>> Load( + const std::string& name, const std::string& warehouse, + const std::unordered_map<std::string, std::string>& properties) { + Factory factory; + { + std::lock_guard lock(Mutex()); + auto it = Registry().find(name); + if (it == Registry().end()) { + return std::unexpected<Error>( + {.kind = ErrorKind::kNotFound, + .message = "FileIO implementation not found: " + name}); + } + factory = it->second; + } + // Invoke factory outside the lock to avoid blocking other Register/Load + // calls and to prevent deadlocks if the factory calls back into the registry. + return factory(warehouse, properties); + } + + private: + static std::unordered_map<std::string, Factory>& Registry() { Review Comment: Do we really need to add two wrappers for `registry` and `mutex`? We don't benefit too much from their lazy initialization. ########## src/iceberg/catalog/rest/rest_catalog.cc: ########## @@ -174,6 +175,40 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make( std::move(catalog_session), snapshot_mode)); } +Result<std::shared_ptr<RestCatalog>> RestCatalog::Make( + const RestCatalogProperties& config) { + // Get warehouse location to determine the appropriate FileIO type + auto warehouse = config.Get(RestCatalogProperties::kWarehouse); + if (warehouse.empty()) { + return InvalidArgument( + "Warehouse location is required when FileIO is not explicitly provided. " + "Set the 'warehouse' property to an S3 URI (s3://...) or local path."); + } + + // Check for user-specified io-impl property + auto io_impl = config.configs().find(FileIOProperties::kImpl); + std::string impl_name; + + if (io_impl != config.configs().end() && !io_impl->second.empty()) { Review Comment: nit: make the url scheme detection a separate function and move it to be an utility function of FileIO ########## src/iceberg/catalog/rest/rest_catalog.h: ########## @@ -54,6 +54,31 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, static Result<std::shared_ptr<RestCatalog>> Make(const RestCatalogProperties& config, std::shared_ptr<FileIO> file_io); + /// \brief Create a RestCatalog instance with auto-detected FileIO. + /// + /// This overload automatically creates an appropriate FileIO based on the "io-impl" + /// property or the warehouse location URI scheme. + /// + /// FileIO selection logic: + /// 1. If "io-impl" property is set, use the specified implementation from + /// FileIORegistry. + /// 2. Otherwise, auto-detect based on warehouse URI: + /// - "s3://" -> ArrowS3FileIO + /// - Local path -> ArrowLocalFileIO + /// + /// Users can register custom FileIO implementations via FileIORegistry::Register(): + /// \code + /// FileIORegistry::Register("com.mycompany.MyFileIO", + /// [](const std::string& warehouse, const auto& props) { + /// return std::make_shared<MyFileIO>(warehouse, props); + /// }); + /// \endcode + /// + /// \param config the configuration for the RestCatalog, including warehouse location + /// and optional "io-impl" property + /// \return a shared_ptr to RestCatalog instance, or an error if FileIO creation fails + static Result<std::shared_ptr<RestCatalog>> Make(const RestCatalogProperties& config); Review Comment: A cleaner approach is to make file_io a member of `RestCatalogProperties` and remove the above 2-arg Make function. ########## src/iceberg/arrow/arrow_s3_file_io.cc: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <cstdlib> +#include <mutex> +#include <stdexcept> + +#include <arrow/filesystem/filesystem.h> +#ifdef ICEBERG_S3_ENABLED +# include <arrow/filesystem/s3fs.h> +# define ICEBERG_ARROW_HAS_S3 1 +#else +# define ICEBERG_ARROW_HAS_S3 0 +#endif + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/s3_properties.h" +#include "iceberg/util/macros.h" + +namespace iceberg::arrow { + +namespace { + +Status EnsureS3Initialized() { +#if ICEBERG_ARROW_HAS_S3 + static std::once_flag init_flag; + static ::arrow::Status init_status = ::arrow::Status::OK(); + std::call_once(init_flag, []() { + ::arrow::fs::S3GlobalOptions options; + init_status = ::arrow::fs::InitializeS3(options); + }); + if (!init_status.ok()) { + return std::unexpected(Error{.kind = ::iceberg::arrow::ToErrorKind(init_status), + .message = init_status.ToString()}); + } + return {}; +#else + return NotImplemented("Arrow S3 support is not enabled"); +#endif +} + +#if ICEBERG_ARROW_HAS_S3 +/// \brief Configure S3Options from a properties map. +/// +/// \param properties The configuration properties map. +/// \return Configured S3Options. +Result<::arrow::fs::S3Options> ConfigureS3Options( + const std::unordered_map<std::string, std::string>& properties) { + ::arrow::fs::S3Options options; + + // Configure credentials + auto access_key_it = properties.find(S3Properties::kAccessKeyId); + auto secret_key_it = properties.find(S3Properties::kSecretAccessKey); + auto session_token_it = properties.find(S3Properties::kSessionToken); + + if (access_key_it != properties.end() && secret_key_it != properties.end()) { + if (session_token_it != properties.end()) { + options.ConfigureAccessKey(access_key_it->second, secret_key_it->second, + session_token_it->second); + } else { + options.ConfigureAccessKey(access_key_it->second, secret_key_it->second); + } + } else { + // Use default credential chain (environment, instance profile, etc.) + options.ConfigureDefaultCredentials(); + } + + // Configure region + auto region_it = properties.find(S3Properties::kRegion); + if (region_it != properties.end()) { + options.region = region_it->second; + } + + // Configure endpoint (for MinIO, LocalStack, etc.) + auto endpoint_it = properties.find(S3Properties::kEndpoint); + if (endpoint_it != properties.end()) { + options.endpoint_override = endpoint_it->second; + } else { + // Fall back to AWS standard environment variables for endpoint override + const char* s3_endpoint_env = std::getenv("AWS_ENDPOINT_URL_S3"); + if (s3_endpoint_env != nullptr) { + options.endpoint_override = s3_endpoint_env; + } else { + const char* endpoint_env = std::getenv("AWS_ENDPOINT_URL"); + if (endpoint_env != nullptr) { + options.endpoint_override = endpoint_env; + } + } + } + + auto path_style_it = properties.find(S3Properties::kPathStyleAccess); + if (path_style_it != properties.end() && path_style_it->second == "true") { + options.force_virtual_addressing = false; + } + + // Configure SSL + auto ssl_it = properties.find(S3Properties::kSslEnabled); + if (ssl_it != properties.end() && ssl_it->second == "false") { + options.scheme = "http"; + } + + // Configure timeouts + auto connect_timeout_it = properties.find(S3Properties::kConnectTimeoutMs); + if (connect_timeout_it != properties.end()) { + try { + options.connect_timeout = std::stod(connect_timeout_it->second) / 1000.0; Review Comment: Can we use from_chars just like `ParseInteger` does? ########## src/iceberg/arrow/arrow_s3_file_io.cc: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <cstdlib> +#include <mutex> +#include <stdexcept> + +#include <arrow/filesystem/filesystem.h> +#ifdef ICEBERG_S3_ENABLED +# include <arrow/filesystem/s3fs.h> +# define ICEBERG_ARROW_HAS_S3 1 +#else +# define ICEBERG_ARROW_HAS_S3 0 +#endif + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/s3_properties.h" +#include "iceberg/util/macros.h" + +namespace iceberg::arrow { + +namespace { + +Status EnsureS3Initialized() { +#if ICEBERG_ARROW_HAS_S3 + static std::once_flag init_flag; + static ::arrow::Status init_status = ::arrow::Status::OK(); + std::call_once(init_flag, []() { + ::arrow::fs::S3GlobalOptions options; Review Comment: nit: add a TODO comment to support options supported by `::arrow::fs::S3GlobalOptions`. ########## src/iceberg/arrow/arrow_s3_file_io.cc: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <cstdlib> +#include <mutex> +#include <stdexcept> + +#include <arrow/filesystem/filesystem.h> +#ifdef ICEBERG_S3_ENABLED +# include <arrow/filesystem/s3fs.h> +# define ICEBERG_ARROW_HAS_S3 1 +#else +# define ICEBERG_ARROW_HAS_S3 0 +#endif + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/s3_properties.h" +#include "iceberg/util/macros.h" + +namespace iceberg::arrow { + +namespace { + +Status EnsureS3Initialized() { +#if ICEBERG_ARROW_HAS_S3 + static std::once_flag init_flag; + static ::arrow::Status init_status = ::arrow::Status::OK(); + std::call_once(init_flag, []() { + ::arrow::fs::S3GlobalOptions options; + init_status = ::arrow::fs::InitializeS3(options); + }); + if (!init_status.ok()) { + return std::unexpected(Error{.kind = ::iceberg::arrow::ToErrorKind(init_status), + .message = init_status.ToString()}); + } + return {}; +#else + return NotImplemented("Arrow S3 support is not enabled"); Review Comment: ```suggestion return NotSupported("Arrow S3 support is not enabled"); ``` ########## src/iceberg/arrow/arrow_s3_file_io.cc: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <cstdlib> +#include <mutex> +#include <stdexcept> + +#include <arrow/filesystem/filesystem.h> +#ifdef ICEBERG_S3_ENABLED +# include <arrow/filesystem/s3fs.h> +# define ICEBERG_ARROW_HAS_S3 1 +#else +# define ICEBERG_ARROW_HAS_S3 0 +#endif + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/s3_properties.h" +#include "iceberg/util/macros.h" + +namespace iceberg::arrow { + +namespace { + +Status EnsureS3Initialized() { +#if ICEBERG_ARROW_HAS_S3 + static std::once_flag init_flag; + static ::arrow::Status init_status = ::arrow::Status::OK(); + std::call_once(init_flag, []() { + ::arrow::fs::S3GlobalOptions options; + init_status = ::arrow::fs::InitializeS3(options); + }); + if (!init_status.ok()) { + return std::unexpected(Error{.kind = ::iceberg::arrow::ToErrorKind(init_status), + .message = init_status.ToString()}); + } + return {}; +#else + return NotImplemented("Arrow S3 support is not enabled"); +#endif +} + +#if ICEBERG_ARROW_HAS_S3 +/// \brief Configure S3Options from a properties map. +/// +/// \param properties The configuration properties map. +/// \return Configured S3Options. +Result<::arrow::fs::S3Options> ConfigureS3Options( + const std::unordered_map<std::string, std::string>& properties) { + ::arrow::fs::S3Options options; + + // Configure credentials + auto access_key_it = properties.find(S3Properties::kAccessKeyId); + auto secret_key_it = properties.find(S3Properties::kSecretAccessKey); + auto session_token_it = properties.find(S3Properties::kSessionToken); + + if (access_key_it != properties.end() && secret_key_it != properties.end()) { + if (session_token_it != properties.end()) { + options.ConfigureAccessKey(access_key_it->second, secret_key_it->second, + session_token_it->second); + } else { + options.ConfigureAccessKey(access_key_it->second, secret_key_it->second); + } + } else { + // Use default credential chain (environment, instance profile, etc.) + options.ConfigureDefaultCredentials(); + } + + // Configure region + auto region_it = properties.find(S3Properties::kRegion); + if (region_it != properties.end()) { + options.region = region_it->second; + } + + // Configure endpoint (for MinIO, LocalStack, etc.) + auto endpoint_it = properties.find(S3Properties::kEndpoint); + if (endpoint_it != properties.end()) { + options.endpoint_override = endpoint_it->second; + } else { + // Fall back to AWS standard environment variables for endpoint override + const char* s3_endpoint_env = std::getenv("AWS_ENDPOINT_URL_S3"); + if (s3_endpoint_env != nullptr) { + options.endpoint_override = s3_endpoint_env; + } else { + const char* endpoint_env = std::getenv("AWS_ENDPOINT_URL"); + if (endpoint_env != nullptr) { + options.endpoint_override = endpoint_env; + } + } + } + + auto path_style_it = properties.find(S3Properties::kPathStyleAccess); + if (path_style_it != properties.end() && path_style_it->second == "true") { + options.force_virtual_addressing = false; + } + + // Configure SSL + auto ssl_it = properties.find(S3Properties::kSslEnabled); + if (ssl_it != properties.end() && ssl_it->second == "false") { + options.scheme = "http"; + } + + // Configure timeouts + auto connect_timeout_it = properties.find(S3Properties::kConnectTimeoutMs); + if (connect_timeout_it != properties.end()) { + try { + options.connect_timeout = std::stod(connect_timeout_it->second) / 1000.0; + } catch (const std::exception& e) { + return InvalidArgument("Invalid {}: '{}' ({})", S3Properties::kConnectTimeoutMs, + connect_timeout_it->second, e.what()); + } + } + + auto socket_timeout_it = properties.find(S3Properties::kSocketTimeoutMs); + if (socket_timeout_it != properties.end()) { + try { + options.request_timeout = std::stod(socket_timeout_it->second) / 1000.0; + } catch (const std::exception& e) { + return InvalidArgument("Invalid {}: '{}' ({})", S3Properties::kSocketTimeoutMs, + socket_timeout_it->second, e.what()); + } + } + + return options; +} +#endif + +} // namespace + +Result<std::unique_ptr<FileIO>> MakeS3FileIO( + const std::string& uri, + const std::unordered_map<std::string, std::string>& properties) { + if (!uri.starts_with("s3://")) { Review Comment: Define a constant for the magic `s3://`. `s3a` should be supported as well. ########## CMakeLists.txt: ########## @@ -45,6 +45,7 @@ option(ICEBERG_BUILD_TESTS "Build tests" ON) option(ICEBERG_BUILD_BUNDLE "Build the battery included library" ON) option(ICEBERG_BUILD_REST "Build rest catalog client" ON) option(ICEBERG_BUILD_REST_INTEGRATION_TESTS "Build rest catalog integration tests" OFF) +option(ICEBERG_S3 "Build with S3 support" ON) Review Comment: Should we disable it by default? ########## src/iceberg/arrow/s3_properties.h: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <string> + +namespace iceberg::arrow { + +/// \brief S3 configuration property keys for ArrowS3FileIO. +/// +/// These constants define the property keys used to configure S3 access +/// via the Arrow filesystem integration, following the Iceberg spec for +/// S3 configuration properties. +struct S3Properties { + /// AWS access key ID + static constexpr const char* kAccessKeyId = "s3.access-key-id"; Review Comment: Use std::string_view. Same for other places in this PR. ########## CMakeLists.txt: ########## @@ -45,6 +45,7 @@ option(ICEBERG_BUILD_TESTS "Build tests" ON) option(ICEBERG_BUILD_BUNDLE "Build the battery included library" ON) option(ICEBERG_BUILD_REST "Build rest catalog client" ON) option(ICEBERG_BUILD_REST_INTEGRATION_TESTS "Build rest catalog integration tests" OFF) +option(ICEBERG_S3 "Build with S3 support" ON) Review Comment: It is worth noting that `ICEBERG_S3` should be disabled if `ICEBERG_BUILD_BUNDLE` is `OFF`. ########## src/iceberg/file_io_registry.h: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <functional> +#include <memory> +#include <mutex> +#include <string> +#include <unordered_map> + +#include "iceberg/file_io.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Registry for FileIO implementations. +/// +/// Provides a mechanism to register and load FileIO implementations by name. +/// This allows the REST catalog (and others) to resolve FileIO implementations +/// at runtime based on configuration properties like "io-impl". +class ICEBERG_EXPORT FileIORegistry { + public: + /// Well-known implementation names + static constexpr const char* kArrowLocalFileIO = "org.apache.iceberg.arrow.ArrowFileIO"; + static constexpr const char* kArrowS3FileIO = "org.apache.iceberg.arrow.ArrowS3FileIO"; + + /// Factory function type for creating FileIO instances. + using Factory = std::function<Result<std::shared_ptr<FileIO>>( + const std::string& warehouse, + const std::unordered_map<std::string, std::string>& properties)>; + + /// \brief Register a FileIO factory under the given name. + /// + /// \param name The implementation name (e.g., "org.apache.iceberg.arrow.ArrowFileIO") + /// \param factory The factory function that creates the FileIO instance. + static void Register(const std::string& name, Factory factory) { + std::lock_guard lock(Mutex()); + Registry()[name] = std::move(factory); + } + + /// \brief Load a FileIO implementation by name. + /// + /// \param name The implementation name to look up. + /// \param warehouse The warehouse location URI. + /// \param properties Configuration properties to pass to the factory. + /// \return A shared_ptr to the FileIO instance, or an error if not found. + static Result<std::shared_ptr<FileIO>> Load( + const std::string& name, const std::string& warehouse, + const std::unordered_map<std::string, std::string>& properties) { + Factory factory; + { + std::lock_guard lock(Mutex()); + auto it = Registry().find(name); + if (it == Registry().end()) { + return std::unexpected<Error>( + {.kind = ErrorKind::kNotFound, + .message = "FileIO implementation not found: " + name}); + } + factory = it->second; + } + // Invoke factory outside the lock to avoid blocking other Register/Load + // calls and to prevent deadlocks if the factory calls back into the registry. + return factory(warehouse, properties); + } + + private: + static std::unordered_map<std::string, Factory>& Registry() { + static std::unordered_map<std::string, Factory> registry; + return registry; + } + + static std::mutex& Mutex() { + static std::mutex mutex; + return mutex; + } +}; + +/// \brief Property keys for FileIO configuration. +struct FileIOProperties { + /// The FileIO implementation class name (e.g., "org.apache.iceberg.arrow.ArrowFileIO") + static constexpr const char* kImpl = "io-impl"; Review Comment: A relevant followup is to map and register known `"io-impl"` names from other impls (especially Java). ########## src/iceberg/test/arrow_s3_file_io_test.cc: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <cstdlib> +#include <iostream> +#include <string> +#include <unordered_map> + +#include <gtest/gtest.h> + +#ifdef ICEBERG_S3_ENABLED +# include <arrow/filesystem/s3fs.h> +#endif + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/s3_properties.h" +#include "iceberg/test/matchers.h" + +#ifdef ICEBERG_S3_ENABLED +namespace { + +/// \brief GTest environment that finalizes Arrow S3 after all tests complete. +/// +/// Arrow's S3 initialization creates global state that must be cleaned up via +/// FinalizeS3() before the process exits. Without this, Arrow's static destructor +/// detects the missing finalization and causes a non-zero exit (which fails under +/// sanitizers). GTest Environment::TearDown() runs after all tests but before +/// static destructors, making it the safe place to finalize. +class ArrowS3TestEnvironment : public ::testing::Environment { + public: + void TearDown() override { + auto status = ::arrow::fs::FinalizeS3(); Review Comment: Should we add an equivalent function like `::iceberg::arrow::FinalizeS3()`? In this case, users do not need to write `include <arrow/filesystem/s3fs.h>` and the test is closer to the real world use case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
