ashb commented on code in PR #62645:
URL: https://github.com/apache/airflow/pull/62645#discussion_r3072208467
##########
airflow-core/src/airflow/executors/workloads/__init__.py:
##########
@@ -34,13 +34,20 @@
TaskInstance = TaskInstanceDTO
+ExecutorWorkload = Annotated[
Review Comment:
Nit: since this is more than a single type
```suggestion
ExecutorWorkloads = Annotated[
```
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -206,43 +204,23 @@ def on_celery_worker_ready(*args, **kwargs):
# and deserialization for us.
@app.task(name="execute_workload")
def execute_workload(input: str) -> None:
+ if not AIRFLOW_V_3_2_PLUS:
+ raise RuntimeError("BaseExecutor.run_workload() requires Airflow
3.2+.")
+
from celery.exceptions import Ignore
from pydantic import TypeAdapter
- from airflow.executors import workloads
- from airflow.providers.common.compat.sdk import conf
- from airflow.sdk.execution_time.supervisor import supervise
+ from airflow.executors.workloads import ExecutorWorkload
- decoder = TypeAdapter[workloads.All](workloads.All)
+ decoder = TypeAdapter[ExecutorWorkload](ExecutorWorkload)
workload = decoder.validate_json(input)
celery_task_id = app.current_task.request.id
log.info("[%s] Executing workload in Celery: %s", celery_task_id, workload)
- base_url = conf.get("api", "base_url", fallback="/")
- # If it's a relative URL, use localhost:8080 as the default.
- if base_url.startswith("/"):
- base_url = f"http://localhost:8080{base_url}"
- default_execution_api_server = f"{base_url.rstrip('/')}/execution/"
-
try:
- if isinstance(workload, workloads.ExecuteTask):
- supervise(
- # This is the "wrong" ti type, but it duck types the same.
TODO: Create a protocol for this.
- ti=workload.ti, # type: ignore[arg-type]
- dag_rel_path=workload.dag_rel_path,
- bundle_info=workload.bundle_info,
- token=workload.token,
- server=conf.get("core", "execution_api_server_url",
fallback=default_execution_api_server),
- log_path=workload.log_path,
- )
- elif isinstance(workload, workloads.ExecuteCallback):
- success, error_msg = execute_callback_workload(workload.callback,
log)
- if not success:
- raise RuntimeError(error_msg or "Callback execution failed")
- else:
- raise ValueError(f"CeleryExecutor does not know how to handle
{type(workload)}")
+ BaseExecutor.run_workload(workload)
Review Comment:
I think this is going to cause compat issues.
This will mean that latest Celery provider will no longer work with Airflow
3.1.x as it doesn't have `BaseExecutor.run_workload` function.
##########
airflow-core/tests/unit/executors/test_local_executor.py:
##########
@@ -54,6 +55,17 @@
)
+def _make_mock_task_workload():
+ """Create a MagicMock that passes isinstance checks for ExecuteTask and
has required attributes."""
+ task_workload = mock.MagicMock(spec=workloads.ExecuteTask)
Review Comment:
`workloads.ExecuteTask` is essentially a dataclass isn't it -- Is there a
reason we shouldn't create the actual object even if the ti and bundle_info are
mock objects?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]