twuebi commented on code in PR #558:
URL: https://github.com/apache/iceberg-go/pull/558#discussion_r2334001345
##########
partitions.go:
##########
@@ -85,6 +87,130 @@ type PartitionSpec struct {
sourceIdToFields map[int][]PartitionField
}
+type PartitionOption func(*PartitionSpec) error
+
+func NewPartitionSpecOpts(opts ...PartitionOption) (PartitionSpec, error) {
+ spec := PartitionSpec{
+ id: 0,
+ }
+ for _, opt := range opts {
+ if err := opt(&spec); err != nil {
+ return PartitionSpec{}, err
+ }
+ }
+ spec.initialize()
+
+ return spec, nil
+}
+
+func WithSpecID(id int) PartitionOption {
+ return func(p *PartitionSpec) error {
+ p.id = id
+
+ return nil
+ }
+}
+
+func AddPartitionFieldBySourceID(sourceID int, targetName string, transform
Transform, schema *Schema, fieldID *int) PartitionOption {
+ return func(p *PartitionSpec) error {
+ if schema == nil {
+ return errors.New("cannot add partition field with nil
schema")
+ }
+ field, ok := schema.FindFieldByID(sourceID)
+ if !ok {
+ return fmt.Errorf("cannot find source column with id:
%d in schema", sourceID)
+ }
+ err := addSpecFieldInternal(p, targetName, field, transform,
fieldID)
+ if err != nil {
+ return err
+ }
+
+ return nil
+ }
+}
+
+func addSpecFieldInternal(p *PartitionSpec, targetName string, field
NestedField, transform Transform, fieldID *int) error {
+ if targetName == "" {
+ return errors.New("cannot use empty partition name")
+ }
+ for _, existingField := range p.fields {
+ if existingField.Name == targetName {
+ return errors.New("duplicate partition name: " +
targetName)
+ }
+ }
+ var fieldIDValue int
+ if fieldID == nil {
+ fieldIDValue = unassignedFieldID
+ } else {
+ fieldIDValue = *fieldID
+ }
+ if err := p.checkForRedundantPartitions(field.ID, transform); err !=
nil {
+ return err
+ }
+ unboundField := PartitionField{
+ SourceID: field.ID,
+ FieldID: fieldIDValue,
+ Name: targetName,
+ Transform: transform,
+ }
+ p.fields = append(p.fields, unboundField)
+
+ return nil
+}
+
+func (p *PartitionSpec) checkForRedundantPartitions(sourceID int, transform
Transform) error {
+ for _, field := range p.fields {
+ if field.SourceID == sourceID && field.Transform.String() ==
transform.String() {
+ return fmt.Errorf("cannot add redundant partition with
source id %d and transform %s. A partition with the same source id and
transform already exists with name: %s", sourceID, transform, field.Name)
+ }
+ }
+
+ return nil
+}
+
+func (p *PartitionSpec) SetID(id int) {
+ p.id = id
+}
+
+func (ps *PartitionSpec) AssignPartitionFieldIds(lastAssignedFieldIDPtr *int)
error {
+ // This is set_field_ids from iceberg-rust
+ // Already assigned partition ids. If we see one of these during
iteration,
+ // we skip it.
+ assignedIds := make(map[int]struct{})
+ for _, field := range ps.fields {
+ if field.FieldID != unassignedFieldID {
+ if _, exists := assignedIds[field.FieldID]; exists {
+ return fmt.Errorf("duplicate field ID provided:
%d", field.FieldID)
+ }
+ assignedIds[field.FieldID] = struct{}{}
+ }
+ }
Review Comment:
I thought it'd be best to have it called always from addPartitionSpec to
make sure it's always run, putting it into the constructors / initialize of
PartitionSpec would also work as long as the fields remain unexported & we make
sure to call it on unmarshal too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]