This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 58fb2fb0938 fix/celery-ssl-skip-empty-key-cert (#64767)
58fb2fb0938 is described below

commit 58fb2fb0938225aa928553071b393e382de34fdc
Author: Daniel Seo <[email protected]>
AuthorDate: Sun Apr 12 20:37:20 2026 -0400

    fix/celery-ssl-skip-empty-key-cert (#64767)
    
    Co-authored-by: hseo36 <[email protected]>
---
 providers/celery/provider.yaml                     |  13 ++-
 .../providers/celery/executors/default_celery.py   |  60 ++++++----
 .../airflow/providers/celery/get_provider_info.py  |  11 +-
 .../unit/celery/executors/test_celery_executor.py  | 129 +++++++++++++++++++++
 scripts/ci/prek/known_airflow_exceptions.txt       |   2 +-
 5 files changed, 189 insertions(+), 26 deletions(-)

diff --git a/providers/celery/provider.yaml b/providers/celery/provider.yaml
index b89ea4e16ae..6b6c1007ad3 100644
--- a/providers/celery/provider.yaml
+++ b/providers/celery/provider.yaml
@@ -265,16 +265,25 @@ config:
         type: string
         example: ~
         default: "False"
+      ssl_mutual_tls:
+        description: |
+          Whether to require mutual TLS (client certificate authentication).
+          When True (default), SSL_KEY and SSL_CERT must be set.
+          Set to False for one-way TLS (server verification only).
+        version_added: ~
+        type: boolean
+        example: ~
+        default: "True"
       ssl_key:
         description: |
-          Path to the client key.
+          Path to the client key. Required when SSL_MUTUAL_TLS is True.
         version_added: ~
         type: string
         example: ~
         default: ""
       ssl_cert:
         description: |
-          Path to the client certificate.
+          Path to the client certificate. Required when SSL_MUTUAL_TLS is True.
         version_added: ~
         type: string
         example: ~
diff --git 
a/providers/celery/src/airflow/providers/celery/executors/default_celery.py 
b/providers/celery/src/airflow/providers/celery/executors/default_celery.py
index f0ef8185d1e..9dd94036734 100644
--- a/providers/celery/src/airflow/providers/celery/executors/default_celery.py
+++ b/providers/celery/src/airflow/providers/celery/executors/default_celery.py
@@ -141,36 +141,54 @@ def get_default_celery_config(team_conf) -> dict[str, 
Any]:
 
     try:
         if celery_ssl_active:
+            ssl_mutual_tls = team_conf.getboolean("celery", "SSL_MUTUAL_TLS", 
fallback=True)
+            ssl_key = team_conf.get("celery", "SSL_KEY")
+            ssl_cert = team_conf.get("celery", "SSL_CERT")
+            ssl_cacert = team_conf.get("celery", "SSL_CACERT")
+
+            if ssl_mutual_tls and (not ssl_key or not ssl_cert):
+                raise ValueError(
+                    "SSL_MUTUAL_TLS is True (default) but SSL_KEY and/or 
SSL_CERT are not set. "
+                    "Set both for mutual TLS, or set SSL_MUTUAL_TLS=False for 
one-way TLS."
+                )
+
+            if not ssl_cacert:
+                log.info("SSL_CACERT is not set. Using system CA certificates 
for server verification.")
+
+            if not ssl_mutual_tls and (ssl_key or ssl_cert):
+                log.warning(
+                    "SSL_MUTUAL_TLS is False but SSL_KEY/SSL_CERT are 
configured. "
+                    "Client certificates will not be used. "
+                    "Set SSL_MUTUAL_TLS=True if you intend to use mutual TLS."
+                )
+
             if broker_url and re.search(r"amqps?://", broker_url):
-                broker_use_ssl = {
-                    "keyfile": team_conf.get("celery", "SSL_KEY"),
-                    "certfile": team_conf.get("celery", "SSL_CERT"),
-                    "ca_certs": team_conf.get("celery", "SSL_CACERT"),
-                    "cert_reqs": ssl.CERT_REQUIRED,
-                }
+                broker_use_ssl = {"cert_reqs": ssl.CERT_REQUIRED}
+                if ssl_cacert:
+                    broker_use_ssl["ca_certs"] = ssl_cacert
+                if ssl_mutual_tls:
+                    broker_use_ssl["keyfile"] = ssl_key
+                    broker_use_ssl["certfile"] = ssl_cert
             elif broker_url and re.search("rediss?://|sentinel://", 
broker_url):
-                broker_use_ssl = {
-                    "ssl_keyfile": team_conf.get("celery", "SSL_KEY"),
-                    "ssl_certfile": team_conf.get("celery", "SSL_CERT"),
-                    "ssl_ca_certs": team_conf.get("celery", "SSL_CACERT"),
-                    "ssl_cert_reqs": ssl.CERT_REQUIRED,
-                }
+                broker_use_ssl = {"ssl_cert_reqs": ssl.CERT_REQUIRED}
+                if ssl_cacert:
+                    broker_use_ssl["ssl_ca_certs"] = ssl_cacert
+                if ssl_mutual_tls:
+                    broker_use_ssl["ssl_keyfile"] = ssl_key
+                    broker_use_ssl["ssl_certfile"] = ssl_cert
             else:
-                raise AirflowException(
+                raise ValueError(
                     "The broker you configured does not support SSL_ACTIVE to 
be True. "
                     "Please use RabbitMQ or Redis if you would like to use SSL 
for broker."
                 )
 
             config["broker_use_ssl"] = broker_use_ssl
-    except AirflowConfigException:
-        raise AirflowException(
-            "AirflowConfigException: SSL_ACTIVE is True, please ensure 
SSL_KEY, SSL_CERT and SSL_CACERT are set"
-        )
+    except ValueError:
+        raise
     except Exception as e:
-        raise AirflowException(
-            f"Exception: There was an unknown Celery SSL Error. Please ensure 
you want to use SSL and/or have "
-            f"all necessary certs and key ({e})."
-        )
+        raise RuntimeError(
+            f"Unknown Celery SSL error. Please ensure you want to use SSL and 
have all necessary certs and key ({e})."
+        ) from e
 
     # Warning for not recommended backends
     match_not_recommended_backend = re.search("rediss?://|amqp://|rpc://", 
result_backend)
diff --git a/providers/celery/src/airflow/providers/celery/get_provider_info.py 
b/providers/celery/src/airflow/providers/celery/get_provider_info.py
index 071071133bd..ce59a55918f 100644
--- a/providers/celery/src/airflow/providers/celery/get_provider_info.py
+++ b/providers/celery/src/airflow/providers/celery/get_provider_info.py
@@ -169,15 +169,22 @@ def get_provider_info():
                         "example": None,
                         "default": "False",
                     },
+                    "ssl_mutual_tls": {
+                        "description": "Whether to require mutual TLS (client 
certificate authentication).\nWhen True (default), SSL_KEY and SSL_CERT must be 
set.\nSet to False for one-way TLS (server verification only).\n",
+                        "version_added": None,
+                        "type": "boolean",
+                        "example": None,
+                        "default": "True",
+                    },
                     "ssl_key": {
-                        "description": "Path to the client key.\n",
+                        "description": "Path to the client key. Required when 
SSL_MUTUAL_TLS is True.\n",
                         "version_added": None,
                         "type": "string",
                         "example": None,
                         "default": "",
                     },
                     "ssl_cert": {
-                        "description": "Path to the client certificate.\n",
+                        "description": "Path to the client certificate. 
Required when SSL_MUTUAL_TLS is True.\n",
                         "version_added": None,
                         "type": "string",
                         "example": None,
diff --git 
a/providers/celery/tests/unit/celery/executors/test_celery_executor.py 
b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
index d69df9ec5e7..b1fee33cb75 100644
--- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
+++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
@@ -872,6 +872,64 @@ class TestAmqpsSslConfig:
         assert broker_ssl["keyfile"] == "/path/to/key.pem"
         assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
 
+    @conf_vars(
+        {
+            ("celery", "BROKER_URL"): "rediss://redis:6380//",
+            ("celery", "SSL_ACTIVE"): "True",
+            ("celery", "SSL_KEY"): "/path/to/key.pem",
+            ("celery", "SSL_CERT"): "/path/to/cert.pem",
+            ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+        }
+    )
+    def test_redis_mutual_tls_builds_ssl_config(self):
+        """Test mutual TLS: all three SSL keys produce correct broker_use_ssl 
for Redis."""
+        import importlib
+        import ssl
+
+        importlib.reload(default_celery)
+
+        config = default_celery.DEFAULT_CELERY_CONFIG
+        assert "broker_use_ssl" in config
+        broker_ssl = config["broker_use_ssl"]
+        assert broker_ssl["ssl_keyfile"] == "/path/to/key.pem"
+        assert broker_ssl["ssl_certfile"] == "/path/to/cert.pem"
+        assert broker_ssl["ssl_ca_certs"] == "/path/to/ca.pem"
+        assert broker_ssl["ssl_cert_reqs"] == ssl.CERT_REQUIRED
+
+    @conf_vars(
+        {
+            ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+            ("celery", "SSL_ACTIVE"): "True",
+            ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+        }
+    )
+    def test_amqps_mutual_tls_missing_key_cert_raises(self):
+        """Test that mutual TLS (default) raises error when SSL_KEY/SSL_CERT 
are missing."""
+        import importlib
+
+        with pytest.raises(ValueError, match="SSL_MUTUAL_TLS is True.*but 
SSL_KEY and/or SSL_CERT"):
+            importlib.reload(default_celery)
+
+    @conf_vars(
+        {
+            ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+            ("celery", "SSL_ACTIVE"): "True",
+            ("celery", "SSL_KEY"): "/path/to/key",
+            ("celery", "SSL_CERT"): "/path/to/cert",
+            ("celery", "SSL_CACERT"): "",
+        }
+    )
+    def test_ssl_active_without_cacert_uses_system_cas(self):
+        """Test that empty SSL_CACERT falls back to system CAs (ca_certs 
omitted from config)."""
+        import importlib
+        import ssl
+
+        importlib.reload(default_celery)
+        broker_ssl = default_celery.DEFAULT_CELERY_CONFIG["broker_use_ssl"]
+
+        assert "ca_certs" not in broker_ssl
+        assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+
     @conf_vars(
         {
             ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
@@ -887,6 +945,77 @@ class TestAmqpsSslConfig:
         config = default_celery.DEFAULT_CELERY_CONFIG
         assert "broker_use_ssl" not in config
 
+    @conf_vars(
+        {
+            ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+            ("celery", "SSL_ACTIVE"): "True",
+            ("celery", "SSL_MUTUAL_TLS"): "False",
+            ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+        }
+    )
+    def test_amqps_one_way_tls(self):
+        """Test one-way TLS for AMQP: only ca_certs, no keyfile/certfile."""
+        import importlib
+        import ssl
+
+        importlib.reload(default_celery)
+
+        config = default_celery.DEFAULT_CELERY_CONFIG
+        assert "broker_use_ssl" in config
+        broker_ssl = config["broker_use_ssl"]
+        assert broker_ssl["ca_certs"] == "/path/to/ca.pem"
+        assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+        assert "keyfile" not in broker_ssl
+        assert "certfile" not in broker_ssl
+
+    @conf_vars(
+        {
+            ("celery", "BROKER_URL"): "rediss://redis:6380//",
+            ("celery", "SSL_ACTIVE"): "True",
+            ("celery", "SSL_MUTUAL_TLS"): "False",
+            ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+        }
+    )
+    def test_redis_one_way_tls(self):
+        """Test one-way TLS for Redis: only ssl_ca_certs, no 
ssl_keyfile/ssl_certfile."""
+        import importlib
+        import ssl
+
+        importlib.reload(default_celery)
+
+        config = default_celery.DEFAULT_CELERY_CONFIG
+        assert "broker_use_ssl" in config
+        broker_ssl = config["broker_use_ssl"]
+        assert broker_ssl["ssl_ca_certs"] == "/path/to/ca.pem"
+        assert broker_ssl["ssl_cert_reqs"] == ssl.CERT_REQUIRED
+        assert "ssl_keyfile" not in broker_ssl
+        assert "ssl_certfile" not in broker_ssl
+
+    @conf_vars(
+        {
+            ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+            ("celery", "SSL_ACTIVE"): "True",
+            ("celery", "SSL_MUTUAL_TLS"): "False",
+            ("celery", "SSL_KEY"): "/path/to/key.pem",
+            ("celery", "SSL_CERT"): "/path/to/cert.pem",
+            ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+        }
+    )
+    def test_one_way_tls_ignores_key_cert(self):
+        """Test that SSL_KEY/SSL_CERT are ignored when SSL_MUTUAL_TLS is 
False."""
+        import importlib
+        import ssl
+
+        importlib.reload(default_celery)
+
+        config = default_celery.DEFAULT_CELERY_CONFIG
+        assert "broker_use_ssl" in config
+        broker_ssl = config["broker_use_ssl"]
+        assert broker_ssl["ca_certs"] == "/path/to/ca.pem"
+        assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+        assert "keyfile" not in broker_ssl
+        assert "certfile" not in broker_ssl
+
 
 class TestCreateCeleryAppTeamIsolation:
     """Tests for create_celery_app() multi-team config isolation."""
diff --git a/scripts/ci/prek/known_airflow_exceptions.txt 
b/scripts/ci/prek/known_airflow_exceptions.txt
index 9cd3d52cd1f..1a870c169d6 100644
--- a/scripts/ci/prek/known_airflow_exceptions.txt
+++ b/scripts/ci/prek/known_airflow_exceptions.txt
@@ -150,7 +150,7 @@ 
providers/arangodb/src/airflow/providers/arangodb/hooks/arangodb.py::9
 providers/arangodb/src/airflow/providers/arangodb/operators/arangodb.py::1
 providers/atlassian/jira/src/airflow/providers/atlassian/jira/hooks/jira.py::1
 
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py::2
-providers/celery/src/airflow/providers/celery/executors/default_celery.py::5
+providers/celery/src/airflow/providers/celery/executors/default_celery.py::2
 providers/celery/tests/integration/celery/test_celery_executor.py::2
 providers/cloudant/src/airflow/providers/cloudant/hooks/cloudant.py::2
 
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py::3

Reply via email to