This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 17b6ca9df feat(csharp/src/Apache.Arrow.Adbc/Tracing): allow
ActivitySource tags to be set from TracingConnection (#3218)
17b6ca9df is described below
commit 17b6ca9df872e6087a2d69411a152de62d891d0c
Author: Bruce Irschick <[email protected]>
AuthorDate: Thu Jul 31 13:03:37 2025 -0700
feat(csharp/src/Apache.Arrow.Adbc/Tracing): allow ActivitySource tags to be
set from TracingConnection (#3218)
Provides an virtual override for `GetActivitySourceTags(properties)` to
retrieve tags when creating the `ActivitySource`.
Also adds the ActivitySourceName property so an `ActivityListener` can
create a useful filter.
---
.../src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs | 11 ++-
.../Apache.Arrow.Adbc/Tracing/TracingConnection.cs | 9 ++-
.../src/Drivers/Databricks/DatabricksConnection.cs | 9 +++
.../Tracing/TracingTests.cs | 89 ++++++++++++++++++++--
4 files changed, 108 insertions(+), 10 deletions(-)
diff --git a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs
b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs
index f474820cd..b332590a5 100644
--- a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs
+++ b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs
@@ -16,8 +16,8 @@
*/
using System;
+using System.Collections.Generic;
using System.Diagnostics;
-using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
@@ -33,8 +33,11 @@ namespace Apache.Arrow.Adbc.Tracing
/// Constructs a new <see cref="ActivityTrace"/> object. If <paramref
name="activitySourceName"/> is set, it provides the
/// activity source name, otherwise the current assembly name is used
as the activity source name.
/// </summary>
- /// <param name="activitySourceName"></param>
- public ActivityTrace(string? activitySourceName = default, string?
activitySourceVersion = default, string? traceParent = default)
+ /// <param name="activitySourceName">The name of the ActivitySource
object</param>
+ /// <param name="activitySourceVersion">The version of the component
publishing the tracing info.</param>
+ /// <param name="traceParent">The trace parent context, which is used
to link the activity to a distributed trace.</param>
+ /// <param name="tags">The tags associated with the activity.</param>
+ public ActivityTrace(string? activitySourceName = default, string?
activitySourceVersion = default, string? traceParent = default,
IEnumerable<KeyValuePair<string, object?>>? tags = default)
{
activitySourceName ??= GetType().Assembly.GetName().Name!;
// It's okay to have a null version.
@@ -46,7 +49,7 @@ namespace Apache.Arrow.Adbc.Tracing
TraceParent = traceParent;
// This is required to be disposed
- ActivitySource = new(activitySourceName, activitySourceVersion);
+ ActivitySource = new(activitySourceName, activitySourceVersion,
tags);
}
/// <summary>
diff --git a/csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnection.cs
b/csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnection.cs
index aaa5ac6af..cd580a6ef 100644
--- a/csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnection.cs
+++ b/csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnection.cs
@@ -27,7 +27,7 @@ namespace Apache.Arrow.Adbc.Tracing
protected TracingConnection(IReadOnlyDictionary<string, string>
properties)
{
properties.TryGetValue(AdbcOptions.Telemetry.TraceParent, out
string? traceParent);
- _trace = new ActivityTrace(this.AssemblyName,
this.AssemblyVersion, traceParent);
+ _trace = new ActivityTrace(AssemblyName, AssemblyVersion,
traceParent, GetActivitySourceTags(properties));
}
string? IActivityTracer.TraceParent => _trace.TraceParent;
@@ -38,6 +38,13 @@ namespace Apache.Arrow.Adbc.Tracing
public abstract string AssemblyName { get; }
+ public string ActivitySourceName => _trace.ActivitySourceName;
+
+ public virtual IEnumerable<KeyValuePair<string, object?>>?
GetActivitySourceTags(IReadOnlyDictionary<string, string> properties)
+ {
+ return null;
+ }
+
protected void SetTraceParent(string? traceParent)
{
_trace.TraceParent = traceParent;
diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 37263183c..b5c11588f 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -76,6 +76,15 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
ValidateProperties();
}
+ public override IEnumerable<KeyValuePair<string, object?>>?
GetActivitySourceTags(IReadOnlyDictionary<string, string> properties)
+ {
+ IEnumerable<KeyValuePair<string, object?>>? tags =
base.GetActivitySourceTags(properties);
+ // TODO: Add any additional tags specific to Databricks connection
+ //tags ??= [];
+ //tags.Concat([new("key", "value")]);
+ return tags;
+ }
+
protected override TCLIService.IAsync
CreateTCLIServiceClient(TProtocol protocol)
{
return new ThreadSafeClient(new TCLIService.Client(protocol));
diff --git a/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/TracingTests.cs
b/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/TracingTests.cs
index beee438eb..a61582f06 100644
--- a/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/TracingTests.cs
+++ b/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/TracingTests.cs
@@ -20,6 +20,7 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Apache.Arrow.Adbc.Tracing;
+using Apache.Arrow.Ipc;
using OpenTelemetry;
using OpenTelemetry.Trace;
using Xunit;
@@ -29,6 +30,10 @@ namespace Apache.Arrow.Adbc.Tests.Tracing
{
public class TracingTests(ITestOutputHelper? outputHelper) : IDisposable
{
+ private const string SourceTagName = "sourceTagName";
+ private const string SourceTagValue = "sourceTagValue";
+ private const string TraceParent =
"00-3236da27af79882bd317c4d1c3776982-a3cc9bd52ccd58e6-01";
+
private readonly ITestOutputHelper? _outputHelper = outputHelper;
private bool _disposed;
@@ -143,12 +148,10 @@ namespace Apache.Arrow.Adbc.Tests.Tracing
testClass.MethodWithActivity(eventNameWithoutParent);
Assert.True(exportedActivities.Count() > 0);
- const string traceParent =
"00-3236da27af79882bd317c4d1c3776982-a3cc9bd52ccd58e6-01";
-
const int withParentCountExpected = 10;
for (int i = 0; i < withParentCountExpected; i++)
{
- testClass.MethodWithActivity(eventNameWithParent, traceParent);
+ testClass.MethodWithActivity(eventNameWithParent, TraceParent);
}
testClass.MethodWithActivity(eventNameWithoutParent);
@@ -169,13 +172,61 @@ namespace Apache.Arrow.Adbc.Tests.Tracing
else if
(exportedActivity.OperationName.Contains(eventNameWithParent))
{
withParentCount++;
- Assert.Equal(traceParent, exportedActivity.ParentId);
+ Assert.Equal(TraceParent, exportedActivity.ParentId);
}
}
Assert.Equal(2, withoutParentCount);
Assert.Equal(withParentCountExpected, withParentCount);
}
+ [Fact]
+ internal void CanListenAndFilterActivitySourceTagsUsingActivityTrace()
+ {
+ string activitySourceName = NewName();
+ Queue<Activity> exportedActivities = new();
+ using (ActivityListener activityListener = new()
+ {
+ ShouldListenTo = source =>
+ {
+ return source.Name == activitySourceName
+ && source.Tags?.Any(t => t.Key == SourceTagName &&
t.Value?.Equals(SourceTagValue) == true) == true;
+ },
+ Sample = (ref ActivityCreationOptions<ActivityContext>
options) => ActivitySamplingResult.AllDataAndRecorded,
+ ActivityStopped = activity =>
exportedActivities.Enqueue(activity)
+ })
+ {
+ ActivitySource.AddActivityListener(activityListener);
+
+ var testClass = new TraceProducer(activitySourceName);
+ testClass.MethodWithActivity();
+ }
+ Assert.Single(exportedActivities);
+ }
+
+ [Fact]
+ internal void
CanListenAndFilterActivitySourceTagsUsingTracingConnection()
+ {
+ string activitySourceName = NewName();
+ Queue<Activity> exportedActivities = new();
+ var testClass = new MyTracingConnection(new Dictionary<string,
string>(), activitySourceName);
+ using (ActivityListener activityListener = new()
+ {
+ ShouldListenTo = source =>
+ {
+ return source.Name == testClass.ActivitySourceName
+ && source.Tags?.Any(t => t.Key == SourceTagName &&
t.Value?.Equals(SourceTagValue) == true) == true;
+ },
+ Sample = (ref ActivityCreationOptions<ActivityContext>
options) => ActivitySamplingResult.AllDataAndRecorded,
+ ActivityStopped = activity =>
exportedActivities.Enqueue(activity)
+ })
+ {
+ ActivitySource.AddActivityListener(activityListener);
+
+ testClass.MethodWithActivity();
+ }
+ Assert.Single(exportedActivities);
+ }
+
internal static string NewName() =>
Guid.NewGuid().ToString().Replace("-", "").ToLower();
protected virtual void Dispose(bool disposing)
@@ -203,7 +254,8 @@ namespace Apache.Arrow.Adbc.Tests.Tracing
internal TraceProducer(string? activitySourceName = default,
string? traceParent = default)
{
- _trace = new ActivityTrace(activitySourceName, traceParent:
traceParent);
+ IEnumerable<KeyValuePair<string, object?>>? tags =
[new(SourceTagName, SourceTagValue)];
+ _trace = new ActivityTrace(activitySourceName, traceParent:
traceParent, tags: tags);
}
internal void MethodWithNoInstrumentation()
@@ -273,6 +325,33 @@ namespace Apache.Arrow.Adbc.Tests.Tracing
}
}
+ private class MyTracingConnection(IReadOnlyDictionary<string, string>
properties, string assemblyName) : TracingConnection(properties)
+ {
+ public override string AssemblyVersion => "1.0.0";
+ public override string AssemblyName { get; } = assemblyName;
+
+ public override IEnumerable<KeyValuePair<string, object?>>?
GetActivitySourceTags(IReadOnlyDictionary<string, string> properties)
+ {
+ return [new KeyValuePair<string, object?>(SourceTagName,
SourceTagValue)];
+ }
+
+ public void MethodWithActivity()
+ {
+ this.TraceActivity(activity =>
+ {
+ activity?.AddTag("exampleTag", "exampleValue")
+ .AddBaggage("exampleBaggage", "exampleBaggageValue")
+ .AddEvent("exampleEvent", [new KeyValuePair<string,
object?>("eventTag", "eventValue")])
+ .AddLink(TraceParent, [new KeyValuePair<string,
object?>("linkTag", "linkValue")]);
+ });
+ }
+
+ public override AdbcStatement CreateStatement() => throw new
NotImplementedException();
+ public override IArrowArrayStream GetObjects(GetObjectsDepth
depth, string? catalogPattern, string? dbSchemaPattern, string?
tableNamePattern, IReadOnlyList<string>? tableTypes, string? columnNamePattern)
=> throw new NotImplementedException();
+ public override Schema GetTableSchema(string? catalog, string?
dbSchema, string tableName) => throw new NotImplementedException();
+ public override IArrowArrayStream GetTableTypes() => throw new
NotImplementedException();
+ }
+
internal class ActivityQueueExporter(Queue<Activity>
exportedActivities) : BaseExporter<Activity>
{
private Queue<Activity> ExportedActivities { get; } =
exportedActivities;