This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c05dae7107 Add heartbeat metric for DAG processor (#42398)
c05dae7107 is described below
commit c05dae7107cd4b49c8a2bb58c29bfc3ef8ce2c79
Author: Kalyan <[email protected]>
AuthorDate: Wed Oct 2 03:29:54 2024 +0530
Add heartbeat metric for DAG processor (#42398)
---------
Signed-off-by: kalyanr <[email protected]>
---
airflow/jobs/dag_processor_job_runner.py | 16 ++++++++++------
chart/files/statsd-mappings.yml | 6 ++++++
.../logging-monitoring/metrics.rst | 1 +
3 files changed, 17 insertions(+), 6 deletions(-)
diff --git a/airflow/jobs/dag_processor_job_runner.py
b/airflow/jobs/dag_processor_job_runner.py
index 76b2ab5925..28128efba4 100644
--- a/airflow/jobs/dag_processor_job_runner.py
+++ b/airflow/jobs/dag_processor_job_runner.py
@@ -17,18 +17,18 @@
from __future__ import annotations
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING
from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import Job, perform_heartbeat
+from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.session import NEW_SESSION, provide_session
if TYPE_CHECKING:
- from airflow.dag_processing.manager import DagFileProcessorManager
-
+ from sqlalchemy.orm import Session
-def empty_callback(_: Any) -> None:
- pass
+ from airflow.dag_processing.manager import DagFileProcessorManager
class DagProcessorJobRunner(BaseJobRunner, LoggingMixin):
@@ -52,7 +52,7 @@ class DagProcessorJobRunner(BaseJobRunner, LoggingMixin):
self.processor = processor
self.processor.heartbeat = lambda: perform_heartbeat(
job=self.job,
- heartbeat_callback=empty_callback,
+ heartbeat_callback=self.heartbeat_callback,
only_if_necessary=True,
)
@@ -67,3 +67,7 @@ class DagProcessorJobRunner(BaseJobRunner, LoggingMixin):
self.processor.terminate()
self.processor.end()
return None
+
+ @provide_session
+ def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
+ Stats.incr("dag_processor_heartbeat", 1, 1)
diff --git a/chart/files/statsd-mappings.yml b/chart/files/statsd-mappings.yml
index 86d773fd20..cef9593dd1 100644
--- a/chart/files/statsd-mappings.yml
+++ b/chart/files/statsd-mappings.yml
@@ -46,6 +46,12 @@ mappings:
labels:
type: counter
+ - match: airflow.dag_processor_heartbeat
+ match_type: regex
+ name: "airflow_dag_processor_heartbeat"
+ labels:
+ type: counter
+
- match: airflow.dag.*.*.duration
name: "airflow_task_duration"
labels:
diff --git
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
index ac44d1acba..079aa5d397 100644
---
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
+++
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
@@ -159,6 +159,7 @@ Name
Descripti
``previously_succeeded`` Number
of previously succeeded task instances. Metric with dag_id and task_id tagging.
``zombies_killed`` Zombie
tasks killed. Metric with dag_id and task_id tagging.
``scheduler_heartbeat``
Scheduler heartbeats
+``dag_processor_heartbeat``
Standalone DAG processor heartbeats
``dag_processing.processes``
Relative number of currently running DAG parsing processes (ie this delta
is
negative when, since the last metric was sent, processes have completed).
Metric
with file_path and action tagging.