This is an automated email from the ASF dual-hosted git repository.
bugraoz pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 98b3e0db39f sync airflowctl from main to v3-2-test (#65069)
98b3e0db39f is described below
commit 98b3e0db39fd62c8767d66f1a10c1a2fdb76de8f
Author: Bugra Ozturk <[email protected]>
AuthorDate: Sun Apr 12 17:31:55 2026 +0200
sync airflowctl from main to v3-2-test (#65069)
---
.../docs/installation/installing-from-pypi.rst | 34 +++++++-
airflow-ctl/pyproject.toml | 2 +
airflow-ctl/src/airflowctl/api/client.py | 31 ++++++--
airflow-ctl/src/airflowctl/api/operations.py | 24 +++---
.../src/airflowctl/ctl/commands/auth_command.py | 6 +-
airflow-ctl/tests/airflow_ctl/api/test_client.py | 18 ++++-
.../tests/airflow_ctl/api/test_operations.py | 91 ++++++++++++++--------
.../airflow_ctl/ctl/commands/test_auth_command.py | 18 +++++
8 files changed, 171 insertions(+), 53 deletions(-)
diff --git a/airflow-ctl/docs/installation/installing-from-pypi.rst
b/airflow-ctl/docs/installation/installing-from-pypi.rst
index 9ec3a5e73ab..826cbee7e95 100644
--- a/airflow-ctl/docs/installation/installing-from-pypi.rst
+++ b/airflow-ctl/docs/installation/installing-from-pypi.rst
@@ -21,8 +21,38 @@ Installation from PyPI
This page describes installations using the ``apache-airflow-ctl`` package
`published in
PyPI <https://pypi.org/project/apache-airflow-ctl/>`__.
-Installation tools
-''''''''''''''''''
+Installation via ``pipx`` or ``uv`` as tool
+'''''''''''''''''''''''''''''''''''''''''''
+
+You can locally deploy or run airflowctl without installing it in your
environment using tools like `pipx <https://pypi.org/project/pipx/>`_ or `uv
<https://astral.sh/uv/>`_.
+
+Via ``pipx`` it is possible to install airflowctl directly from PyPI using the
command below:
+
+.. code-block:: bash
+
+ pipx install "apache-airflow-ctl==|version|"
+
+As well as directly run w/o installing it first:
+
+.. code-block:: bash
+
+ pipx run "apache-airflow-ctl --help"
+
+Same via Astral ``uv`` to install airflowctl from PyPI using the command below:
+
+.. code-block:: bash
+
+ uv tool install "apache-airflow-ctl==|version|"
+
+Additionally to jump-start using it you can also use the shortcut via ``uvx``
command and directly run it without installing it first:
+
+.. code-block:: bash
+
+ uvx apache-airflow-ctl --help
+
+
+Installation in your environment
+''''''''''''''''''''''''''''''''
Only ``pip`` and ``uv`` installation is currently officially supported.
diff --git a/airflow-ctl/pyproject.toml b/airflow-ctl/pyproject.toml
index a14d42ecb15..06278d01cd6 100644
--- a/airflow-ctl/pyproject.toml
+++ b/airflow-ctl/pyproject.toml
@@ -67,6 +67,8 @@ YouTube =
"https://www.youtube.com/channel/UCSXwxpWZQ7XZ1WL3wqevChA/"
[project.scripts]
airflowctl = "airflowctl.__main__:main"
+# Redundant definition allowing direct execution via `uvx apache-airflow-ctl
...`
+apache-airflow-ctl = "airflowctl.__main__:main"
[build-system]
requires = [
diff --git a/airflow-ctl/src/airflowctl/api/client.py
b/airflow-ctl/src/airflowctl/api/client.py
index 06e78a6a32a..51786bfcfcb 100644
--- a/airflow-ctl/src/airflowctl/api/client.py
+++ b/airflow-ctl/src/airflowctl/api/client.py
@@ -169,6 +169,11 @@ class Credentials:
"""Generate path for the CLI config file."""
return f"{self.api_environment}.json"
+ @staticmethod
+ def token_key_for_environment(api_environment: str) -> str:
+ """Build the keyring/debug token key for a given environment name."""
+ return f"api_token_{api_environment}"
+
def save(self, skip_keyring: bool = False):
"""
Save the credentials to keyring and URL to disk as a file.
@@ -186,7 +191,7 @@ class Credentials:
with open(
os.path.join(default_config_dir,
f"debug_creds_{self.input_cli_config_file}"), "w"
) as f:
- json.dump({f"api_token_{self.api_environment}":
self.api_token}, f)
+
json.dump({self.token_key_for_environment(self.api_environment):
self.api_token}, f)
else:
if skip_keyring:
return
@@ -199,7 +204,11 @@ class Credentials:
for candidate in candidates:
if hasattr(candidate, "_get_new_password"):
candidate._get_new_password = _bounded_get_new_password
- keyring.set_password("airflowctl",
f"api_token_{self.api_environment}", self.api_token) # type: ignore[arg-type]
+ keyring.set_password(
+ "airflowctl",
+ self.token_key_for_environment(self.api_environment),
+ self.api_token, # type: ignore[arg-type]
+ )
except (NoKeyringError, NotImplementedError) as e:
log.error(e)
raise AirflowCtlKeyringException(
@@ -228,13 +237,23 @@ class Credentials:
debug_creds_path = os.path.join(
default_config_dir,
f"debug_creds_{self.input_cli_config_file}"
)
- with open(debug_creds_path) as df:
- debug_credentials = json.load(df)
- self.api_token =
debug_credentials.get(f"api_token_{self.api_environment}")
+ try:
+ with open(debug_creds_path) as df:
+ debug_credentials = json.load(df)
+ self.api_token = debug_credentials.get(
+
self.token_key_for_environment(self.api_environment)
+ )
+ except FileNotFoundError as e:
+ if self.client_kind == ClientKind.CLI:
+ raise AirflowCtlCredentialNotFoundException(
+ f"Debug credentials file not found:
{debug_creds_path}. "
+ "Set AIRFLOW_CLI_DEBUG_MODE=false or log in
with debug mode enabled first."
+ ) from e
+ self.api_token = None
else:
try:
self.api_token = keyring.get_password(
- "airflowctl", f"api_token_{self.api_environment}"
+ "airflowctl",
self.token_key_for_environment(self.api_environment)
)
except ValueError as e:
# Incorrect keyring password
diff --git a/airflow-ctl/src/airflowctl/api/operations.py
b/airflow-ctl/src/airflowctl/api/operations.py
index 2890304743b..3ce196c10cb 100644
--- a/airflow-ctl/src/airflowctl/api/operations.py
+++ b/airflow-ctl/src/airflowctl/api/operations.py
@@ -195,6 +195,9 @@ class BaseOperations:
setattr(cls, attr,
_check_flag_and_exit_if_server_response_error(value))
def execute_list(self, *, path, data_model, offset=0, limit=50,
params=None):
+ if limit <= 0:
+ raise ValueError(f"limit must be a positive integer, got {limit}")
+
shared_params = {"limit": limit, **(params or {})}
def safe_validate(content: bytes) -> BaseModel:
@@ -610,32 +613,33 @@ class DagRunOperations(BaseOperations):
dag_id: str | None = None,
) -> DAGRunCollectionResponse | ServerResponseError:
"""
- List all dag runs.
+ List dag runs (at most `limit` results).
Args:
state: Filter dag runs by state
start_date: Filter dag runs by start date (optional)
end_date: Filter dag runs by end date (optional)
- state: Filter dag runs by state
- limit: Limit the number of results
+ limit: Limit the number of results returned
dag_id: The DAG ID to filter by. If None, retrieves dag runs for
all DAGs (using "~").
"""
# Use "~" for all DAGs if dag_id is not specified
if not dag_id:
dag_id = "~"
- params: dict[str, object] = {
- "state": state,
+ params: dict[str, Any] = {
+ "state": str(state),
"limit": limit,
}
if start_date is not None:
- params["start_date"] = start_date
+ params["start_date"] = start_date.isoformat()
if end_date is not None:
- params["end_date"] = end_date
+ params["end_date"] = end_date.isoformat()
- return super().execute_list(
- path=f"/dags/{dag_id}/dagRuns",
data_model=DAGRunCollectionResponse, params=params
- )
+ try:
+ self.response = self.client.get(f"/dags/{dag_id}/dagRuns",
params=params)
+ return
DAGRunCollectionResponse.model_validate_json(self.response.content)
+ except ServerResponseError as e:
+ raise e
class JobsOperations(BaseOperations):
diff --git a/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py
b/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py
index 236b8d5c6b8..cf521cbe7ee 100644
--- a/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py
+++ b/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py
@@ -144,7 +144,7 @@ def list_envs(args) -> None:
if filename.startswith("debug_creds_") or
filename.endswith("_generated.json"):
continue
- env_name = filename.replace(".json", "")
+ env_name, _ = os.path.splitext(filename)
# Try to read config file
api_url = None
@@ -168,11 +168,11 @@ def list_envs(args) -> None:
if os.path.exists(debug_path):
with open(debug_path) as f:
debug_creds = json.load(f)
- if f"api_token_{env_name}" in debug_creds:
+ if Credentials.token_key_for_environment(env_name) in
debug_creds:
token_status = "authenticated"
else:
# Check keyring
- token = keyring.get_password("airflowctl",
f"api_token_{env_name}")
+ token = keyring.get_password("airflowctl",
Credentials.token_key_for_environment(env_name))
if token:
token_status = "authenticated"
except NoKeyringError:
diff --git a/airflow-ctl/tests/airflow_ctl/api/test_client.py
b/airflow-ctl/tests/airflow_ctl/api/test_client.py
index 0617d62276a..f495b357d8d 100644
--- a/airflow-ctl/tests/airflow_ctl/api/test_client.py
+++ b/airflow-ctl/tests/airflow_ctl/api/test_client.py
@@ -30,7 +30,10 @@ from httpx import URL
from airflowctl.api.client import Client, ClientKind, Credentials,
_bounded_get_new_password
from airflowctl.api.operations import ServerResponseError
-from airflowctl.exceptions import AirflowCtlCredentialNotFoundException,
AirflowCtlKeyringException
+from airflowctl.exceptions import (
+ AirflowCtlCredentialNotFoundException,
+ AirflowCtlKeyringException,
+)
def make_client_w_responses(responses: list[httpx.Response]) -> Client:
@@ -376,3 +379,16 @@ class TestSaveKeyringPatching:
response = client.get("http://error")
assert response.status_code == 200
assert len(responses) == 1
+
+ def test_debug_mode_missing_debug_creds_reports_correct_error(self,
monkeypatch, tmp_path):
+ monkeypatch.setenv("AIRFLOW_HOME", str(tmp_path))
+ monkeypatch.setenv("AIRFLOW_CLI_DEBUG_MODE", "true")
+ monkeypatch.setenv("AIRFLOW_CLI_ENVIRONMENT", "TEST_DEBUG")
+
+ config_path = tmp_path / "TEST_DEBUG.json"
+ config_path.write_text(json.dumps({"api_url":
"http://localhost:8080"}), encoding="utf-8")
+ # Intentionally do not create debug_creds_TEST_DEBUG.json to simulate
a missing file
+
+ creds = Credentials(client_kind=ClientKind.CLI,
api_environment="TEST_DEBUG")
+ with pytest.raises(AirflowCtlCredentialNotFoundException, match="Debug
credentials file not found"):
+ creds.load()
diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py
b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
index 2f2e0b0f547..aa559f17421 100644
--- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py
+++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
@@ -230,6 +230,16 @@ class TestBaseOperations:
for call in mock_client.get.call_args_list:
assert call.kwargs["params"]["limit"] == 2
+ @pytest.mark.parametrize("limit", [0, -1])
+ def test_execute_list_rejects_non_positive_limit(self, limit):
+ mock_client = Mock()
+ base_operation = BaseOperations(client=mock_client)
+
+ with pytest.raises(ValueError, match="limit must be a positive
integer"):
+ base_operation.execute_list(path="hello",
data_model=HelloCollectionResponse, limit=limit)
+
+ mock_client.get.assert_not_called()
+
class TestAssetsOperations:
asset_id: int = 1
@@ -1068,44 +1078,63 @@ class TestDagRunOperations:
)
assert response == self.dag_run_collection_response
- def test_list_all_dags(self):
- """Test listing dag runs for all DAGs using default dag_id='~'."""
-
- def handle_request(request: httpx.Request) -> httpx.Response:
- # When dag_id is "~", it should query all DAGs
- assert request.url.path == "/api/v2/dags/~/dagRuns"
+ @pytest.mark.parametrize(
+ (
+ "dag_id_input",
+ "state",
+ "limit",
+ "start_date",
+ "end_date",
+ "expected_path_suffix",
+ "expected_params_subset",
+ ),
+ [
+ # Test --limit with various values and configurations (covers CLI
--limit flag)
+ ("dag1", "queued", 5, None, None, "dag1", {"state": "queued",
"limit": "5"}),
+ (None, "running", 1, None, None, "~", {"state": "running",
"limit": "1"}),
+ (
+ "example_dag",
+ "success",
+ 10,
+ None,
+ None,
+ "example_dag",
+ {"state": "success", "limit": "10"},
+ ),
+ ("dag2", "failed", 0, None, None, "dag2", {"state": "failed",
"limit": "0"}),
+ ],
+ ids=["limit-5", "all-dags-limit-1", "string-state-limit-10",
"limit-zero"],
+ )
+ def test_list_with_various_limits(
+ self,
+ dag_id_input: str | None,
+ state: str | DagRunState,
+ limit: int,
+ start_date: datetime.datetime | None,
+ end_date: datetime.datetime | None,
+ expected_path_suffix: str,
+ expected_params_subset: dict,
+ ) -> None:
+ """Test listing dag runs with various limit values (especially --limit
flag)."""
+
+ def handle_request(request: httpx.Request) -> httpx.Response:
+ assert
request.url.path.endswith(f"/dags/{expected_path_suffix}/dagRuns")
+ params = dict(request.url.params)
+ for key, value in expected_params_subset.items():
+ assert key in params
+ assert str(params[key]) == str(value)
return httpx.Response(200,
json=json.loads(self.dag_run_collection_response.model_dump_json()))
client = make_api_client(transport=httpx.MockTransport(handle_request))
- # Call without specifying dag_id - should use default "~"
response = client.dag_runs.list(
- start_date=datetime.datetime(2025, 1, 1, 0, 0, 0),
- end_date=datetime.datetime(2025, 1, 1, 0, 0, 0),
- state="running",
- limit=1,
+ state=state,
+ limit=limit,
+ start_date=start_date,
+ end_date=end_date,
+ dag_id=dag_id_input,
)
assert response == self.dag_run_collection_response
- def test_list_with_optional_parameters(self):
- """Test listing dag runs with only some optional parameters."""
-
- def handle_request(request: httpx.Request) -> httpx.Response:
- assert request.url.path == "/api/v2/dags/dag1/dagRuns"
- # Verify that only state and limit are in query params
- params = dict(request.url.params)
- assert "state" in params
- assert params["state"] == "queued"
- assert "limit" in params
- assert params["limit"] == "5"
- # start_date and end_date should not be present
- assert "start_date" not in params
- assert "end_date" not in params
- return httpx.Response(200,
json=json.loads(self.dag_run_collection_response.model_dump_json()))
-
- client = make_api_client(transport=httpx.MockTransport(handle_request))
- response = client.dag_runs.list(state="queued", limit=5, dag_id="dag1")
- assert response == self.dag_run_collection_response
-
class TestJobsOperations:
job_response = JobResponse(
diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py
b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py
index e76fafc28ad..2bda56b0fdc 100644
--- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py
+++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py
@@ -477,3 +477,21 @@ class TestListEnvs:
# Only production environment should be checked, not the special
files
mock_get_password.assert_called_once_with("airflowctl",
"api_token_production")
+
+ def test_list_envs_environment_name_with_json_substring(self, monkeypatch):
+ """Test list-envs keeps '.json' substrings in environment name for key
lookup."""
+ with (
+ tempfile.TemporaryDirectory() as temp_airflow_home,
+ patch("keyring.get_password") as mock_get_password,
+ ):
+ monkeypatch.setenv("AIRFLOW_HOME", temp_airflow_home)
+
+ with open(os.path.join(temp_airflow_home,
"prod.json.region.json"), "w") as f:
+ json.dump({"api_url": "http://localhost:8080"}, f)
+
+ mock_get_password.return_value = "test_token"
+
+ args = self.parser.parse_args(["auth", "list-envs"])
+ auth_command.list_envs(args)
+
+ mock_get_password.assert_called_once_with("airflowctl",
"api_token_prod.json.region")