gty404 commented on code in PR #408:
URL: https://github.com/apache/iceberg-cpp/pull/408#discussion_r2660227532
##########
src/iceberg/table_metadata.cc:
##########
@@ -982,6 +994,158 @@ void
TableMetadataBuilder::Impl::SetLocation(std::string_view location) {
changes_.push_back(std::make_unique<table::SetLocation>(std::string(location)));
}
+Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr<Snapshot>
snapshot) {
+ if (snapshot == nullptr) {
+ // change is a noop
+ return {};
+ }
+ ICEBERG_CHECK(!metadata_.schemas.empty(),
+ "Attempting to add a snapshot before a schema is added");
+ ICEBERG_CHECK(!metadata_.partition_specs.empty(),
+ "Attempting to add a snapshot before a partition spec is
added");
+ ICEBERG_CHECK(!metadata_.sort_orders.empty(),
+ "Attempting to add a snapshot before a sort order is added");
+
+ ICEBERG_CHECK(!snapshots_by_id_.contains(snapshot->snapshot_id),
+ "Snapshot already exists for id: {}", snapshot->snapshot_id);
+
+ ICEBERG_CHECK(
+ metadata_.format_version == 1 ||
+ snapshot->sequence_number > metadata_.last_sequence_number ||
+ !snapshot->parent_snapshot_id.has_value(),
+ "Cannot add snapshot with sequence number {} older than last sequence
number {}",
+ snapshot->sequence_number, metadata_.last_sequence_number);
+
+ metadata_.last_updated_ms = snapshot->timestamp_ms;
+ metadata_.last_sequence_number = snapshot->sequence_number;
+
+ metadata_.snapshots.push_back(snapshot);
+ snapshots_by_id_.emplace(snapshot->snapshot_id, snapshot);
+
+ changes_.push_back(std::make_unique<table::AddSnapshot>(snapshot));
+
+ // Handle row lineage for format version >= 3
+ if (metadata_.format_version >= TableMetadata::kMinFormatVersionRowLineage) {
+ auto first_row_id = snapshot->FirstRowId();
+ ICEBERG_CHECK(first_row_id.has_value(),
+ "Cannot add a snapshot: first-row-id is null");
+ ICEBERG_CHECK(
+ first_row_id.value() >= metadata_.next_row_id,
+ "Cannot add a snapshot, first-row-id is behind table next-row-id: {} <
{}",
+ first_row_id.value(), metadata_.next_row_id);
+
+ metadata_.next_row_id += snapshot->AddedRows().value_or(0);
+ }
+
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::SetBranchSnapshot(int64_t snapshot_id,
+ const std::string&
branch) {
+ // Check if ref already exists with the same snapshot ID
+ auto ref_it = metadata_.refs.find(branch);
+ if (ref_it != metadata_.refs.end() && ref_it->second->snapshot_id ==
snapshot_id) {
+ return {};
+ }
+
+ auto snapshot_it = snapshots_by_id_.find(snapshot_id);
+ ICEBERG_PRECHECK(snapshot_it != snapshots_by_id_.end(),
+ "Cannot set {} to unknown snapshot: {}", branch,
snapshot_id);
+ const auto& snapshot = snapshot_it->second;
+
+ // If ref exists, validate it's a branch and check if snapshot ID matches
+ if (ref_it != metadata_.refs.end()) {
+ const auto& ref = ref_it->second;
+ ICEBERG_CHECK(ref->type() == SnapshotRefType::kBranch,
+ "Cannot update branch: {} is a tag", branch);
+ if (ref->snapshot_id == snapshot_id) {
+ return {};
+ }
+ }
+
+ ICEBERG_CHECK(
+ metadata_.format_version == 1 ||
+ snapshot->sequence_number <= metadata_.last_sequence_number,
+ "Last sequence number {} is less than existing snapshot sequence number
{}",
+ metadata_.last_sequence_number, snapshot->sequence_number);
+
+ // Create new ref: either from existing ref or create new branch ref
+ std::shared_ptr<SnapshotRef> new_ref;
+ if (ref_it != metadata_.refs.end()) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto ref_result,
+ SnapshotRef::Builder::BuilderFrom(*ref_it->second,
snapshot_id).Build());
+ new_ref = std::make_shared<SnapshotRef>(std::move(ref_result));
+ } else {
+ ICEBERG_ASSIGN_OR_RAISE(auto ref_result,
+
SnapshotRef::Builder::BranchBuilder(snapshot_id).Build());
+ new_ref = std::make_shared<SnapshotRef>(std::move(ref_result));
+ }
+
+ return SetRef(branch, std::move(new_ref));
+}
+
+Status TableMetadataBuilder::Impl::SetRef(const std::string& name,
+ std::shared_ptr<SnapshotRef> ref) {
+ auto existing_ref_it = metadata_.refs.find(name);
+ if (existing_ref_it != metadata_.refs.end() && *existing_ref_it->second ==
*ref) {
+ return {};
+ }
+
+ int64_t snapshot_id = ref->snapshot_id;
+ auto snapshot_it = snapshots_by_id_.find(snapshot_id);
+ ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(),
+ "Cannot set {} to unknown snapshot: {}", name, snapshot_id);
+ const auto& snapshot = snapshot_it->second;
+
+ // If snapshot was added in this set of changes, update last_updated_ms
+ if (std::ranges::any_of(changes_, [snapshot_id](const auto& change) {
+ if (change->kind() != TableUpdate::Kind::kAddSnapshot) {
+ return false;
+ }
+ const auto* add_snapshot =
+ internal::checked_cast<const table::AddSnapshot*>(change.get());
+ return add_snapshot->snapshot()->snapshot_id == snapshot_id;
+ })) {
+ metadata_.last_updated_ms = snapshot->timestamp_ms;
+ }
+
+ // If it's MAIN_BRANCH, update currentSnapshotId and add to snapshotLog
+ if (name == SnapshotRef::kMainBranch) {
+ metadata_.current_snapshot_id = ref->snapshot_id;
+ if (metadata_.last_updated_ms == kInvalidLastUpdatedMs) {
+ metadata_.last_updated_ms =
+ TimePointMs{std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())};
+ }
+ metadata_.snapshot_log.emplace_back(metadata_.last_updated_ms,
ref->snapshot_id);
+ }
+
+ // Set the ref
+ metadata_.refs[name] = ref;
+
+ // Extract properties from SnapshotRef for the change
+ std::optional<int32_t> min_snapshots_to_keep = std::nullopt;
+ std::optional<int64_t> max_snapshot_age_ms = std::nullopt;
+ std::optional<int64_t> max_ref_age_ms = std::nullopt;
+
+ if (ref->type() == SnapshotRefType::kBranch) {
+ const auto& branch = std::get<SnapshotRef::Branch>(ref->retention);
+ min_snapshots_to_keep = branch.min_snapshots_to_keep;
+ max_snapshot_age_ms = branch.max_snapshot_age_ms;
+ max_ref_age_ms = branch.max_ref_age_ms;
+ } else {
+ const auto& tag = std::get<SnapshotRef::Tag>(ref->retention);
+ max_ref_age_ms = tag.max_ref_age_ms;
+ }
+
+ changes_.push_back(std::make_unique<table::SetSnapshotRef>(
Review Comment:
How about modifying the construction parameters of SetSnapshotRef to accept
SnapshotRef?
##########
src/iceberg/snapshot.cc:
##########
@@ -75,6 +78,24 @@ std::optional<std::string_view> Snapshot::operation() const {
return std::nullopt;
}
+std::optional<int64_t> Snapshot::FirstRowId() const {
+ auto it = summary.find("first-row-id");
Review Comment:
添加新的 kFirstRowId 到 SnapshotSummaryFields 里?
##########
src/iceberg/snapshot.cc:
##########
@@ -141,4 +162,103 @@ Result<std::span<ManifestFile>>
SnapshotCache::DeleteManifests(
return std::span<ManifestFile>(cache.first.data() + delete_start,
delete_count);
}
+// SnapshotRef::Builder implementation
+
+SnapshotRef::Builder::Builder(SnapshotRefType type, int64_t snapshot_id)
+ : type_(type), snapshot_id_(snapshot_id) {}
+
+SnapshotRef::Builder SnapshotRef::Builder::TagBuilder(int64_t snapshot_id) {
+ return Builder(SnapshotRefType::kTag, snapshot_id);
+}
+
+SnapshotRef::Builder SnapshotRef::Builder::BranchBuilder(int64_t snapshot_id) {
+ return Builder(SnapshotRefType::kBranch, snapshot_id);
+}
+
+SnapshotRef::Builder SnapshotRef::Builder::BuilderFor(int64_t snapshot_id,
+ SnapshotRefType type) {
+ return Builder(type, snapshot_id);
+}
+
+SnapshotRef::Builder SnapshotRef::Builder::BuilderFrom(const SnapshotRef& ref)
{
+ Builder builder(ref.type(), ref.snapshot_id);
+ if (ref.type() == SnapshotRefType::kBranch) {
+ const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
+ builder.min_snapshots_to_keep_ = branch.min_snapshots_to_keep;
+ builder.max_snapshot_age_ms_ = branch.max_snapshot_age_ms;
+ builder.max_ref_age_ms_ = branch.max_ref_age_ms;
+ } else {
+ const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
+ builder.max_ref_age_ms_ = tag.max_ref_age_ms;
+ }
+ return builder;
+}
+
+SnapshotRef::Builder SnapshotRef::Builder::BuilderFrom(const SnapshotRef& ref,
+ int64_t snapshot_id) {
+ Builder builder(ref.type(), snapshot_id);
+ if (ref.type() == SnapshotRefType::kBranch) {
+ const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
+ builder.min_snapshots_to_keep_ = branch.min_snapshots_to_keep;
+ builder.max_snapshot_age_ms_ = branch.max_snapshot_age_ms;
+ builder.max_ref_age_ms_ = branch.max_ref_age_ms;
+ } else {
+ const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
+ builder.max_ref_age_ms_ = tag.max_ref_age_ms;
+ }
+ return builder;
+}
+
+SnapshotRef::Builder& SnapshotRef::Builder::MinSnapshotsToKeep(
+ std::optional<int32_t> value) {
+ if (type_ == SnapshotRefType::kTag && value.has_value()) {
Review Comment:
```suggestion
ICEBERG_BUILDER_CHECK(type_ != SnapshotRefType::kTag ||
!value.has_value(), "Tags do not support setting minSnapshotsToKeep");
```
##########
src/iceberg/snapshot.cc:
##########
@@ -75,6 +78,24 @@ std::optional<std::string_view> Snapshot::operation() const {
return std::nullopt;
}
+std::optional<int64_t> Snapshot::FirstRowId() const {
+ auto it = summary.find("first-row-id");
+ if (it == summary.end()) {
+ return std::nullopt;
+ }
+
+ return StringUtils::ParseInt<int64_t>(it->second);
+}
+
+std::optional<int64_t> Snapshot::AddedRows() const {
+ auto it = summary.find("added-rows");
Review Comment:
SnapshotSummaryFields::kAddedRows
##########
src/iceberg/table_metadata.cc:
##########
Review Comment:
We also need to rebuild the SnapshotLog list in
TableMetadataBuilder::Impl::Build.
##########
src/iceberg/snapshot.cc:
##########
@@ -141,4 +162,103 @@ Result<std::span<ManifestFile>>
SnapshotCache::DeleteManifests(
return std::span<ManifestFile>(cache.first.data() + delete_start,
delete_count);
}
+// SnapshotRef::Builder implementation
+
+SnapshotRef::Builder::Builder(SnapshotRefType type, int64_t snapshot_id)
+ : type_(type), snapshot_id_(snapshot_id) {}
+
+SnapshotRef::Builder SnapshotRef::Builder::TagBuilder(int64_t snapshot_id) {
+ return Builder(SnapshotRefType::kTag, snapshot_id);
+}
+
+SnapshotRef::Builder SnapshotRef::Builder::BranchBuilder(int64_t snapshot_id) {
+ return Builder(SnapshotRefType::kBranch, snapshot_id);
+}
+
+SnapshotRef::Builder SnapshotRef::Builder::BuilderFor(int64_t snapshot_id,
+ SnapshotRefType type) {
+ return Builder(type, snapshot_id);
+}
+
+SnapshotRef::Builder SnapshotRef::Builder::BuilderFrom(const SnapshotRef& ref)
{
+ Builder builder(ref.type(), ref.snapshot_id);
Review Comment:
```suggestion
return BuilderFrom(ref, ref.snapshot_id);
```
##########
src/iceberg/snapshot.h:
##########
@@ -119,6 +120,67 @@ struct ICEBERG_EXPORT SnapshotRef {
return lhs.Equals(rhs);
}
+ /// \brief Builder class for constructing SnapshotRef objects
+ class ICEBERG_EXPORT Builder : public ErrorCollector {
+ public:
+ /// \brief Create a builder for a tag reference
+ /// \param snapshot_id The snapshot ID for the tag
+ /// \return A new Builder instance for a tag
+ static Builder TagBuilder(int64_t snapshot_id);
+
+ /// \brief Create a builder for a branch reference
+ /// \param snapshot_id The snapshot ID for the branch
+ /// \return A new Builder instance for a branch
+ static Builder BranchBuilder(int64_t snapshot_id);
+
+ /// \brief Create a builder from an existing SnapshotRef
+ /// \param ref The existing reference to copy properties from
+ /// \return A new Builder instance with properties from the existing ref
+ static Builder BuilderFrom(const SnapshotRef& ref);
+
+ /// \brief Create a builder from an existing SnapshotRef with a new
snapshot ID
+ /// \param ref The existing reference to copy properties from
+ /// \param snapshot_id The new snapshot ID to use
+ /// \return A new Builder instance with properties from the existing ref
but new
+ /// snapshot ID
+ static Builder BuilderFrom(const SnapshotRef& ref, int64_t snapshot_id);
+
+ /// \brief Create a builder for a specific type
+ /// \param snapshot_id The snapshot ID
+ /// \param type The type of reference (branch or tag)
+ /// \return A new Builder instance
+ static Builder BuilderFor(int64_t snapshot_id, SnapshotRefType type);
+
+ /// \brief Set the minimum number of snapshots to keep (branch only)
+ /// \param value The minimum number of snapshots to keep, or nullopt for
default
+ /// \return Reference to this builder for method chaining
+ Builder& MinSnapshotsToKeep(std::optional<int32_t> value);
+
+ /// \brief Set the maximum snapshot age in milliseconds (branch only)
+ /// \param value The maximum snapshot age in milliseconds, or nullopt for
default
+ /// \return Reference to this builder for method chaining
+ Builder& MaxSnapshotAgeMs(std::optional<int64_t> value);
+
+ /// \brief Set the maximum reference age in milliseconds
+ /// \param value The maximum reference age in milliseconds, or nullopt for
default
+ /// \return Reference to this builder for method chaining
+ Builder& MaxRefAgeMs(std::optional<int64_t> value);
+
+ /// \brief Build the SnapshotRef
+ /// \return A Result containing the SnapshotRef instance, or an error if
validation
+ /// failed
+ Result<SnapshotRef> Build() const;
+
+ private:
+ explicit Builder(SnapshotRefType type, int64_t snapshot_id);
+
+ SnapshotRefType type_;
+ int64_t snapshot_id_;
+ std::optional<int32_t> min_snapshots_to_keep_;
Review Comment:
Should we use Branch/Tag directly ?
std::optional<SnapshotRef::Branch> branch_;
std::optional<SnapshotRef::Tag> tag_;
##########
src/iceberg/snapshot.cc:
##########
@@ -141,4 +162,103 @@ Result<std::span<ManifestFile>>
SnapshotCache::DeleteManifests(
return std::span<ManifestFile>(cache.first.data() + delete_start,
delete_count);
}
+// SnapshotRef::Builder implementation
+
+SnapshotRef::Builder::Builder(SnapshotRefType type, int64_t snapshot_id)
+ : type_(type), snapshot_id_(snapshot_id) {}
+
+SnapshotRef::Builder SnapshotRef::Builder::TagBuilder(int64_t snapshot_id) {
+ return Builder(SnapshotRefType::kTag, snapshot_id);
+}
+
+SnapshotRef::Builder SnapshotRef::Builder::BranchBuilder(int64_t snapshot_id) {
+ return Builder(SnapshotRefType::kBranch, snapshot_id);
+}
+
+SnapshotRef::Builder SnapshotRef::Builder::BuilderFor(int64_t snapshot_id,
+ SnapshotRefType type) {
+ return Builder(type, snapshot_id);
+}
+
+SnapshotRef::Builder SnapshotRef::Builder::BuilderFrom(const SnapshotRef& ref)
{
+ Builder builder(ref.type(), ref.snapshot_id);
+ if (ref.type() == SnapshotRefType::kBranch) {
+ const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
+ builder.min_snapshots_to_keep_ = branch.min_snapshots_to_keep;
+ builder.max_snapshot_age_ms_ = branch.max_snapshot_age_ms;
+ builder.max_ref_age_ms_ = branch.max_ref_age_ms;
+ } else {
+ const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
+ builder.max_ref_age_ms_ = tag.max_ref_age_ms;
+ }
+ return builder;
+}
+
+SnapshotRef::Builder SnapshotRef::Builder::BuilderFrom(const SnapshotRef& ref,
+ int64_t snapshot_id) {
+ Builder builder(ref.type(), snapshot_id);
+ if (ref.type() == SnapshotRefType::kBranch) {
+ const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
+ builder.min_snapshots_to_keep_ = branch.min_snapshots_to_keep;
+ builder.max_snapshot_age_ms_ = branch.max_snapshot_age_ms;
+ builder.max_ref_age_ms_ = branch.max_ref_age_ms;
+ } else {
+ const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
+ builder.max_ref_age_ms_ = tag.max_ref_age_ms;
+ }
+ return builder;
+}
+
+SnapshotRef::Builder& SnapshotRef::Builder::MinSnapshotsToKeep(
+ std::optional<int32_t> value) {
+ if (type_ == SnapshotRefType::kTag && value.has_value()) {
+ return AddError(ErrorKind::kInvalidArgument,
+ "Tags do not support setting minSnapshotsToKeep");
+ }
+ if (value.has_value() && value.value() <= 0) {
Review Comment:
The following can also be changed to use ICEBERG_BUILDER_CHECK
##########
src/iceberg/update/snapshot_update.cc:
##########
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/snapshot_update.h"
+
+#include <charconv>
+#include <chrono>
+#include <format>
+
+#include "iceberg/file_io.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/manifest/rolling_manifest_writer.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/string_util.h"
+#include "iceberg/util/uuid.h"
+
+namespace iceberg {
+
+SnapshotUpdate::SnapshotUpdate(std::shared_ptr<Transaction> transaction)
+ : PendingUpdate(std::move(transaction)) {
+ target_manifest_size_bytes_ =
+
transaction_->current().properties.Get(TableProperties::kManifestTargetSizeBytes);
+
+ // For format version 1, check if snapshot ID inheritance is enabled
+ if (transaction_->current().format_version == 1) {
+ can_inherit_snapshot_id_ = transaction_->current().properties.Get(
+ TableProperties::kSnapshotIdInheritanceEnabled);
+ }
+
+ // Generate commit UUID
+ commit_uuid_ = Uuid::GenerateV7().ToString();
+
+ // Initialize delete function if not set
+ if (!delete_func_) {
+ delete_func_ = [this](const std::string& path) {
+ return transaction_->table()->io()->DeleteFile(path);
+ };
+ }
+}
+
+Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDataManifests(
+ const std::vector<DataFile>& data_files, const
std::shared_ptr<PartitionSpec>& spec) {
+ if (data_files.empty()) {
+ return std::vector<ManifestFile>{};
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto current_schema,
transaction_->current().Schema());
+
+ int8_t format_version = transaction_->current().format_version;
+ std::optional<int64_t> snapshot_id =
+ snapshot_id_ ? std::make_optional(*snapshot_id_) : std::nullopt;
+
+ // Create factory function for rolling manifest writer
+ RollingManifestWriter::ManifestWriterFactory factory =
+ [this, spec, current_schema, format_version,
+ snapshot_id]() -> Result<std::unique_ptr<ManifestWriter>> {
+ std::string manifest_path = ManifestPath();
+
+ if (format_version == 1) {
Review Comment:
The related judgments for the format version (format_version) are scattered
across multiple methods. Each version's logic exists in the form of if-else,
encapsulating the processing logic of each version into independent
implementations to avoid the expansion of if-else branches?
##########
src/iceberg/update/snapshot_update.cc:
##########
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/snapshot_update.h"
+
+#include <charconv>
+#include <chrono>
+#include <format>
+
+#include "iceberg/file_io.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/manifest/rolling_manifest_writer.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/string_util.h"
+#include "iceberg/util/uuid.h"
+
+namespace iceberg {
+
+SnapshotUpdate::SnapshotUpdate(std::shared_ptr<Transaction> transaction)
+ : PendingUpdate(std::move(transaction)) {
+ target_manifest_size_bytes_ =
+
transaction_->current().properties.Get(TableProperties::kManifestTargetSizeBytes);
+
+ // For format version 1, check if snapshot ID inheritance is enabled
+ if (transaction_->current().format_version == 1) {
+ can_inherit_snapshot_id_ = transaction_->current().properties.Get(
+ TableProperties::kSnapshotIdInheritanceEnabled);
+ }
+
+ // Generate commit UUID
+ commit_uuid_ = Uuid::GenerateV7().ToString();
+
+ // Initialize delete function if not set
+ if (!delete_func_) {
+ delete_func_ = [this](const std::string& path) {
+ return transaction_->table()->io()->DeleteFile(path);
+ };
+ }
+}
+
+Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDataManifests(
+ const std::vector<DataFile>& data_files, const
std::shared_ptr<PartitionSpec>& spec) {
+ if (data_files.empty()) {
+ return std::vector<ManifestFile>{};
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto current_schema,
transaction_->current().Schema());
+
+ int8_t format_version = transaction_->current().format_version;
+ std::optional<int64_t> snapshot_id =
+ snapshot_id_ ? std::make_optional(*snapshot_id_) : std::nullopt;
+
+ // Create factory function for rolling manifest writer
+ RollingManifestWriter::ManifestWriterFactory factory =
+ [this, spec, current_schema, format_version,
+ snapshot_id]() -> Result<std::unique_ptr<ManifestWriter>> {
+ std::string manifest_path = ManifestPath();
+
+ if (format_version == 1) {
+ return ManifestWriter::MakeV1Writer(
+ snapshot_id, manifest_path, transaction_->table()->io(), spec,
current_schema);
+ } else if (format_version == 2) {
+ return ManifestWriter::MakeV2Writer(snapshot_id, manifest_path,
+ transaction_->table()->io(), spec,
+ current_schema,
ManifestContent::kData);
+ } else { // format_version == 3
+ std::optional<int64_t> first_row_id =
+ transaction_->table()->metadata()->next_row_id;
+ return ManifestWriter::MakeV3Writer(snapshot_id, first_row_id,
manifest_path,
+ transaction_->table()->io(), spec,
+ current_schema,
ManifestContent::kData);
+ }
+ };
+
+ // Create rolling manifest writer
+ RollingManifestWriter rolling_writer(factory, target_manifest_size_bytes_);
+
+ // Write all files
+ for (const auto& file : data_files) {
+ ICEBERG_RETURN_UNEXPECTED(
+ rolling_writer.WriteAddedEntry(std::make_shared<DataFile>(file)));
+ }
+
+ // Close the rolling writer
+ ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
+
+ // Get all manifest files
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_files,
rolling_writer.ToManifestFiles());
+
+ return manifest_files;
+}
+
+Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDeleteManifests(
+ const std::vector<DataFile>& delete_files,
+ const std::shared_ptr<PartitionSpec>& spec) {
+ if (delete_files.empty()) {
+ return std::vector<ManifestFile>{};
+ }
+
+ int8_t format_version = transaction_->current().format_version;
+ if (format_version < 2) {
+ // Delete manifests are only supported in format version 2+
+ return std::vector<ManifestFile>{};
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto current_schema,
transaction_->current().Schema());
+
+ std::optional<int64_t> snapshot_id =
+ snapshot_id_ ? std::make_optional(*snapshot_id_) : std::nullopt;
+
+ // Create factory function for rolling manifest writer
+ RollingManifestWriter::ManifestWriterFactory factory =
+ [this, spec, current_schema, format_version,
+ snapshot_id]() -> Result<std::unique_ptr<ManifestWriter>> {
+ std::string manifest_path = ManifestPath();
+
+ if (format_version == 2) {
+ return ManifestWriter::MakeV2Writer(snapshot_id, manifest_path,
+ transaction_->table()->io(), spec,
+ current_schema,
ManifestContent::kDeletes);
+ } else { // format_version == 3
+ std::optional<int64_t> first_row_id =
+ transaction_->table()->metadata()->next_row_id;
+ return ManifestWriter::MakeV3Writer(snapshot_id, first_row_id,
manifest_path,
+ transaction_->table()->io(), spec,
+ current_schema,
ManifestContent::kDeletes);
+ }
+ };
+
+ // Create rolling manifest writer
+ RollingManifestWriter rolling_writer(factory, target_manifest_size_bytes_);
+
+ // Write all delete files
+ for (const auto& file : delete_files) {
+ ICEBERG_RETURN_UNEXPECTED(
+ rolling_writer.WriteAddedEntry(std::make_shared<DataFile>(file)));
+ }
+
+ // Close the rolling writer
+ ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
+
+ // Get all manifest files
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_files,
rolling_writer.ToManifestFiles());
+
+ return manifest_files;
+}
+
+int64_t SnapshotUpdate::SnapshotId() {
+ if (snapshot_id_.has_value()) {
+ return *snapshot_id_;
+ }
+ snapshot_id_ = SnapshotUtil::GenerateSnapshotId(transaction_->current());
+ return *snapshot_id_;
+}
+
+Result<SnapshotUpdate::ApplyResult> SnapshotUpdate::Apply() {
+ ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+ // Get the latest snapshot for the target branch
+ std::shared_ptr<Snapshot> parent_snapshot;
+ std::optional<int64_t> parent_snapshot_id;
+ auto parent_snapshot_result =
+ SnapshotUtil::LatestSnapshot(transaction_->current(), target_branch_);
+ if (!parent_snapshot_result.has_value()) [[unlikely]] {
+ if (parent_snapshot_result.error().kind == ErrorKind::kNotFound) {
+ parent_snapshot_id = std::nullopt;
+ }
+ return std::unexpected<Error>(parent_snapshot_result.error());
+ } else {
+ parent_snapshot = *parent_snapshot_result;
+ parent_snapshot_id = parent_snapshot->snapshot_id;
+ }
+ int64_t sequence_number = transaction_->current().NextSequenceNumber();
+
+ ICEBERG_RETURN_UNEXPECTED(Validate(transaction_->current(),
parent_snapshot));
+
+ std::vector<ManifestFile> manifests = Apply(transaction_->current(),
parent_snapshot);
+
+ std::string manifest_list_path = ManifestListPath();
+ manifest_lists_.push_back(manifest_list_path);
+
+ // Create manifest list writer based on format version
+ int8_t format_version = transaction_->current().format_version;
+ int64_t snapshot_id = SnapshotId();
+ std::unique_ptr<ManifestListWriter> writer;
+
+ if (format_version == 1) {
+ ICEBERG_ASSIGN_OR_RAISE(writer, ManifestListWriter::MakeV1Writer(
+ snapshot_id, parent_snapshot_id,
+ manifest_list_path,
transaction_->table()->io()));
+ } else if (format_version == 2) {
+ ICEBERG_ASSIGN_OR_RAISE(writer, ManifestListWriter::MakeV2Writer(
+ snapshot_id, parent_snapshot_id,
sequence_number,
+ manifest_list_path,
transaction_->table()->io()));
+ } else { // format_version == 3
+ int64_t first_row_id = transaction_->current().next_row_id;
+ ICEBERG_ASSIGN_OR_RAISE(
+ writer, ManifestListWriter::MakeV3Writer(
+ snapshot_id, parent_snapshot_id, sequence_number,
first_row_id,
+ manifest_list_path, transaction_->table()->io()));
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests));
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+
+ // Get nextRowId and assignedRows for format version 3
+ std::optional<int64_t> next_row_id;
+ std::optional<int64_t> assigned_rows;
+ if (format_version >= 3) {
+ next_row_id = transaction_->current().next_row_id;
+ if (writer->next_row_id().has_value()) {
+ assigned_rows = writer->next_row_id().value() - next_row_id.value();
+ }
+ }
+
+ std::unordered_map<std::string, std::string> summary = Summary();
+ std::string op = operation();
+
+ if (!op.empty() && op == DataOperation::kReplace) {
+ auto added_records_it = summary.find(SnapshotSummaryFields::kAddedRecords);
+ auto replaced_records_it =
summary.find(SnapshotSummaryFields::kDeletedRecords);
+ if (added_records_it != summary.end() && replaced_records_it !=
summary.end()) {
+ auto added_records =
StringUtils::ParseInt<int64_t>(added_records_it->second);
+ auto replaced_records =
StringUtils::ParseInt<int64_t>(replaced_records_it->second);
+ if (added_records.has_value() && replaced_records.has_value() &&
+ added_records.value() > replaced_records.value()) {
+ return InvalidArgument(
+ "Invalid REPLACE operation: {} added records > {} replaced
records",
+ added_records.value(), replaced_records.value());
+ }
+ }
+ }
+
+ summary = ComputeSummary(transaction_->current());
+
+ // Get current time
+ auto now = std::chrono::system_clock::now();
+ auto duration_since_epoch = now.time_since_epoch();
+ TimePointMs timestamp_ms =
std::chrono::time_point_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::time_point(duration_since_epoch));
+
+ // Get schema ID
+ std::optional<int32_t> schema_id = transaction_->current().current_schema_id;
+
+ // Create snapshot
+ staged_snapshot_ =
+ std::make_shared<Snapshot>(Snapshot{.snapshot_id = snapshot_id,
+ .parent_snapshot_id =
parent_snapshot_id,
+ .sequence_number = sequence_number,
+ .timestamp_ms = timestamp_ms,
+ .manifest_list = manifest_list_path,
+ .summary = std::move(summary),
+ .schema_id = schema_id});
+
+ // Return the new snapshot
+ return ApplyResult{.snapshot = staged_snapshot_,
+ .target_branch = target_branch_,
+ .stage_only = stage_only_};
+}
+
+Status SnapshotUpdate::Finalize() {
+ // Cleanup after successful commit
+ if (cleanup_after_commit()) {
+ auto cached_snapshot = SnapshotCache(staged_snapshot_.get());
Review Comment:
`staged_snapshot` might be null if an exception occurs in the `apply` method
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]