laskoviymishka commented on code in PR #1104:
URL: https://github.com/apache/iceberg-go/pull/1104#discussion_r3282065506
##########
catalog/hadoop/hadoop.go:
##########
@@ -561,6 +561,10 @@ func (c *Catalog) DropTable(_ context.Context, ident
table.Identifier) error {
return os.RemoveAll(tablePath)
}
+func (c *Catalog) PurgeTable(ctx context.Context, identifier table.Identifier)
error {
Review Comment:
This delegates to `os.RemoveAll(tablePath)`, which means any data written to
`write.data.path` or `write.metadata.path` outside the table root just survives
the purge. The interface godoc explicitly claims "files written outside the
root… are deleted", so Hadoop is silently violating its own contract. I'd at
minimum load the table and run the same union-walk + bulk-delete path the other
catalogs use, or call out the caveat in the Hadoop-specific godoc.
##########
catalog/internal/utils.go:
##########
@@ -241,3 +245,79 @@ func UpdateAndStageTable(ctx context.Context, current
*table.Table, ident table.
),
}, nil
}
+
+func normalizeURI(uri string) string {
+ if strings.HasPrefix(uri, "file:") {
+ // Clean "file:/", "file://", "file:///" to all consistently
have "file:///" prefix
+ cleaned := strings.TrimPrefix(uri, "file:")
+ cleaned = strings.TrimPrefix(cleaned, "//")
+ cleaned = strings.TrimPrefix(cleaned, "/")
+
+ return "file:///" + cleaned
+ }
+
+ return uri
+}
+
+// PurgeTableFiles physically deletes all files under the table's warehouse
location
+// and any referenced files written outside the location root (e.g., via
write.data.path
+// or write.metadata.path properties).
Review Comment:
We're deleting data files unconditionally here, but Iceberg's spec gates
this on the `gc.enabled` table property — Java's `CatalogUtil.dropTableData`
and PyIceberg both refuse to purge data when `gc.enabled=false`, because the
same physical files may be referenced by snapshotted clones or branched tables.
On a `gc.enabled=false` table this PR will silently corrupt the other reader.
I'd read `tbl.Metadata().Properties().Get("gc.enabled", "true")` at the top of
`PurgeTableFiles` and short-circuit (or at least skip the data-file portion)
when it's false. wdyt?
##########
catalog/internal/utils.go:
##########
@@ -241,3 +245,79 @@ func UpdateAndStageTable(ctx context.Context, current
*table.Table, ident table.
),
}, nil
}
Review Comment:
This only strips `file://` — so the LocalFS walk callback (which yields bare
OS paths like `/tmp/foo`) and the metadata-side URIs (`file:///tmp/foo`) land
in the set as two distinct keys. Today it's masked because the second `Remove`
hits `ErrNoSuchFile` and we swallow it, but the union-dedup intent is broken,
and for any cloud FS where `WalkDir` emits a different scheme form than the
metadata we'll get spurious `BulkRemovableIO` errors. There's already a
`normalizeFilePath` helper in `table/orphan_cleanup.go` that does full URL
normalization across schemes — I'd reuse that instead of rolling our own here.
##########
catalog/internal/utils.go:
##########
@@ -241,3 +245,79 @@ func UpdateAndStageTable(ctx context.Context, current
*table.Table, ident table.
),
}, nil
}
+
+func normalizeURI(uri string) string {
+ if strings.HasPrefix(uri, "file:") {
+ // Clean "file:/", "file://", "file:///" to all consistently
have "file:///" prefix
+ cleaned := strings.TrimPrefix(uri, "file:")
+ cleaned = strings.TrimPrefix(cleaned, "//")
+ cleaned = strings.TrimPrefix(cleaned, "/")
+
+ return "file:///" + cleaned
+ }
+
+ return uri
+}
+
+// PurgeTableFiles physically deletes all files under the table's warehouse
location
+// and any referenced files written outside the location root (e.g., via
write.data.path
Review Comment:
The godoc calls this "best-effort" but `errors.Join` propagates every
per-file failure and the catalogs wrap it as "dropped table but failed to purge
files". That's not best-effort — that's fail-loud, with no retry path because
the catalog row is already gone. Either match Java/PyIceberg (log per-file
failures, return nil so the catalog drop stays the source of truth) or keep
fail-loud and return a structured `PurgeResult{CatalogDropped bool, FileErrors
[]error}` so the caller can tell that the catalog state is durable. Right now
the doc and the code disagree, which is the worst of both.
##########
catalog/internal/utils.go:
##########
@@ -170,6 +173,7 @@ func getDefaultWarehouseLocation(dbname, tablename string,
nsprops, catprops ice
// ([\w-]{36}) -> UUID (36 characters, including hyphens)
// (?:\.\w+)? -> optional codec name
// \.metadata\.json -> file extension
+// var tableMetadataFileNameRegex =
regexp.MustCompile(`^(\d+)-([\w-]{36})(?:\.\w+)?\.metadata\.json`)
Review Comment:
Stray commented-out `tableMetadataFileNameRegex` line that the diff left
behind. All three of us stopped on it. Worth a quick clean.
##########
catalog/catalog.go:
##########
@@ -167,6 +167,26 @@ type TransactionalCatalog interface {
CommitTransaction(ctx context.Context, commits []table.TableCommit)
error
}
+// PurgeableTable is an optional interface that catalogs can implement
+// to support physical table deletion (catalog entry + underlying files).
+// Callers should check for this capability via a type assertion:
+//
+// if purger, ok := cat.(catalog.PurgeableTable); ok {
+// err := purger.PurgeTable(ctx, ident)
+// }
+//
+// For REST catalogs the purge is delegated server-side. For client-side
+// catalogs (SQL, Glue, Hive, Hadoop) the table is first dropped from
+// the catalog, then all files under the table's [table.Metadata.Location]
+// root, plus any referenced files written outside the root (e.g. via
+// write.data.path or write.metadata.path table properties), are deleted.
+// File-deletion errors are propagated to the caller, but because the
+// catalog entry is already removed at that point there is no automatic
+// retry path.
+type PurgeableTable interface {
Review Comment:
The catalogs that intentionally satisfy this interface would benefit from a
`var _ catalog.PurgeableTable = (*Catalog)(nil)` assertion (REST especially,
since it's not exercised in the same test fixture). Cheap compile-time guard
against someone renaming the method out from under the interface.
##########
cmd/iceberg/drop_test.go:
##########
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "context"
+ "errors"
+ "iter"
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+type mockCatalogForDrop struct {
+ catalogType catalog.Type
+ dropCalled bool
+ dropIdent table.Identifier
+ dropErr error
+ checkExists bool
+ checkExistsErr error
+}
+
+func (m *mockCatalogForDrop) CatalogType() catalog.Type {
+ return m.catalogType
+}
+
+func (m *mockCatalogForDrop) CreateTable(ctx context.Context, identifier
table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt)
(*table.Table, error) {
+ return nil, nil
+}
+
+func (m *mockCatalogForDrop) CommitTable(ctx context.Context, identifier
table.Identifier, requirements []table.Requirement, updates []table.Update)
(table.Metadata, string, error) {
+ return nil, "", nil
+}
+
+func (m *mockCatalogForDrop) ListTables(ctx context.Context, namespace
table.Identifier) iter.Seq2[table.Identifier, error] {
+ return nil
+}
+
+func (m *mockCatalogForDrop) LoadTable(ctx context.Context, identifier
table.Identifier) (*table.Table, error) {
+ return nil, nil
+}
+
+func (m *mockCatalogForDrop) DropTable(ctx context.Context, identifier
table.Identifier) error {
+ m.dropCalled = true
+ m.dropIdent = identifier
+
+ return m.dropErr
+}
+
+func (m *mockCatalogForDrop) RenameTable(ctx context.Context, from, to
table.Identifier) (*table.Table, error) {
+ return nil, nil
+}
+
+func (m *mockCatalogForDrop) CheckTableExists(ctx context.Context, identifier
table.Identifier) (bool, error) {
+ return m.checkExists, m.checkExistsErr
+}
+
+func (m *mockCatalogForDrop) ListNamespaces(ctx context.Context, parent
table.Identifier) ([]table.Identifier, error) {
+ return nil, nil
+}
+
+func (m *mockCatalogForDrop) CreateNamespace(ctx context.Context, namespace
table.Identifier, props iceberg.Properties) error {
+ return nil
+}
+
+func (m *mockCatalogForDrop) DropNamespace(ctx context.Context, namespace
table.Identifier) error {
+ return nil
+}
+
+func (m *mockCatalogForDrop) CheckNamespaceExists(ctx context.Context,
namespace table.Identifier) (bool, error) {
+ return false, nil
+}
+
+func (m *mockCatalogForDrop) LoadNamespaceProperties(ctx context.Context,
namespace table.Identifier) (iceberg.Properties, error) {
+ return nil, nil
+}
+
+func (m *mockCatalogForDrop) UpdateNamespaceProperties(ctx context.Context,
namespace table.Identifier, removals []string, updates iceberg.Properties)
(catalog.PropertiesUpdateSummary, error) {
+ return catalog.PropertiesUpdateSummary{}, nil
+}
+
+type mockPurgeableCatalog struct {
+ mockCatalogForDrop
+ purgeCalled bool
+ purgeIdent table.Identifier
+ purgeErr error
Review Comment:
`purgeErr` is declared on the mock but never used in a test case — I'd add a
`TestRunDropTablePurgeError` that returns an error here and asserts the
wrapping ("dropped table but failed to purge files") so the contract can't
silently regress.
##########
catalog/sql/sql_test.go:
##########
@@ -684,6 +686,106 @@ func (s *SqliteCatalogTestSuite) TestDropTableNotExist() {
}
}
+func (s *SqliteCatalogTestSuite) TestPurgeTable() {
+ tests := []struct {
+ cat *sqlcat.Catalog
+ tblID table.Identifier
+ }{
+ {s.getCatalogMemory(), s.randomTableIdentifier()},
+ {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+ }
+
+ for _, tt := range tests {
+ ns := catalog.NamespaceFromIdent(tt.tblID)
+
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+
+ schema := iceberg.NewSchema(1, iceberg.NestedField{
+ ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.String, Required: true,
+ })
+ tbl, err := tt.cat.CreateTable(context.Background(), tt.tblID,
schema)
+ s.Require().NoError(err)
+
+ // Append data to create data files and manifest files
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "foo", Type: arrow.BinaryTypes.String},
+ }, nil)
+
+ bldr := array.NewStringBuilder(memory.DefaultAllocator)
+ bldr.Append("bar")
+ arr := bldr.NewArray()
+
+ rec := array.NewRecordBatch(arrowSchema, []arrow.Array{arr}, 1)
+ arrTable := array.NewTableFromRecords(arrowSchema,
[]arrow.RecordBatch{rec})
+
+ tx := tbl.NewTransaction()
+ s.Require().NoError(tx.AppendTable(context.Background(),
arrTable, 1024, nil))
+ tbl, err = tx.Commit(context.Background())
+ s.Require().NoError(err)
+
+ arr.Release()
+ bldr.Release()
+ rec.Release()
+ arrTable.Release()
+
+ metaLoc := strings.TrimPrefix(tbl.MetadataLocation(), "file://")
+ tableLoc := strings.TrimPrefix(tbl.Location(), "file://")
+ s.FileExists(metaLoc)
+
+ // Create a dummy statistics file at an external path outside
the table Location()
+ externalStatsPath := filepath.Join(s.warehouse,
"external-path", "stats.puffin")
+
s.Require().NoError(os.MkdirAll(filepath.Dir(externalStatsPath), 0o755))
+ s.Require().NoError(os.WriteFile(externalStatsPath,
[]byte("dummy puffin data"), 0o644))
+ s.FileExists(externalStatsPath)
+
+ // Add this external statistics file to the table metadata JSON
+ metaBytes, err := os.ReadFile(metaLoc)
+ s.Require().NoError(err)
+ var metaMap map[string]any
+ s.Require().NoError(json.Unmarshal(metaBytes, &metaMap))
+ metaMap["statistics"] = []any{
+ map[string]any{
+ "snapshot-id": int64(1),
Review Comment:
Hard-coding `"snapshot-id": int64(1)` here is fragile — if it ever drifts
from `tbl.CurrentSnapshot().SnapshotID`, the test will still pass, but for the
wrong reason (the external file gets removed by the walk rather than by the
referenced-files path, which is what this test is supposed to exercise). I'd
read the snapshot id off the loaded table instead.
##########
cmd/iceberg/main.go:
##########
@@ -537,10 +538,21 @@ func runDrop(ctx context.Context, output Output, cat
catalog.Catalog, cmd *DropC
os.Exit(1)
}
case cmd.Table != nil:
- err := cat.DropTable(ctx,
catalog.ToIdentifier(cmd.Table.Identifier))
+ ident := catalog.ToIdentifier(cmd.Table.Identifier)
+ var err error
+ if cmd.Table.Purge {
+ if purger, ok := cat.(catalog.PurgeableTable); ok {
+ err = purger.PurgeTable(ctx, ident)
+ } else {
+ output.Error(fmt.Errorf("catalog %s does not
support purge", cat.CatalogType()))
+ osExit(1)
Review Comment:
The purge branch uses `osExit(1)` (the swappable test hook), but the
namespace branch right next to it still calls bare `os.Exit(1)`. Functionally
fine today, but it's the kind of thing that makes future test additions land on
a confusing failure mode. I'd make both use `osExit` for consistency.
##########
table/orphan_cleanup.go:
##########
@@ -225,10 +225,10 @@ func (t Table) executeOrphanCleanup(ctx context.Context,
cfg *orphanCleanupConfi
return result, nil
}
-// getReferencedFiles collects all files referenced by table metadata:
previous metadata
+// GetReferencedFiles collects all files referenced by table metadata:
previous metadata
// files, statistics and partition-statistics paths (Puffin, etc.), and all
paths reachable
// from current snapshots (manifest lists, manifests, data files).
-func (t Table) getReferencedFiles(fs iceio.IO) (map[string]bool, error) {
+func (t Table) GetReferencedFiles(fs iceio.IO) (map[string]bool, error) {
Review Comment:
This is a public method on `Table` whose only caller is `catalog/internal`,
takes an `iceio.IO` it'll panic-deref if the table has snapshots and `fs` is
nil, and the returned `map[string]bool` has no documented contract about path
normalization. I'd consider either folding this into a `Table.PurgeFiles(ctx)`
that owns its own FS, or pushing it onto `Metadata` and documenting the nil-fs
case. Exporting it as-is locks in an awkward surface for one internal caller.
Also worth a note: this walk doesn't include deletion-vector blob paths.
With #1100 just merged, V3 tables with DVs will leak `.dv` files on stores
where `ListableIO` isn't available.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]