This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new aae84d22c feat(csharp/src/Drivers/Databricks): Primary Key and Foreign
Key Metadata Optimization (#2886)
aae84d22c is described below
commit aae84d22c1f74601ae81cf6b38455bbacfb7e65e
Author: eric-wang-1990 <[email protected]>
AuthorDate: Wed May 28 10:27:39 2025 -0700
feat(csharp/src/Drivers/Databricks): Primary Key and Foreign Key Metadata
Optimization (#2886)
## Arrow ADBC: Primary Key and Foreign Key Metadata Optimization
### Description
This PR adds support for optimizing Primary Key and Foreign Key metadata
queries in the C# Databricks ADBC driver. It introduces a new connection
parameter `adbc.databricks.enable_pk_fk` that allows users to control
whether the driver should make PK/FK metadata calls to the server or
return empty results for improved performance.
### Background
Primary Key and Foreign Key metadata queries can be expensive
operations, particularly in Databricks environments where they may not
be fully supported in certain catalogs. This implementation provides a
way to optimize these operations by:
1. Allowing users to disable PK/FK metadata calls entirely via
configuration
2. Automatically returning empty results for legacy catalogs (SPARK,
hive_metastore) where PK/FK metadata is not supported
3. Ensuring that empty results maintain schema compatibility with real
metadata responses
### Proposed Changes
- Add new connection parameter `adbc.databricks.enable_pk_fk` to control
PK/FK metadata behavior (default: true)
- Implement special handling for legacy catalogs (SPARK, hive_metastore)
to return empty results without server calls
- Modify method visibility in base classes to allow proper overriding in
derived classes
- Add comprehensive test coverage for the new functionality
### How is this tested?
Added unit tests that verify:
1. The correct behavior of the `ShouldReturnEmptyPkFkResult` method with
various combinations of settings
2. Schema compatibility between empty results and real metadata
responses
3. Proper handling of different catalog scenarios
These tests ensure that the optimization works correctly while
maintaining compatibility with client applications that expect
consistent schema structures.
---
.../Drivers/Apache/Hive2/HiveServer2Statement.cs | 4 +-
.../src/Drivers/Databricks/DatabricksConnection.cs | 18 ++++
.../src/Drivers/Databricks/DatabricksParameters.cs | 6 ++
.../src/Drivers/Databricks/DatabricksStatement.cs | 117 ++++++++++++++++++++-
csharp/test/Drivers/Databricks/StatementTests.cs | 98 +++++++++++++++++
5 files changed, 240 insertions(+), 3 deletions(-)
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
index 4079fce26..c4c7414c2 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
@@ -368,7 +368,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
};
}
- private async Task<QueryResult>
GetCrossReferenceAsync(CancellationToken cancellationToken = default)
+ protected virtual async Task<QueryResult>
GetCrossReferenceAsync(CancellationToken cancellationToken = default)
{
TGetCrossReferenceResp resp = await
Connection.GetCrossReferenceAsync(
CatalogName,
@@ -383,7 +383,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
return await GetQueryResult(resp.DirectResults, cancellationToken);
}
- private async Task<QueryResult> GetPrimaryKeysAsync(CancellationToken
cancellationToken = default)
+ protected virtual async Task<QueryResult>
GetPrimaryKeysAsync(CancellationToken cancellationToken = default)
{
TGetPrimaryKeysResp resp = await Connection.GetPrimaryKeysAsync(
CatalogName,
diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 3cb0e544f..1282d0c8a 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -40,6 +40,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
private bool _applySSPWithQueries = false;
private bool _enableDirectResults = true;
private bool _enableMultipleCatalogSupport = true;
+ private bool _enablePKFK = true;
internal static TSparkGetDirectResults defaultGetDirectResults = new()
{
@@ -71,6 +72,18 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
private void ValidateProperties()
{
+ if (Properties.TryGetValue(DatabricksParameters.EnablePKFK, out
string? enablePKFKStr))
+ {
+ if (bool.TryParse(enablePKFKStr, out bool enablePKFKValue))
+ {
+ _enablePKFK = enablePKFKValue;
+ }
+ else
+ {
+ throw new ArgumentException($"Parameter
'{DatabricksParameters.EnablePKFK}' value '{enablePKFKStr}' could not be
parsed. Valid values are 'true', 'false'.");
+ }
+ }
+
if
(Properties.TryGetValue(DatabricksParameters.EnableMultipleCatalogSupport, out
string? enableMultipleCatalogSupportStr))
{
if (bool.TryParse(enableMultipleCatalogSupportStr, out bool
enableMultipleCatalogSupportValue))
@@ -204,6 +217,11 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
/// </summary>
internal bool EnableMultipleCatalogSupport =>
_enableMultipleCatalogSupport;
+ /// <summary>
+ /// Gets whether PK/FK metadata call is enabled
+ /// </summary>
+ public bool EnablePKFK => _enablePKFK;
+
/// <summary>
/// Gets a value indicating whether to retry requests that receive a
503 response with a Retry-After header.
/// </summary>
diff --git a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
index 85e33f62d..db62c04b2 100644
--- a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
@@ -155,6 +155,12 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
/// Default value is true if not specified.
/// </summary>
public const string EnableMultipleCatalogSupport =
"adbc.databricks.enable_multiple_catalog_support";
+
+ /// <summary>
+ /// Whether to enable primary key foreign key metadata call.
+ /// Default value is true if not specified.
+ /// </summary>
+ public const string EnablePKFK = "adbc.databricks.enable_pk_fk";
}
/// <summary>
diff --git a/csharp/src/Drivers/Databricks/DatabricksStatement.cs
b/csharp/src/Drivers/Databricks/DatabricksStatement.cs
index 1fdeee285..cb92cdd5e 100644
--- a/csharp/src/Drivers/Databricks/DatabricksStatement.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksStatement.cs
@@ -16,7 +16,6 @@
*/
using System;
-using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache;
@@ -37,6 +36,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
private bool canDecompressLz4;
private long maxBytesPerFile;
private bool enableMultipleCatalogSupport;
+ private bool enablePKFK;
public DatabricksStatement(DatabricksConnection connection)
: base(connection)
@@ -46,6 +46,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
canDecompressLz4 = connection.CanDecompressLz4;
maxBytesPerFile = connection.MaxBytesPerFile;
enableMultipleCatalogSupport =
connection.EnableMultipleCatalogSupport;
+ enablePKFK = connection.EnablePKFK;
}
protected override void SetStatementProperties(TExecuteStatementReq
statement)
@@ -386,5 +387,119 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
// Call the base implementation with the potentially modified
catalog name
return await base.GetColumnsAsync(cancellationToken);
}
+
+ /// <summary>
+ /// Determines whether PK/FK metadata queries
(GetPrimaryKeys/GetCrossReference) should return an empty result set without
hitting the server.
+ ///
+ /// Why:
+ /// - For certain catalog names (null, empty, "SPARK",
"hive_metastore"), Databricks does not support PK/FK metadata,
+ /// or these are legacy/synthesized catalogs that should gracefully
return empty results for compatibility.
+ /// - The EnablePKFK flag allows the client to globally disable PK/FK
metadata queries for performance or compatibility reasons.
+ ///
+ /// What it does:
+ /// - Returns true if PK/FK queries should return an empty result (and
not hit the server), based on:
+ /// - The EnablePKFK flag (if false, always return empty)
+ /// - The catalog name (SPARK, hive_metastore, null, or empty string)
+ /// - Returns false if the query should proceed to the server (for
valid, supported catalogs).
+ /// </summary>
+ internal bool ShouldReturnEmptyPkFkResult()
+ {
+ if (!enablePKFK)
+ return true;
+
+ // Handle special catalog cases
+ if (string.IsNullOrEmpty(CatalogName) ||
+ string.Equals(CatalogName, "SPARK",
StringComparison.OrdinalIgnoreCase) ||
+ string.Equals(CatalogName, "hive_metastore",
StringComparison.OrdinalIgnoreCase))
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ protected override async Task<QueryResult>
GetPrimaryKeysAsync(CancellationToken cancellationToken = default)
+ {
+ if (ShouldReturnEmptyPkFkResult())
+ return EmptyPrimaryKeysResult();
+
+ return await base.GetPrimaryKeysAsync(cancellationToken);
+ }
+
+ private QueryResult EmptyPrimaryKeysResult()
+ {
+ var fields = new[]
+ {
+ new Field("TABLE_CAT", StringType.Default, true),
+ new Field("TABLE_SCHEM", StringType.Default, true),
+ new Field("TABLE_NAME", StringType.Default, true),
+ new Field("COLUMN_NAME", StringType.Default, true),
+ new Field("KEQ_SEQ", Int32Type.Default, true),
+ new Field("PK_NAME", StringType.Default, true)
+ };
+ var schema = new Schema(fields, null);
+
+ var arrays = new IArrowArray[]
+ {
+ new StringArray.Builder().Build(), // TABLE_CAT
+ new StringArray.Builder().Build(), // TABLE_SCHEM
+ new StringArray.Builder().Build(), // TABLE_NAME
+ new StringArray.Builder().Build(), // COLUMN_NAME
+ new Int16Array.Builder().Build(), // KEQ_SEQ
+ new StringArray.Builder().Build() // PK_NAME
+ };
+
+ return new QueryResult(0, new
HiveServer2Connection.HiveInfoArrowStream(schema, arrays));
+ }
+
+ protected override async Task<QueryResult>
GetCrossReferenceAsync(CancellationToken cancellationToken = default)
+ {
+ if (ShouldReturnEmptyPkFkResult())
+ return EmptyCrossReferenceResult();
+
+ return await base.GetCrossReferenceAsync(cancellationToken);
+ }
+
+ private QueryResult EmptyCrossReferenceResult()
+ {
+ var fields = new[]
+ {
+ new Field("PKTABLE_CAT", StringType.Default, true),
+ new Field("PKTABLE_SCHEM", StringType.Default, true),
+ new Field("PKTABLE_NAME", StringType.Default, true),
+ new Field("PKCOLUMN_NAME", StringType.Default, true),
+ new Field("FKTABLE_CAT", StringType.Default, true),
+ new Field("FKTABLE_SCHEM", StringType.Default, true),
+ new Field("FKTABLE_NAME", StringType.Default, true),
+ new Field("FKCOLUMN_NAME", StringType.Default, true),
+ new Field("KEY_SEQ", Int16Type.Default, true),
+ new Field("UPDATE_RULE", Int16Type.Default, true),
+ new Field("DELETE_RULE", Int16Type.Default, true),
+ new Field("FK_NAME", StringType.Default, true),
+ new Field("PK_NAME", StringType.Default, true),
+ new Field("DEFERRABILITY", Int16Type.Default, true)
+ };
+ var schema = new Schema(fields, null);
+
+ var arrays = new IArrowArray[]
+ {
+ new StringArray.Builder().Build(), // PKTABLE_CAT
+ new StringArray.Builder().Build(), // PKTABLE_SCHEM
+ new StringArray.Builder().Build(), // PKTABLE_NAME
+ new StringArray.Builder().Build(), // PKCOLUMN_NAME
+ new StringArray.Builder().Build(), // FKTABLE_CAT
+ new StringArray.Builder().Build(), // FKTABLE_SCHEM
+ new StringArray.Builder().Build(), // FKTABLE_NAME
+ new StringArray.Builder().Build(), // FKCOLUMN_NAME
+ new Int16Array.Builder().Build(), // KEY_SEQ
+ new Int16Array.Builder().Build(), // UPDATE_RULE
+ new Int16Array.Builder().Build(), // DELETE_RULE
+ new StringArray.Builder().Build(), // FK_NAME
+ new StringArray.Builder().Build(), // PK_NAME
+ new Int16Array.Builder().Build() // DEFERRABILITY
+ };
+
+ return new QueryResult(0, new
HiveServer2Connection.HiveInfoArrowStream(schema, arrays));
+ }
}
}
diff --git a/csharp/test/Drivers/Databricks/StatementTests.cs
b/csharp/test/Drivers/Databricks/StatementTests.cs
index ef1a84b00..368a5e0d7 100644
--- a/csharp/test/Drivers/Databricks/StatementTests.cs
+++ b/csharp/test/Drivers/Databricks/StatementTests.cs
@@ -652,5 +652,103 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
Assert.True(expectedType.Equals(field.DataType), $"Field {index}
type mismatch");
Assert.True(expectedNullable == field.IsNullable, $"Field {index}
nullability mismatch");
}
+
+ [Theory]
+ [InlineData(false, "main", true)]
+ [InlineData(true, null, true)]
+ [InlineData(true, "", true)]
+ [InlineData(true, "SPARK", true)]
+ [InlineData(true, "hive_metastore", true)]
+ [InlineData(true, "main", false)]
+ public void ShouldReturnEmptyPkFkResult_WorksAsExpected(bool
enablePKFK, string? catalogName, bool expected)
+ {
+ // Arrange: create test configuration and connection
+ var testConfig =
(DatabricksTestConfiguration)TestConfiguration.Clone();
+ var connectionParams = new Dictionary<string, string>
+ {
+ [DatabricksParameters.EnablePKFK] =
enablePKFK.ToString().ToLowerInvariant()
+ };
+ using var connection = NewConnection(testConfig, connectionParams);
+ var statement = connection.CreateStatement();
+
+ // Set CatalogName using SetOption
+ if(catalogName != null)
+ {
+ statement.SetOption(ApacheParameters.CatalogName, catalogName);
+ }
+
+ // Act
+ var result =
((DatabricksStatement)statement).ShouldReturnEmptyPkFkResult();
+
+ // Assert
+ Assert.Equal(expected, result);
+ }
+
+ [SkippableFact]
+ public async Task PKFK_EmptyResult_SchemaMatches_RealMetadataResponse()
+ {
+ // Arrange: create test configuration and connection
+ var testConfig =
(DatabricksTestConfiguration)TestConfiguration.Clone();
+ var connectionParams = new Dictionary<string, string>
+ {
+ [DatabricksParameters.EnablePKFK] = "true"
+ };
+ using var connection = NewConnection(testConfig, connectionParams);
+ var statement = connection.CreateStatement();
+
+ // Get real PK metadata schema
+ statement.SetOption(ApacheParameters.IsMetadataCommand, "true");
+ statement.SetOption(ApacheParameters.CatalogName, "powerbi");
+ statement.SetOption(ApacheParameters.SchemaName,
TestConfiguration.Metadata.Schema);
+ statement.SetOption(ApacheParameters.TableName,
TestConfiguration.Metadata.Table);
+ statement.SqlQuery = "GetPrimaryKeys";
+ var realPkResult = await statement.ExecuteQueryAsync();
+ Assert.NotNull(realPkResult.Stream);
+ var realPkSchema = realPkResult.Stream.Schema;
+
+ // Get empty PK result schema (using SPARK catalog which should
return empty)
+ statement.SetOption(ApacheParameters.CatalogName, "SPARK");
+ var emptyPkResult = await statement.ExecuteQueryAsync();
+ Assert.NotNull(emptyPkResult.Stream);
+ var emptyPkSchema = emptyPkResult.Stream.Schema;
+
+ // Verify PK schemas match
+ Assert.Equal(realPkSchema.FieldsList.Count,
emptyPkSchema.FieldsList.Count);
+ for (int i = 0; i < realPkSchema.FieldsList.Count; i++)
+ {
+ var realField = realPkSchema.FieldsList[i];
+ var emptyField = emptyPkSchema.FieldsList[i];
+ AssertField(emptyField, realField.Name, realField.DataType,
realField.IsNullable);
+ }
+
+ // Get real FK metadata schema
+ statement.SetOption(ApacheParameters.CatalogName,
TestConfiguration.Metadata.Catalog);
+ statement.SqlQuery = "GetCrossReference";
+ var realFkResult = await statement.ExecuteQueryAsync();
+ Assert.NotNull(realFkResult.Stream);
+ var realFkSchema = realFkResult.Stream.Schema;
+
+ // Get empty FK result schema
+ statement.SetOption(ApacheParameters.CatalogName, "SPARK");
+ var emptyFkResult = await statement.ExecuteQueryAsync();
+ Assert.NotNull(emptyFkResult.Stream);
+ var emptyFkSchema = emptyFkResult.Stream.Schema;
+
+ // Verify FK schemas match
+ Assert.Equal(realFkSchema.FieldsList.Count,
emptyFkSchema.FieldsList.Count);
+ for (int i = 0; i < realFkSchema.FieldsList.Count; i++)
+ {
+ var realField = realFkSchema.FieldsList[i];
+ var emptyField = emptyFkSchema.FieldsList[i];
+ AssertField(emptyField, realField.Name, realField.DataType,
realField.IsNullable);
+ }
+ }
+
+ private void AssertField(Field field, string expectedName, IArrowType
expectedType, bool expectedNullable)
+ {
+ Assert.True(expectedName.Equals(field.Name), $"Field name
mismatch: expected {expectedName}, got {field.Name}");
+ Assert.True(expectedType.Equals(field.DataType), $"Field type
mismatch: expected {expectedType}, got {field.DataType}");
+ Assert.True(expectedNullable == field.IsNullable, $"Field
nullability mismatch: expected {expectedNullable}, got {field.IsNullable}");
+ }
}
}