This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 714a9a79a5 Add data_interval_start and data_interval_end in dagrun 
create API endpoint (#36630)
714a9a79a5 is described below

commit 714a9a79a59c317f58d6bd621acba9dd4e2a4622
Author: Burak Karakan <[email protected]>
AuthorDate: Fri Jan 26 19:27:55 2024 +0000

    Add data_interval_start and data_interval_end in dagrun create API endpoint 
(#36630)
---
 .../api_connexion/endpoints/dag_run_endpoint.py    | 14 +++-
 airflow/api_connexion/openapi/v1.yaml              |  6 +-
 airflow/api_connexion/schemas/dag_run_schema.py    | 16 +++-
 airflow/www/static/js/types/api-generated.ts       | 10 ++-
 .../endpoints/test_dag_run_endpoint.py             | 89 ++++++++++++++++++++--
 5 files changed, 120 insertions(+), 15 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py 
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 149af771e4..3826acb75f 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -61,6 +61,7 @@ from airflow.api_connexion.schemas.task_instance_schema 
import (
 from airflow.auth.managers.models.resource_details import DagAccessEntity
 from airflow.models import DagModel, DagRun
 from airflow.security import permissions
+from airflow.timetables.base import DataInterval
 from airflow.utils.airflow_flask_app import get_airflow_app
 from airflow.utils.db import get_query_count
 from airflow.utils.log.action_logger import action_event_from_permission
@@ -336,11 +337,22 @@ def post_dag_run(*, dag_id: str, session: Session = 
NEW_SESSION) -> APIResponse:
     if not dagrun_instance:
         try:
             dag = get_airflow_app().dag_bag.get_dag(dag_id)
+
+            data_interval_start = post_body.get("data_interval_start")
+            data_interval_end = post_body.get("data_interval_end")
+            if data_interval_start and data_interval_end:
+                data_interval = DataInterval(
+                    start=pendulum.instance(data_interval_start),
+                    end=pendulum.instance(data_interval_end),
+                )
+            else:
+                data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
+
             dag_run = dag.create_dagrun(
                 run_type=DagRunType.MANUAL,
                 run_id=run_id,
                 execution_date=logical_date,
-                
data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date),
+                data_interval=data_interval,
                 state=DagRunState.QUEUED,
                 conf=post_body.get("conf"),
                 external_trigger=True,
diff --git a/airflow/api_connexion/openapi/v1.yaml 
b/airflow/api_connexion/openapi/v1.yaml
index 03dabfb759..a94f303a5a 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2900,12 +2900,14 @@ components:
         data_interval_start:
           type: string
           format: date-time
-          readOnly: true
+          description: |
+            The beginning of the interval the DAG run covers.
           nullable: true
         data_interval_end:
           type: string
           format: date-time
-          readOnly: true
+          description: |
+            The end of the interval the DAG run covers.
           nullable: true
         last_scheduling_decision:
           type: string
diff --git a/airflow/api_connexion/schemas/dag_run_schema.py 
b/airflow/api_connexion/schemas/dag_run_schema.py
index da01751f59..6a1fc334b5 100644
--- a/airflow/api_connexion/schemas/dag_run_schema.py
+++ b/airflow/api_connexion/schemas/dag_run_schema.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 import json
 from typing import NamedTuple
 
-from marshmallow import fields, post_dump, pre_load, validate
+from marshmallow import ValidationError, fields, post_dump, pre_load, 
validate, validates_schema
 from marshmallow.schema import Schema
 from marshmallow.validate import Range
 from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
@@ -69,8 +69,8 @@ class DAGRunSchema(SQLAlchemySchema):
     state = DagStateField(dump_only=True)
     external_trigger = auto_field(dump_default=True, dump_only=True)
     conf = ConfObject()
-    data_interval_start = auto_field(dump_only=True)
-    data_interval_end = auto_field(dump_only=True)
+    data_interval_start = auto_field(validate=validate_istimezone)
+    data_interval_end = auto_field(validate=validate_istimezone)
     last_scheduling_decision = auto_field(dump_only=True)
     run_type = auto_field(dump_only=True)
     note = auto_field(dump_only=False)
@@ -121,6 +121,16 @@ class DAGRunSchema(SQLAlchemySchema):
 
         return ret_data
 
+    @validates_schema
+    def validate_data_interval_dates(self, data, **kwargs):
+        data_interval_start_exists = data.get("data_interval_start") is not 
None
+        data_interval_end_exists = data.get("data_interval_end") is not None
+
+        if data_interval_start_exists != data_interval_end_exists:
+            raise ValidationError(
+                "Both 'data_interval_start' and 'data_interval_end' must be 
specified together"
+            )
+
 
 class SetDagRunStateFormSchema(Schema):
     """Schema for handling the request of setting state of DAG run."""
diff --git a/airflow/www/static/js/types/api-generated.ts 
b/airflow/www/static/js/types/api-generated.ts
index bd205c8e69..d9e1816bbd 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -1077,9 +1077,15 @@ export interface components {
       start_date?: string | null;
       /** Format: date-time */
       end_date?: string | null;
-      /** Format: date-time */
+      /**
+       * Format: date-time
+       * @description The beginning of the interval the DAG run covers.
+       */
       data_interval_start?: string | null;
-      /** Format: date-time */
+      /**
+       * Format: date-time
+       * @description The end of the interval the DAG run covers.
+       */
       data_interval_end?: string | null;
       /** Format: date-time */
       last_scheduling_decision?: string | null;
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 0ce3f222db..045b5392f5 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -1159,25 +1159,51 @@ class TestGetDagRunBatchDateFilters(TestDagRunEndpoint):
 class TestPostDagRun(TestDagRunEndpoint):
     @pytest.mark.parametrize("logical_date_field_name", ["execution_date", 
"logical_date"])
     @pytest.mark.parametrize(
-        "dag_run_id, logical_date, note",
+        "dag_run_id, logical_date, note, data_interval_start, 
data_interval_end",
         [
-            pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", 
"test-note", id="all-present"),
-            pytest.param(None, "2020-06-11T18:00:00+00:00", None, 
id="only-date"),
-            pytest.param(None, None, None, id="all-missing"),
+            pytest.param(
+                "TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", 
None, None, id="all-present"
+            ),
+            pytest.param(
+                "TEST_DAG_RUN",
+                "2024-06-11T18:00:00+00:00",
+                "test-note",
+                "2024-01-03T00:00:00+00:00",
+                "2024-01-04T05:00:00+00:00",
+                id="all-present-with-dates",
+            ),
+            pytest.param(None, "2020-06-11T18:00:00+00:00", None, None, None, 
id="only-date"),
+            pytest.param(None, None, None, None, None, id="all-missing"),
         ],
     )
-    def test_should_respond_200(self, session, logical_date_field_name, 
dag_run_id, logical_date, note):
+    def test_should_respond_200(
+        self,
+        session,
+        logical_date_field_name,
+        dag_run_id,
+        logical_date,
+        note,
+        data_interval_start,
+        data_interval_end,
+    ):
         self._create_dag("TEST_DAG_ID")
 
         # We'll patch airflow.utils.timezone.utcnow to always return this so we
         # can check the returned dates.
         fixed_now = timezone.utcnow()
 
+        # raise NotImplementedError("TODO: Add tests for data_interval_start 
and data_interval_end")
+
         request_json = {}
         if logical_date is not None:
             request_json[logical_date_field_name] = logical_date
         if dag_run_id is not None:
             request_json["dag_run_id"] = dag_run_id
+        if data_interval_start is not None:
+            request_json["data_interval_start"] = data_interval_start
+        if data_interval_end is not None:
+            request_json["data_interval_end"] = data_interval_end
+
         request_json["note"] = note
         with mock.patch("airflow.utils.timezone.utcnow", lambda: fixed_now):
             response = self.client.post(
@@ -1185,6 +1211,7 @@ class TestPostDagRun(TestDagRunEndpoint):
                 json=request_json,
                 environ_overrides={"REMOTE_USER": "test"},
             )
+
         assert response.status_code == 200
 
         if logical_date is None:
@@ -1195,6 +1222,13 @@ class TestPostDagRun(TestDagRunEndpoint):
             expected_dag_run_id = f"manual__{expected_logical_date}"
         else:
             expected_dag_run_id = dag_run_id
+
+        expected_data_interval_start = expected_logical_date
+        expected_data_interval_end = expected_logical_date
+        if data_interval_start is not None and data_interval_end is not None:
+            expected_data_interval_start = data_interval_start
+            expected_data_interval_end = data_interval_end
+
         assert response.json == {
             "conf": {},
             "dag_id": "TEST_DAG_ID",
@@ -1205,8 +1239,8 @@ class TestPostDagRun(TestDagRunEndpoint):
             "external_trigger": True,
             "start_date": None,
             "state": "queued",
-            "data_interval_end": expected_logical_date,
-            "data_interval_start": expected_logical_date,
+            "data_interval_end": expected_data_interval_end,
+            "data_interval_start": expected_data_interval_start,
             "last_scheduling_decision": None,
             "run_type": "manual",
             "note": note,
@@ -1323,6 +1357,47 @@ class TestPostDagRun(TestDagRunEndpoint):
         assert response.json["title"] == "logical_date conflicts with 
execution_date"
         assert response.json["detail"] == (f"'{logical_date}' != 
'{execution_date}'")
 
+    @pytest.mark.parametrize(
+        "data_interval_start, data_interval_end, expected",
+        [
+            (
+                "2020-11-10T08:25:56.939143",
+                None,
+                "'2020-11-10T08:25:56.939143' is not a 'date-time' - 
'data_interval_start'",
+            ),
+            (
+                None,
+                "2020-11-10T08:25:56.939143",
+                "'2020-11-10T08:25:56.939143' is not a 'date-time' - 
'data_interval_end'",
+            ),
+            (
+                "2020-11-10T08:25:56.939143+00:00",
+                None,
+                "{'_schema': [\"Both 'data_interval_start' and 
'data_interval_end' must be specified together\"]}",
+            ),
+            (
+                None,
+                "2020-11-10T08:25:56.939143+00:00",
+                "{'_schema': [\"Both 'data_interval_start' and 
'data_interval_end' must be specified together\"]}",
+            ),
+        ],
+    )
+    def test_should_response_400_for_missing_start_date_or_end_date(
+        self, data_interval_start, data_interval_end, expected
+    ):
+        self._create_dag("TEST_DAG_ID")
+        response = self.client.post(
+            "api/v1/dags/TEST_DAG_ID/dagRuns",
+            json={
+                "execution_date": "2020-11-10T08:25:56.939143+00:00",
+                "data_interval_start": data_interval_start,
+                "data_interval_end": data_interval_end,
+            },
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+        assert response.status_code == 400
+        assert response.json["detail"] == expected
+
     @pytest.mark.parametrize(
         "data, expected",
         [

Reply via email to