This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bb5a744d449 Use conf_vars in executor tests to avoid polluting config
instance (#65126)
bb5a744d449 is described below
commit bb5a744d4494d87817d641b02aa98278ea3a69b6
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Apr 13 13:28:22 2026 +0530
Use conf_vars in executor tests to avoid polluting config instance (#65126)
---
.../tests/unit/executors/test_base_executor.py | 216 +++++++++------------
1 file changed, 87 insertions(+), 129 deletions(-)
diff --git a/airflow-core/tests/unit/executors/test_base_executor.py
b/airflow-core/tests/unit/executors/test_base_executor.py
index fa0f311d018..ae5e69b1a29 100644
--- a/airflow-core/tests/unit/executors/test_base_executor.py
+++ b/airflow-core/tests/unit/executors/test_base_executor.py
@@ -18,7 +18,6 @@
from __future__ import annotations
import logging
-import textwrap
from datetime import timedelta
from unittest import mock
from uuid import UUID
@@ -43,6 +42,7 @@ from airflow.sdk import BaseOperator
from airflow.serialization.definitions.baseoperator import
SerializedBaseOperator
from airflow.utils.state import State, TaskInstanceState
+from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.markers import
skip_if_force_lowest_dependencies_marker
@@ -413,170 +413,128 @@ class TestExecutorConf:
def test_executor_conf_get(self):
"""Test ExecutorConf.get() passes team_name to underlying
conf.get()."""
- from airflow.configuration import conf
from airflow.executors.base_executor import ExecutorConf
- test_config = textwrap.dedent(
- """
- [celery]
- result_backend = DEFAULT_VALUE
+ with conf_vars(
+ {
+ ("celery", "result_backend"): "DEFAULT_VALUE",
+ ("test_team=celery", "result_backend"): "TEAM_VALUE",
+ }
+ ):
+ # Test without team_name
+ executor_conf = ExecutorConf(team_name=None)
+ assert executor_conf.get("celery", "result_backend") ==
"DEFAULT_VALUE"
- [test_team=celery]
- result_backend = TEAM_VALUE
- """
- )
- conf.read_string(test_config)
-
- # Test without team_name
- executor_conf = ExecutorConf(team_name=None)
- assert executor_conf.get("celery", "result_backend") == "DEFAULT_VALUE"
-
- # Test with team_name
- team_executor_conf = ExecutorConf(team_name="test_team")
- assert team_executor_conf.get("celery", "result_backend") ==
"TEAM_VALUE"
+ # Test with team_name
+ team_executor_conf = ExecutorConf(team_name="test_team")
+ assert team_executor_conf.get("celery", "result_backend") ==
"TEAM_VALUE"
def test_executor_conf_getboolean(self):
"""Test ExecutorConf.getboolean() passes team_name to underlying
conf.getboolean()."""
- from airflow.configuration import conf
from airflow.executors.base_executor import ExecutorConf
- test_config = textwrap.dedent(
- """
- [celery]
- ssl_active = true
-
- [test_team=celery]
- ssl_active = false
- """
- )
- conf.read_string(test_config)
-
- executor_conf = ExecutorConf(team_name=None)
- assert executor_conf.getboolean("celery", "ssl_active") is True
+ with conf_vars(
+ {
+ ("celery", "ssl_active"): "true",
+ ("test_team=celery", "ssl_active"): "false",
+ }
+ ):
+ executor_conf = ExecutorConf(team_name=None)
+ assert executor_conf.getboolean("celery", "ssl_active") is True
- team_executor_conf = ExecutorConf(team_name="test_team")
- assert team_executor_conf.getboolean("celery", "ssl_active") is False
+ team_executor_conf = ExecutorConf(team_name="test_team")
+ assert team_executor_conf.getboolean("celery", "ssl_active") is
False
def test_executor_conf_getint(self):
"""Test ExecutorConf.getint() passes team_name to underlying
conf.getint()."""
- from airflow.configuration import conf
from airflow.executors.base_executor import ExecutorConf
- test_config = textwrap.dedent(
- """
- [celery]
- worker_concurrency = 16
+ with conf_vars(
+ {
+ ("celery", "worker_concurrency"): "16",
+ ("test_team=celery", "worker_concurrency"): "32",
+ }
+ ):
+ executor_conf = ExecutorConf(team_name=None)
+ assert executor_conf.getint("celery", "worker_concurrency") == 16
- [test_team=celery]
- worker_concurrency = 32
- """
- )
- conf.read_string(test_config)
-
- executor_conf = ExecutorConf(team_name=None)
- assert executor_conf.getint("celery", "worker_concurrency") == 16
-
- team_executor_conf = ExecutorConf(team_name="test_team")
- assert team_executor_conf.getint("celery", "worker_concurrency") == 32
+ team_executor_conf = ExecutorConf(team_name="test_team")
+ assert team_executor_conf.getint("celery", "worker_concurrency")
== 32
def test_executor_conf_getjson(self):
"""Test ExecutorConf.getjson() passes team_name to underlying
conf.getjson()."""
- from airflow.configuration import conf
from airflow.executors.base_executor import ExecutorConf
- test_config = textwrap.dedent(
- """
- [celery]
- broker_transport_options = {"visibility_timeout": 3600}
-
- [test_team=celery]
- broker_transport_options = {"visibility_timeout": 7200}
- """
- )
- conf.read_string(test_config)
-
- executor_conf = ExecutorConf(team_name=None)
- assert executor_conf.getjson("celery", "broker_transport_options") ==
{"visibility_timeout": 3600}
+ with conf_vars(
+ {
+ ("celery", "broker_transport_options"):
'{"visibility_timeout": 3600}',
+ ("test_team=celery", "broker_transport_options"):
'{"visibility_timeout": 7200}',
+ }
+ ):
+ executor_conf = ExecutorConf(team_name=None)
+ assert executor_conf.getjson("celery", "broker_transport_options")
== {"visibility_timeout": 3600}
- team_executor_conf = ExecutorConf(team_name="test_team")
- assert team_executor_conf.getjson("celery",
"broker_transport_options") == {
- "visibility_timeout": 7200
- }
+ team_executor_conf = ExecutorConf(team_name="test_team")
+ assert team_executor_conf.getjson("celery",
"broker_transport_options") == {
+ "visibility_timeout": 7200
+ }
def test_executor_conf_getsection(self):
"""Test ExecutorConf.getsection() passes team_name to underlying
conf.getsection()."""
- from airflow.configuration import conf
from airflow.executors.base_executor import ExecutorConf
- test_config = textwrap.dedent(
- """
- [celery]
- worker_concurrency = 16
- result_backend = DEFAULT_BACKEND
-
- [test_team=celery]
- worker_concurrency = 32
- result_backend = TEAM_BACKEND
- """
- )
- conf.read_string(test_config)
-
- executor_conf = ExecutorConf(team_name=None)
- section = executor_conf.getsection("celery")
- assert section["worker_concurrency"] == 16
- assert section["result_backend"] == "DEFAULT_BACKEND"
-
- team_executor_conf = ExecutorConf(team_name="test_team")
- team_section = team_executor_conf.getsection("celery")
- assert team_section["worker_concurrency"] == 32
- assert team_section["result_backend"] == "TEAM_BACKEND"
+ with conf_vars(
+ {
+ ("celery", "worker_concurrency"): "16",
+ ("celery", "result_backend"): "DEFAULT_BACKEND",
+ ("test_team=celery", "worker_concurrency"): "32",
+ ("test_team=celery", "result_backend"): "TEAM_BACKEND",
+ }
+ ):
+ executor_conf = ExecutorConf(team_name=None)
+ section = executor_conf.getsection("celery")
+ assert section["worker_concurrency"] == 16
+ assert section["result_backend"] == "DEFAULT_BACKEND"
+
+ team_executor_conf = ExecutorConf(team_name="test_team")
+ team_section = team_executor_conf.getsection("celery")
+ assert team_section["worker_concurrency"] == 32
+ assert team_section["result_backend"] == "TEAM_BACKEND"
def test_executor_conf_has_option(self):
"""Test ExecutorConf.has_option() passes team_name to underlying
conf.has_option()."""
- from airflow.configuration import conf
from airflow.executors.base_executor import ExecutorConf
- test_config = textwrap.dedent(
- """
- [celery]
- result_backend = DEFAULT
-
- [test_team=celery]
- result_backend = TEAM
- team_specific_option = VALUE
- """
- )
- conf.read_string(test_config)
-
- executor_conf = ExecutorConf(team_name=None)
- assert executor_conf.has_option("celery", "result_backend") is True
- assert executor_conf.has_option("celery", "team_specific_option") is
False
-
- team_executor_conf = ExecutorConf(team_name="test_team")
- assert team_executor_conf.has_option("celery", "result_backend") is
True
- assert team_executor_conf.has_option("celery", "team_specific_option")
is True
+ with conf_vars(
+ {
+ ("celery", "result_backend"): "DEFAULT",
+ ("test_team=celery", "result_backend"): "TEAM",
+ ("test_team=celery", "team_specific_option"): "VALUE",
+ }
+ ):
+ executor_conf = ExecutorConf(team_name=None)
+ assert executor_conf.has_option("celery", "result_backend") is True
+ assert executor_conf.has_option("celery", "team_specific_option")
is False
+
+ team_executor_conf = ExecutorConf(team_name="test_team")
+ assert team_executor_conf.has_option("celery", "result_backend")
is True
+ assert team_executor_conf.has_option("celery",
"team_specific_option") is True
def test_executor_conf_get_mandatory_value(self):
"""Test ExecutorConf.get_mandatory_value() passes team_name to
underlying conf.get_mandatory_value()."""
- from airflow.configuration import conf
from airflow.executors.base_executor import ExecutorConf
- test_config = textwrap.dedent(
- """
- [celery]
- broker_url = redis://localhost
-
- [test_team=celery]
- broker_url = redis://team-redis
- """
- )
- conf.read_string(test_config)
-
- executor_conf = ExecutorConf(team_name=None)
- assert executor_conf.get_mandatory_value("celery", "broker_url") ==
"redis://localhost"
-
- team_executor_conf = ExecutorConf(team_name="test_team")
- assert team_executor_conf.get_mandatory_value("celery", "broker_url")
== "redis://team-redis"
+ with conf_vars(
+ {
+ ("celery", "broker_url"): "redis://localhost",
+ ("test_team=celery", "broker_url"): "redis://team-redis",
+ }
+ ):
+ executor_conf = ExecutorConf(team_name=None)
+ assert executor_conf.get_mandatory_value("celery", "broker_url")
== "redis://localhost"
+
+ team_executor_conf = ExecutorConf(team_name="test_team")
+ assert team_executor_conf.get_mandatory_value("celery",
"broker_url") == "redis://team-redis"
class TestCallbackSupport: