Copilot commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3066476691
##########
shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml:
##########
@@ -292,6 +292,24 @@ metrics:
legacy_name: "-"
name_variables: []
+ - name: "dag_bag.cache.hits"
+ description: "Number of DBDagBag cache hits (DAG version found in memory)"
+ type: "counter"
+ legacy_name: "-"
+ name_variables: []
+
+ - name: "dag_bag.cache.misses"
+ description: "Number of DBDagBag cache misses (DAG version fetched from
DB)"
Review Comment:
The implementation increments `dag_bag.cache.misses` before the DB lookup,
including cases where the DB does not contain the requested `DagVersion` /
`SerializedDagModel`. To match behavior, update the description to reflect “not
found in memory (DB lookup performed)” rather than “fetched from DB”.
```suggestion
description: "Number of DBDagBag cache misses (DAG version not found in
memory; DB lookup performed)"
```
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -44,14 +48,31 @@ class DBDagBag:
:meta private:
"""
- def __init__(self, load_op_links: bool = True) -> None:
- self._dags: dict[UUID, SerializedDagModel] = {} # dag_version_id to
dag
+ def __init__(self, load_op_links: bool = True, max_cache_size: int | None
= None) -> None:
+ self._max_dag_version_cache_size = max_cache_size
+ self._dags: OrderedDict[UUID, SerializedDagModel] = OrderedDict()
self.load_op_links = load_op_links
+ # Use a real lock only when bounded caching is enabled (API server
mode).
+ # The scheduler uses max_cache_size=None (unbounded, single-threaded)
and
+ # gets a no-op context manager to avoid any locking overhead.
+ self._lock: threading.Lock | contextlib.nullcontext[None] = (
Review Comment:
`threading.Lock` is a factory function at runtime (returning a
`_thread.lock`), so using `threading.Lock` as a type in a union is likely to
fail strict type-checking. Prefer annotating `_lock` as a context manager type
(e.g., `contextlib.AbstractContextManager[None]` /
`collections.abc.ContextManager[None]`) and assign either `threading.Lock()` or
`contextlib.nullcontext()` to it.
```suggestion
self._lock: contextlib.AbstractContextManager[None] = (
```
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -44,14 +48,31 @@ class DBDagBag:
:meta private:
"""
- def __init__(self, load_op_links: bool = True) -> None:
- self._dags: dict[UUID, SerializedDagModel] = {} # dag_version_id to
dag
+ def __init__(self, load_op_links: bool = True, max_cache_size: int | None
= None) -> None:
+ self._max_dag_version_cache_size = max_cache_size
+ self._dags: OrderedDict[UUID, SerializedDagModel] = OrderedDict()
self.load_op_links = load_op_links
+ # Use a real lock only when bounded caching is enabled (API server
mode).
+ # The scheduler uses max_cache_size=None (unbounded, single-threaded)
and
+ # gets a no-op context manager to avoid any locking overhead.
+ self._lock: threading.Lock | contextlib.nullcontext[None] = (
+ threading.Lock() if max_cache_size is not None else
contextlib.nullcontext()
+ )
def _read_dag(self, serialized_dag_model: SerializedDagModel) ->
SerializedDAG | None:
serialized_dag_model.load_op_links = self.load_op_links
if dag := serialized_dag_model.dag:
- self._dags[serialized_dag_model.dag_version_id] =
serialized_dag_model
+ version_id = serialized_dag_model.dag_version_id
+ with self._lock:
+ self._dags[version_id] = serialized_dag_model
+ self._dags.move_to_end(version_id)
+ if (
+ self._max_dag_version_cache_size is not None
+ and len(self._dags) > self._max_dag_version_cache_size
+ ):
+ self._dags.popitem(last=False)
+ Stats.incr("dag_bag.cache.evictions")
+ Stats.gauge("dag_bag.cache.size", len(self._dags), rate=0.1)
Review Comment:
`Stats.incr`/`Stats.gauge` are executed while holding the cache lock,
increasing contention risk if metric emission performs non-trivial work (or
blocks internally). Capture the eviction flag and current size under the lock,
then emit metrics after releasing the lock to keep the critical section minimal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]