kevinjqliu commented on code in PR #356: URL: https://github.com/apache/iceberg-go/pull/356#discussion_r2019181505
########## table/table_test.go: ########## @@ -732,6 +732,106 @@ func (t *TableWritingTestSuite) TestAddFilesReferencedCurrentSnapshotIgnoreDupli t.Equal([]int32{0, 0, 0}, deleted) } +type mockedCatalog struct{} + +func (m *mockedCatalog) LoadTable(ctx context.Context, ident table.Identifier, props iceberg.Properties) (*table.Table, error) { + return nil, nil +} + +func (m *mockedCatalog) CommitTable(ctx context.Context, tbl *table.Table, reqs []table.Requirement, updates []table.Update) (table.Metadata, string, error) { + bldr, err := table.MetadataBuilderFromBase(tbl.Metadata()) + if err != nil { + return nil, "", err + } + + for _, u := range updates { + if err := u.Apply(bldr); err != nil { + return nil, "", err + } + } + + meta, err := bldr.Build() + if err != nil { + return nil, "", err + } + + return meta, "", nil +} + +func (t *TableWritingTestSuite) TestReplaceDataFiles() { + fs := iceio.LocalFS{} + + files := make([]string, 0) + for i := range 5 { + filePath := fmt.Sprintf("%s/replace_data_files_v%d/data-%d.parquet", t.location, t.formatVersion, i) + t.writeParquet(fs, filePath, t.arrTablePromotedTypes) + files = append(files, filePath) + } + + ident := table.Identifier{"default", "replace_data_files_v" + strconv.Itoa(t.formatVersion)} + meta, err := table.NewMetadata(t.tableSchemaPromotedTypes, iceberg.UnpartitionedSpec, + table.UnsortedSortOrder, t.location, iceberg.Properties{"format-version": strconv.Itoa(t.formatVersion)}) + t.Require().NoError(err) + + ctx := context.Background() + + tbl := table.New(ident, meta, t.getMetadataLoc(), fs, &mockedCatalog{}) + for i := range 5 { + tx := tbl.NewTransaction() + t.Require().NoError(tx.AddFiles(files[i:i+1], nil, false)) + tbl, err = tx.Commit(ctx) + t.Require().NoError(err) + } + + mflist, err := tbl.CurrentSnapshot().Manifests(tbl.FS()) + t.Require().NoError(err) + t.Len(mflist, 5) + + // create a parquet file that is essentially as if we merged two of + // the data files together + cols := make([]arrow.Column, 0, t.arrTablePromotedTypes.NumCols()) + for i := range int(t.arrTablePromotedTypes.NumCols()) { + chkd := t.arrTablePromotedTypes.Column(i).Data() + duplicated := arrow.NewChunked(chkd.DataType(), append(chkd.Chunks(), chkd.Chunks()...)) + defer duplicated.Release() + + col := arrow.NewColumn(t.arrSchemaPromotedTypes.Fields()[i], duplicated) + defer col.Release() + + cols = append(cols, *col) + } + + combined := array.NewTable(t.arrSchemaPromotedTypes, cols, -1) + defer combined.Release() + + combinedFilePath := fmt.Sprintf("%s/replace_data_files_v%d/combined.parquet", t.location, t.formatVersion) + t.writeParquet(fs, combinedFilePath, combined) + + tx := tbl.NewTransaction() + t.Require().NoError(tx.ReplaceDataFiles(files[:2], []string{combinedFilePath}, nil)) Review Comment: `arrSchemaPromotedTypes` has 2 rows. We append 5 copies of `arrSchemaPromotedTypes`, which is 10 rows and 5 files. `combined` is 2 copies of `arrSchemaPromotedTypes`, which is 4 rows and 1 file This replace operation replaces 2 files (4 rows) with 1 `combined` file (4 rows). So in total, the table should have 10 rows still and 4 files total. This replaces 2 files (4 rows) with the combined ########## table/transaction.go: ########## @@ -117,6 +127,101 @@ func (t *Transaction) SetProperties(props iceberg.Properties) error { return nil } +// ReplaceFiles is actually just an overwrite operation with multiple +// files deleted and added. +// +// TODO: technically, this could be a REPLACE operation but we aren't performing +// any validation here that there are no changes to the underlying data. A REPLACE +// operation is only valid if the data is exactly the same as the previous snapshot. +// +// For now, we'll keep using an overwrite operation. +func (t *Transaction) ReplaceDataFiles(filesToDelete, filesToAdd []string, snapshotProps iceberg.Properties) error { + if len(filesToDelete) == 0 { + if len(filesToAdd) > 0 { + return t.AddFiles(filesToAdd, snapshotProps, true) Review Comment: nit: maybe we should still check for duplicates, currently `ignoreDuplicates=true` ########## table/transaction.go: ########## @@ -117,6 +127,101 @@ func (t *Transaction) SetProperties(props iceberg.Properties) error { return nil } +// ReplaceFiles is actually just an overwrite operation with multiple +// files deleted and added. +// +// TODO: technically, this could be a REPLACE operation but we aren't performing +// any validation here that there are no changes to the underlying data. A REPLACE +// operation is only valid if the data is exactly the same as the previous snapshot. +// +// For now, we'll keep using an overwrite operation. +func (t *Transaction) ReplaceDataFiles(filesToDelete, filesToAdd []string, snapshotProps iceberg.Properties) error { + if len(filesToDelete) == 0 { + if len(filesToAdd) > 0 { + return t.AddFiles(filesToAdd, snapshotProps, true) + } + } + + var ( + setToDelete = make(map[string]struct{}) + setToAdd = make(map[string]struct{}) + ) + + for _, f := range filesToDelete { + setToDelete[f] = struct{}{} + } + + for _, f := range filesToAdd { + setToAdd[f] = struct{}{} + } + + if len(setToDelete) != len(filesToDelete) { + return errors.New("file paths must be unique for ReplaceDataFiles") Review Comment: nit: ```suggestion return errors.New("delete file paths must be unique for ReplaceDataFiles") ``` ########## table/transaction.go: ########## @@ -117,6 +127,101 @@ func (t *Transaction) SetProperties(props iceberg.Properties) error { return nil } +// ReplaceFiles is actually just an overwrite operation with multiple +// files deleted and added. +// +// TODO: technically, this could be a REPLACE operation but we aren't performing +// any validation here that there are no changes to the underlying data. A REPLACE +// operation is only valid if the data is exactly the same as the previous snapshot. +// +// For now, we'll keep using an overwrite operation. +func (t *Transaction) ReplaceDataFiles(filesToDelete, filesToAdd []string, snapshotProps iceberg.Properties) error { + if len(filesToDelete) == 0 { + if len(filesToAdd) > 0 { + return t.AddFiles(filesToAdd, snapshotProps, true) + } + } + + var ( + setToDelete = make(map[string]struct{}) + setToAdd = make(map[string]struct{}) + ) + + for _, f := range filesToDelete { + setToDelete[f] = struct{}{} + } + + for _, f := range filesToAdd { + setToAdd[f] = struct{}{} + } + + if len(setToDelete) != len(filesToDelete) { + return errors.New("file paths must be unique for ReplaceDataFiles") + } + + if len(setToAdd) != len(filesToAdd) { + return errors.New("file paths must be unique for ReplaceDataFiles") Review Comment: nit: ```suggestion return errors.New("add file paths must be unique for ReplaceDataFiles") ``` ########## table/snapshot_producers.go: ########## @@ -89,9 +97,147 @@ func (fa *fastAppendFiles) existingManifests() ([]iceberg.ManifestFile, error) { } func (fa *fastAppendFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { + // for fast appends, there are no deleted entries return nil, nil } +type overwriteFiles struct { + base *snapshotProducer +} + +func newOverwriteFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) + prod.producerImpl = &overwriteFiles{base: prod} + + return prod +} + +func (of *overwriteFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + // no post processing + return manifests, nil +} + +func (of *overwriteFiles) existingManifests() ([]iceberg.ManifestFile, error) { + // determine if there are any existing manifest files + existingFiles := make([]iceberg.ManifestFile, 0) + + snap := of.base.txn.meta.currentSnapshot() + if snap == nil { + return existingFiles, nil + } + + manifestList, err := snap.Manifests(of.base.io) + if err != nil { + return existingFiles, err + } + + for _, m := range manifestList { + entries, err := of.base.fetchManifestEntry(m, true) + if err != nil { + return existingFiles, err + } + + foundDeleted := make([]iceberg.ManifestEntry, 0) + notDeleted := make([]iceberg.ManifestEntry, 0, len(entries)) + for _, entry := range entries { + if _, ok := of.base.deletedFiles[entry.DataFile().FilePath()]; ok { + foundDeleted = append(foundDeleted, entry) + } else { + notDeleted = append(notDeleted, entry) + } + } + + if len(foundDeleted) == 0 { + existingFiles = append(existingFiles, m) + + continue + } + + if len(notDeleted) == 0 { + continue + } + + spec, err := of.base.txn.meta.GetSpecByID(int(m.PartitionSpecID())) + if err != nil { + return existingFiles, err + } + + wr, path, counter, err := of.base.newManifestWriter(*spec) + if err != nil { + return existingFiles, err + } + defer counter.W.(io.Closer).Close() + + for _, entry := range notDeleted { + if err := wr.Existing(entry); err != nil { + return existingFiles, err + } + } + + mf, err := wr.ToManifestFile(path, counter.Count) + if err != nil { + return existingFiles, err + } + + existingFiles = append(existingFiles, mf) + } + + return existingFiles, nil +} + +func (of *overwriteFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { + // determine if we need to record any deleted entries + // + // with a full overwrite all the entries are considered deleted + // with partial overwrites we have to use the predicate to evaluate + // which entries are affected + if of.base.parentSnapshotID <= 0 { + return nil, nil + } + + parent, err := of.base.txn.meta.SnapshotByID(of.base.parentSnapshotID) + if err != nil { + return nil, fmt.Errorf("%w: cannot overwrite empty table", err) + } + + previousManifests, err := parent.Manifests(of.base.io) + if err != nil { + return nil, err + } + + getEntries := func(m iceberg.ManifestFile) ([]iceberg.ManifestEntry, error) { + entries, err := of.base.fetchManifestEntry(m, true) + if err != nil { + return nil, err + } + + result := make([]iceberg.ManifestEntry, 0, len(entries)) + for _, entry := range entries { + _, ok := of.base.deletedFiles[entry.DataFile().FilePath()] + if ok && entry.DataFile().ContentType() == iceberg.EntryContentData { + seqNum := entry.SequenceNum() + result = append(result, + iceberg.NewManifestEntry(iceberg.EntryStatusDELETED, + &of.base.snapshotID, &seqNum, entry.FileSequenceNum(), + entry.DataFile())) + } + } + + return result, nil + } + + nWorkers := 5 Review Comment: nit: should we set this as a variable somewhere? ########## table/snapshot_producers.go: ########## @@ -89,9 +97,147 @@ func (fa *fastAppendFiles) existingManifests() ([]iceberg.ManifestFile, error) { } func (fa *fastAppendFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { + // for fast appends, there are no deleted entries return nil, nil } +type overwriteFiles struct { + base *snapshotProducer +} + +func newOverwriteFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) Review Comment: nit: for `newOverwriteFilesProducer`, isn't the `op` always `OVERWRITE`? ########## table/table_test.go: ########## @@ -732,6 +732,106 @@ func (t *TableWritingTestSuite) TestAddFilesReferencedCurrentSnapshotIgnoreDupli t.Equal([]int32{0, 0, 0}, deleted) } +type mockedCatalog struct{} + +func (m *mockedCatalog) LoadTable(ctx context.Context, ident table.Identifier, props iceberg.Properties) (*table.Table, error) { + return nil, nil +} + +func (m *mockedCatalog) CommitTable(ctx context.Context, tbl *table.Table, reqs []table.Requirement, updates []table.Update) (table.Metadata, string, error) { + bldr, err := table.MetadataBuilderFromBase(tbl.Metadata()) + if err != nil { + return nil, "", err + } + + for _, u := range updates { + if err := u.Apply(bldr); err != nil { + return nil, "", err + } + } + + meta, err := bldr.Build() + if err != nil { + return nil, "", err + } + + return meta, "", nil +} + +func (t *TableWritingTestSuite) TestReplaceDataFiles() { + fs := iceio.LocalFS{} + + files := make([]string, 0) + for i := range 5 { + filePath := fmt.Sprintf("%s/replace_data_files_v%d/data-%d.parquet", t.location, t.formatVersion, i) + t.writeParquet(fs, filePath, t.arrTablePromotedTypes) + files = append(files, filePath) + } + + ident := table.Identifier{"default", "replace_data_files_v" + strconv.Itoa(t.formatVersion)} + meta, err := table.NewMetadata(t.tableSchemaPromotedTypes, iceberg.UnpartitionedSpec, + table.UnsortedSortOrder, t.location, iceberg.Properties{"format-version": strconv.Itoa(t.formatVersion)}) + t.Require().NoError(err) + + ctx := context.Background() + + tbl := table.New(ident, meta, t.getMetadataLoc(), fs, &mockedCatalog{}) + for i := range 5 { + tx := tbl.NewTransaction() + t.Require().NoError(tx.AddFiles(files[i:i+1], nil, false)) + tbl, err = tx.Commit(ctx) + t.Require().NoError(err) + } + + mflist, err := tbl.CurrentSnapshot().Manifests(tbl.FS()) + t.Require().NoError(err) + t.Len(mflist, 5) + + // create a parquet file that is essentially as if we merged two of + // the data files together + cols := make([]arrow.Column, 0, t.arrTablePromotedTypes.NumCols()) + for i := range int(t.arrTablePromotedTypes.NumCols()) { + chkd := t.arrTablePromotedTypes.Column(i).Data() + duplicated := arrow.NewChunked(chkd.DataType(), append(chkd.Chunks(), chkd.Chunks()...)) + defer duplicated.Release() + + col := arrow.NewColumn(t.arrSchemaPromotedTypes.Fields()[i], duplicated) + defer col.Release() + + cols = append(cols, *col) + } + + combined := array.NewTable(t.arrSchemaPromotedTypes, cols, -1) + defer combined.Release() + + combinedFilePath := fmt.Sprintf("%s/replace_data_files_v%d/combined.parquet", t.location, t.formatVersion) + t.writeParquet(fs, combinedFilePath, combined) + + tx := tbl.NewTransaction() + t.Require().NoError(tx.ReplaceDataFiles(files[:2], []string{combinedFilePath}, nil)) Review Comment: Matches the snapshot summary ``` "added-data-files": "1", "added-records": "4", "deleted-data-files": "2", "deleted-records": "4", "total-data-files": "4", "total-records": "10", ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org