This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new e9e5c5c5d fix(go/adbc/driver/snowflake): fix potential deadlocks in
reader (#3870)
e9e5c5c5d is described below
commit e9e5c5c5d0ef248fc8cdfdd041e062a9d1df0e6f
Author: Matt Topol <[email protected]>
AuthorDate: Fri Jan 9 15:38:29 2026 -0500
fix(go/adbc/driver/snowflake): fix potential deadlocks in reader (#3870)
## Fix Critical Deadlocks and Race Conditions in Snowflake Record Reader
This PR addresses multiple critical concurrency issues in the Snowflake
driver's `recordReader` that could cause complete application hangs
under normal racing conditions.
### Issues Fixed
*1. Critical Deadlock: `Release()` Blocking Forever*
*Problem*: When `Release()` was called while producer goroutines were
blocked on channel sends, a permanent deadlock occurred:
* `Release()` cancels context and attempts to drain channels
* Producer goroutines blocked on `ch <- rec` cannot see the cancellation
* Channels never close because producers never exit
* `Release()` blocks forever on `for rec := range ch`
*Fix:* Added a `done` channel that signals when all producer goroutines
have completed. `Release()` now waits for this signal before attempting
to drain channels.
*2. Severe Deadlock: Non-Context-Aware Channel Sends*
*Problem:* Channel send operations at lines 694 and 732 checked context
before the send but not during:
```go
for rr.Next() && ctx.Err() == nil { // Context checked here
// ...
ch <- rec // But send blocks here without checking context
}
```
*Fix:* Wrapped all channel sends in `select` statements with context
awareness:
```go
select {
case chs[0] <- rec:
// Successfully sent
case <-ctx.Done():
rec.Release()
return ctx.Err()
}
```
*3. Critical Race Condition: Nil Channel Reads*
*Problem:* Channels were created asynchronously in goroutines after
`newRecordReader` returned. If `Next()` was called quickly after
creation, it could read from uninitialized (nil) channels, causing
infinite blocking.
*Fix:* Initialize all channels upfront before starting any goroutines:
```go
chs := make([]chan arrow.RecordBatch, len(batches))
for i := range chs {
chs[i] = make(chan arrow.RecordBatch, bufferSize)
}
```
*4. Goroutine Leaks on Initialization Errors*
*Problem:* Error paths only cleaned up the first channel, potentially
leaking goroutines if initialization failed after starting concurrent
operations.
*Fix:* Moved all error-prone initialization (GetStream, NewReader)
before goroutine creation, and added proper cleanup on errors.
----------------------
#### Changes
* Added `done` channel to `reader` struct to signal goroutine completion
* Initialize all channels upfront to eliminate race conditions
* Use context-aware sends with `select` statements for all channel
operations
* Update `Release()` to wait on `done` channel before draining
* Reorganize initialization to handle errors before starting goroutines
* Signal completion by closing `done` channel after all producers finish
#### Reproduction Scenarios Prevented
*Deadlock #1:*
1. bufferSize = 1, producer generates 2 records quickly
2. Channel becomes full after first record
3. Producer blocks on send
4. Consumer calls Release() before Next()
5. Without fix: permanent deadlock
6. With fix: producer responds to cancellation, Release() completes
*Race Condition:*
1. Query returns 3 batches
2. First batch processes quickly
3. Next() advances to second channel
4. Without fix: reads from nil channel, blocks forever
5. With fix: channel already initialized, works correctly
See #3730
---
go/adbc/driver/snowflake/record_reader.go | 92 ++++++++++++++++++++++---------
1 file changed, 66 insertions(+), 26 deletions(-)
diff --git a/go/adbc/driver/snowflake/record_reader.go
b/go/adbc/driver/snowflake/record_reader.go
index 9e50303af..5ccde3a10 100644
--- a/go/adbc/driver/snowflake/record_reader.go
+++ b/go/adbc/driver/snowflake/record_reader.go
@@ -524,6 +524,7 @@ type reader struct {
err error
cancelFn context.CancelFunc
+ done chan struct{} // signals all producer goroutines have finished
}
func newRecordReader(ctx context.Context, alloc memory.Allocator, ld
gosnowflake.ArrowStreamLoader, bufferSize, prefetchConcurrency int,
useHighPrecision bool, maxTimestampPrecision MaxTimestampPrecision)
(array.RecordReader, error) {
@@ -631,35 +632,26 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
return array.NewRecordReader(schema, results)
}
- ch := make(chan arrow.RecordBatch, bufferSize)
- group, ctx := errgroup.WithContext(compute.WithAllocator(ctx, alloc))
- ctx, cancelFn := context.WithCancel(ctx)
- group.SetLimit(prefetchConcurrency)
-
- defer func() {
- if err != nil {
- close(ch)
- cancelFn()
- }
- }()
-
- chs := make([]chan arrow.RecordBatch, len(batches))
- rdr := &reader{
- refCount: 1,
- chs: chs,
- err: nil,
- cancelFn: cancelFn,
- }
-
+ // Handle empty batches case early
if len(batches) == 0 {
schema, err := rowTypesToArrowSchema(ctx, ld, useHighPrecision,
maxTimestampPrecision)
if err != nil {
return nil, err
}
+ _, cancelFn := context.WithCancel(ctx)
+ rdr := &reader{
+ refCount: 1,
+ chs: nil,
+ err: nil,
+ cancelFn: cancelFn,
+ done: make(chan struct{}),
+ }
+ close(rdr.done) // No goroutines to wait for
rdr.schema, _ = getTransformer(schema, ld, useHighPrecision,
maxTimestampPrecision)
return rdr, nil
}
+ // Do all error-prone initialization first, before starting goroutines
r, err := batches[0].GetStream(ctx)
if err != nil {
return nil, errToAdbcErr(adbc.StatusIO, err)
@@ -667,12 +659,32 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
rr, err := ipc.NewReader(r, ipc.WithAllocator(alloc))
if err != nil {
+ _ = r.Close() // Clean up the stream
return nil, adbc.Error{
Msg: err.Error(),
Code: adbc.StatusInvalidState,
}
}
+ // Now setup concurrency primitives after error-prone operations
+ group, ctx := errgroup.WithContext(compute.WithAllocator(ctx, alloc))
+ ctx, cancelFn := context.WithCancel(ctx)
+ group.SetLimit(prefetchConcurrency)
+
+ // Initialize all channels upfront to avoid race condition
+ chs := make([]chan arrow.RecordBatch, len(batches))
+ for i := range chs {
+ chs[i] = make(chan arrow.RecordBatch, bufferSize)
+ }
+
+ rdr := &reader{
+ refCount: 1,
+ chs: chs,
+ err: nil,
+ cancelFn: cancelFn,
+ done: make(chan struct{}),
+ }
+
var recTransform recordTransformer
rdr.schema, recTransform = getTransformer(rr.Schema(), ld,
useHighPrecision, maxTimestampPrecision)
@@ -682,7 +694,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
err = errors.Join(err, r.Close())
}()
if len(batches) > 1 {
- defer close(ch)
+ defer close(chs[0])
}
for rr.Next() && ctx.Err() == nil {
@@ -691,18 +703,25 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
if err != nil {
return err
}
- ch <- rec
+
+ // Use context-aware send to prevent deadlock
+ select {
+ case chs[0] <- rec:
+ // Successfully sent
+ case <-ctx.Done():
+ // Context cancelled, clean up and exit
+ rec.Release()
+ return ctx.Err()
+ }
}
return rr.Err()
})
- chs[0] = ch
-
lastChannelIndex := len(chs) - 1
go func() {
for i, b := range batches[1:] {
batch, batchIdx := b, i+1
- chs[batchIdx] = make(chan arrow.RecordBatch, bufferSize)
+ // Channels already initialized above, no need to
create them here
group.Go(func() (err error) {
// close channels (except the last) so that
Next can move on to the next channel properly
if batchIdx != lastChannelIndex {
@@ -729,7 +748,16 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
if err != nil {
return err
}
- chs[batchIdx] <- rec
+
+ // Use context-aware send to prevent
deadlock
+ select {
+ case chs[batchIdx] <- rec:
+ // Successfully sent
+ case <-ctx.Done():
+ // Context cancelled, clean up
and exit
+ rec.Release()
+ return ctx.Err()
+ }
}
return rr.Err()
@@ -744,6 +772,8 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
// don't close the last channel until after the group is
finished,
// so that Next() can only return after reader.err may have
been set
close(chs[lastChannelIndex])
+ // Signal that all producer goroutines have finished
+ close(rdr.done)
}()
return rdr, nil
@@ -795,7 +825,17 @@ func (r *reader) Release() {
r.rec.Release()
}
r.cancelFn()
+
+ // Wait for all producer goroutines to finish before draining
channels
+ // This prevents deadlock where producers are blocked on sends
+ <-r.done
+
+ // Now safely drain remaining data from channels
+ // All channels should be closed at this point
for _, ch := range r.chs {
+ if ch == nil {
+ continue
+ }
for rec := range ch {
rec.Release()
}