Copilot commented on code in PR #63568:
URL: https://github.com/apache/airflow/pull/63568#discussion_r3066479624
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -142,6 +145,38 @@
tracer = trace.get_tracer(__name__)
+class detail_span:
+ """Context manager and decorator that creates a child span when detail
level > 1."""
+
+ def __init__(self, *args, **kwargs):
+ self._args = args
+ self._kwargs = kwargs
+ self._ctx = None
+
+ def _make_ctx(self):
+ parent_span = trace.get_current_span()
+ config_level = get_task_span_detail_level(span=parent_span)
+ if config_level > 1:
+ return tracer.start_as_current_span(*self._args, **self._kwargs)
+ return trace.INVALID_SPAN
+
+ def __enter__(self):
+ self._ctx = self._make_ctx()
+ return self._ctx.__enter__()
Review Comment:
`_make_ctx()` returns `trace.INVALID_SPAN` when detail level ≤ 1, but
`INVALID_SPAN` is a span object, not a context manager. Calling
`self._ctx.__enter__()` will raise at runtime (and the added tests using `with
detail_span(...)` will fail). Return a no-op context manager instead (e.g.,
`contextlib.nullcontext(trace.INVALID_SPAN)`), so `__enter__/__exit__` always
exist and `as span` still yields `INVALID_SPAN`.
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -142,6 +145,38 @@
tracer = trace.get_tracer(__name__)
+class detail_span:
+ """Context manager and decorator that creates a child span when detail
level > 1."""
Review Comment:
`detail_span` is a class but uses a lowercase name, which is inconsistent
with standard class naming conventions and makes it easy to confuse with a
function/decorator factory. Consider renaming the class to `DetailSpan` (or
`_DetailSpan`) and exposing a `detail_span(...)` function that returns an
instance, preserving the existing call sites (`with detail_span(...)`,
`@detail_span(...)`).
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1872,26 +1958,43 @@ def main():
end_date=datetime.now(tz=timezone.utc),
)
)
+ span.record_exception(reschedule)
+ span.set_status(
+ Status(StatusCode.ERROR, description=f"Exception:
{type(reschedule).__name__}")
+ )
sys.exit(0)
- with BundleVersionLock(
- bundle_name=ti.bundle_instance.name,
- bundle_version=ti.bundle_instance.version,
- ):
- state, _, error = run(ti, context, log)
- context["exception"] = error
- finalize(ti, state, context, log, error)
- except KeyboardInterrupt:
+
+ with detail_span("run") as span:
Review Comment:
`span` can be referenced before assignment if an exception occurs before
`_make_task_span(...)` is entered (e.g., `get_startup_details()` fails).
Additionally, `span` is re-bound inside `with detail_span(\"run\") as span:`,
so the exception handlers may record on the wrong span depending on where the
error occurs. Initialize a safe default span up-front (e.g.,
`trace.INVALID_SPAN`) and avoid shadowing by using distinct names like
`task_span` / `run_span`.
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1872,26 +1958,43 @@ def main():
end_date=datetime.now(tz=timezone.utc),
)
)
+ span.record_exception(reschedule)
+ span.set_status(
+ Status(StatusCode.ERROR, description=f"Exception:
{type(reschedule).__name__}")
+ )
sys.exit(0)
- with BundleVersionLock(
- bundle_name=ti.bundle_instance.name,
- bundle_version=ti.bundle_instance.version,
- ):
- state, _, error = run(ti, context, log)
- context["exception"] = error
- finalize(ti, state, context, log, error)
- except KeyboardInterrupt:
+
+ with detail_span("run") as span:
+ with BundleVersionLock(
+ bundle_name=ti.bundle_instance.name,
+ bundle_version=ti.bundle_instance.version,
+ ):
+ state, _, error = run(ti, context, log)
+ if error:
+ span.record_exception(error)
+ span.set_status(
+ Status(StatusCode.ERROR, description=f"Exception:
{type(error).__name__}")
+ )
+ context["exception"] = error
+ span.set_attribute("state", state.value if state else
"unknown")
+ finalize(ti, state, context, log, error)
+ except KeyboardInterrupt as e:
log.exception("Ctrl-c hit")
+ span.record_exception(e)
+ span.set_status(Status(StatusCode.ERROR, description=f"Exception:
{type(e).__name__}"))
sys.exit(2)
- except Exception:
+ except Exception as e:
log.exception("Top level error")
+ span.record_exception(e)
Review Comment:
`span` can be referenced before assignment if an exception occurs before
`_make_task_span(...)` is entered (e.g., `get_startup_details()` fails).
Additionally, `span` is re-bound inside `with detail_span(\"run\") as span:`,
so the exception handlers may record on the wrong span depending on where the
error occurs. Initialize a safe default span up-front (e.g.,
`trace.INVALID_SPAN`) and avoid shadowing by using distinct names like
`task_span` / `run_span`.
##########
shared/observability/src/airflow_shared/observability/traces/__init__.py:
##########
@@ -82,6 +88,28 @@ def new_task_run_carrier(dag_run_context_carrier):
return carrier
+def build_trace_state_entries(task_span_detail_level) -> list[tuple[str, str]]:
+ trace_state_entries = []
+ if task_span_detail_level is not None:
+ try:
+ level = int(task_span_detail_level)
+ except Exception:
+ level = None
+ if level:
+ trace_state_entries.append((TASK_SPAN_DETAIL_LEVEL_KEY,
str(level)))
+ return trace_state_entries
+
+
+def get_task_span_detail_level(span: Span):
+ span_ctx = span.get_span_context()
+ trace_state = span_ctx.trace_state
+ try:
+ return int(trace_state.get(TASK_SPAN_DETAIL_LEVEL_KEY,
default=DEFAULT_TASK_SPAN_DETAIL_LEVEL))
+ except Exception:
+ log.warning("%s config in dag run conf must be integer.",
TASK_SPAN_DETAIL_LEVEL_KEY)
+ return DEFAULT_TASK_SPAN_DETAIL_LEVEL
Review Comment:
`TraceState.get(...)` in common OpenTelemetry Python versions does not
accept a `default=` keyword, so this can raise `TypeError` and break span
processing even in the happy path. Fetch the value with
`trace_state.get(TASK_SPAN_DETAIL_LEVEL_KEY)` and apply the default in Python
when it is `None` (then parse to int). Also, narrowing the exception types
(e.g., `ValueError`, `TypeError`) would prevent masking unexpected issues.
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1078,7 +1085,7 @@ def _emit_dagrun_span(self, state: DagRunState):
name=f"dag_run.{self.dag_id}",
start_time=int((self.queued_at or self.start_date or
timezone.utcnow()).timestamp() * 1e9),
attributes=attributes,
- context=context.Context(),
+ context=context.Context(), # maybe need to make optional!!!
Review Comment:
The inline comment `# maybe need to make optional!!!` reads like an
unresolved review note and adds ambiguity in production code. Please either
remove it, or replace it with a concrete TODO explaining the specific concern
and what condition would make this optional (ideally with a ticket/reference).
##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -3580,3 +3580,25 @@ def
test_clear_task_instances_resets_context_carrier(dag_maker, session):
assert ti.context_carrier["traceparent"] != original_ti_traceparent
assert dag_run.context_carrier["traceparent"] != original_dr_traceparent
+
+
[email protected]_test
+def test_clear_task_instances_preserves_detail_level(dag_maker, session):
+ """clear_task_instances should produce a new context_carrier that keeps
the detail level from dag run conf."""
+ from airflow._shared.observability.traces import (
+ TASK_SPAN_DETAIL_LEVEL_KEY,
+ get_task_span_detail_level,
+ )
+
+ with dag_maker("test_clear_preserves_level"):
+ EmptyOperator(task_id="t1")
+ dag_run = dag_maker.create_dagrun(conf={TASK_SPAN_DETAIL_LEVEL_KEY: 2})
+ ti = dag_run.get_task_instance("t1", session=session)
+ ti.state = TaskInstanceState.SUCCESS
+ session.flush()
+
+ clear_task_instances([ti], session)
+
+ new_ctx = TraceContextTextMapPropagator().extract(dag_run.context_carrier)
+ span = otel_trace.get_current_span(new_ctx)
+ assert get_task_span_detail_level(span) == 2
Review Comment:
This new test references `otel_trace` but does not import it in the shown
diff. This will fail at runtime with `NameError`. Use `from opentelemetry
import trace` and call `trace.get_current_span(new_ctx)`, or add the missing
import for `otel_trace` if that’s the established pattern in this file.
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4722,6 +4727,111 @@ def test_operator_failures_metrics_emitted(self,
create_runtime_ti, mock_supervi
mock_stats.incr.assert_any_call("ti_failures", tags=stats_tags)
+class TestDetailSpan:
+ """Tests for the detail_span decorator / context manager."""
+
+ def _make_provider_with_detail_level(self, level: int):
+ """Return (provider, tracer, carrier) where the carrier encodes the
given detail level."""
+ exporter = InMemorySpanExporter()
+ provider = TracerProvider()
+ provider.add_span_processor(SimpleSpanProcessor(exporter))
+ t = provider.get_tracer("test")
+ carrier = new_dagrun_trace_carrier(task_span_detail_level=level)
+ return provider, t, exporter, carrier
Review Comment:
Helper `_make_provider_with_detail_level` is added but not used by the new
tests in this class, which increases noise and makes the test intent harder to
scan. Either refactor the tests to use this helper (to reduce duplication) or
remove it.
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1861,8 +1947,8 @@ def main():
try:
try:
startup_details = get_startup_details()
- span = _make_task_span(msg=startup_details)
- stack.enter_context(span)
+ span_ctx_mgr = _make_task_span(msg=startup_details)
+ span = stack.enter_context(span_ctx_mgr)
ti, context, log = startup(msg=startup_details)
Review Comment:
`span` can be referenced before assignment if an exception occurs before
`_make_task_span(...)` is entered (e.g., `get_startup_details()` fails).
Additionally, `span` is re-bound inside `with detail_span(\"run\") as span:`,
so the exception handlers may record on the wrong span depending on where the
error occurs. Initialize a safe default span up-front (e.g.,
`trace.INVALID_SPAN`) and avoid shadowing by using distinct names like
`task_span` / `run_span`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]