This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bb5a744d449 Use conf_vars in executor tests to avoid polluting config 
instance (#65126)
bb5a744d449 is described below

commit bb5a744d4494d87817d641b02aa98278ea3a69b6
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Apr 13 13:28:22 2026 +0530

    Use conf_vars in executor tests to avoid polluting config instance (#65126)
---
 .../tests/unit/executors/test_base_executor.py     | 216 +++++++++------------
 1 file changed, 87 insertions(+), 129 deletions(-)

diff --git a/airflow-core/tests/unit/executors/test_base_executor.py 
b/airflow-core/tests/unit/executors/test_base_executor.py
index fa0f311d018..ae5e69b1a29 100644
--- a/airflow-core/tests/unit/executors/test_base_executor.py
+++ b/airflow-core/tests/unit/executors/test_base_executor.py
@@ -18,7 +18,6 @@
 from __future__ import annotations
 
 import logging
-import textwrap
 from datetime import timedelta
 from unittest import mock
 from uuid import UUID
@@ -43,6 +42,7 @@ from airflow.sdk import BaseOperator
 from airflow.serialization.definitions.baseoperator import 
SerializedBaseOperator
 from airflow.utils.state import State, TaskInstanceState
 
+from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.markers import 
skip_if_force_lowest_dependencies_marker
 
 
@@ -413,170 +413,128 @@ class TestExecutorConf:
 
     def test_executor_conf_get(self):
         """Test ExecutorConf.get() passes team_name to underlying 
conf.get()."""
-        from airflow.configuration import conf
         from airflow.executors.base_executor import ExecutorConf
 
-        test_config = textwrap.dedent(
-            """
-            [celery]
-            result_backend = DEFAULT_VALUE
+        with conf_vars(
+            {
+                ("celery", "result_backend"): "DEFAULT_VALUE",
+                ("test_team=celery", "result_backend"): "TEAM_VALUE",
+            }
+        ):
+            # Test without team_name
+            executor_conf = ExecutorConf(team_name=None)
+            assert executor_conf.get("celery", "result_backend") == 
"DEFAULT_VALUE"
 
-            [test_team=celery]
-            result_backend = TEAM_VALUE
-            """
-        )
-        conf.read_string(test_config)
-
-        # Test without team_name
-        executor_conf = ExecutorConf(team_name=None)
-        assert executor_conf.get("celery", "result_backend") == "DEFAULT_VALUE"
-
-        # Test with team_name
-        team_executor_conf = ExecutorConf(team_name="test_team")
-        assert team_executor_conf.get("celery", "result_backend") == 
"TEAM_VALUE"
+            # Test with team_name
+            team_executor_conf = ExecutorConf(team_name="test_team")
+            assert team_executor_conf.get("celery", "result_backend") == 
"TEAM_VALUE"
 
     def test_executor_conf_getboolean(self):
         """Test ExecutorConf.getboolean() passes team_name to underlying 
conf.getboolean()."""
-        from airflow.configuration import conf
         from airflow.executors.base_executor import ExecutorConf
 
-        test_config = textwrap.dedent(
-            """
-            [celery]
-            ssl_active = true
-
-            [test_team=celery]
-            ssl_active = false
-            """
-        )
-        conf.read_string(test_config)
-
-        executor_conf = ExecutorConf(team_name=None)
-        assert executor_conf.getboolean("celery", "ssl_active") is True
+        with conf_vars(
+            {
+                ("celery", "ssl_active"): "true",
+                ("test_team=celery", "ssl_active"): "false",
+            }
+        ):
+            executor_conf = ExecutorConf(team_name=None)
+            assert executor_conf.getboolean("celery", "ssl_active") is True
 
-        team_executor_conf = ExecutorConf(team_name="test_team")
-        assert team_executor_conf.getboolean("celery", "ssl_active") is False
+            team_executor_conf = ExecutorConf(team_name="test_team")
+            assert team_executor_conf.getboolean("celery", "ssl_active") is 
False
 
     def test_executor_conf_getint(self):
         """Test ExecutorConf.getint() passes team_name to underlying 
conf.getint()."""
-        from airflow.configuration import conf
         from airflow.executors.base_executor import ExecutorConf
 
-        test_config = textwrap.dedent(
-            """
-            [celery]
-            worker_concurrency = 16
+        with conf_vars(
+            {
+                ("celery", "worker_concurrency"): "16",
+                ("test_team=celery", "worker_concurrency"): "32",
+            }
+        ):
+            executor_conf = ExecutorConf(team_name=None)
+            assert executor_conf.getint("celery", "worker_concurrency") == 16
 
-            [test_team=celery]
-            worker_concurrency = 32
-            """
-        )
-        conf.read_string(test_config)
-
-        executor_conf = ExecutorConf(team_name=None)
-        assert executor_conf.getint("celery", "worker_concurrency") == 16
-
-        team_executor_conf = ExecutorConf(team_name="test_team")
-        assert team_executor_conf.getint("celery", "worker_concurrency") == 32
+            team_executor_conf = ExecutorConf(team_name="test_team")
+            assert team_executor_conf.getint("celery", "worker_concurrency") 
== 32
 
     def test_executor_conf_getjson(self):
         """Test ExecutorConf.getjson() passes team_name to underlying 
conf.getjson()."""
-        from airflow.configuration import conf
         from airflow.executors.base_executor import ExecutorConf
 
-        test_config = textwrap.dedent(
-            """
-            [celery]
-            broker_transport_options = {"visibility_timeout": 3600}
-
-            [test_team=celery]
-            broker_transport_options = {"visibility_timeout": 7200}
-            """
-        )
-        conf.read_string(test_config)
-
-        executor_conf = ExecutorConf(team_name=None)
-        assert executor_conf.getjson("celery", "broker_transport_options") == 
{"visibility_timeout": 3600}
+        with conf_vars(
+            {
+                ("celery", "broker_transport_options"): 
'{"visibility_timeout": 3600}',
+                ("test_team=celery", "broker_transport_options"): 
'{"visibility_timeout": 7200}',
+            }
+        ):
+            executor_conf = ExecutorConf(team_name=None)
+            assert executor_conf.getjson("celery", "broker_transport_options") 
== {"visibility_timeout": 3600}
 
-        team_executor_conf = ExecutorConf(team_name="test_team")
-        assert team_executor_conf.getjson("celery", 
"broker_transport_options") == {
-            "visibility_timeout": 7200
-        }
+            team_executor_conf = ExecutorConf(team_name="test_team")
+            assert team_executor_conf.getjson("celery", 
"broker_transport_options") == {
+                "visibility_timeout": 7200
+            }
 
     def test_executor_conf_getsection(self):
         """Test ExecutorConf.getsection() passes team_name to underlying 
conf.getsection()."""
-        from airflow.configuration import conf
         from airflow.executors.base_executor import ExecutorConf
 
-        test_config = textwrap.dedent(
-            """
-            [celery]
-            worker_concurrency = 16
-            result_backend = DEFAULT_BACKEND
-
-            [test_team=celery]
-            worker_concurrency = 32
-            result_backend = TEAM_BACKEND
-            """
-        )
-        conf.read_string(test_config)
-
-        executor_conf = ExecutorConf(team_name=None)
-        section = executor_conf.getsection("celery")
-        assert section["worker_concurrency"] == 16
-        assert section["result_backend"] == "DEFAULT_BACKEND"
-
-        team_executor_conf = ExecutorConf(team_name="test_team")
-        team_section = team_executor_conf.getsection("celery")
-        assert team_section["worker_concurrency"] == 32
-        assert team_section["result_backend"] == "TEAM_BACKEND"
+        with conf_vars(
+            {
+                ("celery", "worker_concurrency"): "16",
+                ("celery", "result_backend"): "DEFAULT_BACKEND",
+                ("test_team=celery", "worker_concurrency"): "32",
+                ("test_team=celery", "result_backend"): "TEAM_BACKEND",
+            }
+        ):
+            executor_conf = ExecutorConf(team_name=None)
+            section = executor_conf.getsection("celery")
+            assert section["worker_concurrency"] == 16
+            assert section["result_backend"] == "DEFAULT_BACKEND"
+
+            team_executor_conf = ExecutorConf(team_name="test_team")
+            team_section = team_executor_conf.getsection("celery")
+            assert team_section["worker_concurrency"] == 32
+            assert team_section["result_backend"] == "TEAM_BACKEND"
 
     def test_executor_conf_has_option(self):
         """Test ExecutorConf.has_option() passes team_name to underlying 
conf.has_option()."""
-        from airflow.configuration import conf
         from airflow.executors.base_executor import ExecutorConf
 
-        test_config = textwrap.dedent(
-            """
-            [celery]
-            result_backend = DEFAULT
-
-            [test_team=celery]
-            result_backend = TEAM
-            team_specific_option = VALUE
-            """
-        )
-        conf.read_string(test_config)
-
-        executor_conf = ExecutorConf(team_name=None)
-        assert executor_conf.has_option("celery", "result_backend") is True
-        assert executor_conf.has_option("celery", "team_specific_option") is 
False
-
-        team_executor_conf = ExecutorConf(team_name="test_team")
-        assert team_executor_conf.has_option("celery", "result_backend") is 
True
-        assert team_executor_conf.has_option("celery", "team_specific_option") 
is True
+        with conf_vars(
+            {
+                ("celery", "result_backend"): "DEFAULT",
+                ("test_team=celery", "result_backend"): "TEAM",
+                ("test_team=celery", "team_specific_option"): "VALUE",
+            }
+        ):
+            executor_conf = ExecutorConf(team_name=None)
+            assert executor_conf.has_option("celery", "result_backend") is True
+            assert executor_conf.has_option("celery", "team_specific_option") 
is False
+
+            team_executor_conf = ExecutorConf(team_name="test_team")
+            assert team_executor_conf.has_option("celery", "result_backend") 
is True
+            assert team_executor_conf.has_option("celery", 
"team_specific_option") is True
 
     def test_executor_conf_get_mandatory_value(self):
         """Test ExecutorConf.get_mandatory_value() passes team_name to 
underlying conf.get_mandatory_value()."""
-        from airflow.configuration import conf
         from airflow.executors.base_executor import ExecutorConf
 
-        test_config = textwrap.dedent(
-            """
-            [celery]
-            broker_url = redis://localhost
-
-            [test_team=celery]
-            broker_url = redis://team-redis
-            """
-        )
-        conf.read_string(test_config)
-
-        executor_conf = ExecutorConf(team_name=None)
-        assert executor_conf.get_mandatory_value("celery", "broker_url") == 
"redis://localhost"
-
-        team_executor_conf = ExecutorConf(team_name="test_team")
-        assert team_executor_conf.get_mandatory_value("celery", "broker_url") 
== "redis://team-redis"
+        with conf_vars(
+            {
+                ("celery", "broker_url"): "redis://localhost",
+                ("test_team=celery", "broker_url"): "redis://team-redis",
+            }
+        ):
+            executor_conf = ExecutorConf(team_name=None)
+            assert executor_conf.get_mandatory_value("celery", "broker_url") 
== "redis://localhost"
+
+            team_executor_conf = ExecutorConf(team_name="test_team")
+            assert team_executor_conf.get_mandatory_value("celery", 
"broker_url") == "redis://team-redis"
 
 
 class TestCallbackSupport:

Reply via email to