ferhatelmas commented on code in PR #723:
URL: https://github.com/apache/iceberg-go/pull/723#discussion_r2830650003


##########
table/transaction.go:
##########
@@ -435,6 +435,254 @@ func (t *Transaction) ReplaceDataFiles(ctx 
context.Context, filesToDelete, files
        return t.apply(updates, reqs)
 }
 
+// validateDataFilePartitionData verifies that DataFile partition values match
+// the current partition spec fields by ID without reading file contents.
+func validateDataFilePartitionData(df iceberg.DataFile, spec 
*iceberg.PartitionSpec) error {
+       partitionData := df.Partition()
+       if partitionData == nil {
+               partitionData = map[int]any{}
+       }
+
+       expectedFieldIDs := make(map[int]string)
+       for field := range spec.Fields() {
+               expectedFieldIDs[field.FieldID] = field.Name
+               if _, ok := partitionData[field.FieldID]; !ok {
+                       return fmt.Errorf("missing partition value for field id 
%d (%s)", field.FieldID, field.Name)
+               }
+       }
+
+       for fieldID := range partitionData {
+               if _, ok := expectedFieldIDs[fieldID]; !ok {
+                       return fmt.Errorf("unknown partition field id %d for 
spec id %d", fieldID, spec.ID())
+               }
+       }
+
+       return nil
+}
+
+// validateDataFilesToAdd performs metadata-only validation for caller-provided
+// DataFiles and returns a set of paths that passed validation.
+func (t *Transaction) validateDataFilesToAdd(dataFiles []iceberg.DataFile, 
operation string) (map[string]struct{}, error) {
+       currentSpec, err := t.meta.CurrentSpec()
+       if err != nil || currentSpec == nil {
+               return nil, fmt.Errorf("could not get current partition spec: 
%w", err)
+       }
+
+       expectedSpecID := int32(currentSpec.ID())
+       setToAdd := make(map[string]struct{}, len(dataFiles))
+
+       for i, df := range dataFiles {
+               if df == nil {
+                       return nil, fmt.Errorf("nil data file at index %d for 
%s", i, operation)
+               }
+
+               path := df.FilePath()
+               if path == "" {
+                       return nil, fmt.Errorf("data file path cannot be empty 
for %s", operation)
+               }
+
+               if _, ok := setToAdd[path]; ok {
+                       return nil, fmt.Errorf("add data file paths must be 
unique for %s", operation)
+               }
+               setToAdd[path] = struct{}{}
+
+               if df.ContentType() != iceberg.EntryContentData {
+                       return nil, fmt.Errorf("adding files other than data 
files is not yet implemented: file %s has content type %s for %s", path, 
df.ContentType(), operation)
+               }
+
+               switch df.FileFormat() {
+               case iceberg.ParquetFile, iceberg.OrcFile, iceberg.AvroFile:
+               default:
+                       return nil, fmt.Errorf("data file %s has invalid file 
format %s for %s", path, df.FileFormat(), operation)
+               }
+
+               if df.SpecID() != expectedSpecID {
+                       return nil, fmt.Errorf("data file %s has invalid 
partition spec id %d for %s: expected %d",
+                               path, df.SpecID(), operation, expectedSpecID)
+               }
+
+               if err := validateDataFilePartitionData(df, currentSpec); err 
!= nil {
+                       return nil, fmt.Errorf("data file %s has invalid 
partition data for %s: %w", path, operation, err)
+               }
+       }
+
+       return setToAdd, nil
+}
+
+// AddDataFiles adds pre-built DataFiles to the table without scanning them 
from storage.
+// This is useful for clients who have already constructed DataFile objects 
with metadata,
+// avoiding the need to read files to extract schema and statistics.
+//
+// Unlike AddFiles, this method does not read files from storage. It validates 
only metadata
+// that can be checked without opening files (for example spec-id and 
partition field IDs).
+//
+// Callers are responsible for ensuring each DataFile is valid and consistent 
with the table.
+// Supplying incorrect DataFile metadata can produce an invalid snapshot and 
break reads.
+func (t *Transaction) AddDataFiles(ctx context.Context, dataFiles 
[]iceberg.DataFile, snapshotProps iceberg.Properties) error {
+       if len(dataFiles) == 0 {
+               return nil
+       }
+
+       setToAdd, err := t.validateDataFilesToAdd(dataFiles, "AddDataFiles")
+       if err != nil {
+               return err
+       }
+
+       fs, err := t.tbl.fsF(ctx)
+       if err != nil {
+               return err
+       }
+
+       if s := t.meta.currentSnapshot(); s != nil {
+               referenced := make([]string, 0)
+               for df, err := range s.dataFiles(fs, nil) {
+                       if err != nil {
+                               return err
+                       }
+
+                       if _, ok := setToAdd[df.FilePath()]; ok {
+                               referenced = append(referenced, df.FilePath())
+                       }
+               }
+
+               if len(referenced) > 0 {
+                       return fmt.Errorf("cannot add files that are already 
referenced by table, files: %s", referenced)

Review Comment:
   `referenced` is a slice, not a string. `%v` would be better or converted to 
a string with join



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to