This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-6-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2813c41c1b93ea020d5788d4bb08c775abb6b90f Author: Daniel Standish <[email protected]> AuthorDate: Tue Jun 20 20:41:58 2023 -0700 Fix hashing of dag_dependencies in serialized dag (#32037) (cherry picked from commit c76d57a67f1091e49cc9a8255054970e05ab1de5) --- airflow/serialization/serialized_objects.py | 4 +-- tests/models/test_serialized_dag.py | 46 ++++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 04c427309c..058da502a8 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -1240,7 +1240,7 @@ class SerializedDAG(DAG, BaseSerialization): for dep in SerializedBaseOperator.detect_dependencies(task) } dag_deps.update(DependencyDetector.detect_dag_dependencies(dag)) - serialized_dag["dag_dependencies"] = [x.__dict__ for x in dag_deps] + serialized_dag["dag_dependencies"] = [x.__dict__ for x in sorted(dag_deps)] serialized_dag["_task_group"] = TaskGroupSerialization.serialize_task_group(dag.task_group) # Edge info in the JSON exactly matches our internal structure @@ -1446,7 +1446,7 @@ class TaskGroupSerialization(BaseSerialization): return group -@dataclass(frozen=True) +@dataclass(frozen=True, order=True) class DagDependency: """Dataclass for representing dependencies between DAGs. These are calculated during serialization and attached to serialized DAGs. diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index b425cd8f65..61be12676f 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -20,13 +20,17 @@ from __future__ import annotations from unittest import mock +import pendulum import pytest -from airflow import DAG, example_dags as example_dags_module +from airflow import DAG, Dataset, example_dags as example_dags_module from airflow.models import DagBag from airflow.models.dagcode import DagCode from airflow.models.serialized_dag import SerializedDagModel as SDM +from airflow.operators.bash import BashOperator from airflow.serialization.serialized_objects import SerializedDAG +from airflow.settings import json +from airflow.utils.hashlib_wrapper import md5 from airflow.utils.session import create_session from tests.test_utils import db from tests.test_utils.asserts import assert_queries_count @@ -196,3 +200,43 @@ class TestSerializedDagModel: expected_dependencies = {dag_id: [] for dag_id in example_dags} assert SDM.get_dag_dependencies() == expected_dependencies + + def test_order_of_deps_is_consistent(self): + """ + Previously the 'dag_dependencies' node in serialized dag was converted to list from set. + This caused the order, and thus the hash value, to be unreliable, which could produce + excessive dag parsing. + """ + first_dag_hash = None + for r in range(10): + with DAG( + dag_id="example", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=[ + Dataset("1"), + Dataset("2"), + Dataset("3"), + Dataset("4"), + Dataset("5"), + ], + ) as dag6: + BashOperator( + task_id="any", + outlets=[Dataset("0*"), Dataset("6*")], + bash_command="sleep 5", + ) + deps_order = [x["dependency_id"] for x in SerializedDAG.serialize_dag(dag6)["dag_dependencies"]] + # in below assert, 0 and 6 both come at end because "source" is different for them and source + # is the first field in DagDependency class + assert deps_order == ["1", "2", "3", "4", "5", "0*", "6*"] + + # for good measure, let's check that the dag hash is consistent + dag_json = json.dumps(SerializedDAG.to_dict(dag6), sort_keys=True).encode("utf-8") + this_dag_hash = md5(dag_json).hexdigest() + + # set first dag hash on first pass + if first_dag_hash is None: + first_dag_hash = this_dag_hash + + # dag hash should not change without change in structure (we're in a loop) + assert this_dag_hash == first_dag_hash
