Copilot commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3045612891
##########
airflow-core/tests/unit/api_fastapi/common/test_dagbag.py:
##########
@@ -82,3 +84,76 @@ def test_dagbag_used_as_singleton_in_dependency(self,
session, dag_maker, test_c
assert resp2.status_code == 200
assert self.dagbag_call_counter["count"] == 1
+
+
+class TestDBDagBagLRUCache:
+ """Tests for the bounded LRU eviction behaviour of DBDagBag._dags."""
+
+ def _make_bag(self, max_size: int) -> DBDagBag:
+ return DBDagBag(max_cache_size=max_size)
+
+ def _make_model(self, version_id):
+ m = mock.MagicMock()
+ m.dag_version_id = version_id
+ m.dag = mock.MagicMock() # truthy — deserialization succeeds
+ return m
+
+ def test_cache_bounded_by_max_size(self):
+ """Inserting beyond max_size evicts the least-recently-used entry."""
+ bag = self._make_bag(max_size=3)
+ ids = [uuid4() for _ in range(4)]
+ for uid in ids:
+ bag._read_dag(self._make_model(uid))
+
+ assert len(bag._dags) == 3
+ assert ids[0] not in bag._dags # first inserted → LRU → evicted
+ assert ids[3] in bag._dags
+
+ def test_cache_hit_promotes_to_mru(self):
+ """A cache hit via get_serialized_dag_model promotes the entry to
MRU."""
+ bag = self._make_bag(max_size=3)
+ ids = [uuid4() for _ in range(3)]
+ models = {uid: self._make_model(uid) for uid in ids}
+ for uid in ids:
+ bag._read_dag(models[uid])
+
+ # Re-access ids[0] through get_serialized_dag_model to promote it
+ session = mock.MagicMock()
+ bag.get_serialized_dag_model(ids[0], session=session)
+ session.get.assert_not_called() # should be a pure cache hit
+
+ # Insert a 4th entry — ids[1] (now LRU) should be evicted, not ids[0]
+ bag._read_dag(self._make_model(uuid4()))
+
+ assert ids[0] in bag._dags # promoted to MRU, survives
+ assert ids[1] not in bag._dags # was LRU after ids[0] promoted,
evicted
+
+ def test_failed_deserialization_not_cached(self):
+ """Entries whose .dag property is falsy are not inserted into the
cache."""
+ bag = self._make_bag(max_size=10)
+ m = mock.MagicMock()
+ m.dag_version_id = uuid4()
+ m.dag = None # deserialization failure
+
+ bag._read_dag(m)
Review Comment:
`test_failed_deserialization_not_cached` uses a bare `mock.MagicMock()` for
the serialized DAG model. For this specific test you only need a couple of
attributes; using a small stub object or
`Mock(spec_set=["dag_version_id","dag","load_op_links"])` makes the test
stricter and less likely to mask future API changes.
##########
airflow-core/tests/unit/api_fastapi/common/test_dagbag.py:
##########
@@ -82,3 +84,76 @@ def test_dagbag_used_as_singleton_in_dependency(self,
session, dag_maker, test_c
assert resp2.status_code == 200
assert self.dagbag_call_counter["count"] == 1
+
+
+class TestDBDagBagLRUCache:
+ """Tests for the bounded LRU eviction behaviour of DBDagBag._dags."""
+
+ def _make_bag(self, max_size: int) -> DBDagBag:
+ return DBDagBag(max_cache_size=max_size)
+
+ def _make_model(self, version_id):
+ m = mock.MagicMock()
+ m.dag_version_id = version_id
+ m.dag = mock.MagicMock() # truthy — deserialization succeeds
+ return m
+
+ def test_cache_bounded_by_max_size(self):
+ """Inserting beyond max_size evicts the least-recently-used entry."""
+ bag = self._make_bag(max_size=3)
+ ids = [uuid4() for _ in range(4)]
+ for uid in ids:
+ bag._read_dag(self._make_model(uid))
+
+ assert len(bag._dags) == 3
+ assert ids[0] not in bag._dags # first inserted → LRU → evicted
+ assert ids[3] in bag._dags
+
+ def test_cache_hit_promotes_to_mru(self):
+ """A cache hit via get_serialized_dag_model promotes the entry to
MRU."""
+ bag = self._make_bag(max_size=3)
+ ids = [uuid4() for _ in range(3)]
+ models = {uid: self._make_model(uid) for uid in ids}
+ for uid in ids:
+ bag._read_dag(models[uid])
+
+ # Re-access ids[0] through get_serialized_dag_model to promote it
+ session = mock.MagicMock()
+ bag.get_serialized_dag_model(ids[0], session=session)
+ session.get.assert_not_called() # should be a pure cache hit
Review Comment:
New tests use `session = mock.MagicMock()` without `spec`/`autospec`. Using
an autospecced `Session` (or at least a `Mock(spec_set=["get"])`) helps ensure
the test asserts against a realistic interface and prevents typos/incorrect
method names from passing silently.
##########
airflow-core/tests/unit/api_fastapi/common/test_dagbag.py:
##########
@@ -82,3 +84,76 @@ def test_dagbag_used_as_singleton_in_dependency(self,
session, dag_maker, test_c
assert resp2.status_code == 200
assert self.dagbag_call_counter["count"] == 1
+
+
+class TestDBDagBagLRUCache:
+ """Tests for the bounded LRU eviction behaviour of DBDagBag._dags."""
+
+ def _make_bag(self, max_size: int) -> DBDagBag:
+ return DBDagBag(max_cache_size=max_size)
+
+ def _make_model(self, version_id):
+ m = mock.MagicMock()
+ m.dag_version_id = version_id
+ m.dag = mock.MagicMock() # truthy — deserialization succeeds
+ return m
Review Comment:
New tests create `MagicMock()` instances without `spec`/`autospec` (e.g. in
`_make_model`). Unspec'd mocks accept any attribute access, which can hide real
regressions if the production API changes. Prefer `create_autospec(...)`,
`Mock(spec_set=[...])`, or a small real stub object with only the needed
attributes.
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -44,14 +46,24 @@ class DBDagBag:
:meta private:
"""
- def __init__(self, load_op_links: bool = True) -> None:
- self._dags: dict[UUID, SerializedDagModel] = {} # dag_version_id to
dag
+ def __init__(self, load_op_links: bool = True, max_cache_size: int | None
= None) -> None:
+ self._max_dag_version_cache_size = max_cache_size # None = unbounded
+ self._dags: OrderedDict[UUID, SerializedDagModel] = OrderedDict()
self.load_op_links = load_op_links
def _read_dag(self, serialized_dag_model: SerializedDagModel) ->
SerializedDAG | None:
serialized_dag_model.load_op_links = self.load_op_links
if dag := serialized_dag_model.dag:
- self._dags[serialized_dag_model.dag_version_id] =
serialized_dag_model
+ version_id = serialized_dag_model.dag_version_id
+ self._dags[version_id] = serialized_dag_model
+ self._dags.move_to_end(version_id)
+ if (
+ self._max_dag_version_cache_size is not None
+ and len(self._dags) > self._max_dag_version_cache_size
+ ):
+ self._dags.popitem(last=False)
+ Stats.incr("dag_bag.cache.evictions")
+ Stats.gauge("dag_bag.cache.size", len(self._dags), rate=0.1)
Review Comment:
`DBDagBag` is always using `OrderedDict` + `move_to_end()` even when
`max_cache_size=None` (the intended scheduler/unbounded mode). That still adds
per-access bookkeeping overhead without any benefit. Consider using a plain
`dict` when unbounded (or guarding the `move_to_end`/eviction logic behind
`self._max_dag_version_cache_size is not None`) to keep the scheduler path as
cheap as before.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]