Copilot commented on code in PR #64900:
URL: https://github.com/apache/airflow/pull/64900#discussion_r3066480603


##########
providers/google/tests/system/google/cloud/dataflow/example_dataflow_get_metrics.py:
##########
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow Get Metrics Operator.
+
+This DAG demonstrates how to use DataflowGetMetricsOperator to:
+1. Collect metrics from a running Dataflow job
+2. Route metrics to Pub/Sub for real-time streaming
+3. Stream metrics into BigQuery for historical analysis
+4. Use both destinations simultaneously (fan-out)
+5. Consume metrics summary from XCom in downstream tasks
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.operators.empty import EmptyOperator
+from airflow.operators.python import PythonOperator
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowGetMetricsOperator
+
+from system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or 
DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+DAG_ID = "dataflow_get_metrics"
+LOCATION = "us-central1"
+DATAFLOW_JOB_NAME = f"dataflow-example-job-{ENV_ID}"
+PUBSUB_TOPIC = f"projects/{PROJECT_ID}/topics/dataflow-metrics-{ENV_ID}"
+BQ_DATASET = "dataflow_metrics"
+BQ_TABLE = "job_metrics"
+BQ_PROJECT = PROJECT_ID
+BQ_DATASET_LOCATION = LOCATION
+
+def print_metrics_summary(**context):
+    """
+    Print the metrics summary pushed to XCom by DataflowGetMetricsOperator.
+    Demonstrates how to consume metrics data in downstream tasks.
+    """
+    task_instance = context["task_instance"]
+    
+    metrics_summary = 
task_instance.xcom_pull(task_ids="collect_metrics_bigquery", 
key="metrics_summary")
+    
+    if metrics_summary:
+        print("=" * 70)
+        print("DATAFLOW JOB METRICS SUMMARY")
+        print("=" * 70)
+        print(f"Job ID:                 {metrics_summary.get('job_id')}")
+        print(f"Metric Count:           {metrics_summary.get('metric_count')}")
+        print(f"BigQuery Destination:   
{metrics_summary.get('bq_destination')}")
+        print(f"BigQuery Rows Written:  
{metrics_summary.get('bq_rows_written')}")
+        print(f"Pub/Sub Topic:          {metrics_summary.get('pubsub_topic')}")
+        print(f"Pub/Sub Published:      
{metrics_summary.get('pubsub_metrics_published')}")
+        print("=" * 70)
+    else:
+        print("No metrics summary found in XCom")
+
+with DAG(
+    DAG_ID,
+    default_args=default_args,
+    schedule="@once",
+    start_date=datetime(2024, 1, 1),
+    catchup=False,
+    tags=["example", "dataflow", "metrics"],
+) as dag:

Review Comment:
   `default_args` is referenced but not defined anywhere in this file, which 
will raise a `NameError` when the DAG is parsed. Define `default_args` 
(consistent with other system tests) or remove the `default_args=...` argument 
if it’s not needed.



##########
providers/google/tests/system/google/cloud/dataflow/example_dataflow_get_metrics.py:
##########
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow Get Metrics Operator.
+
+This DAG demonstrates how to use DataflowGetMetricsOperator to:
+1. Collect metrics from a running Dataflow job
+2. Route metrics to Pub/Sub for real-time streaming
+3. Stream metrics into BigQuery for historical analysis
+4. Use both destinations simultaneously (fan-out)
+5. Consume metrics summary from XCom in downstream tasks
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.operators.empty import EmptyOperator
+from airflow.operators.python import PythonOperator
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowGetMetricsOperator
+
+from system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or 
DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+DAG_ID = "dataflow_get_metrics"
+LOCATION = "us-central1"
+DATAFLOW_JOB_NAME = f"dataflow-example-job-{ENV_ID}"
+PUBSUB_TOPIC = f"projects/{PROJECT_ID}/topics/dataflow-metrics-{ENV_ID}"
+BQ_DATASET = "dataflow_metrics"
+BQ_TABLE = "job_metrics"
+BQ_PROJECT = PROJECT_ID
+BQ_DATASET_LOCATION = LOCATION
+
+def print_metrics_summary(**context):
+    """
+    Print the metrics summary pushed to XCom by DataflowGetMetricsOperator.
+    Demonstrates how to consume metrics data in downstream tasks.
+    """
+    task_instance = context["task_instance"]
+    
+    metrics_summary = 
task_instance.xcom_pull(task_ids="collect_metrics_bigquery", 
key="metrics_summary")
+    
+    if metrics_summary:
+        print("=" * 70)
+        print("DATAFLOW JOB METRICS SUMMARY")
+        print("=" * 70)
+        print(f"Job ID:                 {metrics_summary.get('job_id')}")
+        print(f"Metric Count:           {metrics_summary.get('metric_count')}")
+        print(f"BigQuery Destination:   
{metrics_summary.get('bq_destination')}")
+        print(f"BigQuery Rows Written:  
{metrics_summary.get('bq_rows_written')}")
+        print(f"Pub/Sub Topic:          {metrics_summary.get('pubsub_topic')}")
+        print(f"Pub/Sub Published:      
{metrics_summary.get('pubsub_metrics_published')}")
+        print("=" * 70)
+    else:
+        print("No metrics summary found in XCom")
+
+with DAG(
+    DAG_ID,
+    default_args=default_args,
+    schedule="@once",
+    start_date=datetime(2024, 1, 1),
+    catchup=False,
+    tags=["example", "dataflow", "metrics"],
+) as dag:
+
+    start_task = EmptyOperator(task_id="start_task")
+
+    # [START howto_operator_dataflow_get_metrics_bigquery]
+    collect_metrics_bigquery = DataflowGetMetricsOperator(
+        task_id="collect_metrics_bigquery",
+        job_id="{{ dag_run.conf['dataflow_job_id'] or 'test-job-id' }}",

Review Comment:
   Using `dag_run.conf['dataflow_job_id']` will raise a Jinja error when the 
key is absent. Use a safe access pattern (e.g., 
`dag_run.conf.get('dataflow_job_id', 'test-job-id')`) so the example DAG can 
run without requiring conf.



##########
providers/google/src/airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -1176,3 +1181,280 @@ def execute(self, context: Context):
             raise AirflowException(self.response)
 
         return None
+
+class DataflowGetMetricsOperator(GoogleCloudBaseOperator):
+    """
+    Fetches metrics for a Dataflow job and routes the result to one or more
+    destinations based on what the caller provides.
+    Currently it only supports pre-defined bigquery table schema.
+    CREATE TABLE IF NOT EXISTS `my-project.monitoring.dataflow_metrics` (
+    job_id        STRING    NOT NULL,
+    metric_name   STRING,
+    origin        STRING,
+    scalar        FLOAT64,
+    collected_at  TIMESTAMP NOT NULL
+    )
+    PARTITION BY DATE(collected_at);
+    Routing logic
+    ─────────────
+    • pubsub_topic provided              → publish full metrics to Pub/Sub
+    • bq_dataset + bq_table provided     → stream full metrics rows into 
BigQuery
+    • both provided                      → publish to both (fan-out)
+    • neither provided                   → log a warning; metrics are NOT 
stored
+    • XCom always                        → a lean summary dict only (never the
+                                           full payload), keeping the metadata 
DB lean
+    Execution modes
+    ───────────────
+    • deferrable=True  (default) — defers to DataflowJobMetricsTrigger.
+    • deferrable=False           — blocks the worker (small/test jobs only).

Review Comment:
   The docstring claims `deferrable=True` is the default, but the constructor 
default is configuration-driven with `fallback=False`. Update the docstring to 
reflect the actual default behavior (config-based, defaulting to non-deferrable 
if unset).
   ```suggestion
       • deferrable=True            — defers to DataflowJobMetricsTrigger.
       • deferrable=False           — blocks the worker (small/test jobs only).
       • default behavior           — configuration-driven; if unset, defaults 
to
                                      non-deferrable (``False``).
   ```



##########
providers/google/src/airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -1176,3 +1181,280 @@ def execute(self, context: Context):
             raise AirflowException(self.response)
 
         return None
+
+class DataflowGetMetricsOperator(GoogleCloudBaseOperator):
+    """
+    Fetches metrics for a Dataflow job and routes the result to one or more
+    destinations based on what the caller provides.
+    Currently it only supports pre-defined bigquery table schema.
+    CREATE TABLE IF NOT EXISTS `my-project.monitoring.dataflow_metrics` (
+    job_id        STRING    NOT NULL,
+    metric_name   STRING,
+    origin        STRING,
+    scalar        FLOAT64,
+    collected_at  TIMESTAMP NOT NULL
+    )
+    PARTITION BY DATE(collected_at);
+    Routing logic
+    ─────────────
+    • pubsub_topic provided              → publish full metrics to Pub/Sub
+    • bq_dataset + bq_table provided     → stream full metrics rows into 
BigQuery
+    • both provided                      → publish to both (fan-out)
+    • neither provided                   → log a warning; metrics are NOT 
stored
+    • XCom always                        → a lean summary dict only (never the
+                                           full payload), keeping the metadata 
DB lean
+    Execution modes
+    ───────────────
+    • deferrable=True  (default) — defers to DataflowJobMetricsTrigger.
+    • deferrable=False           — blocks the worker (small/test jobs only).
+    :param job_id: Dataflow job ID. Jinja-templated.
+    :param project_id: GCP project that owns the Dataflow job. Jinja-templated.
+    :param location: Dataflow job region. Jinja-templated.
+    :param pubsub_topic: Full Pub/Sub topic path:
+                         ``projects/<project>/topics/<topic>``. 
Jinja-templated.
+    :param bq_dataset: BigQuery dataset ID to write metrics into. 
Jinja-templated.
+    :param bq_dataset_location: BigQuery dataset location used by the hook. 
Jinja-templated.
+    :param bq_table: BigQuery table ID to write metrics into. Jinja-templated.
+    :param bq_project: BigQuery project (defaults to ``project_id``). 
Jinja-templated.
+    :param gcp_conn_id: Airflow GCP connection.
+    :param impersonation_chain: Optional SA to impersonate.
+    :param deferrable: When True, defer to Triggerer.
+    :param poll_sleep: Triggerer poll interval in seconds.
+    """
+
+    template_fields: Sequence[str] = (
+        "job_id",
+        "project_id",
+        "location",
+        "pubsub_topic",
+        "bq_dataset",
+        "bq_dataset_location",
+        "bq_table",
+        "bq_project",
+    )
+    ui_color = "#4285F4"
+
+    def __init__(
+        self,
+        *,
+        job_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        location: str | None = DEFAULT_DATAFLOW_LOCATION,
+        pubsub_topic: str | None = None,
+        bq_dataset: str | None = None,
+        bq_dataset_location: str | None = None,
+        bq_table: str | None = None,
+        bq_project: str | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        poll_sleep: int = 10,
+        fail_on_terminal_state: bool = False,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_id = job_id
+        self.project_id = project_id
+        self.location = location
+        self.pubsub_topic = pubsub_topic
+        self.bq_dataset = bq_dataset
+        self.bq_dataset_location = bq_dataset_location
+        self.bq_table = bq_table
+        self.bq_project = bq_project or project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.deferrable = deferrable
+        self.poll_sleep = poll_sleep
+        self.fail_on_terminal_state = fail_on_terminal_state
+
+    @staticmethod
+    def _normalise(raw: Any) -> dict[str, Any]:
+        if isinstance(raw, dict):
+            return raw
+        if isinstance(raw, list):
+            return {"metrics": raw}
+        try:
+            return type(raw).to_dict(raw)
+        except AttributeError:
+            import proto
+            return proto.Message.to_dict(raw)
+
+    @cached_property
+    def hook(self) -> DataflowHook:
+        return DataflowHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    def _fetch_metrics(self) -> dict[str, Any]:
+        raw = self.hook.fetch_job_metrics_by_id(
+            job_id=self.job_id,
+            project_id=self.project_id,
+            location=self.location,
+        )
+        return self._normalise(raw)
+
+    def _publish_to_pubsub(self, metrics: dict[str, Any]) -> int:
+        try:
+            _, topic_project, _, topic_id = self.pubsub_topic.split("/")
+        except ValueError:
+            raise AirflowException(
+                f"pubsub_topic must be 'projects/<project>/topics/<topic>', 
got: {self.pubsub_topic}"
+            )
+
+        metric_entries = metrics.get("metrics", [])
+        payload = json.dumps(
+            {"job_id": self.job_id, "metrics": metric_entries},
+            default=str,
+        ).encode("utf-8")
+
+        hook = PubSubHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        hook.publish(
+            project_id=topic_project,
+            topic=topic_id,
+            messages=[{"data": payload}],
+        )
+        published_count = len(metric_entries)
+        self.log.info(
+            "Published %d metric entries for job %s → topic %s",
+            published_count, 
+            self.job_id, 
+            self.pubsub_topic,
+        )
+        return published_count
+
+    def _publish_to_bq(self, metrics: dict[str, Any]) -> int:
+        collected_at = datetime.now(timezone.utc).isoformat()
+        rows = []
+        for entry in metrics.get("metrics", []):
+            name_obj = entry.get("name", {})
+
+            scalar_obj = entry.get("scalar")
+            if scalar_obj is None:
+                scalar = None
+            elif isinstance(scalar_obj, (int, float)):
+                scalar = float(scalar_obj)
+            elif isinstance(scalar_obj, dict):
+                raw = scalar_obj.get("integer_value") or 
scalar_obj.get("floatValue")

Review Comment:
   Using `or` here will incorrectly drop valid zero values (e.g., `0` or `0.0`) 
because they are falsy, causing fallback to the next key and potentially 
resulting in `None`. Use explicit `is not None` checks (and consider consistent 
key naming) so zeros are preserved.
   ```suggestion
                   raw = scalar_obj.get("integer_value")
                   if raw is None:
                       raw = scalar_obj.get("floatValue")
   ```



##########
providers/google/tests/unit/google/cloud/operators/test_dataflow.py:
##########
@@ -794,3 +818,310 @@ def test_invalid_response(self, sdk_connection_not_found):
             
DataflowDeletePipelineOperator(**init_kwargs).execute(mock.MagicMock()).return_value
 = {
                 "error": {"message": "example error"}
             }
+
[email protected]
+def sync_operator():
+    """Create a synchronous DataflowGetMetricsOperator instance (no 
destination)."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+    )
+
+
[email protected]
+def deferrable_operator():
+    """Create a deferrable DataflowGetMetricsOperator instance."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=True,
+    )
+
+
[email protected]
+def pubsub_operator():
+    """Create an operator with PubSub topic destination."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+        pubsub_topic=PUBSUB_TOPIC,
+    )
+
+
[email protected]
+def bq_operator():
+    """Create an operator with BigQuery destination."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+        bq_dataset=BQ_DATASET,
+        bq_table=BQ_TABLE,
+        bq_dataset_location=BQ_DATASET_LOCATION,
+        bq_project=BQ_PROJECT,
+    )
+
+
[email protected]
+def both_destinations_operator():
+    """Create an operator with both PubSub and BigQuery destinations."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+        pubsub_topic=PUBSUB_TOPIC,
+        bq_dataset=BQ_DATASET,
+        bq_table=BQ_TABLE,
+        bq_dataset_location=BQ_DATASET_LOCATION,
+        bq_project=BQ_PROJECT,
+    )
+
+
+class TestDataflowGetMetricsOperatorInit:
+    """Test operator initialization with default attributes."""
+
+    def test_default_attributes(self, sync_operator):
+        """Test that all default attributes are set correctly."""
+        assert sync_operator.job_id == JOB_ID
+        assert sync_operator.project_id == PROJECT_ID
+        assert sync_operator.location == LOCATION
+        assert sync_operator.gcp_conn_id == GCP_CONN_ID
+        assert sync_operator.deferrable is False
+        assert sync_operator.fail_on_terminal_state is False
+        assert sync_operator.poll_sleep == 10
+        assert sync_operator.pubsub_topic is None
+        assert sync_operator.bq_dataset is None
+        assert sync_operator.bq_table is None
+        assert sync_operator.bq_dataset_location is None
+
+    def test_template_fields(self):
+        """Test that template_fields are correctly defined."""
+        expected = (
+            "job_id", 
+            "project_id", 
+            "location", 
+            "pubsub_topic", 
+            "bq_dataset", 
+            "bq_table", 
+            "bq_dataset_location", 
+            "bq_project"
+        )
+        assert DataflowGetMetricsOperator.template_fields == expected
+
+
+class TestDataflowGetMetricsOperatorLocationValidation:
+    """Test location validation during execution."""
+
+    def test_execute_raises_when_location_is_none(self):
+        """Test that execute raises AirflowException when location is None."""
+        op = DataflowGetMetricsOperator(
+            task_id=TASK_ID,
+            job_id=JOB_ID,
+            project_id=PROJECT_ID,
+            location=None,
+            deferrable=False,
+        )
+        with pytest.raises(AirflowException, match="requires 'location' to be 
set"):
+            op.execute(mock.MagicMock())
+
+    def test_execute_raises_when_location_is_empty_string(self):
+        """Test that execute raises AirflowException when location is empty 
string."""
+        op = DataflowGetMetricsOperator(
+            task_id=TASK_ID,
+            job_id=JOB_ID,
+            project_id=PROJECT_ID,
+            location="",
+            deferrable=False,
+        )
+        with pytest.raises(AirflowException, match="requires 'location' to be 
set"):
+            op.execute(mock.MagicMock())
+
+
+class TestDataflowGetMetricsOperatorExecuteSync:
+    """Test synchronous execution with 5 scenarios."""

Review Comment:
   The docstring says '5 scenarios' but the class currently defines 4 sync 
tests below. Update the docstring to match the actual number of scenarios 
covered.
   ```suggestion
       """Test synchronous execution with 4 scenarios."""
   ```



##########
providers/google/src/airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -1176,3 +1181,280 @@ def execute(self, context: Context):
             raise AirflowException(self.response)
 
         return None
+
+class DataflowGetMetricsOperator(GoogleCloudBaseOperator):
+    """
+    Fetches metrics for a Dataflow job and routes the result to one or more
+    destinations based on what the caller provides.
+    Currently it only supports pre-defined bigquery table schema.
+    CREATE TABLE IF NOT EXISTS `my-project.monitoring.dataflow_metrics` (
+    job_id        STRING    NOT NULL,
+    metric_name   STRING,
+    origin        STRING,
+    scalar        FLOAT64,
+    collected_at  TIMESTAMP NOT NULL
+    )
+    PARTITION BY DATE(collected_at);
+    Routing logic
+    ─────────────
+    • pubsub_topic provided              → publish full metrics to Pub/Sub
+    • bq_dataset + bq_table provided     → stream full metrics rows into 
BigQuery
+    • both provided                      → publish to both (fan-out)
+    • neither provided                   → log a warning; metrics are NOT 
stored
+    • XCom always                        → a lean summary dict only (never the
+                                           full payload), keeping the metadata 
DB lean
+    Execution modes
+    ───────────────
+    • deferrable=True  (default) — defers to DataflowJobMetricsTrigger.
+    • deferrable=False           — blocks the worker (small/test jobs only).
+    :param job_id: Dataflow job ID. Jinja-templated.
+    :param project_id: GCP project that owns the Dataflow job. Jinja-templated.
+    :param location: Dataflow job region. Jinja-templated.
+    :param pubsub_topic: Full Pub/Sub topic path:
+                         ``projects/<project>/topics/<topic>``. 
Jinja-templated.
+    :param bq_dataset: BigQuery dataset ID to write metrics into. 
Jinja-templated.
+    :param bq_dataset_location: BigQuery dataset location used by the hook. 
Jinja-templated.
+    :param bq_table: BigQuery table ID to write metrics into. Jinja-templated.
+    :param bq_project: BigQuery project (defaults to ``project_id``). 
Jinja-templated.
+    :param gcp_conn_id: Airflow GCP connection.
+    :param impersonation_chain: Optional SA to impersonate.
+    :param deferrable: When True, defer to Triggerer.
+    :param poll_sleep: Triggerer poll interval in seconds.
+    """
+
+    template_fields: Sequence[str] = (
+        "job_id",
+        "project_id",
+        "location",
+        "pubsub_topic",
+        "bq_dataset",
+        "bq_dataset_location",
+        "bq_table",
+        "bq_project",
+    )
+    ui_color = "#4285F4"
+
+    def __init__(
+        self,
+        *,
+        job_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        location: str | None = DEFAULT_DATAFLOW_LOCATION,
+        pubsub_topic: str | None = None,
+        bq_dataset: str | None = None,
+        bq_dataset_location: str | None = None,
+        bq_table: str | None = None,
+        bq_project: str | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        poll_sleep: int = 10,
+        fail_on_terminal_state: bool = False,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_id = job_id
+        self.project_id = project_id
+        self.location = location
+        self.pubsub_topic = pubsub_topic
+        self.bq_dataset = bq_dataset
+        self.bq_dataset_location = bq_dataset_location
+        self.bq_table = bq_table
+        self.bq_project = bq_project or project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.deferrable = deferrable
+        self.poll_sleep = poll_sleep
+        self.fail_on_terminal_state = fail_on_terminal_state
+
+    @staticmethod
+    def _normalise(raw: Any) -> dict[str, Any]:
+        if isinstance(raw, dict):
+            return raw
+        if isinstance(raw, list):
+            return {"metrics": raw}
+        try:
+            return type(raw).to_dict(raw)
+        except AttributeError:
+            import proto
+            return proto.Message.to_dict(raw)
+
+    @cached_property
+    def hook(self) -> DataflowHook:
+        return DataflowHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    def _fetch_metrics(self) -> dict[str, Any]:
+        raw = self.hook.fetch_job_metrics_by_id(
+            job_id=self.job_id,
+            project_id=self.project_id,
+            location=self.location,
+        )
+        return self._normalise(raw)
+
+    def _publish_to_pubsub(self, metrics: dict[str, Any]) -> int:
+        try:
+            _, topic_project, _, topic_id = self.pubsub_topic.split("/")
+        except ValueError:
+            raise AirflowException(
+                f"pubsub_topic must be 'projects/<project>/topics/<topic>', 
got: {self.pubsub_topic}"
+            )
+
+        metric_entries = metrics.get("metrics", [])
+        payload = json.dumps(
+            {"job_id": self.job_id, "metrics": metric_entries},
+            default=str,
+        ).encode("utf-8")
+
+        hook = PubSubHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        hook.publish(
+            project_id=topic_project,
+            topic=topic_id,
+            messages=[{"data": payload}],
+        )

Review Comment:
   Two concrete issues here: (1) `split('/')` only checks segment count and 
would accept invalid paths like `foo/<p>/bar/<t>` if there are 4 
segments—validate that segments 0 and 2 are exactly `projects` and `topics`. 
(2) Publishing all metric entries as one Pub/Sub message can exceed Pub/Sub 
message size limits for larger jobs; consider chunking the payload (e.g., 
multiple messages) or enforcing a size check with a clear exception.



##########
providers/google/src/airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -1176,3 +1181,280 @@ def execute(self, context: Context):
             raise AirflowException(self.response)
 
         return None
+
+class DataflowGetMetricsOperator(GoogleCloudBaseOperator):
+    """
+    Fetches metrics for a Dataflow job and routes the result to one or more
+    destinations based on what the caller provides.
+    Currently it only supports pre-defined bigquery table schema.
+    CREATE TABLE IF NOT EXISTS `my-project.monitoring.dataflow_metrics` (
+    job_id        STRING    NOT NULL,
+    metric_name   STRING,
+    origin        STRING,
+    scalar        FLOAT64,
+    collected_at  TIMESTAMP NOT NULL
+    )
+    PARTITION BY DATE(collected_at);
+    Routing logic
+    ─────────────
+    • pubsub_topic provided              → publish full metrics to Pub/Sub
+    • bq_dataset + bq_table provided     → stream full metrics rows into 
BigQuery
+    • both provided                      → publish to both (fan-out)
+    • neither provided                   → log a warning; metrics are NOT 
stored
+    • XCom always                        → a lean summary dict only (never the
+                                           full payload), keeping the metadata 
DB lean
+    Execution modes
+    ───────────────
+    • deferrable=True  (default) — defers to DataflowJobMetricsTrigger.
+    • deferrable=False           — blocks the worker (small/test jobs only).
+    :param job_id: Dataflow job ID. Jinja-templated.
+    :param project_id: GCP project that owns the Dataflow job. Jinja-templated.
+    :param location: Dataflow job region. Jinja-templated.
+    :param pubsub_topic: Full Pub/Sub topic path:
+                         ``projects/<project>/topics/<topic>``. 
Jinja-templated.
+    :param bq_dataset: BigQuery dataset ID to write metrics into. 
Jinja-templated.
+    :param bq_dataset_location: BigQuery dataset location used by the hook. 
Jinja-templated.
+    :param bq_table: BigQuery table ID to write metrics into. Jinja-templated.
+    :param bq_project: BigQuery project (defaults to ``project_id``). 
Jinja-templated.
+    :param gcp_conn_id: Airflow GCP connection.
+    :param impersonation_chain: Optional SA to impersonate.
+    :param deferrable: When True, defer to Triggerer.
+    :param poll_sleep: Triggerer poll interval in seconds.
+    """
+
+    template_fields: Sequence[str] = (
+        "job_id",
+        "project_id",
+        "location",
+        "pubsub_topic",
+        "bq_dataset",
+        "bq_dataset_location",
+        "bq_table",
+        "bq_project",
+    )
+    ui_color = "#4285F4"
+
+    def __init__(
+        self,
+        *,
+        job_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        location: str | None = DEFAULT_DATAFLOW_LOCATION,
+        pubsub_topic: str | None = None,
+        bq_dataset: str | None = None,
+        bq_dataset_location: str | None = None,
+        bq_table: str | None = None,
+        bq_project: str | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),

Review Comment:
   The docstring claims `deferrable=True` is the default, but the constructor 
default is configuration-driven with `fallback=False`. Update the docstring to 
reflect the actual default behavior (config-based, defaulting to non-deferrable 
if unset).



##########
providers/google/tests/system/google/cloud/dataflow/example_dataflow_get_metrics.py:
##########
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow Get Metrics Operator.
+
+This DAG demonstrates how to use DataflowGetMetricsOperator to:
+1. Collect metrics from a running Dataflow job
+2. Route metrics to Pub/Sub for real-time streaming
+3. Stream metrics into BigQuery for historical analysis
+4. Use both destinations simultaneously (fan-out)
+5. Consume metrics summary from XCom in downstream tasks
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.operators.empty import EmptyOperator
+from airflow.operators.python import PythonOperator
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowGetMetricsOperator
+
+from system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or 
DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+DAG_ID = "dataflow_get_metrics"
+LOCATION = "us-central1"
+DATAFLOW_JOB_NAME = f"dataflow-example-job-{ENV_ID}"
+PUBSUB_TOPIC = f"projects/{PROJECT_ID}/topics/dataflow-metrics-{ENV_ID}"
+BQ_DATASET = "dataflow_metrics"
+BQ_TABLE = "job_metrics"
+BQ_PROJECT = PROJECT_ID
+BQ_DATASET_LOCATION = LOCATION
+
+def print_metrics_summary(**context):
+    """
+    Print the metrics summary pushed to XCom by DataflowGetMetricsOperator.
+    Demonstrates how to consume metrics data in downstream tasks.
+    """
+    task_instance = context["task_instance"]
+    
+    metrics_summary = 
task_instance.xcom_pull(task_ids="collect_metrics_bigquery", 
key="metrics_summary")
+    
+    if metrics_summary:
+        print("=" * 70)
+        print("DATAFLOW JOB METRICS SUMMARY")
+        print("=" * 70)
+        print(f"Job ID:                 {metrics_summary.get('job_id')}")
+        print(f"Metric Count:           {metrics_summary.get('metric_count')}")
+        print(f"BigQuery Destination:   
{metrics_summary.get('bq_destination')}")
+        print(f"BigQuery Rows Written:  
{metrics_summary.get('bq_rows_written')}")
+        print(f"Pub/Sub Topic:          {metrics_summary.get('pubsub_topic')}")
+        print(f"Pub/Sub Published:      
{metrics_summary.get('pubsub_metrics_published')}")
+        print("=" * 70)
+    else:
+        print("No metrics summary found in XCom")
+
+with DAG(
+    DAG_ID,
+    default_args=default_args,
+    schedule="@once",
+    start_date=datetime(2024, 1, 1),
+    catchup=False,
+    tags=["example", "dataflow", "metrics"],
+) as dag:
+
+    start_task = EmptyOperator(task_id="start_task")
+
+    # [START howto_operator_dataflow_get_metrics_bigquery]
+    collect_metrics_bigquery = DataflowGetMetricsOperator(
+        task_id="collect_metrics_bigquery",
+        job_id="{{ dag_run.conf['dataflow_job_id'] or 'test-job-id' }}",
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        bq_dataset=BQ_DATASET,
+        bq_table=BQ_TABLE,
+        bq_dataset_location=BQ_DATASET_LOCATION,
+        bq_project=BQ_PROJECT,
+        deferrable=False,
+        gcp_conn_id="google_cloud_default",
+    )
+    # [END howto_operator_dataflow_get_metrics_bigquery]
+
+    # [START howto_operator_dataflow_get_metrics_pubsub_deferrable]
+    collect_metrics_pubsub = DataflowGetMetricsOperator(
+        task_id="collect_metrics_pubsub",
+        job_id="{{ dag_run.conf['dataflow_job_id'] or 'test-job-id' }}",
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        pubsub_topic=PUBSUB_TOPIC,
+        deferrable=True,
+        poll_sleep=10,
+        gcp_conn_id="google_cloud_default",
+    )
+    # [END howto_operator_dataflow_get_metrics_pubsub_deferrable]
+
+    # [START howto_operator_dataflow_get_metrics_multi_destination]
+    collect_metrics_multi_destination = DataflowGetMetricsOperator(
+        task_id="collect_metrics_fanout",
+        job_id="{{ dag_run.conf['dataflow_job_id'] or 'test-job-id' }}",
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        pubsub_topic=PUBSUB_TOPIC,
+        bq_dataset=BQ_DATASET,
+        bq_table=BQ_TABLE,
+        bq_dataset_location=BQ_DATASET_LOCATION,
+        bq_project=BQ_PROJECT,
+        deferrable=True,
+        poll_sleep=10,
+        gcp_conn_id="google_cloud_default",
+    )
+    # [END howto_operator_dataflow_get_metrics_multi_destination]
+
+    # [START howto_operator_dataflow_get_metrics_xcom_processing]
+    process_metrics_summary = PythonOperator(
+        task_id="process_metrics_summary",
+        python_callable=print_metrics_summary,
+        provide_context=True,
+    )
+    # [END howto_operator_dataflow_get_metrics_xcom_processing]
+
+    end_task = EmptyOperator(task_id="end_task")
+
+    start_task [
+        collect_metrics_bigquery,
+        collect_metrics_pubsub,
+        collect_metrics_multi_destination,
+    ]

Review Comment:
   This is invalid Python syntax for setting dependencies (missing `>>` / 
`<<`). Replace with a valid dependency expression, e.g., `start_task >> [ ... 
]`.



##########
providers/google/tests/system/google/cloud/dataflow/example_dataflow_get_metrics.py:
##########
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow Get Metrics Operator.
+
+This DAG demonstrates how to use DataflowGetMetricsOperator to:
+1. Collect metrics from a running Dataflow job
+2. Route metrics to Pub/Sub for real-time streaming
+3. Stream metrics into BigQuery for historical analysis
+4. Use both destinations simultaneously (fan-out)
+5. Consume metrics summary from XCom in downstream tasks
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.operators.empty import EmptyOperator
+from airflow.operators.python import PythonOperator
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowGetMetricsOperator
+
+from system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or 
DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+DAG_ID = "dataflow_get_metrics"
+LOCATION = "us-central1"
+DATAFLOW_JOB_NAME = f"dataflow-example-job-{ENV_ID}"
+PUBSUB_TOPIC = f"projects/{PROJECT_ID}/topics/dataflow-metrics-{ENV_ID}"
+BQ_DATASET = "dataflow_metrics"
+BQ_TABLE = "job_metrics"
+BQ_PROJECT = PROJECT_ID
+BQ_DATASET_LOCATION = LOCATION
+
+def print_metrics_summary(**context):
+    """
+    Print the metrics summary pushed to XCom by DataflowGetMetricsOperator.
+    Demonstrates how to consume metrics data in downstream tasks.
+    """
+    task_instance = context["task_instance"]
+    
+    metrics_summary = 
task_instance.xcom_pull(task_ids="collect_metrics_bigquery", 
key="metrics_summary")
+    
+    if metrics_summary:
+        print("=" * 70)
+        print("DATAFLOW JOB METRICS SUMMARY")
+        print("=" * 70)
+        print(f"Job ID:                 {metrics_summary.get('job_id')}")
+        print(f"Metric Count:           {metrics_summary.get('metric_count')}")
+        print(f"BigQuery Destination:   
{metrics_summary.get('bq_destination')}")
+        print(f"BigQuery Rows Written:  
{metrics_summary.get('bq_rows_written')}")
+        print(f"Pub/Sub Topic:          {metrics_summary.get('pubsub_topic')}")
+        print(f"Pub/Sub Published:      
{metrics_summary.get('pubsub_metrics_published')}")
+        print("=" * 70)
+    else:
+        print("No metrics summary found in XCom")
+
+with DAG(
+    DAG_ID,
+    default_args=default_args,
+    schedule="@once",
+    start_date=datetime(2024, 1, 1),
+    catchup=False,
+    tags=["example", "dataflow", "metrics"],
+) as dag:
+
+    start_task = EmptyOperator(task_id="start_task")
+
+    # [START howto_operator_dataflow_get_metrics_bigquery]
+    collect_metrics_bigquery = DataflowGetMetricsOperator(
+        task_id="collect_metrics_bigquery",
+        job_id="{{ dag_run.conf['dataflow_job_id'] or 'test-job-id' }}",
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        bq_dataset=BQ_DATASET,
+        bq_table=BQ_TABLE,
+        bq_dataset_location=BQ_DATASET_LOCATION,
+        bq_project=BQ_PROJECT,
+        deferrable=False,
+        gcp_conn_id="google_cloud_default",
+    )
+    # [END howto_operator_dataflow_get_metrics_bigquery]
+
+    # [START howto_operator_dataflow_get_metrics_pubsub_deferrable]
+    collect_metrics_pubsub = DataflowGetMetricsOperator(
+        task_id="collect_metrics_pubsub",
+        job_id="{{ dag_run.conf['dataflow_job_id'] or 'test-job-id' }}",
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        pubsub_topic=PUBSUB_TOPIC,
+        deferrable=True,
+        poll_sleep=10,
+        gcp_conn_id="google_cloud_default",
+    )
+    # [END howto_operator_dataflow_get_metrics_pubsub_deferrable]
+
+    # [START howto_operator_dataflow_get_metrics_multi_destination]
+    collect_metrics_multi_destination = DataflowGetMetricsOperator(
+        task_id="collect_metrics_fanout",
+        job_id="{{ dag_run.conf['dataflow_job_id'] or 'test-job-id' }}",
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        pubsub_topic=PUBSUB_TOPIC,
+        bq_dataset=BQ_DATASET,
+        bq_table=BQ_TABLE,
+        bq_dataset_location=BQ_DATASET_LOCATION,
+        bq_project=BQ_PROJECT,
+        deferrable=True,
+        poll_sleep=10,
+        gcp_conn_id="google_cloud_default",
+    )
+    # [END howto_operator_dataflow_get_metrics_multi_destination]
+
+    # [START howto_operator_dataflow_get_metrics_xcom_processing]
+    process_metrics_summary = PythonOperator(
+        task_id="process_metrics_summary",
+        python_callable=print_metrics_summary,
+        provide_context=True,

Review Comment:
   `provide_context` is not a valid argument for `PythonOperator` in Airflow 
2.x and will raise `TypeError` during DAG parse. Remove `provide_context` (your 
callable already accepts `**context`, which is sufficient).
   ```suggestion
   
   ```



##########
providers/google/tests/unit/google/cloud/operators/test_dataflow.py:
##########
@@ -794,3 +818,310 @@ def test_invalid_response(self, sdk_connection_not_found):
             
DataflowDeletePipelineOperator(**init_kwargs).execute(mock.MagicMock()).return_value
 = {
                 "error": {"message": "example error"}
             }
+
[email protected]
+def sync_operator():
+    """Create a synchronous DataflowGetMetricsOperator instance (no 
destination)."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+    )
+
+
[email protected]
+def deferrable_operator():
+    """Create a deferrable DataflowGetMetricsOperator instance."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=True,
+    )
+
+
[email protected]
+def pubsub_operator():
+    """Create an operator with PubSub topic destination."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+        pubsub_topic=PUBSUB_TOPIC,
+    )
+
+
[email protected]
+def bq_operator():
+    """Create an operator with BigQuery destination."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+        bq_dataset=BQ_DATASET,
+        bq_table=BQ_TABLE,
+        bq_dataset_location=BQ_DATASET_LOCATION,
+        bq_project=BQ_PROJECT,
+    )
+
+
[email protected]
+def both_destinations_operator():
+    """Create an operator with both PubSub and BigQuery destinations."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+        pubsub_topic=PUBSUB_TOPIC,
+        bq_dataset=BQ_DATASET,
+        bq_table=BQ_TABLE,
+        bq_dataset_location=BQ_DATASET_LOCATION,
+        bq_project=BQ_PROJECT,
+    )
+
+
+class TestDataflowGetMetricsOperatorInit:
+    """Test operator initialization with default attributes."""
+
+    def test_default_attributes(self, sync_operator):
+        """Test that all default attributes are set correctly."""
+        assert sync_operator.job_id == JOB_ID
+        assert sync_operator.project_id == PROJECT_ID
+        assert sync_operator.location == LOCATION
+        assert sync_operator.gcp_conn_id == GCP_CONN_ID
+        assert sync_operator.deferrable is False
+        assert sync_operator.fail_on_terminal_state is False
+        assert sync_operator.poll_sleep == 10
+        assert sync_operator.pubsub_topic is None
+        assert sync_operator.bq_dataset is None
+        assert sync_operator.bq_table is None
+        assert sync_operator.bq_dataset_location is None
+
+    def test_template_fields(self):
+        """Test that template_fields are correctly defined."""
+        expected = (
+            "job_id", 
+            "project_id", 
+            "location", 
+            "pubsub_topic", 
+            "bq_dataset", 
+            "bq_table", 
+            "bq_dataset_location", 
+            "bq_project"

Review Comment:
   This expected `template_fields` order does not match the operator 
implementation in `dataflow.py` (where `bq_dataset_location` comes before 
`bq_table`). As written, this test will fail—update either the operator’s 
`template_fields` tuple or this expected tuple to be consistent.
   ```suggestion
               "job_id",
               "project_id",
               "location",
               "pubsub_topic",
               "bq_dataset",
               "bq_dataset_location",
               "bq_table",
               "bq_project",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to