laskoviymishka commented on code in PR #1078:
URL: https://github.com/apache/iceberg-go/pull/1078#discussion_r3243482896
##########
io/gocloud/blob.go:
##########
@@ -102,23 +115,81 @@ type blobFileIO struct {
newRangeReader func(ctx context.Context, key string, offset, length
int64) (io.ReadCloser, error)
}
+// resolveBucket parses path into a bucket handle and object key. If the URI
+// references the primary bucket (or has no scheme), the primary bucket is
+// returned without any additional allocation. URIs that reference a different
+// bucket are resolved via BucketOpener and cached for reuse.
+func (bfs *blobFileIO) resolveBucket(path string) (*blob.Bucket, string,
error) {
+ _, after, hasScheme := strings.Cut(path, "://")
+ if !hasScheme {
+ // No scheme: treat as a key in the primary bucket (legacy
behavior).
+ key, err := bfs.keyExtractor(path)
+
+ return bfs.Bucket, key, err
+ }
+
+ bucketName, key, _ := strings.Cut(after, "/")
+ if key == "" {
+ return nil, "", fmt.Errorf("URI path is empty: %s", path)
+ }
+
+ // Fast path: primary bucket.
+ if bucketName == bfs.primaryBucket {
+ return bfs.Bucket, key, nil
+ }
+
+ // Secondary bucket: check cache first.
+ if bfs.bucketOpener == nil {
+ // No opener configured: fall back to primary bucket with
legacy key extraction.
+ // This preserves backward compatibility for callers that don't
set a BucketOpener.
+ key, err := bfs.keyExtractor(path)
+
+ return bfs.Bucket, key, err
+ }
+
+ bfs.mu.RLock()
+ b, ok := bfs.buckets[bucketName]
+ bfs.mu.RUnlock()
+ if ok {
+ return b, key, nil
+ }
+
+ // Open a new bucket handle.
+ b, err := bfs.bucketOpener(bfs.ctx, bucketName)
+ if err != nil {
+ return nil, "", fmt.Errorf("failed to open bucket %q: %w",
bucketName, err)
+ }
+
+ bfs.mu.Lock()
+ // Double-check: another goroutine may have opened it concurrently.
+ if existing, ok := bfs.buckets[bucketName]; ok {
+ bfs.mu.Unlock()
+ _ = b.Close()
+
+ return existing, key, nil
+ }
+ bfs.buckets[bucketName] = b
+ bfs.mu.Unlock()
+
+ return b, key, nil
+}
+
func (bfs *blobFileIO) preprocess(path string) (string, error) {
return bfs.keyExtractor(path)
}
func (bfs *blobFileIO) Open(path string) (icebergio.File, error) {
- var err error
- path, err = bfs.preprocess(path)
+ bucket, key, err := bfs.resolveBucket(path)
if err != nil {
return nil, &fs.PathError{Op: "open", Path: path, Err: err}
}
- if !fs.ValidPath(path) {
- return nil, &fs.PathError{Op: "open", Path: path, Err:
fs.ErrInvalid}
+ if !fs.ValidPath(key) {
+ return nil, &fs.PathError{Op: "open", Path: key, Err:
fs.ErrInvalid}
}
- key, name := path, filepath.Base(path)
+ name := filepath.Base(key)
- r, err := bfs.NewReader(bfs.ctx, key, nil)
+ r, err := bucket.NewReader(bfs.ctx, key, nil)
Review Comment:
I think there's a subtle issue — `Open` now routes through the resolved
`bucket` here, but `blobOpenFile.ReadAt` (line 49) still calls
`f.b.NewRangeReader(...)`, which dispatches to the embedded *primary* bucket.
So any cross-bucket file opened here fails on the first `ReadAt` (404, or worse
if a same-named key exists in the primary), and that's the path Parquet column
readers and the puffin/DV reader actually take. `TestMultiBucketWriteAndRead`
passes only because `io.ReadAll` uses `Read`.
Could we thread `bucket` into `blobOpenFile` and have `ReadAt` use
`f.bucket.NewRangeReader(...)`? A `ReadAt`-specific test would help lock it in.
wdyt?
##########
io/gocloud/blob.go:
##########
@@ -91,9 +92,21 @@ func defaultKeyExtractor(bucketName string) KeyExtractor {
}
}
+// BucketOpener creates a new blob.Bucket for the given bucket name.
+// Used to lazily open secondary buckets when a URI references a bucket
+// different from the primary one (e.g. write.metadata.path pointing to
+// a separate metadata bucket).
+type BucketOpener func(ctx context.Context, bucketName string) (*blob.Bucket,
error)
+
type blobFileIO struct {
*blob.Bucket
+ primaryBucket string
+ bucketOpener BucketOpener
+
+ mu sync.RWMutex
+ buckets map[string]*blob.Bucket
Review Comment:
These secondary buckets are never closed — `blobFileIO` only inherits
`Close()` from the embedded primary. Each cached `*blob.Bucket` holds its own
HTTP transport / connection pool, so a long-running process that touches many
distinct metadata buckets will leak transports indefinitely.
I'd add a `Close()` on `blobFileIO` that walks `bfs.buckets` under the lock
and closes each before closing the primary. wdyt?
##########
io/gocloud/blob.go:
##########
@@ -159,37 +228,53 @@ func (bfs *blobFileIO) WriteFile(name string, content
[]byte) error {
// The caller must call Close on the returned Writer, even if the write is
// aborted.
func (bfs *blobFileIO) NewWriter(ctx context.Context, path string, overwrite
bool, opts *blob.WriterOptions) (w *blobWriteFile, err error) {
- path, err = bfs.preprocess(path)
+ bucket, key, err := bfs.resolveBucket(path)
if err != nil {
return nil, &fs.PathError{Op: "new writer", Path: path, Err:
err}
}
- if !fs.ValidPath(path) {
- return nil, &fs.PathError{Op: "new writer", Path: path, Err:
fs.ErrInvalid}
+ if !fs.ValidPath(key) {
+ return nil, &fs.PathError{Op: "new writer", Path: key, Err:
fs.ErrInvalid}
}
if !overwrite {
- if exists, err := bfs.Exists(ctx, path); err != nil || exists {
+ if exists, err := bucket.Exists(ctx, key); err != nil || exists
{
if err != nil {
- return nil, &fs.PathError{Op: "new writer",
Path: path, Err: err}
+ return nil, &fs.PathError{Op: "new writer",
Path: key, Err: err}
}
- return nil, &fs.PathError{Op: "new writer", Path: path,
Err: fs.ErrInvalid}
+ return nil, &fs.PathError{Op: "new writer", Path: key,
Err: fs.ErrInvalid}
}
}
- bw, err := bfs.Bucket.NewWriter(ctx, path, opts)
+ bw, err := bucket.NewWriter(ctx, key, opts)
if err != nil {
return nil, err
}
return &blobWriteFile{
Writer: bw,
- name: path,
+ name: key,
},
nil
}
func createBlobFS(ctx context.Context, bucket *blob.Bucket, keyExtractor
KeyExtractor) icebergio.IO {
- return &blobFileIO{Bucket: bucket, keyExtractor: keyExtractor, ctx: ctx}
+ return &blobFileIO{
+ Bucket: bucket,
+ keyExtractor: keyExtractor,
+ ctx: ctx,
+ buckets: make(map[string]*blob.Bucket),
+ }
+}
+
+func createMultiBucketBlobFS(ctx context.Context, bucket *blob.Bucket,
primaryBucket string, opener BucketOpener) icebergio.IO {
+ return &blobFileIO{
+ Bucket: bucket,
+ primaryBucket: primaryBucket,
+ bucketOpener: opener,
+ buckets: make(map[string]*blob.Bucket),
+ keyExtractor: defaultKeyExtractor(primaryBucket),
+ ctx: ctx,
+ }
}
func (bfs *blobFileIO) WalkDir(root string, fn fs.WalkDirFunc) error {
Review Comment:
`WalkDir` is the one IO entry point that doesn't go through `resolveBucket`
— it still hardcodes `fs.WalkDir(bfs.Bucket, ...)`. So if `root` names a
secondary bucket (e.g., orphan-file enumeration on `write.metadata.path`), this
silently walks the primary warehouse bucket and returns the wrong listing.
Could we route this through `resolveBucket` too? Probably worth a small test
covering the cross-bucket root.
##########
io/gocloud/register.go:
##########
@@ -38,7 +39,13 @@ func registerS3Schemes() {
return nil, err
}
- return createBlobFS(ctx, bucket,
defaultKeyExtractor(parsed.Host)), nil
+ opener := func(ctx context.Context, bucketName string)
(*blob.Bucket, error) {
+ u := &url.URL{Scheme: parsed.Scheme, Host: bucketName}
+
+ return createS3Bucket(ctx, u, props)
+ }
+
+ return createMultiBucketBlobFS(ctx, bucket, parsed.Host,
opener), nil
Review Comment:
The fix lands for S3 here, but `registerGCSScheme` (line 64) and
`registerAzureSchemes` (line 76) still call `createBlobFS`, so a GCS or ADLS
user with `write.metadata.path` on a separate bucket still hits the mangled-key
bug we're fixing. PyIceberg and Java handle cross-bucket GCS transparently, so
tables written by them become unreadable from Go in that layout.
Could we either add `BucketOpener` impls for GCS/Azure here, or call out the
S3-only scope in the PR description with a follow-up tracked? wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]