This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 542a30fe8ae [v3-2-test] add check for xcom permission when result is 
specified in query parameter (#64415) (#64488)
542a30fe8ae is described below

commit 542a30fe8ae424e534feff1d1d9f1cb8d0e9496d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 30 17:15:27 2026 +0200

    [v3-2-test] add check for xcom permission when result is specified in query 
parameter (#64415) (#64488)
    
    (cherry picked from commit 35ca494ace6ec7d4962c27f5fd7137097934c0b1)
    
    Co-authored-by: Kevin Yang <[email protected]>
---
 .../api_fastapi/core_api/routes/public/dag_run.py  | 15 +++++++++-
 .../core_api/routes/public/test_dag_run.py         | 32 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 67f76307124..ff42238806b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -33,7 +33,8 @@ from airflow.api.common.mark_tasks import (
     set_dag_run_state_to_queued,
     set_dag_run_state_to_success,
 )
-from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
+from airflow.api_fastapi.app import get_auth_manager
+from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity, DagDetails
 from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, 
get_latest_version_of_dag
 from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
 from airflow.api_fastapi.common.db.dag_runs import (
@@ -536,6 +537,7 @@ def wait_dag_run_until_finished(
     dag_id: str,
     dag_run_id: str,
     session: SessionDep,
+    user: GetUserDep,
     interval: Annotated[float, Query(gt=0.0, description="Seconds to wait 
between dag run state checks")],
     result_task_ids: Annotated[
         list[str] | None,
@@ -543,6 +545,17 @@ def wait_dag_run_until_finished(
     ] = None,
 ):
     "Wait for a dag run until it finishes, and return its result(s)."
+    if result_task_ids:
+        if not get_auth_manager().is_authorized_dag(
+            method="GET",
+            access_entity=DagAccessEntity.XCOM,
+            details=DagDetails(id=dag_id),
+            user=user,
+        ):
+            raise HTTPException(
+                status.HTTP_403_FORBIDDEN,
+                "User is not authorized to read XCom data for this DAG",
+            )
     if not session.scalar(select(1).where(DagRun.dag_id == dag_id, 
DagRun.run_id == dag_run_id)):
         raise HTTPException(
             status.HTTP_404_NOT_FOUND,
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 3a55532e3d7..2e80a3501fe 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -2079,3 +2079,35 @@ class TestWaitDagRun:
         assert response.status_code == 200
         data = response.json()
         assert data == {"state": DagRunState.SUCCESS, "results": {"task_1": 
'"result_1"'}}
+
+    def test_should_respond_403_when_user_lacks_xcom_permission(self, 
test_client):
+        from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity, DagDetails
+
+        with mock.patch(
+            
"airflow.api_fastapi.core_api.routes.public.dag_run.get_auth_manager",
+            autospec=True,
+        ) as mock_get_auth_manager:
+            mock_get_auth_manager.return_value.is_authorized_dag.return_value 
= False
+
+            response = test_client.get(
+                f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait",
+                params={"interval": "1", "result": "task_1"},
+            )
+
+            assert response.status_code == 403
+            
mock_get_auth_manager.return_value.is_authorized_dag.assert_called_once_with(
+                method="GET",
+                access_entity=DagAccessEntity.XCOM,
+                details=DagDetails(id=DAG1_ID),
+                user=mock.ANY,
+            )
+
+    def 
test_should_respond_200_without_result_when_user_lacks_xcom_permission(self, 
test_client):
+        """Waiting without result parameter should not require XCom 
permissions."""
+        response = test_client.get(
+            f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait",
+            params={"interval": "1"},
+        )
+        assert response.status_code == 200
+        data = response.json()
+        assert data == {"state": DagRunState.SUCCESS}

Reply via email to