This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f0a1baf5fc9b35e4303e55f9255c7eede0a09e80
Author: boushphong <[email protected]>
AuthorDate: Wed Mar 22 21:22:39 2023 +0700

    Fix inconsistent returned value of `airflow dags next-execution` cli 
command (#30117)
    
    * Fix inconsistent returned value of `airflow dags next-execution` cli 
command
    
    This commit is to fix the inconsistent returned value of `airflow dags 
next-execution` cli command when the dag is paused and catchup is False
    
    ---------
    
    Co-authored-by: Bui Duc Phong[ Bui Duc Phong ] <[email protected]>
    (cherry picked from commit c63836ccb763fd078e0665c7ef3353146b1afe96)
---
 airflow/cli/commands/dag_command.py    |  44 +++++--------
 tests/cli/commands/test_dag_command.py | 116 ++++++++++++++++++---------------
 2 files changed, 80 insertions(+), 80 deletions(-)

diff --git a/airflow/cli/commands/dag_command.py 
b/airflow/cli/commands/dag_command.py
index 0465c474ed..4a934cb8d3 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -27,7 +27,6 @@ import sys
 
 from graphviz.dot import Dot
 from sqlalchemy.orm import Session
-from sqlalchemy.sql.functions import func
 
 from airflow import settings
 from airflow.api.client import get_current_api_client
@@ -38,6 +37,7 @@ from airflow.jobs.base_job import BaseJob
 from airflow.models import DagBag, DagModel, DagRun, TaskInstance
 from airflow.models.dag import DAG
 from airflow.models.serialized_dag import SerializedDagModel
+from airflow.timetables.base import DataInterval
 from airflow.utils import cli as cli_utils, timezone
 from airflow.utils.cli import get_dag, get_dags, process_subdir, 
sigint_handler, suppress_logs_and_warning
 from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
@@ -288,36 +288,26 @@ def dag_next_execution(args):
         print("[INFO] Please be reminded this DAG is PAUSED now.", 
file=sys.stderr)
 
     with create_session() as session:
-        max_date_subq = (
-            session.query(func.max(DagRun.execution_date).label("max_date"))
-            .filter(DagRun.dag_id == dag.dag_id)
-            .subquery()
-        )
-        max_date_run: DagRun | None = (
-            session.query(DagRun)
-            .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date == 
max_date_subq.c.max_date)
-            .one_or_none()
-        )
-
-        if max_date_run is None:
-            print("[WARN] Only applicable when there is execution record found 
for the DAG.", file=sys.stderr)
+        last_parsed_dag: DagModel = 
session.query(DagModel).filter(DagModel.dag_id == dag.dag_id).one()
+
+    def print_execution_interval(interval: DataInterval | None):
+        if interval is None:
+            print(
+                "[WARN] No following schedule can be found. "
+                "This DAG may have schedule interval '@once' or `None`.",
+                file=sys.stderr,
+            )
             print(None)
             return
+        print(interval.start.isoformat())
 
-    next_info = dag.next_dagrun_info(dag.get_run_data_interval(max_date_run), 
restricted=False)
-    if next_info is None:
-        print(
-            "[WARN] No following schedule can be found. "
-            "This DAG may have schedule interval '@once' or `None`.",
-            file=sys.stderr,
-        )
-        print(None)
-        return
+    next_interval = dag.get_next_data_interval(last_parsed_dag)
+    print_execution_interval(next_interval)
 
-    print(next_info.logical_date.isoformat())
-    for _ in range(1, args.num_executions):
-        next_info = dag.next_dagrun_info(next_info.data_interval, 
restricted=False)
-        print(next_info.logical_date.isoformat())
+    for i in range(1, args.num_executions):
+        next_info = dag.next_dagrun_info(next_interval, restricted=False)
+        next_interval = None if next_info is None else next_info.data_interval
+        print_execution_interval(next_interval)
 
 
 @cli_utils.action_cli
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index 15f4c19348..f75539c65d 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -27,6 +27,7 @@ from unittest.mock import MagicMock
 
 import pendulum
 import pytest
+import time_machine
 
 from airflow import settings
 from airflow.cli import cli_parser
@@ -36,7 +37,6 @@ from airflow.models import DagBag, DagModel, DagRun
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils import timezone
 from airflow.utils.session import create_session
-from airflow.utils.state import State
 from airflow.utils.types import DagRunType
 from tests.models import TEST_DAGS_FOLDER
 from tests.test_utils.config import conf_vars
@@ -236,7 +236,6 @@ class TestCliDags:
 
     @mock.patch("airflow.cli.commands.dag_command.get_dag")
     def test_backfill_fails_without_loading_dags(self, mock_get_dag):
-
         cli_args = self.parser.parse_args(["dags", "backfill", 
"example_bash_operator"])
 
         with pytest.raises(AirflowException):
@@ -383,71 +382,82 @@ class TestCliDags:
             disable_retry=False,
         )
 
-    def test_next_execution(self):
-        dag_ids = [
-            "example_bash_operator",  # schedule='0 0 * * *'
-            "latest_only",  # schedule=timedelta(hours=4)
-            "example_python_operator",  # schedule=None
-            "example_xcom",  # schedule="@once"
+    def test_next_execution(self, tmp_path):
+        dag_test_list = [
+            ("future_schedule_daily", "timedelta(days=5)", "'0 0 * * *'", 
"True"),
+            ("future_schedule_every_4_hours", "timedelta(days=5)", 
"timedelta(hours=4)", "True"),
+            ("future_schedule_once", "timedelta(days=5)", "'@once'", "True"),
+            ("future_schedule_none", "timedelta(days=5)", "None", "True"),
+            ("past_schedule_once", "timedelta(days=-5)", "'@once'", "True"),
+            ("past_schedule_daily", "timedelta(days=-5)", "'0 0 * * *'", 
"True"),
+            ("past_schedule_daily_catchup_false", "timedelta(days=-5)", "'0 0 
* * *'", "False"),
         ]
 
-        # Delete DagRuns
-        with create_session() as session:
-            dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
-            dr.delete(synchronize_session=False)
-
-        # Test None output
-        args = self.parser.parse_args(["dags", "next-execution", dag_ids[0]])
-        with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
-            dag_command.dag_next_execution(args)
-            out = temp_stdout.getvalue()
-            # `next_execution` function is inapplicable if no execution record 
found
-            # It prints `None` in such cases
-            assert "None" in out
-
-        # The details below is determined by the schedule_interval of example 
DAGs
-        now = DEFAULT_DATE
-        expected_output = [
-            (now + timedelta(days=1)).isoformat(),
-            (now + timedelta(hours=4)).isoformat(),
-            "None",
-            "None",
-        ]
-        expected_output_2 = [
-            (now + timedelta(days=1)).isoformat() + os.linesep + (now + 
timedelta(days=2)).isoformat(),
-            (now + timedelta(hours=4)).isoformat() + os.linesep + (now + 
timedelta(hours=8)).isoformat(),
-            "None",
-            "None",
-        ]
-
-        for i, dag_id in enumerate(dag_ids):
-            dag = self.dagbag.dags[dag_id]
-            # Create a DagRun for each DAG, to prepare for next step
-            dag.create_dagrun(
-                run_type=DagRunType.SCHEDULED,
-                execution_date=now,
-                start_date=now,
-                state=State.FAILED,
+        for f in dag_test_list:
+            file_content = os.linesep.join(
+                [
+                    "from airflow import DAG",
+                    "from airflow.operators.empty import EmptyOperator",
+                    "from datetime import timedelta; from pendulum import 
today",
+                    f"dag = DAG('{f[0]}', start_date=today() + {f[1]}, 
schedule={f[2]}, catchup={f[3]})",
+                    "task = EmptyOperator(task_id='empty_task',dag=dag)",
+                ]
             )
+            dag_file = tmp_path / f"{f[0]}.py"
+            dag_file.write_text(file_content)
+
+        with time_machine.travel(DEFAULT_DATE):
+            clear_db_dags()
+            self.dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
+            self.dagbag.sync_to_db()
+
+        default_run = DEFAULT_DATE
+        future_run = default_run + timedelta(days=5)
+        past_run = default_run + timedelta(days=-5)
+
+        expected_output = {
+            "future_schedule_daily": (
+                future_run.isoformat(),
+                future_run.isoformat() + os.linesep + (future_run + 
timedelta(days=1)).isoformat(),
+            ),
+            "future_schedule_every_4_hours": (
+                future_run.isoformat(),
+                future_run.isoformat() + os.linesep + (future_run + 
timedelta(hours=4)).isoformat(),
+            ),
+            "future_schedule_once": (future_run.isoformat(), 
future_run.isoformat() + os.linesep + "None"),
+            "future_schedule_none": ("None", "None"),
+            "past_schedule_once": (past_run.isoformat(), "None"),
+            "past_schedule_daily": (
+                past_run.isoformat(),
+                past_run.isoformat() + os.linesep + (past_run + 
timedelta(days=1)).isoformat(),
+            ),
+            "past_schedule_daily_catchup_false": (
+                (default_run - timedelta(days=1)).isoformat(),
+                (default_run - timedelta(days=1)).isoformat() + os.linesep + 
default_run.isoformat(),
+            ),
+        }
 
+        for dag_id in expected_output:
             # Test num-executions = 1 (default)
-            args = self.parser.parse_args(["dags", "next-execution", dag_id])
+            args = self.parser.parse_args(["dags", "next-execution", dag_id, 
"-S", str(tmp_path)])
             with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
                 dag_command.dag_next_execution(args)
                 out = temp_stdout.getvalue()
-            assert expected_output[i] in out
+            assert expected_output[dag_id][0] in out
 
             # Test num-executions = 2
-            args = self.parser.parse_args(["dags", "next-execution", dag_id, 
"--num-executions", "2"])
+            args = self.parser.parse_args(
+                ["dags", "next-execution", dag_id, "--num-executions", "2", 
"-S", str(tmp_path)]
+            )
             with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
                 dag_command.dag_next_execution(args)
                 out = temp_stdout.getvalue()
-            assert expected_output_2[i] in out
+            assert expected_output[dag_id][1] in out
 
-        # Clean up before leaving
-        with create_session() as session:
-            dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
-            dr.delete(synchronize_session=False)
+        # Rebuild Test DB for other tests
+        clear_db_dags()
+        TestCliDags.dagbag = DagBag(include_examples=True)
+        TestCliDags.dagbag.sync_to_db()
 
     @conf_vars({("core", "load_examples"): "true"})
     def test_cli_report(self):

Reply via email to