Copilot commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3045612891


##########
airflow-core/tests/unit/api_fastapi/common/test_dagbag.py:
##########
@@ -82,3 +84,76 @@ def test_dagbag_used_as_singleton_in_dependency(self, 
session, dag_maker, test_c
         assert resp2.status_code == 200
 
         assert self.dagbag_call_counter["count"] == 1
+
+
+class TestDBDagBagLRUCache:
+    """Tests for the bounded LRU eviction behaviour of DBDagBag._dags."""
+
+    def _make_bag(self, max_size: int) -> DBDagBag:
+        return DBDagBag(max_cache_size=max_size)
+
+    def _make_model(self, version_id):
+        m = mock.MagicMock()
+        m.dag_version_id = version_id
+        m.dag = mock.MagicMock()  # truthy — deserialization succeeds
+        return m
+
+    def test_cache_bounded_by_max_size(self):
+        """Inserting beyond max_size evicts the least-recently-used entry."""
+        bag = self._make_bag(max_size=3)
+        ids = [uuid4() for _ in range(4)]
+        for uid in ids:
+            bag._read_dag(self._make_model(uid))
+
+        assert len(bag._dags) == 3
+        assert ids[0] not in bag._dags  # first inserted → LRU → evicted
+        assert ids[3] in bag._dags
+
+    def test_cache_hit_promotes_to_mru(self):
+        """A cache hit via get_serialized_dag_model promotes the entry to 
MRU."""
+        bag = self._make_bag(max_size=3)
+        ids = [uuid4() for _ in range(3)]
+        models = {uid: self._make_model(uid) for uid in ids}
+        for uid in ids:
+            bag._read_dag(models[uid])
+
+        # Re-access ids[0] through get_serialized_dag_model to promote it
+        session = mock.MagicMock()
+        bag.get_serialized_dag_model(ids[0], session=session)
+        session.get.assert_not_called()  # should be a pure cache hit
+
+        # Insert a 4th entry — ids[1] (now LRU) should be evicted, not ids[0]
+        bag._read_dag(self._make_model(uuid4()))
+
+        assert ids[0] in bag._dags  # promoted to MRU, survives
+        assert ids[1] not in bag._dags  # was LRU after ids[0] promoted, 
evicted
+
+    def test_failed_deserialization_not_cached(self):
+        """Entries whose .dag property is falsy are not inserted into the 
cache."""
+        bag = self._make_bag(max_size=10)
+        m = mock.MagicMock()
+        m.dag_version_id = uuid4()
+        m.dag = None  # deserialization failure
+
+        bag._read_dag(m)

Review Comment:
   `test_failed_deserialization_not_cached` uses a bare `mock.MagicMock()` for 
the serialized DAG model. For this specific test you only need a couple of 
attributes; using a small stub object or 
`Mock(spec_set=["dag_version_id","dag","load_op_links"])` makes the test 
stricter and less likely to mask future API changes.



##########
airflow-core/tests/unit/api_fastapi/common/test_dagbag.py:
##########
@@ -82,3 +84,76 @@ def test_dagbag_used_as_singleton_in_dependency(self, 
session, dag_maker, test_c
         assert resp2.status_code == 200
 
         assert self.dagbag_call_counter["count"] == 1
+
+
+class TestDBDagBagLRUCache:
+    """Tests for the bounded LRU eviction behaviour of DBDagBag._dags."""
+
+    def _make_bag(self, max_size: int) -> DBDagBag:
+        return DBDagBag(max_cache_size=max_size)
+
+    def _make_model(self, version_id):
+        m = mock.MagicMock()
+        m.dag_version_id = version_id
+        m.dag = mock.MagicMock()  # truthy — deserialization succeeds
+        return m
+
+    def test_cache_bounded_by_max_size(self):
+        """Inserting beyond max_size evicts the least-recently-used entry."""
+        bag = self._make_bag(max_size=3)
+        ids = [uuid4() for _ in range(4)]
+        for uid in ids:
+            bag._read_dag(self._make_model(uid))
+
+        assert len(bag._dags) == 3
+        assert ids[0] not in bag._dags  # first inserted → LRU → evicted
+        assert ids[3] in bag._dags
+
+    def test_cache_hit_promotes_to_mru(self):
+        """A cache hit via get_serialized_dag_model promotes the entry to 
MRU."""
+        bag = self._make_bag(max_size=3)
+        ids = [uuid4() for _ in range(3)]
+        models = {uid: self._make_model(uid) for uid in ids}
+        for uid in ids:
+            bag._read_dag(models[uid])
+
+        # Re-access ids[0] through get_serialized_dag_model to promote it
+        session = mock.MagicMock()
+        bag.get_serialized_dag_model(ids[0], session=session)
+        session.get.assert_not_called()  # should be a pure cache hit

Review Comment:
   New tests use `session = mock.MagicMock()` without `spec`/`autospec`. Using 
an autospecced `Session` (or at least a `Mock(spec_set=["get"])`) helps ensure 
the test asserts against a realistic interface and prevents typos/incorrect 
method names from passing silently.



##########
airflow-core/tests/unit/api_fastapi/common/test_dagbag.py:
##########
@@ -82,3 +84,76 @@ def test_dagbag_used_as_singleton_in_dependency(self, 
session, dag_maker, test_c
         assert resp2.status_code == 200
 
         assert self.dagbag_call_counter["count"] == 1
+
+
+class TestDBDagBagLRUCache:
+    """Tests for the bounded LRU eviction behaviour of DBDagBag._dags."""
+
+    def _make_bag(self, max_size: int) -> DBDagBag:
+        return DBDagBag(max_cache_size=max_size)
+
+    def _make_model(self, version_id):
+        m = mock.MagicMock()
+        m.dag_version_id = version_id
+        m.dag = mock.MagicMock()  # truthy — deserialization succeeds
+        return m

Review Comment:
   New tests create `MagicMock()` instances without `spec`/`autospec` (e.g. in 
`_make_model`). Unspec'd mocks accept any attribute access, which can hide real 
regressions if the production API changes. Prefer `create_autospec(...)`, 
`Mock(spec_set=[...])`, or a small real stub object with only the needed 
attributes.



##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -44,14 +46,24 @@ class DBDagBag:
     :meta private:
     """
 
-    def __init__(self, load_op_links: bool = True) -> None:
-        self._dags: dict[UUID, SerializedDagModel] = {}  # dag_version_id to 
dag
+    def __init__(self, load_op_links: bool = True, max_cache_size: int | None 
= None) -> None:
+        self._max_dag_version_cache_size = max_cache_size  # None = unbounded
+        self._dags: OrderedDict[UUID, SerializedDagModel] = OrderedDict()
         self.load_op_links = load_op_links
 
     def _read_dag(self, serialized_dag_model: SerializedDagModel) -> 
SerializedDAG | None:
         serialized_dag_model.load_op_links = self.load_op_links
         if dag := serialized_dag_model.dag:
-            self._dags[serialized_dag_model.dag_version_id] = 
serialized_dag_model
+            version_id = serialized_dag_model.dag_version_id
+            self._dags[version_id] = serialized_dag_model
+            self._dags.move_to_end(version_id)
+            if (
+                self._max_dag_version_cache_size is not None
+                and len(self._dags) > self._max_dag_version_cache_size
+            ):
+                self._dags.popitem(last=False)
+                Stats.incr("dag_bag.cache.evictions")
+            Stats.gauge("dag_bag.cache.size", len(self._dags), rate=0.1)

Review Comment:
   `DBDagBag` is always using `OrderedDict` + `move_to_end()` even when 
`max_cache_size=None` (the intended scheduler/unbounded mode). That still adds 
per-access bookkeeping overhead without any benefit. Consider using a plain 
`dict` when unbounded (or guarding the `move_to_end`/eviction logic behind 
`self._max_dag_version_cache_size is not None`) to keep the scheduler path as 
cheap as before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to