This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 58fb2fb0938 fix/celery-ssl-skip-empty-key-cert (#64767)
58fb2fb0938 is described below
commit 58fb2fb0938225aa928553071b393e382de34fdc
Author: Daniel Seo <[email protected]>
AuthorDate: Sun Apr 12 20:37:20 2026 -0400
fix/celery-ssl-skip-empty-key-cert (#64767)
Co-authored-by: hseo36 <[email protected]>
---
providers/celery/provider.yaml | 13 ++-
.../providers/celery/executors/default_celery.py | 60 ++++++----
.../airflow/providers/celery/get_provider_info.py | 11 +-
.../unit/celery/executors/test_celery_executor.py | 129 +++++++++++++++++++++
scripts/ci/prek/known_airflow_exceptions.txt | 2 +-
5 files changed, 189 insertions(+), 26 deletions(-)
diff --git a/providers/celery/provider.yaml b/providers/celery/provider.yaml
index b89ea4e16ae..6b6c1007ad3 100644
--- a/providers/celery/provider.yaml
+++ b/providers/celery/provider.yaml
@@ -265,16 +265,25 @@ config:
type: string
example: ~
default: "False"
+ ssl_mutual_tls:
+ description: |
+ Whether to require mutual TLS (client certificate authentication).
+ When True (default), SSL_KEY and SSL_CERT must be set.
+ Set to False for one-way TLS (server verification only).
+ version_added: ~
+ type: boolean
+ example: ~
+ default: "True"
ssl_key:
description: |
- Path to the client key.
+ Path to the client key. Required when SSL_MUTUAL_TLS is True.
version_added: ~
type: string
example: ~
default: ""
ssl_cert:
description: |
- Path to the client certificate.
+ Path to the client certificate. Required when SSL_MUTUAL_TLS is True.
version_added: ~
type: string
example: ~
diff --git
a/providers/celery/src/airflow/providers/celery/executors/default_celery.py
b/providers/celery/src/airflow/providers/celery/executors/default_celery.py
index f0ef8185d1e..9dd94036734 100644
--- a/providers/celery/src/airflow/providers/celery/executors/default_celery.py
+++ b/providers/celery/src/airflow/providers/celery/executors/default_celery.py
@@ -141,36 +141,54 @@ def get_default_celery_config(team_conf) -> dict[str,
Any]:
try:
if celery_ssl_active:
+ ssl_mutual_tls = team_conf.getboolean("celery", "SSL_MUTUAL_TLS",
fallback=True)
+ ssl_key = team_conf.get("celery", "SSL_KEY")
+ ssl_cert = team_conf.get("celery", "SSL_CERT")
+ ssl_cacert = team_conf.get("celery", "SSL_CACERT")
+
+ if ssl_mutual_tls and (not ssl_key or not ssl_cert):
+ raise ValueError(
+ "SSL_MUTUAL_TLS is True (default) but SSL_KEY and/or
SSL_CERT are not set. "
+ "Set both for mutual TLS, or set SSL_MUTUAL_TLS=False for
one-way TLS."
+ )
+
+ if not ssl_cacert:
+ log.info("SSL_CACERT is not set. Using system CA certificates
for server verification.")
+
+ if not ssl_mutual_tls and (ssl_key or ssl_cert):
+ log.warning(
+ "SSL_MUTUAL_TLS is False but SSL_KEY/SSL_CERT are
configured. "
+ "Client certificates will not be used. "
+ "Set SSL_MUTUAL_TLS=True if you intend to use mutual TLS."
+ )
+
if broker_url and re.search(r"amqps?://", broker_url):
- broker_use_ssl = {
- "keyfile": team_conf.get("celery", "SSL_KEY"),
- "certfile": team_conf.get("celery", "SSL_CERT"),
- "ca_certs": team_conf.get("celery", "SSL_CACERT"),
- "cert_reqs": ssl.CERT_REQUIRED,
- }
+ broker_use_ssl = {"cert_reqs": ssl.CERT_REQUIRED}
+ if ssl_cacert:
+ broker_use_ssl["ca_certs"] = ssl_cacert
+ if ssl_mutual_tls:
+ broker_use_ssl["keyfile"] = ssl_key
+ broker_use_ssl["certfile"] = ssl_cert
elif broker_url and re.search("rediss?://|sentinel://",
broker_url):
- broker_use_ssl = {
- "ssl_keyfile": team_conf.get("celery", "SSL_KEY"),
- "ssl_certfile": team_conf.get("celery", "SSL_CERT"),
- "ssl_ca_certs": team_conf.get("celery", "SSL_CACERT"),
- "ssl_cert_reqs": ssl.CERT_REQUIRED,
- }
+ broker_use_ssl = {"ssl_cert_reqs": ssl.CERT_REQUIRED}
+ if ssl_cacert:
+ broker_use_ssl["ssl_ca_certs"] = ssl_cacert
+ if ssl_mutual_tls:
+ broker_use_ssl["ssl_keyfile"] = ssl_key
+ broker_use_ssl["ssl_certfile"] = ssl_cert
else:
- raise AirflowException(
+ raise ValueError(
"The broker you configured does not support SSL_ACTIVE to
be True. "
"Please use RabbitMQ or Redis if you would like to use SSL
for broker."
)
config["broker_use_ssl"] = broker_use_ssl
- except AirflowConfigException:
- raise AirflowException(
- "AirflowConfigException: SSL_ACTIVE is True, please ensure
SSL_KEY, SSL_CERT and SSL_CACERT are set"
- )
+ except ValueError:
+ raise
except Exception as e:
- raise AirflowException(
- f"Exception: There was an unknown Celery SSL Error. Please ensure
you want to use SSL and/or have "
- f"all necessary certs and key ({e})."
- )
+ raise RuntimeError(
+ f"Unknown Celery SSL error. Please ensure you want to use SSL and
have all necessary certs and key ({e})."
+ ) from e
# Warning for not recommended backends
match_not_recommended_backend = re.search("rediss?://|amqp://|rpc://",
result_backend)
diff --git a/providers/celery/src/airflow/providers/celery/get_provider_info.py
b/providers/celery/src/airflow/providers/celery/get_provider_info.py
index 071071133bd..ce59a55918f 100644
--- a/providers/celery/src/airflow/providers/celery/get_provider_info.py
+++ b/providers/celery/src/airflow/providers/celery/get_provider_info.py
@@ -169,15 +169,22 @@ def get_provider_info():
"example": None,
"default": "False",
},
+ "ssl_mutual_tls": {
+ "description": "Whether to require mutual TLS (client
certificate authentication).\nWhen True (default), SSL_KEY and SSL_CERT must be
set.\nSet to False for one-way TLS (server verification only).\n",
+ "version_added": None,
+ "type": "boolean",
+ "example": None,
+ "default": "True",
+ },
"ssl_key": {
- "description": "Path to the client key.\n",
+ "description": "Path to the client key. Required when
SSL_MUTUAL_TLS is True.\n",
"version_added": None,
"type": "string",
"example": None,
"default": "",
},
"ssl_cert": {
- "description": "Path to the client certificate.\n",
+ "description": "Path to the client certificate.
Required when SSL_MUTUAL_TLS is True.\n",
"version_added": None,
"type": "string",
"example": None,
diff --git
a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
index d69df9ec5e7..b1fee33cb75 100644
--- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
+++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
@@ -872,6 +872,64 @@ class TestAmqpsSslConfig:
assert broker_ssl["keyfile"] == "/path/to/key.pem"
assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+ @conf_vars(
+ {
+ ("celery", "BROKER_URL"): "rediss://redis:6380//",
+ ("celery", "SSL_ACTIVE"): "True",
+ ("celery", "SSL_KEY"): "/path/to/key.pem",
+ ("celery", "SSL_CERT"): "/path/to/cert.pem",
+ ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+ }
+ )
+ def test_redis_mutual_tls_builds_ssl_config(self):
+ """Test mutual TLS: all three SSL keys produce correct broker_use_ssl
for Redis."""
+ import importlib
+ import ssl
+
+ importlib.reload(default_celery)
+
+ config = default_celery.DEFAULT_CELERY_CONFIG
+ assert "broker_use_ssl" in config
+ broker_ssl = config["broker_use_ssl"]
+ assert broker_ssl["ssl_keyfile"] == "/path/to/key.pem"
+ assert broker_ssl["ssl_certfile"] == "/path/to/cert.pem"
+ assert broker_ssl["ssl_ca_certs"] == "/path/to/ca.pem"
+ assert broker_ssl["ssl_cert_reqs"] == ssl.CERT_REQUIRED
+
+ @conf_vars(
+ {
+ ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+ ("celery", "SSL_ACTIVE"): "True",
+ ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+ }
+ )
+ def test_amqps_mutual_tls_missing_key_cert_raises(self):
+ """Test that mutual TLS (default) raises error when SSL_KEY/SSL_CERT
are missing."""
+ import importlib
+
+ with pytest.raises(ValueError, match="SSL_MUTUAL_TLS is True.*but
SSL_KEY and/or SSL_CERT"):
+ importlib.reload(default_celery)
+
+ @conf_vars(
+ {
+ ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+ ("celery", "SSL_ACTIVE"): "True",
+ ("celery", "SSL_KEY"): "/path/to/key",
+ ("celery", "SSL_CERT"): "/path/to/cert",
+ ("celery", "SSL_CACERT"): "",
+ }
+ )
+ def test_ssl_active_without_cacert_uses_system_cas(self):
+ """Test that empty SSL_CACERT falls back to system CAs (ca_certs
omitted from config)."""
+ import importlib
+ import ssl
+
+ importlib.reload(default_celery)
+ broker_ssl = default_celery.DEFAULT_CELERY_CONFIG["broker_use_ssl"]
+
+ assert "ca_certs" not in broker_ssl
+ assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+
@conf_vars(
{
("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
@@ -887,6 +945,77 @@ class TestAmqpsSslConfig:
config = default_celery.DEFAULT_CELERY_CONFIG
assert "broker_use_ssl" not in config
+ @conf_vars(
+ {
+ ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+ ("celery", "SSL_ACTIVE"): "True",
+ ("celery", "SSL_MUTUAL_TLS"): "False",
+ ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+ }
+ )
+ def test_amqps_one_way_tls(self):
+ """Test one-way TLS for AMQP: only ca_certs, no keyfile/certfile."""
+ import importlib
+ import ssl
+
+ importlib.reload(default_celery)
+
+ config = default_celery.DEFAULT_CELERY_CONFIG
+ assert "broker_use_ssl" in config
+ broker_ssl = config["broker_use_ssl"]
+ assert broker_ssl["ca_certs"] == "/path/to/ca.pem"
+ assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+ assert "keyfile" not in broker_ssl
+ assert "certfile" not in broker_ssl
+
+ @conf_vars(
+ {
+ ("celery", "BROKER_URL"): "rediss://redis:6380//",
+ ("celery", "SSL_ACTIVE"): "True",
+ ("celery", "SSL_MUTUAL_TLS"): "False",
+ ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+ }
+ )
+ def test_redis_one_way_tls(self):
+ """Test one-way TLS for Redis: only ssl_ca_certs, no
ssl_keyfile/ssl_certfile."""
+ import importlib
+ import ssl
+
+ importlib.reload(default_celery)
+
+ config = default_celery.DEFAULT_CELERY_CONFIG
+ assert "broker_use_ssl" in config
+ broker_ssl = config["broker_use_ssl"]
+ assert broker_ssl["ssl_ca_certs"] == "/path/to/ca.pem"
+ assert broker_ssl["ssl_cert_reqs"] == ssl.CERT_REQUIRED
+ assert "ssl_keyfile" not in broker_ssl
+ assert "ssl_certfile" not in broker_ssl
+
+ @conf_vars(
+ {
+ ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+ ("celery", "SSL_ACTIVE"): "True",
+ ("celery", "SSL_MUTUAL_TLS"): "False",
+ ("celery", "SSL_KEY"): "/path/to/key.pem",
+ ("celery", "SSL_CERT"): "/path/to/cert.pem",
+ ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+ }
+ )
+ def test_one_way_tls_ignores_key_cert(self):
+ """Test that SSL_KEY/SSL_CERT are ignored when SSL_MUTUAL_TLS is
False."""
+ import importlib
+ import ssl
+
+ importlib.reload(default_celery)
+
+ config = default_celery.DEFAULT_CELERY_CONFIG
+ assert "broker_use_ssl" in config
+ broker_ssl = config["broker_use_ssl"]
+ assert broker_ssl["ca_certs"] == "/path/to/ca.pem"
+ assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+ assert "keyfile" not in broker_ssl
+ assert "certfile" not in broker_ssl
+
class TestCreateCeleryAppTeamIsolation:
"""Tests for create_celery_app() multi-team config isolation."""
diff --git a/scripts/ci/prek/known_airflow_exceptions.txt
b/scripts/ci/prek/known_airflow_exceptions.txt
index 9cd3d52cd1f..1a870c169d6 100644
--- a/scripts/ci/prek/known_airflow_exceptions.txt
+++ b/scripts/ci/prek/known_airflow_exceptions.txt
@@ -150,7 +150,7 @@
providers/arangodb/src/airflow/providers/arangodb/hooks/arangodb.py::9
providers/arangodb/src/airflow/providers/arangodb/operators/arangodb.py::1
providers/atlassian/jira/src/airflow/providers/atlassian/jira/hooks/jira.py::1
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py::2
-providers/celery/src/airflow/providers/celery/executors/default_celery.py::5
+providers/celery/src/airflow/providers/celery/executors/default_celery.py::2
providers/celery/tests/integration/celery/test_celery_executor.py::2
providers/cloudant/src/airflow/providers/cloudant/hooks/cloudant.py::2
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py::3