kevinjqliu commented on code in PR #330: URL: https://github.com/apache/iceberg-go/pull/330#discussion_r1997669352
########## table/transaction.go: ########## @@ -0,0 +1,340 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "encoding/json" + "errors" + "slices" + "sync" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/io" + "github.com/pterm/pterm" +) + +type schemaCompatVisitor struct { + provided *iceberg.Schema + + errorData pterm.TableData +} + +func checkSchemaCompat(requested, provided *iceberg.Schema) error { + sc := &schemaCompatVisitor{ + provided: provided, + errorData: pterm.TableData{{"", "Table Field", "Requested Field"}}, + } + + _, compat := iceberg.PreOrderVisit(requested, sc) + + return compat +} + +func checkArrowSchemaCompat(requested *iceberg.Schema, provided *arrow.Schema, downcastNanoToMicro bool) error { Review Comment: nit: perhaps the schema compatible check can be somewhere else other than in transaction.go ########## table/arrow_utils.go: ########## @@ -1250,5 +1656,56 @@ func dataFileStatsFromParquetMetadata(pqmeta *metadata.FileMetaData, statsCols m nullValueCounts: nullValueCounts, nanValueCounts: nanValueCounts, splitOffsets: splitOffsets, + colAggs: colAggs, + } +} + +func parquetFilesToDataFiles(fileIO iceio.IO, meta *MetadataBuilder, paths iter.Seq[string]) iter.Seq2[iceberg.DataFile, error] { + return func(yield func(iceberg.DataFile, error) bool) { + defer func() { + if r := recover(); r != nil { + switch e := r.(type) { + case string: + yield(nil, fmt.Errorf("error encountered during parquet file conversion: %s", e)) + case error: + yield(nil, fmt.Errorf("error encountered during parquet file conversion: %w", e)) + } + } + }() + + for filePath := range paths { + inputFile := must(fileIO.Open(filePath)) + defer inputFile.Close() + + rdr := must(file.NewParquetReader(inputFile)) + defer rdr.Close() + + arrRdr := must(pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)) + arrSchema := must(arrRdr.Schema()) + + if hasIDs := must(VisitArrowSchema(arrSchema, hasIDs{})); hasIDs { + yield(nil, fmt.Errorf("%w: cannot add file %s because it has field-ids. add-files only supports the addition of files without field_ids", + iceberg.ErrNotImplemented, filePath)) + + return + } + + if err := checkArrowSchemaCompat(meta.CurrentSchema(), arrSchema, false); err != nil { + panic(err) + } + + statistics := dataFileStatsFromParquetMetadata(rdr.MetaData(), + must(computeStatsPlan(meta.CurrentSchema(), meta.props)), + must(parquetPathToIDMapping(meta.CurrentSchema()))) + + df, err := statistics.toDataFile(meta.CurrentSpec(), filePath, iceberg.ParquetFile, rdr.MetaData().GetSourceFileSize()) Review Comment: nit: i like that pyiceberg decouples the `DataFile` object from the stats https://github.com/apache/iceberg-python/blob/1c0e2b04c383e5d90118ffce2066dcdb95193c4a/pyiceberg/io/pyarrow.py#L2497-L2513 ########## table/snapshot_producers.go: ########## @@ -0,0 +1,388 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "bytes" + "fmt" + "io" + "maps" + "slices" + "sync/atomic" + "unicode/utf8" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/internal" + iceio "github.com/apache/iceberg-go/io" + "github.com/google/uuid" + "golang.org/x/sync/errgroup" +) + +type producerImpl interface { + processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) + existingManifests() ([]iceberg.ManifestFile, error) + deletedEntries() ([]iceberg.ManifestEntry, error) +} + +func newManifestFileName(num int, commit uuid.UUID) string { + return fmt.Sprintf("%s-m%d.avro", commit, num) +} + +func newManifestListFileName(snapshotID int64, attempt int, commit uuid.UUID) string { + // mimics behavior of java + // https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 + return fmt.Sprintf("snap-%d-%d-%s.avro", snapshotID, attempt, commit) +} + +func newFastAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) + prod.producerImpl = &fastAppendFiles{base: prod} + + return prod +} + +type fastAppendFiles struct { + base *snapshotProducer +} + +func (fa *fastAppendFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + return manifests, nil +} + +func (fa *fastAppendFiles) existingManifests() ([]iceberg.ManifestFile, error) { + existing := make([]iceberg.ManifestFile, 0) + if fa.base.parentSnapshotID > 0 { + previous, err := fa.base.txn.meta.SnapshotByID(fa.base.parentSnapshotID) + if err != nil { + return nil, fmt.Errorf("could not find parent snapshot %d", fa.base.parentSnapshotID) + } + + manifests, err := previous.Manifests(fa.base.io) + if err != nil { + return nil, err + } + + for _, m := range manifests { + if m.HasAddedFiles() || m.HasExistingFiles() || m.SnapshotID() == fa.base.snapshotID { + existing = append(existing, m) + } + } + } + + return existing, nil +} + +func (fa *fastAppendFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { + return nil, nil +} + +const ( + SnapshotMinFileGroupSize = 10_000 Review Comment: nit: i dont think this is used anywhere ########## table/transaction.go: ########## @@ -0,0 +1,340 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "encoding/json" + "errors" + "slices" + "sync" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/io" + "github.com/pterm/pterm" +) + +type schemaCompatVisitor struct { + provided *iceberg.Schema + + errorData pterm.TableData +} + +func checkSchemaCompat(requested, provided *iceberg.Schema) error { + sc := &schemaCompatVisitor{ + provided: provided, + errorData: pterm.TableData{{"", "Table Field", "Requested Field"}}, + } + + _, compat := iceberg.PreOrderVisit(requested, sc) + + return compat +} + +func checkArrowSchemaCompat(requested *iceberg.Schema, provided *arrow.Schema, downcastNanoToMicro bool) error { + mapping := requested.NameMapping() + providedSchema, err := ArrowSchemaToIceberg(provided, downcastNanoToMicro, mapping) + if err != nil { + return err + } + + return checkSchemaCompat(requested, providedSchema) +} + +func (sc *schemaCompatVisitor) isFieldCompat(lhs iceberg.NestedField) bool { + rhs, ok := sc.provided.FindFieldByID(lhs.ID) + if !ok { + if lhs.Required { + sc.errorData = append(sc.errorData, + []string{"❌", lhs.String(), "missing"}) + + return false + } + sc.errorData = append(sc.errorData, + []string{"✅", lhs.String(), "missing"}) + + return true + } + + if lhs.Required && !rhs.Required { + sc.errorData = append(sc.errorData, + []string{"❌", lhs.String(), rhs.String()}) + + return false + } + + if lhs.Type.Equals(rhs.Type) { + sc.errorData = append(sc.errorData, + []string{"✅", lhs.String(), rhs.String()}) + + return true + } + + // we only check that parent node is also of the same type + // we check the type of the child nodes as we traverse them later + switch lhs.Type.(type) { + case *iceberg.StructType: + if rhs, ok := rhs.Type.(*iceberg.StructType); ok { + sc.errorData = append(sc.errorData, + []string{"✅", lhs.String(), rhs.String()}) + + return true + } + case *iceberg.ListType: + if rhs, ok := rhs.Type.(*iceberg.ListType); ok { + sc.errorData = append(sc.errorData, + []string{"✅", lhs.String(), rhs.String()}) + + return true + } + case *iceberg.MapType: + if rhs, ok := rhs.Type.(*iceberg.MapType); ok { + sc.errorData = append(sc.errorData, + []string{"✅", lhs.String(), rhs.String()}) + + return true + } + } + + if _, err := iceberg.PromoteType(rhs.Type, lhs.Type); err != nil { + sc.errorData = append(sc.errorData, + []string{"❌", lhs.String(), rhs.String()}) + + return false + } + + sc.errorData = append(sc.errorData, + []string{"✅", lhs.String(), rhs.String()}) + + return true +} + +func (sc *schemaCompatVisitor) Schema(s *iceberg.Schema, v func() bool) bool { + if !v() { + pterm.DisableColor() + tbl := pterm.DefaultTable.WithHasHeader(true).WithData(sc.errorData) + tbl.Render() + txt, _ := tbl.Srender() + pterm.EnableColor() + panic("mismatch in fields:\n" + txt) + } + + return true +} + +func (sc *schemaCompatVisitor) Struct(st iceberg.StructType, v []func() bool) bool { + out := true + for _, res := range v { + out = res() && out + } + + return out +} + +func (sc *schemaCompatVisitor) Field(n iceberg.NestedField, v func() bool) bool { + return sc.isFieldCompat(n) && v() +} + +func (sc *schemaCompatVisitor) List(l iceberg.ListType, v func() bool) bool { + return sc.isFieldCompat(l.ElementField()) && v() +} + +func (sc *schemaCompatVisitor) Map(m iceberg.MapType, vk, vv func() bool) bool { + return sc.isFieldCompat(m.KeyField()) && sc.isFieldCompat(m.ValueField()) && vk() && vv() +} + +func (sc *schemaCompatVisitor) Primitive(p iceberg.PrimitiveType) bool { + return true +} + +type snapshotUpdate struct { + txn *Transaction + io io.WriteFileIO + snapshotProps iceberg.Properties +} + +func (s snapshotUpdate) fastAppend() *snapshotProducer { + return newFastAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps) +} + +// func (s snapshotUpdate) mergeAppend() *snapshotProducer { Review Comment: nit: remove the commented out codes ########## table/transaction.go: ########## @@ -0,0 +1,340 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "encoding/json" + "errors" + "slices" + "sync" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/io" + "github.com/pterm/pterm" +) + +type schemaCompatVisitor struct { + provided *iceberg.Schema + + errorData pterm.TableData +} + +func checkSchemaCompat(requested, provided *iceberg.Schema) error { + sc := &schemaCompatVisitor{ + provided: provided, + errorData: pterm.TableData{{"", "Table Field", "Requested Field"}}, + } + + _, compat := iceberg.PreOrderVisit(requested, sc) + + return compat +} + +func checkArrowSchemaCompat(requested *iceberg.Schema, provided *arrow.Schema, downcastNanoToMicro bool) error { + mapping := requested.NameMapping() + providedSchema, err := ArrowSchemaToIceberg(provided, downcastNanoToMicro, mapping) + if err != nil { + return err + } + + return checkSchemaCompat(requested, providedSchema) +} + +func (sc *schemaCompatVisitor) isFieldCompat(lhs iceberg.NestedField) bool { + rhs, ok := sc.provided.FindFieldByID(lhs.ID) + if !ok { + if lhs.Required { + sc.errorData = append(sc.errorData, + []string{"❌", lhs.String(), "missing"}) + + return false + } + sc.errorData = append(sc.errorData, + []string{"✅", lhs.String(), "missing"}) + + return true + } + + if lhs.Required && !rhs.Required { + sc.errorData = append(sc.errorData, + []string{"❌", lhs.String(), rhs.String()}) + + return false + } + + if lhs.Type.Equals(rhs.Type) { + sc.errorData = append(sc.errorData, + []string{"✅", lhs.String(), rhs.String()}) + + return true + } + + // we only check that parent node is also of the same type + // we check the type of the child nodes as we traverse them later + switch lhs.Type.(type) { + case *iceberg.StructType: + if rhs, ok := rhs.Type.(*iceberg.StructType); ok { + sc.errorData = append(sc.errorData, + []string{"✅", lhs.String(), rhs.String()}) + + return true + } + case *iceberg.ListType: + if rhs, ok := rhs.Type.(*iceberg.ListType); ok { + sc.errorData = append(sc.errorData, + []string{"✅", lhs.String(), rhs.String()}) + + return true + } + case *iceberg.MapType: + if rhs, ok := rhs.Type.(*iceberg.MapType); ok { + sc.errorData = append(sc.errorData, + []string{"✅", lhs.String(), rhs.String()}) + + return true + } + } + + if _, err := iceberg.PromoteType(rhs.Type, lhs.Type); err != nil { + sc.errorData = append(sc.errorData, + []string{"❌", lhs.String(), rhs.String()}) + + return false + } + + sc.errorData = append(sc.errorData, + []string{"✅", lhs.String(), rhs.String()}) + + return true +} + +func (sc *schemaCompatVisitor) Schema(s *iceberg.Schema, v func() bool) bool { + if !v() { + pterm.DisableColor() + tbl := pterm.DefaultTable.WithHasHeader(true).WithData(sc.errorData) + tbl.Render() + txt, _ := tbl.Srender() + pterm.EnableColor() + panic("mismatch in fields:\n" + txt) + } + + return true +} + +func (sc *schemaCompatVisitor) Struct(st iceberg.StructType, v []func() bool) bool { + out := true + for _, res := range v { + out = res() && out + } + + return out +} + +func (sc *schemaCompatVisitor) Field(n iceberg.NestedField, v func() bool) bool { + return sc.isFieldCompat(n) && v() +} + +func (sc *schemaCompatVisitor) List(l iceberg.ListType, v func() bool) bool { + return sc.isFieldCompat(l.ElementField()) && v() +} + +func (sc *schemaCompatVisitor) Map(m iceberg.MapType, vk, vv func() bool) bool { + return sc.isFieldCompat(m.KeyField()) && sc.isFieldCompat(m.ValueField()) && vk() && vv() +} + +func (sc *schemaCompatVisitor) Primitive(p iceberg.PrimitiveType) bool { + return true +} + +type snapshotUpdate struct { + txn *Transaction + io io.WriteFileIO + snapshotProps iceberg.Properties +} + +func (s snapshotUpdate) fastAppend() *snapshotProducer { + return newFastAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps) +} + +// func (s snapshotUpdate) mergeAppend() *snapshotProducer { +// return newMergeAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps) +// } + +type Transaction struct { + tbl *Table + meta *MetadataBuilder + + reqs []Requirement + + mx sync.Mutex + committed bool +} + +func (t *Transaction) apply(updates []Update, reqs []Requirement) error { + t.mx.Lock() + defer t.mx.Unlock() + + if t.committed { + return errors.New("transaction has already been committed") + } + + current, err := t.meta.Build() + if err != nil { + return err + } + + for _, r := range reqs { + if err := r.Validate(current); err != nil { + return err + } + } + + existing := map[string]struct{}{} + for _, r := range t.reqs { + existing[r.GetType()] = struct{}{} + } + + for _, r := range reqs { + if _, ok := existing[r.GetType()]; !ok { + t.reqs = append(t.reqs, r) + } + } + + prevUpdates, prevLastUpdated := len(t.meta.updates), t.meta.lastUpdatedMS + for _, u := range updates { + if err := u.Apply(t.meta); err != nil { + return err + } + } + + if prevUpdates < len(t.meta.updates) { Review Comment: isnt `prevUpdates` always `len(t.meta.updates)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org