This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new e9e5c5c5d fix(go/adbc/driver/snowflake): fix potential deadlocks in 
reader (#3870)
e9e5c5c5d is described below

commit e9e5c5c5d0ef248fc8cdfdd041e062a9d1df0e6f
Author: Matt Topol <[email protected]>
AuthorDate: Fri Jan 9 15:38:29 2026 -0500

    fix(go/adbc/driver/snowflake): fix potential deadlocks in reader (#3870)
    
    ## Fix Critical Deadlocks and Race Conditions in Snowflake Record Reader
    
    This PR addresses multiple critical concurrency issues in the Snowflake
    driver's `recordReader` that could cause complete application hangs
    under normal racing conditions.
    
    ### Issues Fixed
    
    *1. Critical Deadlock: `Release()` Blocking Forever*
    
    *Problem*: When `Release()` was called while producer goroutines were
    blocked on channel sends, a permanent deadlock occurred:
    
    * `Release()` cancels context and attempts to drain channels
    * Producer goroutines blocked on `ch <- rec` cannot see the cancellation
    * Channels never close because producers never exit
    * `Release()` blocks forever on `for rec := range ch`
    
    *Fix:* Added a `done` channel that signals when all producer goroutines
    have completed. `Release()` now waits for this signal before attempting
    to drain channels.
    
    *2. Severe Deadlock: Non-Context-Aware Channel Sends*
    
    *Problem:* Channel send operations at lines 694 and 732 checked context
    before the send but not during:
    
    ```go
    for rr.Next() && ctx.Err() == nil {  // Context checked here
        // ...
        ch <- rec  // But send blocks here without checking context
    }
    ```
    
    *Fix:* Wrapped all channel sends in `select` statements with context
    awareness:
    
    ```go
    select {
    case chs[0] <- rec:
        // Successfully sent
    case <-ctx.Done():
        rec.Release()
        return ctx.Err()
    }
    ```
    
    *3. Critical Race Condition: Nil Channel Reads*
    
    *Problem:* Channels were created asynchronously in goroutines after
    `newRecordReader` returned. If `Next()` was called quickly after
    creation, it could read from uninitialized (nil) channels, causing
    infinite blocking.
    
    *Fix:* Initialize all channels upfront before starting any goroutines:
    
    ```go
    chs := make([]chan arrow.RecordBatch, len(batches))
    for i := range chs {
        chs[i] = make(chan arrow.RecordBatch, bufferSize)
    }
    ```
    
    *4. Goroutine Leaks on Initialization Errors*
    
    *Problem:* Error paths only cleaned up the first channel, potentially
    leaking goroutines if initialization failed after starting concurrent
    operations.
    
    *Fix:* Moved all error-prone initialization (GetStream, NewReader)
    before goroutine creation, and added proper cleanup on errors.
    
    ----------------------
    
    #### Changes
    
    * Added `done` channel to `reader` struct to signal goroutine completion
    * Initialize all channels upfront to eliminate race conditions
    * Use context-aware sends with `select` statements for all channel
    operations
    * Update `Release()` to wait on `done` channel before draining
    * Reorganize initialization to handle errors before starting goroutines
    * Signal completion by closing `done` channel after all producers finish
    
    #### Reproduction Scenarios Prevented
    
    *Deadlock #1:*
    
    1. bufferSize = 1, producer generates 2 records quickly
    2. Channel becomes full after first record
    3. Producer blocks on send
    4. Consumer calls Release() before Next()
    5. Without fix: permanent deadlock
    6. With fix: producer responds to cancellation, Release() completes
    
    *Race Condition:*
    
    1. Query returns 3 batches
    2. First batch processes quickly
    3. Next() advances to second channel
    4. Without fix: reads from nil channel, blocks forever
    5. With fix: channel already initialized, works correctly
    
    See #3730
---
 go/adbc/driver/snowflake/record_reader.go | 92 ++++++++++++++++++++++---------
 1 file changed, 66 insertions(+), 26 deletions(-)

diff --git a/go/adbc/driver/snowflake/record_reader.go 
b/go/adbc/driver/snowflake/record_reader.go
index 9e50303af..5ccde3a10 100644
--- a/go/adbc/driver/snowflake/record_reader.go
+++ b/go/adbc/driver/snowflake/record_reader.go
@@ -524,6 +524,7 @@ type reader struct {
        err        error
 
        cancelFn context.CancelFunc
+       done     chan struct{} // signals all producer goroutines have finished
 }
 
 func newRecordReader(ctx context.Context, alloc memory.Allocator, ld 
gosnowflake.ArrowStreamLoader, bufferSize, prefetchConcurrency int, 
useHighPrecision bool, maxTimestampPrecision MaxTimestampPrecision) 
(array.RecordReader, error) {
@@ -631,35 +632,26 @@ func newRecordReader(ctx context.Context, alloc 
memory.Allocator, ld gosnowflake
                return array.NewRecordReader(schema, results)
        }
 
-       ch := make(chan arrow.RecordBatch, bufferSize)
-       group, ctx := errgroup.WithContext(compute.WithAllocator(ctx, alloc))
-       ctx, cancelFn := context.WithCancel(ctx)
-       group.SetLimit(prefetchConcurrency)
-
-       defer func() {
-               if err != nil {
-                       close(ch)
-                       cancelFn()
-               }
-       }()
-
-       chs := make([]chan arrow.RecordBatch, len(batches))
-       rdr := &reader{
-               refCount: 1,
-               chs:      chs,
-               err:      nil,
-               cancelFn: cancelFn,
-       }
-
+       // Handle empty batches case early
        if len(batches) == 0 {
                schema, err := rowTypesToArrowSchema(ctx, ld, useHighPrecision, 
maxTimestampPrecision)
                if err != nil {
                        return nil, err
                }
+               _, cancelFn := context.WithCancel(ctx)
+               rdr := &reader{
+                       refCount: 1,
+                       chs:      nil,
+                       err:      nil,
+                       cancelFn: cancelFn,
+                       done:     make(chan struct{}),
+               }
+               close(rdr.done) // No goroutines to wait for
                rdr.schema, _ = getTransformer(schema, ld, useHighPrecision, 
maxTimestampPrecision)
                return rdr, nil
        }
 
+       // Do all error-prone initialization first, before starting goroutines
        r, err := batches[0].GetStream(ctx)
        if err != nil {
                return nil, errToAdbcErr(adbc.StatusIO, err)
@@ -667,12 +659,32 @@ func newRecordReader(ctx context.Context, alloc 
memory.Allocator, ld gosnowflake
 
        rr, err := ipc.NewReader(r, ipc.WithAllocator(alloc))
        if err != nil {
+               _ = r.Close() // Clean up the stream
                return nil, adbc.Error{
                        Msg:  err.Error(),
                        Code: adbc.StatusInvalidState,
                }
        }
 
+       // Now setup concurrency primitives after error-prone operations
+       group, ctx := errgroup.WithContext(compute.WithAllocator(ctx, alloc))
+       ctx, cancelFn := context.WithCancel(ctx)
+       group.SetLimit(prefetchConcurrency)
+
+       // Initialize all channels upfront to avoid race condition
+       chs := make([]chan arrow.RecordBatch, len(batches))
+       for i := range chs {
+               chs[i] = make(chan arrow.RecordBatch, bufferSize)
+       }
+
+       rdr := &reader{
+               refCount: 1,
+               chs:      chs,
+               err:      nil,
+               cancelFn: cancelFn,
+               done:     make(chan struct{}),
+       }
+
        var recTransform recordTransformer
        rdr.schema, recTransform = getTransformer(rr.Schema(), ld, 
useHighPrecision, maxTimestampPrecision)
 
@@ -682,7 +694,7 @@ func newRecordReader(ctx context.Context, alloc 
memory.Allocator, ld gosnowflake
                        err = errors.Join(err, r.Close())
                }()
                if len(batches) > 1 {
-                       defer close(ch)
+                       defer close(chs[0])
                }
 
                for rr.Next() && ctx.Err() == nil {
@@ -691,18 +703,25 @@ func newRecordReader(ctx context.Context, alloc 
memory.Allocator, ld gosnowflake
                        if err != nil {
                                return err
                        }
-                       ch <- rec
+
+                       // Use context-aware send to prevent deadlock
+                       select {
+                       case chs[0] <- rec:
+                               // Successfully sent
+                       case <-ctx.Done():
+                               // Context cancelled, clean up and exit
+                               rec.Release()
+                               return ctx.Err()
+                       }
                }
                return rr.Err()
        })
 
-       chs[0] = ch
-
        lastChannelIndex := len(chs) - 1
        go func() {
                for i, b := range batches[1:] {
                        batch, batchIdx := b, i+1
-                       chs[batchIdx] = make(chan arrow.RecordBatch, bufferSize)
+                       // Channels already initialized above, no need to 
create them here
                        group.Go(func() (err error) {
                                // close channels (except the last) so that 
Next can move on to the next channel properly
                                if batchIdx != lastChannelIndex {
@@ -729,7 +748,16 @@ func newRecordReader(ctx context.Context, alloc 
memory.Allocator, ld gosnowflake
                                        if err != nil {
                                                return err
                                        }
-                                       chs[batchIdx] <- rec
+
+                                       // Use context-aware send to prevent 
deadlock
+                                       select {
+                                       case chs[batchIdx] <- rec:
+                                               // Successfully sent
+                                       case <-ctx.Done():
+                                               // Context cancelled, clean up 
and exit
+                                               rec.Release()
+                                               return ctx.Err()
+                                       }
                                }
 
                                return rr.Err()
@@ -744,6 +772,8 @@ func newRecordReader(ctx context.Context, alloc 
memory.Allocator, ld gosnowflake
                // don't close the last channel until after the group is 
finished,
                // so that Next() can only return after reader.err may have 
been set
                close(chs[lastChannelIndex])
+               // Signal that all producer goroutines have finished
+               close(rdr.done)
        }()
 
        return rdr, nil
@@ -795,7 +825,17 @@ func (r *reader) Release() {
                        r.rec.Release()
                }
                r.cancelFn()
+
+               // Wait for all producer goroutines to finish before draining 
channels
+               // This prevents deadlock where producers are blocked on sends
+               <-r.done
+
+               // Now safely drain remaining data from channels
+               // All channels should be closed at this point
                for _, ch := range r.chs {
+                       if ch == nil {
+                               continue
+                       }
                        for rec := range ch {
                                rec.Release()
                        }

Reply via email to