This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a1df7631 fix(csharp/src/Drivers/Databricks): Make mandatory token 
exchange blocking and fix concurrent handling (#3715)
9a1df7631 is described below

commit 9a1df763188b2a9ff05726e2930e1cc3d9e4dc5b
Author: eric-wang-1990 <[email protected]>
AuthorDate: Mon Nov 17 12:19:17 2025 -0800

    fix(csharp/src/Drivers/Databricks): Make mandatory token exchange blocking 
and fix concurrent handling (#3715)
    
    ## Summary
    - Changed token exchange from non-blocking to blocking synchronous
    operation
    - Fixed concurrent token exchange handling for multiple different tokens
    - Ensures requests use the exchanged Databricks token instead of the
    original token that would fail authentication
    - Simplified implementation with proper concurrent request handling
    
    ## Problem
    The previous non-blocking implementation had two issues:
    
    1. **First request would fail**: The handler would send the first
    request with the original non-Databricks token (e.g., Azure AD, AWS IAM)
    while starting token exchange in the background. Since this is
    **mandatory** token exchange, the non-Databricks token would fail
    authentication with Databricks.
    
    2. **Different tokens not handled correctly**: When multiple requests
    came in with different tokens, the second token would not be exchanged
    properly if it arrived during the first token's exchange. The second
    request would wait for the first token's exchange, then return without
    starting its own exchange.
    
    ## Solution
    - Token exchange now blocks the request until completion using direct
    `await`
    - **Serialize token exchanges**: When a request with a different token
    arrives during an ongoing exchange, it waits for that exchange to
    complete, then starts its own exchange
    - Removed `Task.Run` background thread offloading - exchange happens on
    the calling thread
    - Removed `_pendingTokenTask` tracking complexity
    - Removed `Dispose` method (no longer needed without background tasks)
    - Removed `_lastSeenToken` check from `NeedsTokenExchange` to avoid race
    conditions
    - Falls back to original token if exchange fails, with improved error
    message
    - **Single-token cache** prevents unbounded memory growth (only most
    recent token is cached)
    
    ## Implementation Details
    
    The key insight is to **serialize all token exchanges**:
    
    ```csharp
    // 1. Wait for any ongoing exchange to complete (could be for a different 
token)
    if (_pendingExchange != null)
    {
        await _pendingExchange;
    }
    
    // 2. Check if our token was already processed
    if (_lastSeenToken == bearerToken)
    {
        return;  // Already exchanged
    }
    
    // 3. Start exchange for our token
    _pendingExchange = DoExchangeAsync(bearerToken, cancellationToken);
    await _pendingExchange;
    ```
    
    This ensures:
    - Only one exchange runs at a time
    - Each different token gets exchanged
    - Concurrent requests with the same token share the exchange
    - No race conditions
    
    ## Test plan
    - [x] Test with external token to verify exchange blocks until
    completion before first request
    - [x] Verify subsequent requests reuse the cached exchanged token
    - [x] Test with different external tokens to verify each is exchanged
    properly
    - [x] Test concurrent requests with same token verify only one exchange
    happens
    - [x] Test token exchange failure scenario to confirm fallback behavior
    - [x] All 11 unit tests pass
    
    🤖 Generated with [Claude Code](https://claude.com/claude-code)
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
---
 .../MandatoryTokenExchangeDelegatingHandler.cs     | 125 ++++++++++-----------
 ...MandatoryTokenExchangeDelegatingHandlerTests.cs |  41 ++++---
 2 files changed, 88 insertions(+), 78 deletions(-)

diff --git 
a/csharp/src/Drivers/Databricks/Auth/MandatoryTokenExchangeDelegatingHandler.cs 
b/csharp/src/Drivers/Databricks/Auth/MandatoryTokenExchangeDelegatingHandler.cs
index edda03f53..e571d45a1 100644
--- 
a/csharp/src/Drivers/Databricks/Auth/MandatoryTokenExchangeDelegatingHandler.cs
+++ 
b/csharp/src/Drivers/Databricks/Auth/MandatoryTokenExchangeDelegatingHandler.cs
@@ -25,7 +25,8 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth
 {
     /// <summary>
     /// HTTP message handler that performs mandatory token exchange for 
non-Databricks tokens.
-    /// Uses a non-blocking approach to exchange tokens in the background.
+    /// Blocks requests while exchanging tokens to ensure the exchanged token 
is used.
+    /// Falls back to the original token if the exchange fails.
     /// </summary>
     internal class MandatoryTokenExchangeDelegatingHandler : DelegatingHandler
     {
@@ -34,8 +35,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth
         private readonly ITokenExchangeClient _tokenExchangeClient;
         private string? _currentToken;
         private string? _lastSeenToken;
-
-        protected Task? _pendingTokenTask = null;
+        private Task? _pendingExchange = null;
 
         /// <summary>
         /// Initializes a new instance of the <see 
cref="MandatoryTokenExchangeDelegatingHandler"/> class.
@@ -59,18 +59,6 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth
         /// <returns>True if token exchange is needed, false 
otherwise.</returns>
         private bool NeedsTokenExchange(string bearerToken)
         {
-            // If we already started exchange for this token, no need to check 
again
-            if (_lastSeenToken == bearerToken)
-            {
-                return false;
-            }
-
-            // If we already have a pending token task, don't start another 
exchange
-            if (_pendingTokenTask != null)
-            {
-                return false;
-            }
-
             // If we can't parse the token as JWT, default to use existing 
token
             if (!JwtTokenDecoder.TryGetIssuer(bearerToken, out string issuer))
             {
@@ -86,56 +74,87 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth
         }
 
         /// <summary>
-        /// Starts token exchange in the background if needed.
+        /// Performs token exchange if needed.
         /// </summary>
         /// <param name="bearerToken">The bearer token to potentially 
exchange.</param>
         /// <param name="cancellationToken">A cancellation token.</param>
-        private void StartTokenExchangeIfNeeded(string bearerToken, 
CancellationToken cancellationToken)
+        private async Task PerformTokenExchangeIfNeeded(string bearerToken, 
CancellationToken cancellationToken)
         {
-            if (_lastSeenToken == bearerToken)
+            // Check if we need exchange (no lock needed for this check)
+            bool needsExchange = NeedsTokenExchange(bearerToken);
+
+            if (!needsExchange)
             {
+                lock (_tokenLock)
+                {
+                    _lastSeenToken = bearerToken;
+                }
                 return;
             }
 
-            bool needsExchange;
+            // Wait for any pending exchange to complete first (could be for a 
different token)
+            Task? exchangeToAwait = null;
             lock (_tokenLock)
             {
-                needsExchange = NeedsTokenExchange(bearerToken);
-
-                _lastSeenToken = bearerToken;
+                if (_pendingExchange != null)
+                {
+                    exchangeToAwait = _pendingExchange;
+                }
             }
 
-            if (!needsExchange)
+            if (exchangeToAwait != null)
             {
-                return;
+                await exchangeToAwait;
             }
 
-            // Start token exchange in the background
-            _pendingTokenTask = Task.Run(async () =>
+            // Now check if we need to exchange our token
+            lock (_tokenLock)
             {
-                try
+                // If this token was already processed (by us or another 
concurrent request)
+                if (_lastSeenToken == bearerToken)
                 {
-                    TokenExchangeResponse response = await 
_tokenExchangeClient.ExchangeTokenAsync(
-                        bearerToken,
-                        _identityFederationClientId,
-                        cancellationToken);
-
-                    lock (_tokenLock)
-                    {
-                        _currentToken = response.AccessToken;
-                    }
+                    return;
                 }
-                catch (Exception ex)
+
+                // Start new exchange for our token
+                _lastSeenToken = bearerToken;
+                _pendingExchange = DoExchangeAsync(bearerToken, 
cancellationToken);
+                exchangeToAwait = _pendingExchange;
+            }
+
+            await exchangeToAwait;
+        }
+
+        /// <summary>
+        /// Performs the actual token exchange operation.
+        /// </summary>
+        /// <param name="bearerToken">The bearer token to exchange.</param>
+        /// <param name="cancellationToken">A cancellation token.</param>
+        private async Task DoExchangeAsync(string bearerToken, 
CancellationToken cancellationToken)
+        {
+            try
+            {
+                TokenExchangeResponse response = await 
_tokenExchangeClient.ExchangeTokenAsync(
+                    bearerToken,
+                    _identityFederationClientId,
+                    cancellationToken);
+
+                lock (_tokenLock)
                 {
-                    System.Diagnostics.Debug.WriteLine($"Mandatory token 
exchange failed: {ex.Message}");
+                    _currentToken = response.AccessToken;
                 }
-            }, cancellationToken).ContinueWith(_ =>
+            }
+            catch (Exception ex)
+            {
+                System.Diagnostics.Debug.WriteLine($"Mandatory token exchange 
failed: {ex.Message}. Continuing with original token.");
+            }
+            finally
             {
                 lock (_tokenLock)
                 {
-                    _pendingTokenTask = null;
+                    _pendingExchange = null;
                 }
-            }, TaskScheduler.Default);
+            }
         }
 
         /// <summary>
@@ -149,7 +168,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth
             string? bearerToken = request.Headers.Authorization?.Parameter;
             if (!string.IsNullOrEmpty(bearerToken))
             {
-                StartTokenExchangeIfNeeded(bearerToken!, cancellationToken);
+                await PerformTokenExchangeIfNeeded(bearerToken!, 
cancellationToken);
 
                 string tokenToUse;
                 lock (_tokenLock)
@@ -163,27 +182,5 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth
             return await base.SendAsync(request, cancellationToken);
         }
 
-        protected override void Dispose(bool disposing)
-        {
-            if (disposing)
-            {
-                // Wait for any pending token task to complete to avoid 
leaking tasks
-                if (_pendingTokenTask != null)
-                {
-                    try
-                    {
-                        // Try to wait for the task to complete, but don't 
block indefinitely
-                        _pendingTokenTask.Wait(TimeSpan.FromSeconds(10));
-                    }
-                    catch (Exception ex)
-                    {
-                        // Log any exceptions during disposal
-                        System.Diagnostics.Debug.WriteLine($"Exception during 
token task cleanup: {ex.Message}");
-                    }
-                }
-            }
-
-            base.Dispose(disposing);
-        }
     }
 }
diff --git 
a/csharp/test/Drivers/Databricks/Unit/Auth/MandatoryTokenExchangeDelegatingHandlerTests.cs
 
b/csharp/test/Drivers/Databricks/Unit/Auth/MandatoryTokenExchangeDelegatingHandlerTests.cs
index 13fe196e8..5da898ded 100644
--- 
a/csharp/test/Drivers/Databricks/Unit/Auth/MandatoryTokenExchangeDelegatingHandlerTests.cs
+++ 
b/csharp/test/Drivers/Databricks/Unit/Auth/MandatoryTokenExchangeDelegatingHandlerTests.cs
@@ -16,6 +16,7 @@
 */
 
 using System;
+using System.Linq;
 using System.Net;
 using System.Net.Http;
 using System.Text;
@@ -125,7 +126,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit.Auth
         }
 
         [Fact]
-        public async Task 
SendAsync_WithExternalToken_StartsTokenExchangeInBackground()
+        public async Task 
SendAsync_WithExternalToken_BlocksUntilTokenExchangeCompletes()
         {
             var tokenExchangeDelay = TimeSpan.FromMilliseconds(500);
             var handler = new MandatoryTokenExchangeDelegatingHandler(
@@ -165,23 +166,20 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit.Auth
 
             var httpClient = new HttpClient(handler);
 
-            // First request should use original token and start background 
exchange
+            // First request should block until token exchange completes, then 
use exchanged token
             var startTime = DateTime.UtcNow;
             var response = await httpClient.SendAsync(request);
             var requestDuration = DateTime.UtcNow - startTime;
 
             Assert.Equal(expectedResponse, response);
-            Assert.True(requestDuration < tokenExchangeDelay,
-                $"Request took {requestDuration.TotalMilliseconds}ms, which is 
longer than the token exchange delay of 
{tokenExchangeDelay.TotalMilliseconds}ms");
+            Assert.True(requestDuration >= tokenExchangeDelay,
+                $"Request took {requestDuration.TotalMilliseconds}ms, which is 
shorter than the token exchange delay of 
{tokenExchangeDelay.TotalMilliseconds}ms. Expected blocking behavior.");
 
             Assert.NotNull(capturedRequest);
             Assert.Equal("Bearer", 
capturedRequest.Headers.Authorization?.Scheme);
-            Assert.Equal(_externalToken, 
capturedRequest.Headers.Authorization?.Parameter); // First request uses 
original token
+            Assert.Equal(_exchangedToken, 
capturedRequest.Headers.Authorization?.Parameter); // First request uses 
exchanged token
 
-            // Wait for background task to complete
-            await Task.Delay(tokenExchangeDelay + 
TimeSpan.FromMilliseconds(1000));
-
-            // Make a second request - this should use the exchanged token
+            // Make a second request - this should also use the exchanged 
token (cached)
             var request2 = new HttpRequestMessage(HttpMethod.Get, 
"https://example.com/2";);
             request2.Headers.Authorization = new 
System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", _externalToken);
             HttpRequestMessage? capturedRequest2 = null;
@@ -198,8 +196,9 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit.Auth
 
             Assert.NotNull(capturedRequest2);
             Assert.Equal("Bearer", 
capturedRequest2.Headers.Authorization?.Scheme);
-            Assert.Equal(_exchangedToken, 
capturedRequest2.Headers.Authorization?.Parameter); // Second request uses 
exchanged token
+            Assert.Equal(_exchangedToken, 
capturedRequest2.Headers.Authorization?.Parameter); // Second request uses 
cached exchanged token
 
+            // Token exchange should only be called once
             _mockTokenExchangeClient.Verify(
                 x => x.ExchangeTokenAsync(_externalToken, 
_identityFederationClientId, It.IsAny<CancellationToken>()),
                 Times.Once);
@@ -396,11 +395,15 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit.Auth
                 ExpiryTime = DateTime.UtcNow.AddHours(1)
             };
 
-            // Add a small delay to token exchange to simulate concurrent 
access
+            var exchangeCallCount = 0;
+            var capturedRequests = new 
System.Collections.Concurrent.ConcurrentBag<HttpRequestMessage>();
+
+            // Add a delay to token exchange to ensure concurrent requests 
arrive while exchange is in progress
             _mockTokenExchangeClient
                 .Setup(x => x.ExchangeTokenAsync(_externalToken, 
_identityFederationClientId, It.IsAny<CancellationToken>()))
                 .Returns(async () =>
                 {
+                    Interlocked.Increment(ref exchangeCallCount);
                     await Task.Delay(200);
                     return tokenExchangeResponse;
                 });
@@ -410,6 +413,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit.Auth
                     "SendAsync",
                     ItExpr.IsAny<HttpRequestMessage>(),
                     ItExpr.IsAny<CancellationToken>())
+                .Callback<HttpRequestMessage, CancellationToken>((req, ct) => 
capturedRequests.Add(req))
                 .ReturnsAsync(new HttpResponseMessage(HttpStatusCode.OK));
 
             var httpClient = new HttpClient(handler);
@@ -424,13 +428,22 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit.Auth
 
             await Task.WhenAll(tasks);
 
-            // Wait for any background token exchange to complete
-            await Task.Delay(1000);
-
             // Token exchange should only be called once despite concurrent 
requests
             _mockTokenExchangeClient.Verify(
                 x => x.ExchangeTokenAsync(_externalToken, 
_identityFederationClientId, It.IsAny<CancellationToken>()),
                 Times.Once);
+
+            Assert.Equal(1, exchangeCallCount);
+
+            // All requests should have been sent
+            Assert.Equal(3, capturedRequests.Count);
+
+            // All concurrent requests should use the exchanged token
+            // (they all wait for the same _pendingExchange task)
+            foreach (var request in capturedRequests)
+            {
+                Assert.Equal(_exchangedToken, 
request.Headers.Authorization?.Parameter);
+            }
         }
 
         [Fact]

Reply via email to