Copilot commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3066476691


##########
shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml:
##########
@@ -292,6 +292,24 @@ metrics:
     legacy_name: "-"
     name_variables: []
 
+  - name: "dag_bag.cache.hits"
+    description: "Number of DBDagBag cache hits (DAG version found in memory)"
+    type: "counter"
+    legacy_name: "-"
+    name_variables: []
+
+  - name: "dag_bag.cache.misses"
+    description: "Number of DBDagBag cache misses (DAG version fetched from 
DB)"

Review Comment:
   The implementation increments `dag_bag.cache.misses` before the DB lookup, 
including cases where the DB does not contain the requested `DagVersion` / 
`SerializedDagModel`. To match behavior, update the description to reflect “not 
found in memory (DB lookup performed)” rather than “fetched from DB”.
   ```suggestion
       description: "Number of DBDagBag cache misses (DAG version not found in 
memory; DB lookup performed)"
   ```



##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -44,14 +48,31 @@ class DBDagBag:
     :meta private:
     """
 
-    def __init__(self, load_op_links: bool = True) -> None:
-        self._dags: dict[UUID, SerializedDagModel] = {}  # dag_version_id to 
dag
+    def __init__(self, load_op_links: bool = True, max_cache_size: int | None 
= None) -> None:
+        self._max_dag_version_cache_size = max_cache_size
+        self._dags: OrderedDict[UUID, SerializedDagModel] = OrderedDict()
         self.load_op_links = load_op_links
+        # Use a real lock only when bounded caching is enabled (API server 
mode).
+        # The scheduler uses max_cache_size=None (unbounded, single-threaded) 
and
+        # gets a no-op context manager to avoid any locking overhead.
+        self._lock: threading.Lock | contextlib.nullcontext[None] = (

Review Comment:
   `threading.Lock` is a factory function at runtime (returning a 
`_thread.lock`), so using `threading.Lock` as a type in a union is likely to 
fail strict type-checking. Prefer annotating `_lock` as a context manager type 
(e.g., `contextlib.AbstractContextManager[None]` / 
`collections.abc.ContextManager[None]`) and assign either `threading.Lock()` or 
`contextlib.nullcontext()` to it.
   ```suggestion
           self._lock: contextlib.AbstractContextManager[None] = (
   ```



##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -44,14 +48,31 @@ class DBDagBag:
     :meta private:
     """
 
-    def __init__(self, load_op_links: bool = True) -> None:
-        self._dags: dict[UUID, SerializedDagModel] = {}  # dag_version_id to 
dag
+    def __init__(self, load_op_links: bool = True, max_cache_size: int | None 
= None) -> None:
+        self._max_dag_version_cache_size = max_cache_size
+        self._dags: OrderedDict[UUID, SerializedDagModel] = OrderedDict()
         self.load_op_links = load_op_links
+        # Use a real lock only when bounded caching is enabled (API server 
mode).
+        # The scheduler uses max_cache_size=None (unbounded, single-threaded) 
and
+        # gets a no-op context manager to avoid any locking overhead.
+        self._lock: threading.Lock | contextlib.nullcontext[None] = (
+            threading.Lock() if max_cache_size is not None else 
contextlib.nullcontext()
+        )
 
     def _read_dag(self, serialized_dag_model: SerializedDagModel) -> 
SerializedDAG | None:
         serialized_dag_model.load_op_links = self.load_op_links
         if dag := serialized_dag_model.dag:
-            self._dags[serialized_dag_model.dag_version_id] = 
serialized_dag_model
+            version_id = serialized_dag_model.dag_version_id
+            with self._lock:
+                self._dags[version_id] = serialized_dag_model
+                self._dags.move_to_end(version_id)
+                if (
+                    self._max_dag_version_cache_size is not None
+                    and len(self._dags) > self._max_dag_version_cache_size
+                ):
+                    self._dags.popitem(last=False)
+                    Stats.incr("dag_bag.cache.evictions")
+                Stats.gauge("dag_bag.cache.size", len(self._dags), rate=0.1)

Review Comment:
   `Stats.incr`/`Stats.gauge` are executed while holding the cache lock, 
increasing contention risk if metric emission performs non-trivial work (or 
blocks internally). Capture the eviction flag and current size under the lock, 
then emit metrics after releasing the lock to keep the critical section minimal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to