pushpendu1991 commented on code in PR #64900:
URL: https://github.com/apache/airflow/pull/64900#discussion_r3068985616


##########
providers/google/tests/unit/google/cloud/operators/test_dataflow.py:
##########
@@ -794,3 +818,310 @@ def test_invalid_response(self, sdk_connection_not_found):
             
DataflowDeletePipelineOperator(**init_kwargs).execute(mock.MagicMock()).return_value
 = {
                 "error": {"message": "example error"}
             }
+
[email protected]
+def sync_operator():
+    """Create a synchronous DataflowGetMetricsOperator instance (no 
destination)."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+    )
+
+
[email protected]
+def deferrable_operator():
+    """Create a deferrable DataflowGetMetricsOperator instance."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=True,
+    )
+
+
[email protected]
+def pubsub_operator():
+    """Create an operator with PubSub topic destination."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+        pubsub_topic=PUBSUB_TOPIC,
+    )
+
+
[email protected]
+def bq_operator():
+    """Create an operator with BigQuery destination."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+        bq_dataset=BQ_DATASET,
+        bq_table=BQ_TABLE,
+        bq_dataset_location=BQ_DATASET_LOCATION,
+        bq_project=BQ_PROJECT,
+    )
+
+
[email protected]
+def both_destinations_operator():
+    """Create an operator with both PubSub and BigQuery destinations."""
+    return DataflowGetMetricsOperator(
+        task_id=TASK_ID,
+        job_id=JOB_ID,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        gcp_conn_id=GCP_CONN_ID,
+        deferrable=False,
+        pubsub_topic=PUBSUB_TOPIC,
+        bq_dataset=BQ_DATASET,
+        bq_table=BQ_TABLE,
+        bq_dataset_location=BQ_DATASET_LOCATION,
+        bq_project=BQ_PROJECT,
+    )
+
+
+class TestDataflowGetMetricsOperatorInit:
+    """Test operator initialization with default attributes."""
+
+    def test_default_attributes(self, sync_operator):
+        """Test that all default attributes are set correctly."""
+        assert sync_operator.job_id == JOB_ID
+        assert sync_operator.project_id == PROJECT_ID
+        assert sync_operator.location == LOCATION
+        assert sync_operator.gcp_conn_id == GCP_CONN_ID
+        assert sync_operator.deferrable is False
+        assert sync_operator.fail_on_terminal_state is False
+        assert sync_operator.poll_sleep == 10
+        assert sync_operator.pubsub_topic is None
+        assert sync_operator.bq_dataset is None
+        assert sync_operator.bq_table is None
+        assert sync_operator.bq_dataset_location is None
+
+    def test_template_fields(self):
+        """Test that template_fields are correctly defined."""
+        expected = (
+            "job_id", 
+            "project_id", 
+            "location", 
+            "pubsub_topic", 
+            "bq_dataset", 
+            "bq_table", 
+            "bq_dataset_location", 
+            "bq_project"

Review Comment:
   This is fixed.



##########
providers/google/src/airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -1176,3 +1181,280 @@ def execute(self, context: Context):
             raise AirflowException(self.response)
 
         return None
+
+class DataflowGetMetricsOperator(GoogleCloudBaseOperator):
+    """
+    Fetches metrics for a Dataflow job and routes the result to one or more
+    destinations based on what the caller provides.
+    Currently it only supports pre-defined bigquery table schema.
+    CREATE TABLE IF NOT EXISTS `my-project.monitoring.dataflow_metrics` (
+    job_id        STRING    NOT NULL,
+    metric_name   STRING,
+    origin        STRING,
+    scalar        FLOAT64,
+    collected_at  TIMESTAMP NOT NULL
+    )
+    PARTITION BY DATE(collected_at);
+    Routing logic
+    ─────────────
+    • pubsub_topic provided              → publish full metrics to Pub/Sub
+    • bq_dataset + bq_table provided     → stream full metrics rows into 
BigQuery
+    • both provided                      → publish to both (fan-out)
+    • neither provided                   → log a warning; metrics are NOT 
stored
+    • XCom always                        → a lean summary dict only (never the
+                                           full payload), keeping the metadata 
DB lean
+    Execution modes
+    ───────────────
+    • deferrable=True  (default) — defers to DataflowJobMetricsTrigger.
+    • deferrable=False           — blocks the worker (small/test jobs only).
+    :param job_id: Dataflow job ID. Jinja-templated.
+    :param project_id: GCP project that owns the Dataflow job. Jinja-templated.
+    :param location: Dataflow job region. Jinja-templated.
+    :param pubsub_topic: Full Pub/Sub topic path:
+                         ``projects/<project>/topics/<topic>``. 
Jinja-templated.
+    :param bq_dataset: BigQuery dataset ID to write metrics into. 
Jinja-templated.
+    :param bq_dataset_location: BigQuery dataset location used by the hook. 
Jinja-templated.
+    :param bq_table: BigQuery table ID to write metrics into. Jinja-templated.
+    :param bq_project: BigQuery project (defaults to ``project_id``). 
Jinja-templated.
+    :param gcp_conn_id: Airflow GCP connection.
+    :param impersonation_chain: Optional SA to impersonate.
+    :param deferrable: When True, defer to Triggerer.
+    :param poll_sleep: Triggerer poll interval in seconds.
+    """
+
+    template_fields: Sequence[str] = (
+        "job_id",
+        "project_id",
+        "location",
+        "pubsub_topic",
+        "bq_dataset",
+        "bq_dataset_location",
+        "bq_table",
+        "bq_project",
+    )
+    ui_color = "#4285F4"
+
+    def __init__(
+        self,
+        *,
+        job_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        location: str | None = DEFAULT_DATAFLOW_LOCATION,
+        pubsub_topic: str | None = None,
+        bq_dataset: str | None = None,
+        bq_dataset_location: str | None = None,
+        bq_table: str | None = None,
+        bq_project: str | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),

Review Comment:
   This is fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to