ferruzzi commented on code in PR #62645:
URL: https://github.com/apache/airflow/pull/62645#discussion_r3075447265
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -206,43 +204,23 @@ def on_celery_worker_ready(*args, **kwargs):
# and deserialization for us.
@app.task(name="execute_workload")
def execute_workload(input: str) -> None:
+ if not AIRFLOW_V_3_2_PLUS:
+ raise RuntimeError("BaseExecutor.run_workload() requires Airflow
3.2+.")
+
from celery.exceptions import Ignore
from pydantic import TypeAdapter
- from airflow.executors import workloads
- from airflow.providers.common.compat.sdk import conf
- from airflow.sdk.execution_time.supervisor import supervise
+ from airflow.executors.workloads import ExecutorWorkload
- decoder = TypeAdapter[workloads.All](workloads.All)
+ decoder = TypeAdapter[ExecutorWorkload](ExecutorWorkload)
workload = decoder.validate_json(input)
celery_task_id = app.current_task.request.id
log.info("[%s] Executing workload in Celery: %s", celery_task_id, workload)
- base_url = conf.get("api", "base_url", fallback="/")
- # If it's a relative URL, use localhost:8080 as the default.
- if base_url.startswith("/"):
- base_url = f"http://localhost:8080{base_url}"
- default_execution_api_server = f"{base_url.rstrip('/')}/execution/"
-
try:
- if isinstance(workload, workloads.ExecuteTask):
- supervise(
- # This is the "wrong" ti type, but it duck types the same.
TODO: Create a protocol for this.
- ti=workload.ti, # type: ignore[arg-type]
- dag_rel_path=workload.dag_rel_path,
- bundle_info=workload.bundle_info,
- token=workload.token,
- server=conf.get("core", "execution_api_server_url",
fallback=default_execution_api_server),
- log_path=workload.log_path,
- )
- elif isinstance(workload, workloads.ExecuteCallback):
- success, error_msg = execute_callback_workload(workload.callback,
log)
- if not success:
- raise RuntimeError(error_msg or "Callback execution failed")
- else:
- raise ValueError(f"CeleryExecutor does not know how to handle
{type(workload)}")
+ BaseExecutor.run_workload(workload)
Review Comment:
I think I fixed it. In place of the exception, I copy/pasted the old
workflow into a helper so it's now "if not 3.2: run old code via a helper;
else: run new code" which should make it easy to replace whenever the min
version gets bumped instead of trying to extricate the two later.. It
actually surfaced another issue that would already be breaking on 3.1.x: the
old code referenced workloads.ExecuteCallback in an isinstance check, but that
class doesn't exist until 3.2 — so I dropped that branch from the fallback
since it was dead code anyway. Other than that tweak and removing the local
import for conf in favor of the top-level import of the same object, the helper
is copypasta from the code currently in main.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]