wgtmac commented on code in PR #386:
URL: https://github.com/apache/iceberg-cpp/pull/386#discussion_r2613048404
##########
src/iceberg/table_metadata.h:
##########
@@ -458,14 +467,40 @@ enum class ICEBERG_EXPORT MetadataFileCodecType {
kGzip,
};
-/// \brief Utility class for table metadata
-struct ICEBERG_EXPORT TableMetadataUtil {
+struct ICEBERG_EXPORT CodecTypeUtil {
+ /// \brief Returns the MetadataFileCodecType corresponding to the given
string.
+ ///
+ /// \param name The string to parse.
+ /// \return The MetadataFileCodecType corresponding to the given string.
+ static Result<MetadataFileCodecType> CodecFromString(const std::string_view&
name);
Review Comment:
```suggestion
static Result<MetadataFileCodecType> CodecFromString(std::string_view
name);
```
It is recommended to pass string_view by value.
##########
src/iceberg/table_metadata.cc:
##########
@@ -220,30 +226,57 @@ Result<TableMetadataCache::SnapshotsMap>
TableMetadataCache::InitSnapshotMap(
std::ranges::to<SnapshotsMap>();
}
-// TableMetadataUtil implementation
+Result<MetadataFileCodecType> CodecTypeUtil::CodecFromString(
Review Comment:
Perhaps `TableMetadataUtil` should be renamed to `TableMetadataFileUtil` so
that we don't need to split a `CodecTypeUtil`? `CodecTypeUtil` is also
confusing because it is tight to codec supported by only metadata file not
those general codecs supported by data files (e.g. parquet).
##########
src/iceberg/table_metadata.cc:
##########
@@ -48,6 +52,8 @@ const TimePointMs kInvalidLastUpdatedMs = TimePointMs::min();
constexpr int32_t kLastAdded = -1;
} // namespace
+static constexpr std::string_view kMetadataFolderName = "metadata";
Review Comment:
Can we move it to the anonymous namespace above?
##########
src/iceberg/table_metadata.h:
##########
@@ -76,6 +83,8 @@ struct ICEBERG_EXPORT TableMetadata {
static constexpr int64_t kInvalidSequenceNumber = -1;
static constexpr int64_t kInitialRowId = 0;
+ /// The location of the table metadata file
+ std::string metadata_file_location;
Review Comment:
I'm hesitant to introduce this field to the `TableMetadata` because
`TableMetadata` aims to carry fields that only exist in the metadata json. Can
we revert this?
##########
src/iceberg/table_metadata.cc:
##########
@@ -258,13 +291,88 @@ Result<std::unique_ptr<TableMetadata>>
TableMetadataUtil::Read(
return TableMetadataFromJson(json);
}
+Status TableMetadataUtil::Write(FileIO& io, const TableMetadata* base,
+ TableMetadata& metadata) {
+ int version = -1;
+ if (base != nullptr && !base->metadata_file_location.empty()) {
+ // parse current version from location
+ version = ParseVersionFromLocation(base->metadata_file_location);
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(std::string new_file_location,
+ NewTableMetadataFilePath(metadata, version + 1));
+ ICEBERG_RETURN_UNEXPECTED(Write(io, new_file_location, metadata));
+ metadata.metadata_file_location = std::move(new_file_location);
+ return {};
+}
+
Status TableMetadataUtil::Write(FileIO& io, const std::string& location,
const TableMetadata& metadata) {
auto json = ToJson(metadata);
ICEBERG_ASSIGN_OR_RAISE(auto json_string, ToJsonString(json));
return io.WriteFile(location, json_string);
}
+void TableMetadataUtil::DeleteRemovedMetadataFiles(FileIO& io, const
TableMetadata* base,
+ const TableMetadata&
metadata) {
+ if (!base) {
+ return;
+ }
+
+ bool delete_after_commit =
TableProperties::kMetadataDeleteAfterCommitEnabled.value();
+ if (auto it = metadata.properties.find(
+ TableProperties::kMetadataDeleteAfterCommitEnabled.key());
+ it != metadata.properties.end()) {
+ delete_after_commit =
+ StringUtils::EqualsIgnoreCase(it->second, "true") || it->second == "1";
+ }
+
+ if (delete_after_commit) {
+ auto current_files =
+ metadata.metadata_log |
+ std::ranges::to<std::unordered_set<MetadataLogEntry,
MetadataLogEntry::Hasher>>();
+ std::ranges::for_each(
+ base->metadata_log | std::views::filter([¤t_files](const auto&
entry) {
+ return !current_files.contains(entry);
+ }),
+ [&io](const auto& entry) { auto status =
io.DeleteFile(entry.metadata_file); });
Review Comment:
```suggestion
[&io](const auto& entry) { std::ignore =
io.DeleteFile(entry.metadata_file); });
```
##########
src/iceberg/table_metadata.cc:
##########
@@ -258,13 +291,88 @@ Result<std::unique_ptr<TableMetadata>>
TableMetadataUtil::Read(
return TableMetadataFromJson(json);
}
+Status TableMetadataUtil::Write(FileIO& io, const TableMetadata* base,
+ TableMetadata& metadata) {
+ int version = -1;
+ if (base != nullptr && !base->metadata_file_location.empty()) {
+ // parse current version from location
+ version = ParseVersionFromLocation(base->metadata_file_location);
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(std::string new_file_location,
+ NewTableMetadataFilePath(metadata, version + 1));
+ ICEBERG_RETURN_UNEXPECTED(Write(io, new_file_location, metadata));
+ metadata.metadata_file_location = std::move(new_file_location);
+ return {};
+}
+
Status TableMetadataUtil::Write(FileIO& io, const std::string& location,
const TableMetadata& metadata) {
auto json = ToJson(metadata);
ICEBERG_ASSIGN_OR_RAISE(auto json_string, ToJsonString(json));
return io.WriteFile(location, json_string);
}
+void TableMetadataUtil::DeleteRemovedMetadataFiles(FileIO& io, const
TableMetadata* base,
+ const TableMetadata&
metadata) {
+ if (!base) {
+ return;
+ }
+
+ bool delete_after_commit =
TableProperties::kMetadataDeleteAfterCommitEnabled.value();
+ if (auto it = metadata.properties.find(
+ TableProperties::kMetadataDeleteAfterCommitEnabled.key());
+ it != metadata.properties.end()) {
+ delete_after_commit =
+ StringUtils::EqualsIgnoreCase(it->second, "true") || it->second == "1";
+ }
+
+ if (delete_after_commit) {
+ auto current_files =
+ metadata.metadata_log |
+ std::ranges::to<std::unordered_set<MetadataLogEntry,
MetadataLogEntry::Hasher>>();
+ std::ranges::for_each(
+ base->metadata_log | std::views::filter([¤t_files](const auto&
entry) {
+ return !current_files.contains(entry);
+ }),
+ [&io](const auto& entry) { auto status =
io.DeleteFile(entry.metadata_file); });
+ }
+}
+
+int TableMetadataUtil::ParseVersionFromLocation(
Review Comment:
```suggestion
int32_t TableMetadataUtil::ParseVersionFromLocation(
```
##########
src/iceberg/table_metadata.cc:
##########
@@ -258,13 +291,88 @@ Result<std::unique_ptr<TableMetadata>>
TableMetadataUtil::Read(
return TableMetadataFromJson(json);
}
+Status TableMetadataUtil::Write(FileIO& io, const TableMetadata* base,
+ TableMetadata& metadata) {
+ int version = -1;
+ if (base != nullptr && !base->metadata_file_location.empty()) {
+ // parse current version from location
+ version = ParseVersionFromLocation(base->metadata_file_location);
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(std::string new_file_location,
+ NewTableMetadataFilePath(metadata, version + 1));
+ ICEBERG_RETURN_UNEXPECTED(Write(io, new_file_location, metadata));
+ metadata.metadata_file_location = std::move(new_file_location);
+ return {};
+}
+
Status TableMetadataUtil::Write(FileIO& io, const std::string& location,
const TableMetadata& metadata) {
auto json = ToJson(metadata);
ICEBERG_ASSIGN_OR_RAISE(auto json_string, ToJsonString(json));
return io.WriteFile(location, json_string);
}
+void TableMetadataUtil::DeleteRemovedMetadataFiles(FileIO& io, const
TableMetadata* base,
+ const TableMetadata&
metadata) {
+ if (!base) {
+ return;
+ }
+
+ bool delete_after_commit =
TableProperties::kMetadataDeleteAfterCommitEnabled.value();
+ if (auto it = metadata.properties.find(
Review Comment:
I was thinking to change `TableMetadata::properties` to directly use
`std::shared_ptr<TableProperties>` then downstream can easily check any
property. I will put up a PR for this.
##########
src/iceberg/table_metadata.cc:
##########
@@ -258,13 +291,88 @@ Result<std::unique_ptr<TableMetadata>>
TableMetadataUtil::Read(
return TableMetadataFromJson(json);
}
+Status TableMetadataUtil::Write(FileIO& io, const TableMetadata* base,
+ TableMetadata& metadata) {
+ int version = -1;
+ if (base != nullptr && !base->metadata_file_location.empty()) {
+ // parse current version from location
+ version = ParseVersionFromLocation(base->metadata_file_location);
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(std::string new_file_location,
+ NewTableMetadataFilePath(metadata, version + 1));
+ ICEBERG_RETURN_UNEXPECTED(Write(io, new_file_location, metadata));
+ metadata.metadata_file_location = std::move(new_file_location);
+ return {};
+}
+
Status TableMetadataUtil::Write(FileIO& io, const std::string& location,
const TableMetadata& metadata) {
auto json = ToJson(metadata);
ICEBERG_ASSIGN_OR_RAISE(auto json_string, ToJsonString(json));
return io.WriteFile(location, json_string);
}
+void TableMetadataUtil::DeleteRemovedMetadataFiles(FileIO& io, const
TableMetadata* base,
+ const TableMetadata&
metadata) {
+ if (!base) {
+ return;
+ }
+
+ bool delete_after_commit =
TableProperties::kMetadataDeleteAfterCommitEnabled.value();
+ if (auto it = metadata.properties.find(
+ TableProperties::kMetadataDeleteAfterCommitEnabled.key());
+ it != metadata.properties.end()) {
+ delete_after_commit =
+ StringUtils::EqualsIgnoreCase(it->second, "true") || it->second == "1";
+ }
+
+ if (delete_after_commit) {
+ auto current_files =
+ metadata.metadata_log |
+ std::ranges::to<std::unordered_set<MetadataLogEntry,
MetadataLogEntry::Hasher>>();
+ std::ranges::for_each(
+ base->metadata_log | std::views::filter([¤t_files](const auto&
entry) {
+ return !current_files.contains(entry);
+ }),
+ [&io](const auto& entry) { auto status =
io.DeleteFile(entry.metadata_file); });
+ }
+}
+
+int TableMetadataUtil::ParseVersionFromLocation(
+ const std::string_view& metadata_location) {
+ size_t version_start = metadata_location.find_last_of('/') + 1;
Review Comment:
Is this correct? What if `/` does not exist?
##########
src/iceberg/table_metadata.h:
##########
@@ -458,14 +467,40 @@ enum class ICEBERG_EXPORT MetadataFileCodecType {
kGzip,
};
-/// \brief Utility class for table metadata
-struct ICEBERG_EXPORT TableMetadataUtil {
+struct ICEBERG_EXPORT CodecTypeUtil {
+ /// \brief Returns the MetadataFileCodecType corresponding to the given
string.
+ ///
+ /// \param name The string to parse.
+ /// \return The MetadataFileCodecType corresponding to the given string.
+ static Result<MetadataFileCodecType> CodecFromString(const std::string_view&
name);
+
/// \brief Get the codec type from the table metadata file name.
///
/// \param file_name The name of the table metadata file.
/// \return The codec type of the table metadata file.
static Result<MetadataFileCodecType> CodecFromFileName(std::string_view
file_name);
+ /// \brief Get the file extension from the codec type.
+ /// \param codec The codec name.
+ /// \return The file extension of the codec.
+ static Result<std::string> CodecNameToFileExtension(const std::string_view&
codec);
Review Comment:
```suggestion
static Result<std::string> CodecNameToFileExtension(std::string_view
codec);
```
##########
src/iceberg/table_metadata.h:
##########
@@ -58,6 +59,12 @@ struct ICEBERG_EXPORT MetadataLogEntry {
friend bool operator==(const MetadataLogEntry& lhs, const MetadataLogEntry&
rhs) {
return lhs.timestamp_ms == rhs.timestamp_ms && lhs.metadata_file ==
rhs.metadata_file;
}
+
+ struct Hasher {
Review Comment:
Why do we need to add this? Why not do this for `SnapshotLogEntry`?
##########
src/iceberg/table_metadata.h:
##########
@@ -476,13 +511,56 @@ struct ICEBERG_EXPORT TableMetadataUtil {
class FileIO& io, const std::string& location,
std::optional<size_t> length = std::nullopt);
+ /// \brief Write a new metadata file to storage.
+ ///
+ /// Serializes the table metadata to JSON and writes it to a new metadata
+ /// file. If no location is specified in the metadata, generates a new
+ /// file path based on the version number.
+ ///
+ /// \param io The FileIO instance for writing files
+ /// \param base The base metadata (can be null for new tables)
+ /// \param metadata The metadata to write, which will be updated with the
new location
+ static Status Write(FileIO& io, const TableMetadata* base, TableMetadata&
metadata);
+
+ /// \brief Delete removed metadata files based on retention policy.
+ ///
+ /// Removes obsolete metadata files that are no longer referenced in the
+ /// current metadata log, based on the metadata.delete-after-commit.enabled
+ /// property.
+ ///
+ /// \param io The FileIO instance for deleting files
+ /// \param base The previous metadata version
+ /// \param metadata The current metadata containing the updated log
+ static void DeleteRemovedMetadataFiles(FileIO& io, const TableMetadata* base,
+ const TableMetadata& metadata);
+
/// \brief Write the table metadata to a file.
///
/// \param io The file IO to use to write the table metadata.
/// \param location The location of the table metadata file.
/// \param metadata The table metadata to write.
static Status Write(FileIO& io, const std::string& location,
const TableMetadata& metadata);
+
+ private:
+ /// \brief Parse the version number from a metadata file location.
+ ///
+ /// Extracts the version number from a metadata file path which follows
+ /// the format: vvvvv-uuid.metadata.json where vvvvv is the zero-padded
+ /// version number.
+ ///
+ /// \param metadata_location The metadata file location string
+ /// \return The parsed version number, or -1 if parsing fails or the
+ /// location doesn't contain a version
+ static int ParseVersionFromLocation(const std::string_view&
metadata_location);
Review Comment:
```suggestion
static int ParseVersionFromLocation(std::string_view metadata_location);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]