pushpendu1991 commented on code in PR #64900:
URL: https://github.com/apache/airflow/pull/64900#discussion_r3068986465


##########
providers/google/src/airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -1176,3 +1181,280 @@ def execute(self, context: Context):
             raise AirflowException(self.response)
 
         return None
+
+class DataflowGetMetricsOperator(GoogleCloudBaseOperator):
+    """
+    Fetches metrics for a Dataflow job and routes the result to one or more
+    destinations based on what the caller provides.
+    Currently it only supports pre-defined bigquery table schema.
+    CREATE TABLE IF NOT EXISTS `my-project.monitoring.dataflow_metrics` (
+    job_id        STRING    NOT NULL,
+    metric_name   STRING,
+    origin        STRING,
+    scalar        FLOAT64,
+    collected_at  TIMESTAMP NOT NULL
+    )
+    PARTITION BY DATE(collected_at);
+    Routing logic
+    ─────────────
+    • pubsub_topic provided              → publish full metrics to Pub/Sub
+    • bq_dataset + bq_table provided     → stream full metrics rows into 
BigQuery
+    • both provided                      → publish to both (fan-out)
+    • neither provided                   → log a warning; metrics are NOT 
stored
+    • XCom always                        → a lean summary dict only (never the
+                                           full payload), keeping the metadata 
DB lean
+    Execution modes
+    ───────────────
+    • deferrable=True  (default) — defers to DataflowJobMetricsTrigger.
+    • deferrable=False           — blocks the worker (small/test jobs only).

Review Comment:
   This is fixed.



##########
providers/google/tests/unit/google/cloud/operators/test_dataflow.py:
##########
@@ -794,3 +818,310 @@ def test_invalid_response(self, sdk_connection_not_found):
             
DataflowDeletePipelineOperator(**init_kwargs).execute(mock.MagicMock()).return_value
 = {
                 "error": {"message": "example error"}
             }
+
[email protected]
+def sync_operator():
+    """Create a synchronous DataflowGetMetricsOperator instance (no 
destination)."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+    )
+
+
[email protected]
+def deferrable_operator():
+    """Create a deferrable DataflowGetMetricsOperator instance."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=True,
+    )
+
+
[email protected]
+def pubsub_operator():
+    """Create an operator with PubSub topic destination."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+        pubsub_topic=PUBSUB_TOPIC,
+    )
+
+
[email protected]
+def bq_operator():
+    """Create an operator with BigQuery destination."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+        bq_dataset=BQ_DATASET,
+        bq_table=BQ_TABLE,
+        bq_dataset_location=BQ_DATASET_LOCATION,
+        bq_project=BQ_PROJECT,
+    )
+
+
[email protected]
+def both_destinations_operator():
+    """Create an operator with both PubSub and BigQuery destinations."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+        pubsub_topic=PUBSUB_TOPIC,
+        bq_dataset=BQ_DATASET,
+        bq_table=BQ_TABLE,
+        bq_dataset_location=BQ_DATASET_LOCATION,
+        bq_project=BQ_PROJECT,
+    )
+
+
+class TestDataflowGetMetricsOperatorInit:
+    """Test operator initialization with default attributes."""
+
+    def test_default_attributes(self, sync_operator):
+        """Test that all default attributes are set correctly."""
+        assert sync_operator.job_id == JOB_ID
+        assert sync_operator.project_id == PROJECT_ID
+        assert sync_operator.location == LOCATION
+        assert sync_operator.gcp_conn_id == GCP_CONN_ID
+        assert sync_operator.deferrable is False
+        assert sync_operator.fail_on_terminal_state is False
+        assert sync_operator.poll_sleep == 10
+        assert sync_operator.pubsub_topic is None
+        assert sync_operator.bq_dataset is None
+        assert sync_operator.bq_table is None
+        assert sync_operator.bq_dataset_location is None
+
+    def test_template_fields(self):
+        """Test that template_fields are correctly defined."""
+        expected = (
+            "job_id", 
+            "project_id", 
+            "location", 
+            "pubsub_topic", 
+            "bq_dataset", 
+            "bq_table", 
+            "bq_dataset_location", 
+            "bq_project"
+        )
+        assert DataflowGetMetricsOperator.template_fields == expected
+
+
+class TestDataflowGetMetricsOperatorLocationValidation:
+    """Test location validation during execution."""
+
+    def test_execute_raises_when_location_is_none(self):
+        """Test that execute raises AirflowException when location is None."""
+        op = DataflowGetMetricsOperator(
+            task_id=TASK_ID,
+            job_id=JOB_ID,
+            project_id=PROJECT_ID,
+            location=None,
+            deferrable=False,
+        )
+        with pytest.raises(AirflowException, match="requires 'location' to be 
set"):
+            op.execute(mock.MagicMock())
+
+    def test_execute_raises_when_location_is_empty_string(self):
+        """Test that execute raises AirflowException when location is empty 
string."""
+        op = DataflowGetMetricsOperator(
+            task_id=TASK_ID,
+            job_id=JOB_ID,
+            project_id=PROJECT_ID,
+            location="",
+            deferrable=False,
+        )
+        with pytest.raises(AirflowException, match="requires 'location' to be 
set"):
+            op.execute(mock.MagicMock())
+
+
+class TestDataflowGetMetricsOperatorExecuteSync:
+    """Test synchronous execution with 5 scenarios."""

Review Comment:
   This is fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to