Copilot commented on code in PR #64767:
URL: https://github.com/apache/airflow/pull/64767#discussion_r3066476663


##########
providers/celery/src/airflow/providers/celery/executors/default_celery.py:
##########
@@ -141,31 +141,57 @@ def get_default_celery_config(team_conf) -> dict[str, 
Any]:
 
     try:
         if celery_ssl_active:
+            ssl_mutual_tls = team_conf.getboolean("celery", "SSL_MUTUAL_TLS", 
fallback=True)
+            ssl_key = team_conf.get("celery", "SSL_KEY")
+            ssl_cert = team_conf.get("celery", "SSL_CERT")
+            ssl_cacert = team_conf.get("celery", "SSL_CACERT")
+
+            if ssl_mutual_tls and (not ssl_key or not ssl_cert):
+                raise AirflowException(
+                    "SSL_MUTUAL_TLS is True (default) but SSL_KEY and/or 
SSL_CERT are not set. "
+                    "Set both for mutual TLS, or set SSL_MUTUAL_TLS=False for 
one-way TLS."
+                )

Review Comment:
   The new explicit `AirflowException` raised for missing `SSL_KEY`/`SSL_CERT` 
will be caught by the broad `except Exception as e` and re-wrapped as an 
\"unknown Celery SSL Error\", which defeats the purpose of the clearer error 
and will likely break the new test that matches the specific message. Add an 
`except AirflowException: raise` before the generic handler, or make the 
generic handler re-raise `AirflowException` unchanged.



##########
providers/celery/src/airflow/providers/celery/executors/default_celery.py:
##########
@@ -141,31 +141,57 @@ def get_default_celery_config(team_conf) -> dict[str, 
Any]:
 
     try:
         if celery_ssl_active:
+            ssl_mutual_tls = team_conf.getboolean("celery", "SSL_MUTUAL_TLS", 
fallback=True)
+            ssl_key = team_conf.get("celery", "SSL_KEY")
+            ssl_cert = team_conf.get("celery", "SSL_CERT")
+            ssl_cacert = team_conf.get("celery", "SSL_CACERT")
+
+            if ssl_mutual_tls and (not ssl_key or not ssl_cert):
+                raise AirflowException(
+                    "SSL_MUTUAL_TLS is True (default) but SSL_KEY and/or 
SSL_CERT are not set. "
+                    "Set both for mutual TLS, or set SSL_MUTUAL_TLS=False for 
one-way TLS."
+                )
+
+            if not ssl_mutual_tls and (ssl_key or ssl_cert):
+                log.warning(
+                    "SSL_MUTUAL_TLS is False but SSL_KEY/SSL_CERT are 
configured. "
+                    "Client certificates will not be used. "
+                    "Set SSL_MUTUAL_TLS=True if you intend to use mutual TLS."
+                )
+
             if broker_url and re.search(r"amqps?://", broker_url):
-                broker_use_ssl = {
-                    "keyfile": team_conf.get("celery", "SSL_KEY"),
-                    "certfile": team_conf.get("celery", "SSL_CERT"),
-                    "ca_certs": team_conf.get("celery", "SSL_CACERT"),
-                    "cert_reqs": ssl.CERT_REQUIRED,
-                }
+                if ssl_mutual_tls:
+                    broker_use_ssl = {
+                        "keyfile": ssl_key,
+                        "certfile": ssl_cert,
+                        "ca_certs": ssl_cacert,
+                        "cert_reqs": ssl.CERT_REQUIRED,
+                    }
+                else:
+                    broker_use_ssl = {
+                        "ca_certs": ssl_cacert,
+                        "cert_reqs": ssl.CERT_REQUIRED,
+                    }
             elif broker_url and re.search("rediss?://|sentinel://", 
broker_url):
-                broker_use_ssl = {
-                    "ssl_keyfile": team_conf.get("celery", "SSL_KEY"),
-                    "ssl_certfile": team_conf.get("celery", "SSL_CERT"),
-                    "ssl_ca_certs": team_conf.get("celery", "SSL_CACERT"),
-                    "ssl_cert_reqs": ssl.CERT_REQUIRED,
-                }
+                if ssl_mutual_tls:
+                    broker_use_ssl = {
+                        "ssl_keyfile": ssl_key,
+                        "ssl_certfile": ssl_cert,
+                        "ssl_ca_certs": ssl_cacert,
+                        "ssl_cert_reqs": ssl.CERT_REQUIRED,
+                    }
+                else:
+                    broker_use_ssl = {
+                        "ssl_ca_certs": ssl_cacert,
+                        "ssl_cert_reqs": ssl.CERT_REQUIRED,
+                    }
             else:
                 raise AirflowException(
                     "The broker you configured does not support SSL_ACTIVE to 
be True. "
                     "Please use RabbitMQ or Redis if you would like to use SSL 
for broker."
                 )
 
             config["broker_use_ssl"] = broker_use_ssl

Review Comment:
   The new explicit `AirflowException` raised for missing `SSL_KEY`/`SSL_CERT` 
will be caught by the broad `except Exception as e` and re-wrapped as an 
\"unknown Celery SSL Error\", which defeats the purpose of the clearer error 
and will likely break the new test that matches the specific message. Add an 
`except AirflowException: raise` before the generic handler, or make the 
generic handler re-raise `AirflowException` unchanged.
   ```suggestion
               config["broker_use_ssl"] = broker_use_ssl
       except AirflowException:
           raise
   ```



##########
providers/celery/src/airflow/providers/celery/executors/default_celery.py:
##########
@@ -141,31 +141,57 @@ def get_default_celery_config(team_conf) -> dict[str, 
Any]:
 
     try:
         if celery_ssl_active:
+            ssl_mutual_tls = team_conf.getboolean("celery", "SSL_MUTUAL_TLS", 
fallback=True)
+            ssl_key = team_conf.get("celery", "SSL_KEY")
+            ssl_cert = team_conf.get("celery", "SSL_CERT")
+            ssl_cacert = team_conf.get("celery", "SSL_CACERT")

Review Comment:
   `SSL_CACERT` is central to the stated goal (\"server verification only via 
SSL_CACERT\"), but it is never validated. If it is empty/missing while 
`SSL_ACTIVE=True`, the connection may fail later with less actionable SSL 
errors, or it may fall back to system CA behavior unexpectedly. Consider 
explicitly requiring a non-empty `SSL_CACERT` whenever `SSL_ACTIVE=True` (both 
mutual and one-way) and raising a targeted `AirflowException` that tells users 
to set `SSL_CACERT` (or, if you intend to allow system CAs, document that and 
adjust `cert_reqs`/options accordingly).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to