WZhuo commented on code in PR #437:
URL: https://github.com/apache/iceberg-cpp/pull/437#discussion_r2646514543
##########
src/iceberg/schema.cc:
##########
@@ -228,4 +229,43 @@ Result<std::vector<std::string>>
Schema::IdentifierFieldNames() const {
return names;
}
+Result<int32_t> Schema::HighestFieldId() const {
+ ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));
+
+ if (id_to_field.get().empty()) {
+ return kInitialColumnId;
+ }
+
+ auto max_it = std::ranges::max_element(
+ id_to_field.get(),
+ [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });
Review Comment:
Wrapper with a Lazy or a simple std::call_once? Avoid search every time.
##########
src/iceberg/schema.cc:
##########
@@ -30,6 +30,7 @@
#include "iceberg/util/macros.h"
#include "iceberg/util/type_util.h"
#include "iceberg/util/visit_type.h"
+#include "table_metadata.h"
Review Comment:
```suggestion
#include "iceberg/table_metadata.h"
```
##########
src/iceberg/table_metadata.cc:
##########
@@ -608,7 +631,124 @@ Status TableMetadataBuilder::Impl::RemoveProperties(
return {};
}
-std::unique_ptr<TableMetadata> TableMetadataBuilder::Impl::Build() {
+Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
+ if (schema_id == kLastAdded) {
+ if (!last_added_schema_id_.has_value()) {
+ return InvalidArgument("Cannot set last added schema: no schema has been
added");
+ }
+ return SetCurrentSchema(last_added_schema_id_.value());
+ }
+
+ if (metadata_.current_schema_id == schema_id) {
+ return {};
+ }
+
+ auto it = schemas_by_id_.find(schema_id);
+ if (it == schemas_by_id_.end()) {
+ return InvalidArgument("Cannot set current schema to unknown schema: {}",
schema_id);
+ }
+ const auto& schema = it->second;
+
+ // Rebuild all partition specs for the new current schema
+ std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
+ for (const auto& spec : metadata_.partition_specs) {
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema,
*spec));
+ updated_specs.push_back(std::move(updated_spec));
+ }
+ metadata_.partition_specs = std::move(updated_specs);
+ specs_by_id_.clear();
+ for (const auto& spec : metadata_.partition_specs) {
+ specs_by_id_.emplace(spec->spec_id(), spec);
+ }
+
+ // Rebuild all sort orders for the new current schema
+ std::vector<std::shared_ptr<SortOrder>> updated_orders;
+ for (const auto& order : metadata_.sort_orders) {
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema,
*order));
+ updated_orders.push_back(std::move(updated_order));
+ }
+ metadata_.sort_orders = std::move(updated_orders);
+ sort_orders_by_id_.clear();
+ for (const auto& order : metadata_.sort_orders) {
+ sort_orders_by_id_.emplace(order->order_id(), order);
+ }
+
+ // Set the current schema ID
+ metadata_.current_schema_id = schema_id;
+
+ // Record the change
+ if (last_added_schema_id_.has_value() && last_added_schema_id_.value() ==
schema_id) {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+ } else {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(schema_id));
+ }
+
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSchemas(const std::vector<int32_t>&
schema_ids) {
+ std::unordered_set<int32_t> schema_ids_to_remove(schema_ids.begin(),
schema_ids.end());
+ auto current_schema_id =
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+ if (!schema_ids_to_remove.contains(current_schema_id)) {
+ return InvalidArgument("Cannot remove current schema: {}",
current_schema_id);
+ }
Review Comment:
```suggestion
ICEBERG_PRECHECK(!schema_ids_to_remove.contains(current_schema_id), "Cannot
remove current schema: {}", current_schema_id);
```
##########
src/iceberg/table_metadata.cc:
##########
@@ -608,7 +631,124 @@ Status TableMetadataBuilder::Impl::RemoveProperties(
return {};
}
-std::unique_ptr<TableMetadata> TableMetadataBuilder::Impl::Build() {
+Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
+ if (schema_id == kLastAdded) {
+ if (!last_added_schema_id_.has_value()) {
+ return InvalidArgument("Cannot set last added schema: no schema has been
added");
+ }
+ return SetCurrentSchema(last_added_schema_id_.value());
+ }
+
+ if (metadata_.current_schema_id == schema_id) {
+ return {};
+ }
+
+ auto it = schemas_by_id_.find(schema_id);
+ if (it == schemas_by_id_.end()) {
+ return InvalidArgument("Cannot set current schema to unknown schema: {}",
schema_id);
+ }
+ const auto& schema = it->second;
+
+ // Rebuild all partition specs for the new current schema
+ std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
+ for (const auto& spec : metadata_.partition_specs) {
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema,
*spec));
+ updated_specs.push_back(std::move(updated_spec));
+ }
+ metadata_.partition_specs = std::move(updated_specs);
+ specs_by_id_.clear();
+ for (const auto& spec : metadata_.partition_specs) {
+ specs_by_id_.emplace(spec->spec_id(), spec);
+ }
+
+ // Rebuild all sort orders for the new current schema
+ std::vector<std::shared_ptr<SortOrder>> updated_orders;
+ for (const auto& order : metadata_.sort_orders) {
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema,
*order));
+ updated_orders.push_back(std::move(updated_order));
+ }
+ metadata_.sort_orders = std::move(updated_orders);
+ sort_orders_by_id_.clear();
+ for (const auto& order : metadata_.sort_orders) {
+ sort_orders_by_id_.emplace(order->order_id(), order);
+ }
+
+ // Set the current schema ID
+ metadata_.current_schema_id = schema_id;
+
+ // Record the change
+ if (last_added_schema_id_.has_value() && last_added_schema_id_.value() ==
schema_id) {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+ } else {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(schema_id));
+ }
+
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSchemas(const std::vector<int32_t>&
schema_ids) {
+ std::unordered_set<int32_t> schema_ids_to_remove(schema_ids.begin(),
schema_ids.end());
+ auto current_schema_id =
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+ if (!schema_ids_to_remove.contains(current_schema_id)) {
+ return InvalidArgument("Cannot remove current schema: {}",
current_schema_id);
+ }
+
+ if (!schema_ids_to_remove.empty()) {
+ metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto&
schema) {
+ return !schema_ids_to_remove.contains(
+
schema->schema_id().value_or(Schema::kInitialSchemaId));
+ }) |
+
std::ranges::to<std::vector<std::shared_ptr<Schema>>>();
+ changes_.push_back(
+
std::make_unique<table::RemoveSchemas>(std::move(schema_ids_to_remove)));
+ }
+
+ return {};
+}
+
+Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
+ int32_t
new_last_column_id) {
+ if (new_last_column_id < metadata_.last_column_id) {
+ return InvalidArgument("Invalid last column ID: {} < {} (previous last
column ID)",
+ new_last_column_id, metadata_.last_column_id);
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(schema.Validate(metadata_.format_version));
+
+ auto new_schema_id = ReuseOrCreateNewSchemaId(schema);
+ if (schemas_by_id_.find(new_schema_id) != schemas_by_id_.end()) {
Review Comment:
It also check the metadata_.last_column_id equals to new_last_column_id,
does it matter?
##########
src/iceberg/partition_spec.cc:
##########
@@ -155,6 +155,42 @@ Status PartitionSpec::Validate(const Schema& schema, bool
allow_missing_fields)
return {};
}
+Status PartitionSpec::ValidatePartitionName(const Schema& schema) const {
+ std::unordered_set<std::string> partition_names;
+ for (const auto& partition_field : fields_) {
+ auto name = std::string(partition_field.name());
+ if (name.empty()) {
+ return InvalidArgument("Cannot use empty partition name: {}", name);
+ }
Review Comment:
```suggestion
ICEBERG_PRECHECK(!partition_field.empty(), "Cannot use empty partition
name: {}", name);
auto name = std::string(partition_field.name());
```
##########
src/iceberg/table_metadata.h:
##########
@@ -73,10 +73,13 @@ struct ICEBERG_EXPORT TableMetadata {
static constexpr int8_t kDefaultTableFormatVersion = 2;
static constexpr int8_t kSupportedTableFormatVersion = 3;
static constexpr int8_t kMinFormatVersionRowLineage = 3;
+ static constexpr int8_t kMinFormatVersionDefaultValues = 3;
static constexpr int64_t kInitialSequenceNumber = 0;
static constexpr int64_t kInvalidSequenceNumber = -1;
static constexpr int64_t kInitialRowId = 0;
+ static inline const std::unordered_map<TypeId, int8_t> kMinFormatVersions =
{};
Review Comment:
Not inited?
##########
src/iceberg/schema.cc:
##########
@@ -228,4 +229,43 @@ Result<std::vector<std::string>>
Schema::IdentifierFieldNames() const {
return names;
}
+Result<int32_t> Schema::HighestFieldId() const {
+ ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));
+
+ if (id_to_field.get().empty()) {
+ return kInitialColumnId;
+ }
+
+ auto max_it = std::ranges::max_element(
+ id_to_field.get(),
+ [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });
+
+ return max_it->first;
+}
+
+bool Schema::SameSchema(const Schema& other) const { return fields_ ==
other.fields_; }
Review Comment:
Also need compare identifier_fields
##########
src/iceberg/table_metadata.cc:
##########
@@ -608,7 +631,124 @@ Status TableMetadataBuilder::Impl::RemoveProperties(
return {};
}
-std::unique_ptr<TableMetadata> TableMetadataBuilder::Impl::Build() {
+Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
+ if (schema_id == kLastAdded) {
+ if (!last_added_schema_id_.has_value()) {
+ return InvalidArgument("Cannot set last added schema: no schema has been
added");
+ }
+ return SetCurrentSchema(last_added_schema_id_.value());
+ }
+
+ if (metadata_.current_schema_id == schema_id) {
+ return {};
+ }
+
+ auto it = schemas_by_id_.find(schema_id);
+ if (it == schemas_by_id_.end()) {
+ return InvalidArgument("Cannot set current schema to unknown schema: {}",
schema_id);
+ }
+ const auto& schema = it->second;
+
+ // Rebuild all partition specs for the new current schema
+ std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
+ for (const auto& spec : metadata_.partition_specs) {
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema,
*spec));
+ updated_specs.push_back(std::move(updated_spec));
+ }
+ metadata_.partition_specs = std::move(updated_specs);
+ specs_by_id_.clear();
+ for (const auto& spec : metadata_.partition_specs) {
+ specs_by_id_.emplace(spec->spec_id(), spec);
+ }
+
+ // Rebuild all sort orders for the new current schema
+ std::vector<std::shared_ptr<SortOrder>> updated_orders;
+ for (const auto& order : metadata_.sort_orders) {
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema,
*order));
+ updated_orders.push_back(std::move(updated_order));
+ }
+ metadata_.sort_orders = std::move(updated_orders);
+ sort_orders_by_id_.clear();
+ for (const auto& order : metadata_.sort_orders) {
+ sort_orders_by_id_.emplace(order->order_id(), order);
+ }
+
+ // Set the current schema ID
+ metadata_.current_schema_id = schema_id;
+
+ // Record the change
+ if (last_added_schema_id_.has_value() && last_added_schema_id_.value() ==
schema_id) {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+ } else {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(schema_id));
+ }
+
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSchemas(const std::vector<int32_t>&
schema_ids) {
+ std::unordered_set<int32_t> schema_ids_to_remove(schema_ids.begin(),
schema_ids.end());
+ auto current_schema_id =
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+ if (!schema_ids_to_remove.contains(current_schema_id)) {
+ return InvalidArgument("Cannot remove current schema: {}",
current_schema_id);
+ }
+
+ if (!schema_ids_to_remove.empty()) {
+ metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto&
schema) {
+ return !schema_ids_to_remove.contains(
+
schema->schema_id().value_or(Schema::kInitialSchemaId));
+ }) |
+
std::ranges::to<std::vector<std::shared_ptr<Schema>>>();
+ changes_.push_back(
+
std::make_unique<table::RemoveSchemas>(std::move(schema_ids_to_remove)));
+ }
+
+ return {};
+}
+
+Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
+ int32_t
new_last_column_id) {
+ if (new_last_column_id < metadata_.last_column_id) {
+ return InvalidArgument("Invalid last column ID: {} < {} (previous last
column ID)",
+ new_last_column_id, metadata_.last_column_id);
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(schema.Validate(metadata_.format_version));
+
+ auto new_schema_id = ReuseOrCreateNewSchemaId(schema);
+ if (schemas_by_id_.find(new_schema_id) != schemas_by_id_.end()) {
+ // update last_added_schema_id if the schema was added in this set of
changes (since
+ // it is now the last)
+ bool is_new_schema =
+ last_added_schema_id_.has_value() &&
+ std::ranges::find_if(changes_, [new_schema_id](const auto& change) {
+ if (change->kind() != TableUpdate::Kind::kAddSchema) {
+ return false;
+ }
+ auto* add_schema = dynamic_cast<table::AddSchema*>(change.get());
+ return
add_schema->schema()->schema_id().value_or(Schema::kInitialSchemaId) ==
+ new_schema_id;
+ }) != changes_.cend();
Review Comment:
```suggestion
std::ranges::any_of(changes_, [new_schema_id](const auto& change) {
if (change->kind() != TableUpdate::Kind::kAddSchema) {
return false;
}
auto* add_schema = dynamic_cast<table::AddSchema*>(change.get());
return
add_schema->schema()->schema_id().value_or(Schema::kInitialSchemaId) ==
new_schema_id;
});
```
##########
src/iceberg/table_metadata.cc:
##########
@@ -608,7 +631,124 @@ Status TableMetadataBuilder::Impl::RemoveProperties(
return {};
}
-std::unique_ptr<TableMetadata> TableMetadataBuilder::Impl::Build() {
+Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
+ if (schema_id == kLastAdded) {
+ if (!last_added_schema_id_.has_value()) {
+ return InvalidArgument("Cannot set last added schema: no schema has been
added");
+ }
+ return SetCurrentSchema(last_added_schema_id_.value());
+ }
+
+ if (metadata_.current_schema_id == schema_id) {
+ return {};
+ }
+
+ auto it = schemas_by_id_.find(schema_id);
+ if (it == schemas_by_id_.end()) {
+ return InvalidArgument("Cannot set current schema to unknown schema: {}",
schema_id);
+ }
+ const auto& schema = it->second;
+
+ // Rebuild all partition specs for the new current schema
+ std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
+ for (const auto& spec : metadata_.partition_specs) {
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema,
*spec));
+ updated_specs.push_back(std::move(updated_spec));
+ }
+ metadata_.partition_specs = std::move(updated_specs);
+ specs_by_id_.clear();
+ for (const auto& spec : metadata_.partition_specs) {
+ specs_by_id_.emplace(spec->spec_id(), spec);
+ }
+
+ // Rebuild all sort orders for the new current schema
+ std::vector<std::shared_ptr<SortOrder>> updated_orders;
+ for (const auto& order : metadata_.sort_orders) {
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema,
*order));
+ updated_orders.push_back(std::move(updated_order));
+ }
+ metadata_.sort_orders = std::move(updated_orders);
+ sort_orders_by_id_.clear();
+ for (const auto& order : metadata_.sort_orders) {
+ sort_orders_by_id_.emplace(order->order_id(), order);
+ }
+
+ // Set the current schema ID
+ metadata_.current_schema_id = schema_id;
+
+ // Record the change
+ if (last_added_schema_id_.has_value() && last_added_schema_id_.value() ==
schema_id) {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+ } else {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(schema_id));
+ }
+
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSchemas(const std::vector<int32_t>&
schema_ids) {
+ std::unordered_set<int32_t> schema_ids_to_remove(schema_ids.begin(),
schema_ids.end());
+ auto current_schema_id =
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+ if (!schema_ids_to_remove.contains(current_schema_id)) {
Review Comment:
`schema_ids_to_remove should` not contain `current_schema_id`, otherwise
return error.
##########
src/iceberg/table_metadata.cc:
##########
@@ -608,7 +631,124 @@ Status TableMetadataBuilder::Impl::RemoveProperties(
return {};
}
-std::unique_ptr<TableMetadata> TableMetadataBuilder::Impl::Build() {
+Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
+ if (schema_id == kLastAdded) {
+ if (!last_added_schema_id_.has_value()) {
+ return InvalidArgument("Cannot set last added schema: no schema has been
added");
+ }
+ return SetCurrentSchema(last_added_schema_id_.value());
+ }
+
+ if (metadata_.current_schema_id == schema_id) {
+ return {};
+ }
+
+ auto it = schemas_by_id_.find(schema_id);
+ if (it == schemas_by_id_.end()) {
+ return InvalidArgument("Cannot set current schema to unknown schema: {}",
schema_id);
+ }
+ const auto& schema = it->second;
+
+ // Rebuild all partition specs for the new current schema
+ std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
+ for (const auto& spec : metadata_.partition_specs) {
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema,
*spec));
+ updated_specs.push_back(std::move(updated_spec));
+ }
+ metadata_.partition_specs = std::move(updated_specs);
+ specs_by_id_.clear();
+ for (const auto& spec : metadata_.partition_specs) {
+ specs_by_id_.emplace(spec->spec_id(), spec);
+ }
+
+ // Rebuild all sort orders for the new current schema
+ std::vector<std::shared_ptr<SortOrder>> updated_orders;
+ for (const auto& order : metadata_.sort_orders) {
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema,
*order));
+ updated_orders.push_back(std::move(updated_order));
+ }
+ metadata_.sort_orders = std::move(updated_orders);
+ sort_orders_by_id_.clear();
+ for (const auto& order : metadata_.sort_orders) {
+ sort_orders_by_id_.emplace(order->order_id(), order);
+ }
+
+ // Set the current schema ID
+ metadata_.current_schema_id = schema_id;
+
+ // Record the change
+ if (last_added_schema_id_.has_value() && last_added_schema_id_.value() ==
schema_id) {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+ } else {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(schema_id));
+ }
+
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSchemas(const std::vector<int32_t>&
schema_ids) {
+ std::unordered_set<int32_t> schema_ids_to_remove(schema_ids.begin(),
schema_ids.end());
+ auto current_schema_id =
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+ if (!schema_ids_to_remove.contains(current_schema_id)) {
+ return InvalidArgument("Cannot remove current schema: {}",
current_schema_id);
+ }
+
+ if (!schema_ids_to_remove.empty()) {
+ metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto&
schema) {
+ return !schema_ids_to_remove.contains(
+
schema->schema_id().value_or(Schema::kInitialSchemaId));
+ }) |
+
std::ranges::to<std::vector<std::shared_ptr<Schema>>>();
+ changes_.push_back(
+
std::make_unique<table::RemoveSchemas>(std::move(schema_ids_to_remove)));
+ }
+
+ return {};
+}
+
+Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
+ int32_t
new_last_column_id) {
+ if (new_last_column_id < metadata_.last_column_id) {
+ return InvalidArgument("Invalid last column ID: {} < {} (previous last
column ID)",
+ new_last_column_id, metadata_.last_column_id);
+ }
Review Comment:
It can be replaced with `ICEBERG_PRECHECK` macro
##########
src/iceberg/table_metadata.cc:
##########
@@ -709,16 +919,24 @@ TableMetadataBuilder&
TableMetadataBuilder::UpgradeFormatVersion(
}
TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema(
- std::shared_ptr<Schema> schema, int32_t new_last_column_id) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ std::shared_ptr<Schema> const& schema, int32_t new_last_column_id) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_id,
+ impl_->AddSchema(*schema,
new_last_column_id));
+ return SetCurrentSchema(schema_id);
}
TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema(int32_t
schema_id) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetCurrentSchema(schema_id));
+ return *this;
}
-TableMetadataBuilder& TableMetadataBuilder::AddSchema(std::shared_ptr<Schema>
schema) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+TableMetadataBuilder& TableMetadataBuilder::AddSchema(
+ std::shared_ptr<Schema> const& schema) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto highest_field_id,
schema->HighestFieldId());
+ auto new_last_column_id = std::max(impl_->metadata().last_column_id,
highest_field_id);
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_id,
+ impl_->AddSchema(*schema,
new_last_column_id));
Review Comment:
```suggestion
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSchema(*schema,
new_last_column_id));
```
schema_id not used
##########
src/iceberg/table_metadata.cc:
##########
@@ -608,7 +631,124 @@ Status TableMetadataBuilder::Impl::RemoveProperties(
return {};
}
-std::unique_ptr<TableMetadata> TableMetadataBuilder::Impl::Build() {
+Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
+ if (schema_id == kLastAdded) {
+ if (!last_added_schema_id_.has_value()) {
+ return InvalidArgument("Cannot set last added schema: no schema has been
added");
+ }
+ return SetCurrentSchema(last_added_schema_id_.value());
+ }
+
+ if (metadata_.current_schema_id == schema_id) {
+ return {};
+ }
+
+ auto it = schemas_by_id_.find(schema_id);
+ if (it == schemas_by_id_.end()) {
+ return InvalidArgument("Cannot set current schema to unknown schema: {}",
schema_id);
+ }
+ const auto& schema = it->second;
+
+ // Rebuild all partition specs for the new current schema
+ std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
+ for (const auto& spec : metadata_.partition_specs) {
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema,
*spec));
+ updated_specs.push_back(std::move(updated_spec));
+ }
+ metadata_.partition_specs = std::move(updated_specs);
+ specs_by_id_.clear();
+ for (const auto& spec : metadata_.partition_specs) {
+ specs_by_id_.emplace(spec->spec_id(), spec);
+ }
+
+ // Rebuild all sort orders for the new current schema
+ std::vector<std::shared_ptr<SortOrder>> updated_orders;
+ for (const auto& order : metadata_.sort_orders) {
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema,
*order));
+ updated_orders.push_back(std::move(updated_order));
+ }
+ metadata_.sort_orders = std::move(updated_orders);
+ sort_orders_by_id_.clear();
+ for (const auto& order : metadata_.sort_orders) {
+ sort_orders_by_id_.emplace(order->order_id(), order);
+ }
+
+ // Set the current schema ID
+ metadata_.current_schema_id = schema_id;
+
+ // Record the change
+ if (last_added_schema_id_.has_value() && last_added_schema_id_.value() ==
schema_id) {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+ } else {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(schema_id));
+ }
+
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSchemas(const std::vector<int32_t>&
schema_ids) {
+ std::unordered_set<int32_t> schema_ids_to_remove(schema_ids.begin(),
schema_ids.end());
+ auto current_schema_id =
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+ if (!schema_ids_to_remove.contains(current_schema_id)) {
+ return InvalidArgument("Cannot remove current schema: {}",
current_schema_id);
+ }
+
+ if (!schema_ids_to_remove.empty()) {
+ metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto&
schema) {
+ return !schema_ids_to_remove.contains(
+
schema->schema_id().value_or(Schema::kInitialSchemaId));
+ }) |
+
std::ranges::to<std::vector<std::shared_ptr<Schema>>>();
+ changes_.push_back(
+
std::make_unique<table::RemoveSchemas>(std::move(schema_ids_to_remove)));
+ }
+
+ return {};
+}
+
+Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
+ int32_t
new_last_column_id) {
+ if (new_last_column_id < metadata_.last_column_id) {
+ return InvalidArgument("Invalid last column ID: {} < {} (previous last
column ID)",
+ new_last_column_id, metadata_.last_column_id);
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(schema.Validate(metadata_.format_version));
+
+ auto new_schema_id = ReuseOrCreateNewSchemaId(schema);
+ if (schemas_by_id_.find(new_schema_id) != schemas_by_id_.end()) {
+ // update last_added_schema_id if the schema was added in this set of
changes (since
+ // it is now the last)
+ bool is_new_schema =
+ last_added_schema_id_.has_value() &&
+ std::ranges::find_if(changes_, [new_schema_id](const auto& change) {
+ if (change->kind() != TableUpdate::Kind::kAddSchema) {
+ return false;
+ }
+ auto* add_schema = dynamic_cast<table::AddSchema*>(change.get());
+ return
add_schema->schema()->schema_id().value_or(Schema::kInitialSchemaId) ==
+ new_schema_id;
+ }) != changes_.cend();
+ last_added_schema_id_ =
+ is_new_schema ? std::make_optional(new_schema_id) : std::nullopt;
+ return new_schema_id;
+ }
+
+ auto new_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>(schema.fields().begin(), schema.fields().end()),
+ new_schema_id);
Review Comment:
Lack of `identifier_field_ids`
```suggestion
auto new_schema = std::make_shared<Schema>(
schema.fields() | std::ranges::to<std::vector>(),
new_schema_id, schema.IdentifierFieldIds());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]