mzzz-zzm commented on code in PR #983:
URL: https://github.com/apache/iceberg-go/pull/983#discussion_r3195702570
##########
table/conflict_validation.go:
##########
@@ -426,6 +427,159 @@ func validateNoConflictingDataFiles(ctx *conflictContext,
filter iceberg.Boolean
return validateAddedDataFilesMatchingFilter(ctx, filter)
}
+// validateNoConflictingDataFilesInPartitions is like
+// validateNoConflictingDataFiles but scoped to the partitions touched
+// by equality-delete files. It builds an OR-of-equalities filter from
+// the provided partition tuples and delegates to
+// validateAddedDataFilesMatchingFilter, which performs per-spec
+// projection, manifest-summary pruning, and type-aware evaluation via
+// iceberg.Literal — making it safe for UUID, decimal, binary, fixed,
+// and future partition types, and correct across partition-spec
+// evolution because each concurrent manifest is projected against its
+// own spec ID.
+//
+// Callers are responsible for ensuring the table is partitioned
+// (i.e. at least one partition field exists) before calling this
+// function. For unpartitioned tables, call
+// validateNoConflictingDataFiles(ctx, iceberg.AlwaysTrue{}, level)
+// directly.
+//
+// Under IsolationSnapshot this validator is a no-op.
+func validateNoConflictingDataFilesInPartitions(ctx *conflictContext,
eqDeleteFiles []iceberg.DataFile, level IsolationLevel) error {
+ if level != IsolationSerializable {
+ return nil
+ }
+
+ if len(ctx.concurrent) == 0 || len(eqDeleteFiles) == 0 {
+ return nil
+ }
+
+ filter, err := eqDeletePartitionsToFilter(eqDeleteFiles, ctx.current)
+ if err != nil {
+ return fmt.Errorf("building partition conflict filter: %w", err)
+ }
+
+ return validateNoConflictingDataFiles(ctx, filter, level)
+}
+
+// eqDeletePartitionsToFilter converts equality-delete data files into an
+// OR-of-ANDs BooleanExpression in row (source) space, suitable for passing
+// to validateAddedDataFilesMatchingFilter.
+//
+// For each eq-delete file it resolves each partition field ID to the source
+// schema field name via the file's partition spec, then builds an EqualTo
+// predicate using Reference(sourceFieldName). Multiple fields within one
+// partition are AND-ed; multiple eq-delete files are OR-ed.
+//
+// The resulting expression is projected per-concurrent-manifest's spec ID
+// inside validateAddedDataFilesMatchingFilter (via buildPartitionProjection),
+// ensuring correct conflict detection even after partition-spec evolution.
+//
+// An empty partition tuple (unpartitioned delete) returns AlwaysTrue so the
+// caller falls back to the conservative full-table scan. Callers should
+// normally guard against calling this function for unpartitioned tables (see
+// RowDelta.validate).
+func eqDeletePartitionsToFilter(files []iceberg.DataFile, meta Metadata)
(iceberg.BooleanExpression, error) {
+ terms := make([]iceberg.BooleanExpression, 0, len(files))
+ for _, f := range files {
+ p := f.Partition()
+ if len(p) == 0 {
+ return iceberg.AlwaysTrue{}, nil
+ }
+
+ spec := meta.PartitionSpecByID(int(f.SpecID()))
+ if spec == nil {
+ return nil, fmt.Errorf("partition spec ID %d not found
in metadata", f.SpecID())
+ }
+
+ // Build partition field ID → PartitionField lookup for this
spec.
+ partFieldByID := make(map[int]iceberg.PartitionField,
spec.NumFields())
+ for _, pf := range spec.Fields() {
+ partFieldByID[pf.FieldID] = pf
+ }
+
+ // Sort partition field IDs for deterministic expression order.
+ fieldIDs := make([]int, 0, len(p))
+ for id := range p {
+ fieldIDs = append(fieldIDs, id)
+ }
+ sort.Ints(fieldIDs)
+
+ conjuncts := make([]iceberg.BooleanExpression, 0, len(p))
+ for _, partFieldID := range fieldIDs {
+ pf, ok := partFieldByID[partFieldID]
+ if !ok {
+ return nil, fmt.Errorf("partition field ID %d
not found in spec %d", partFieldID, f.SpecID())
+ }
+
+ // Resolve to source schema field to obtain the
Reference name.
+ sourceField, ok :=
meta.CurrentSchema().FindFieldByID(pf.SourceID())
+ if !ok {
+ return nil, fmt.Errorf("source field ID %d
(partition field %q) not found in schema", pf.SourceID(), pf.Name)
+ }
+
+ lit, err := anyToLiteral(p[partFieldID])
+ if err != nil {
+ return nil, fmt.Errorf("partition field %q:
%w", sourceField.Name, err)
+ }
+
+ conjuncts = append(conjuncts,
iceberg.LiteralPredicate(iceberg.OpEQ, iceberg.Reference(sourceField.Name),
lit))
+ }
+
+ if len(conjuncts) == 1 {
+ terms = append(terms, conjuncts[0])
+ } else {
+ terms = append(terms, iceberg.NewAnd(conjuncts[0],
conjuncts[1], conjuncts[2:]...))
+ }
+ }
+
+ if len(terms) == 0 {
+ return iceberg.AlwaysTrue{}, nil
+ }
+
+ if len(terms) == 1 {
+ return terms[0], nil
+ }
+
+ return iceberg.NewOr(terms[0], terms[1], terms[2:]...), nil
+}
+
+// anyToLiteral converts a dynamically-typed partition value (as
+// stored in iceberg.DataFile.Partition()) to an iceberg.Literal.
+// The supported types mirror the iceberg.LiteralType constraint.
+func anyToLiteral(v any) (iceberg.Literal, error) {
+ switch val := v.(type) {
+ case bool:
+ return iceberg.NewLiteral(val), nil
+ case int32:
+ return iceberg.NewLiteral(val), nil
+ case int64:
+ return iceberg.NewLiteral(val), nil
+ case float32:
+ return iceberg.NewLiteral(val), nil
+ case float64:
+ return iceberg.NewLiteral(val), nil
+ case string:
+ return iceberg.NewLiteral(val), nil
+ case []byte:
+ return iceberg.NewLiteral(val), nil
+ case iceberg.Date:
+ return iceberg.NewLiteral(val), nil
+ case iceberg.Time:
+ return iceberg.NewLiteral(val), nil
+ case iceberg.Timestamp:
+ return iceberg.NewLiteral(val), nil
+ case iceberg.TimestampNano:
+ return iceberg.NewLiteral(val), nil
+ case iceberg.Decimal:
Review Comment:
Fixed. Added `case iceberg.DecimalLiteral:` to `anyToLiteral` alongside the
existing `case iceberg.Decimal:`. The new arm casts to `iceberg.Decimal` before
wrapping in a `Literal`, bridging the named-type gap: `iceberg.Decimal(val)` is
a valid conversion because `type DecimalLiteral Decimal`.
The `TestAnyToLiteral_SupportedTypes` table now includes a `DecimalLiteral`
subtest (`iceberg.DecimalLiteral{Scale: 2}`) that would have failed before this
fix — it passes now.
Normalizing `convertAvroValueToIcebergType` to return `Decimal` consistently
is deferred; it touches callers beyond `anyToLiteral` and deserves its own PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]