pushpendu1991 commented on code in PR #64900:
URL: https://github.com/apache/airflow/pull/64900#discussion_r3068986307
##########
providers/google/src/airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -1176,3 +1181,280 @@ def execute(self, context: Context):
raise AirflowException(self.response)
return None
+
+class DataflowGetMetricsOperator(GoogleCloudBaseOperator):
+ """
+ Fetches metrics for a Dataflow job and routes the result to one or more
+ destinations based on what the caller provides.
+ Currently it only supports pre-defined bigquery table schema.
+ CREATE TABLE IF NOT EXISTS `my-project.monitoring.dataflow_metrics` (
+ job_id STRING NOT NULL,
+ metric_name STRING,
+ origin STRING,
+ scalar FLOAT64,
+ collected_at TIMESTAMP NOT NULL
+ )
+ PARTITION BY DATE(collected_at);
+ Routing logic
+ ─────────────
+ • pubsub_topic provided → publish full metrics to Pub/Sub
+ • bq_dataset + bq_table provided → stream full metrics rows into
BigQuery
+ • both provided → publish to both (fan-out)
+ • neither provided → log a warning; metrics are NOT
stored
+ • XCom always → a lean summary dict only (never the
+ full payload), keeping the metadata
DB lean
+ Execution modes
+ ───────────────
+ • deferrable=True (default) — defers to DataflowJobMetricsTrigger.
+ • deferrable=False — blocks the worker (small/test jobs only).
+ :param job_id: Dataflow job ID. Jinja-templated.
+ :param project_id: GCP project that owns the Dataflow job. Jinja-templated.
+ :param location: Dataflow job region. Jinja-templated.
+ :param pubsub_topic: Full Pub/Sub topic path:
+ ``projects/<project>/topics/<topic>``.
Jinja-templated.
+ :param bq_dataset: BigQuery dataset ID to write metrics into.
Jinja-templated.
+ :param bq_dataset_location: BigQuery dataset location used by the hook.
Jinja-templated.
+ :param bq_table: BigQuery table ID to write metrics into. Jinja-templated.
+ :param bq_project: BigQuery project (defaults to ``project_id``).
Jinja-templated.
+ :param gcp_conn_id: Airflow GCP connection.
+ :param impersonation_chain: Optional SA to impersonate.
+ :param deferrable: When True, defer to Triggerer.
+ :param poll_sleep: Triggerer poll interval in seconds.
+ """
+
+ template_fields: Sequence[str] = (
+ "job_id",
+ "project_id",
+ "location",
+ "pubsub_topic",
+ "bq_dataset",
+ "bq_dataset_location",
+ "bq_table",
+ "bq_project",
+ )
+ ui_color = "#4285F4"
+
+ def __init__(
+ self,
+ *,
+ job_id: str,
+ project_id: str = PROVIDE_PROJECT_ID,
+ location: str | None = DEFAULT_DATAFLOW_LOCATION,
+ pubsub_topic: str | None = None,
+ bq_dataset: str | None = None,
+ bq_dataset_location: str | None = None,
+ bq_table: str | None = None,
+ bq_project: str | None = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ poll_sleep: int = 10,
+ fail_on_terminal_state: bool = False,
+ **kwargs: Any,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.job_id = job_id
+ self.project_id = project_id
+ self.location = location
+ self.pubsub_topic = pubsub_topic
+ self.bq_dataset = bq_dataset
+ self.bq_dataset_location = bq_dataset_location
+ self.bq_table = bq_table
+ self.bq_project = bq_project or project_id
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+ self.deferrable = deferrable
+ self.poll_sleep = poll_sleep
+ self.fail_on_terminal_state = fail_on_terminal_state
+
+ @staticmethod
+ def _normalise(raw: Any) -> dict[str, Any]:
+ if isinstance(raw, dict):
+ return raw
+ if isinstance(raw, list):
+ return {"metrics": raw}
+ try:
+ return type(raw).to_dict(raw)
+ except AttributeError:
+ import proto
+ return proto.Message.to_dict(raw)
+
+ @cached_property
+ def hook(self) -> DataflowHook:
+ return DataflowHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ def _fetch_metrics(self) -> dict[str, Any]:
+ raw = self.hook.fetch_job_metrics_by_id(
+ job_id=self.job_id,
+ project_id=self.project_id,
+ location=self.location,
+ )
+ return self._normalise(raw)
+
+ def _publish_to_pubsub(self, metrics: dict[str, Any]) -> int:
+ try:
+ _, topic_project, _, topic_id = self.pubsub_topic.split("/")
+ except ValueError:
+ raise AirflowException(
+ f"pubsub_topic must be 'projects/<project>/topics/<topic>',
got: {self.pubsub_topic}"
+ )
+
+ metric_entries = metrics.get("metrics", [])
+ payload = json.dumps(
+ {"job_id": self.job_id, "metrics": metric_entries},
+ default=str,
+ ).encode("utf-8")
+
+ hook = PubSubHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+ hook.publish(
+ project_id=topic_project,
+ topic=topic_id,
+ messages=[{"data": payload}],
+ )
Review Comment:
This is tested in production google cloud environment. Not changing now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]