Copilot commented on code in PR #61148:
URL: https://github.com/apache/airflow/pull/61148#discussion_r3066497835


##########
providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py:
##########
@@ -169,6 +232,11 @@
             check_table_partition_exists_async,
             check_table_partition_exists_def,
         ]
+        >> [stream_insert, stream_update, stream_delete]
+        >> [
+            check_streaming_buffer_empty,
+            check_streaming_buffer_empty_def,
+        ]

Review Comment:
   The example describes ‘Streaming’ operations and implies it populates the 
streaming buffer, but `BigQueryInsertJobOperator` runs query jobs (including 
`INSERT ... VALUES`), which do not use the streaming buffer mechanism created 
by `tabledata.insertAll`. This makes the example misleading and may not 
demonstrate the real failure mode this sensor addresses. Consider updating the 
example to (a) perform an actual streaming insert (e.g., via the provider’s 
streaming insert operator/hook), then (b) run DML only after 
`BigQueryStreamingBufferEmptySensor` succeeds (i.e., `streaming_insert >> 
check_streaming_buffer_empty >> dml_update/delete`).
   ```suggestion
           >> stream_insert
           >> [
               check_streaming_buffer_empty,
               check_streaming_buffer_empty_def,
           ]
           >> [stream_update, stream_delete]
   ```



##########
providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py:
##########
@@ -256,3 +255,143 @@ def execute_complete(self, context: dict[str, Any], 
event: dict[str, str] | None
 
         message = "No event received in trigger callback"
         raise AirflowException(message)
+
+
+class BigQueryStreamingBufferEmptySensor(BaseSensorOperator):
+    """
+    Sensor for checking whether the streaming buffer in a BigQuery table is 
empty.
+
+        The BigQueryStreamingBufferEmptySensor waits for the streaming buffer 
in a specified
+        BigQuery table to be empty before proceeding. It can be used in ETL 
pipelines to ensure
+        that recent streamed data has been processed before continuing 
downstream tasks.
+
+        :ivar template_fields: Fields that can be templated in this operator.
+        :type template_fields: Sequence[str]
+        :ivar ui_color: Color of the operator in the Airflow UI.
+        :type ui_color: Str
+        :ivar project_id: The Google Cloud project ID where the BigQuery table 
resides.
+        :type project_id: Str
+        :ivar dataset_id: The ID of the dataset containing the BigQuery table.
+        :type dataset_id: Str
+        :ivar table_id: The ID of the BigQuery table to monitor.
+        :type table_id: Str
+        :ivar gcp_conn_id: The Airflow connection ID for GCP. Defaults to 
"google_cloud_default".
+        :type gcp_conn_id: Str
+        :ivar impersonation_chain: Optional array or string of service 
accounts to impersonate using short-term
+                                   credentials. If multiple accounts are 
provided, the service account must
+                                   grant the role 
`roles/iam.serviceAccountTokenCreator` on the next account
+                                   in the chain.
+        :type impersonation_chain: Str | Sequence[str] | None
+        :ivar deferrable: Indicates whether the operator supports deferrable 
execution. If True, the sensor
+                          can defer instead of polling, leading to reduced 
resource use.
+        :type deferrable: Bool
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "dataset_id",
+        "table_id",
+        "impersonation_chain",
+    )
+
+    ui_color = "#f0eee4"
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        dataset_id: str,
+        table_id: str,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs,
+    ) -> None:
+        if deferrable and "poke_interval" not in kwargs:
+            kwargs["poke_interval"] = 30
+
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.table_id = table_id
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.deferrable = deferrable
+
+    def execute(self, context: Context) -> None:
+        """
+        Executes the operator logic taking into account the `deferrable` 
attribute.
+        If not deferrable, it uses the base class `execute` method; otherwise, 
it
+        sets up deferral with a specific trigger to wait until the BigQuery 
Streaming
+        Buffer is empty.
+
+        :param context: The execution context provided by Airflow. It allows
+            access to metadata and runtime information of the task instance.
+        :type context: Context
+        :return: None. The method does not return anything but performs actions
+            relevant to the operator's execution.
+        """
+        if not self.deferrable:
+            super().execute(context)
+        else:
+            if not self.poke(context=context):
+                self.defer(
+                    timeout=timedelta(seconds=self.timeout),
+                    trigger=BigQueryStreamingBufferEmptyTrigger(
+                        project_id=self.project_id,
+                        dataset_id=self.dataset_id,
+                        table_id=self.table_id,
+                        poll_interval=self.poke_interval,
+                        gcp_conn_id=self.gcp_conn_id,
+                        hook_params={
+                            "impersonation_chain": self.impersonation_chain,
+                        },
+                    ),
+                    method_name="execute_complete",
+                )

Review Comment:
   The sensor passes `impersonation_chain` only inside `hook_params`, but the 
trigger’s async hook construction uses the trigger’s `impersonation_chain` 
attribute (not `hook_params`). As a result, deferrable executions will ignore 
impersonation. Fix by passing `impersonation_chain=self.impersonation_chain` to 
`BigQueryStreamingBufferEmptyTrigger(...)` (and/or update the trigger to 
actually consume `hook_params`).



##########
providers/google/tests/unit/google/cloud/triggers/test_bigquery.py:
##########
@@ -888,3 +888,168 @@ def test_serialization_successfully(self):
             "poll_interval": POLLING_PERIOD_SECONDS,
             "hook_params": TEST_HOOK_PARAMS,
         }
+
+
[email protected]
+def streaming_buffer_trigger():
+    return BigQueryStreamingBufferEmptyTrigger(
+        project_id=TEST_GCP_PROJECT_ID,
+        dataset_id=TEST_DATASET_ID,
+        table_id=TEST_TABLE_ID,
+        gcp_conn_id=TEST_GCP_CONN_ID,
+        hook_params=TEST_HOOK_PARAMS,
+        poll_interval=POLLING_PERIOD_SECONDS,
+        impersonation_chain=TEST_IMPERSONATION_CHAIN,
+    )
+
+
+class TestBigQueryStreamingBufferEmptyTrigger:
+    def test_serialization(self, streaming_buffer_trigger):
+        """Asserts that the trigger correctly serializes its arguments and 
classpath."""
+        classpath, kwargs = streaming_buffer_trigger.serialize()
+        assert (
+            classpath
+            == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryStreamingBufferEmptyTrigger"
+        )
+        assert kwargs == {
+            "project_id": TEST_GCP_PROJECT_ID,
+            "dataset_id": TEST_DATASET_ID,
+            "table_id": TEST_TABLE_ID,
+            "gcp_conn_id": TEST_GCP_CONN_ID,
+            "poll_interval": POLLING_PERIOD_SECONDS,
+            "hook_params": TEST_HOOK_PARAMS,
+            "impersonation_chain": TEST_IMPERSONATION_CHAIN,
+        }
+
+    @pytest.mark.asyncio
+    @mock.patch(
+        "airflow.providers.google.cloud.triggers.bigquery."
+        "BigQueryStreamingBufferEmptyTrigger._is_streaming_buffer_empty"
+    )
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.bigquery.BigQueryStreamingBufferEmptyTrigger._get_sync_hook"
+    )
+    async def test_run_buffer_empty_yields_success(
+        self, mock_get_hook, mock_is_empty, streaming_buffer_trigger
+    ):
+        """When buffer is empty, trigger yields a success TriggerEvent."""
+        mock_is_empty.return_value = True
+
+        generator = streaming_buffer_trigger.run()
+        actual = await generator.asend(None)
+
+        table_uri = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}"
+        assert actual == TriggerEvent(
+            {"status": "success", "message": f"Streaming buffer is empty for 
table: {table_uri}"}
+        )
+
+    @pytest.mark.asyncio
+    @mock.patch(
+        "airflow.providers.google.cloud.triggers.bigquery."
+        "BigQueryStreamingBufferEmptyTrigger._is_streaming_buffer_empty"
+    )
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.bigquery.BigQueryStreamingBufferEmptyTrigger._get_sync_hook"
+    )
+    async def test_run_buffer_not_empty_keeps_polling(
+        self, mock_get_hook, mock_is_empty, streaming_buffer_trigger
+    ):
+        """When buffer is not empty, trigger keeps polling (does not yield 
immediately)."""
+        mock_is_empty.return_value = False
+
+        task = asyncio.create_task(streaming_buffer_trigger.run().__anext__())
+        await asyncio.sleep(0.5)
+
+        # TriggerEvent was not returned yet
+        assert task.done() is False
+        asyncio.get_event_loop().stop()

Review Comment:
   These tests don’t match the trigger implementation: (1) the trigger exposes 
`_get_async_hook`, not `_get_sync_hook`, so the patch target is invalid; (2) 
`_is_streaming_buffer_empty` currently uses 
`BigQueryTableAsyncHook.get_table_client()` + `await client.get()`, but the 
tests are written around `BigQueryHook.get_client().get_table()` and SDK 
`BQTable`. Update the tests to mock `BigQueryTableAsyncHook.get_table_client` 
and the returned client’s async `get()` response (including `streamingBuffer` 
presence/absence). Also avoid `asyncio.get_event_loop().stop()` in tests—cancel 
the task and assert via a timeout (e.g., `asyncio.wait_for(..., timeout=...)`) 
to prevent leaking tasks and interfering with the pytest-asyncio loop.



##########
providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py:
##########
@@ -806,3 +806,158 @@ async def _partition_exists(self, hook: 
BigQueryAsyncHook, job_id: str | None, p
         if records:
             records = [row[0] for row in records]
             return self.partition_id in records
+
+
+class BigQueryStreamingBufferEmptyTrigger(BaseTrigger):
+    """
+    Trigger that periodically checks if a BigQuery table's streaming buffer is 
empty.
+
+    This trigger continuously polls a BigQuery table to determine if its 
streaming buffer
+    has been fully processed and is now empty. It's particularly useful before 
running
+    DML operations (UPDATE, DELETE, MERGE) on tables populated via streaming 
inserts.
+
+    :param project_id: The Google Cloud Project in which to look for the table.
+    :param dataset_id: The dataset ID of the table to check.
+    :param table_id: The table ID of the table to check.
+    :param gcp_conn_id: Reference to Google Cloud connection ID.
+    :param hook_params: Additional parameters for hook initialization.
+    :param poll_interval: Polling period in seconds to check the streaming 
buffer status.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    """
+
+    def __init__(
+        self,
+        project_id: str,
+        dataset_id: str,
+        table_id: str,
+        gcp_conn_id: str,
+        hook_params: dict[str, Any],
+        poll_interval: float = 30.0,
+        impersonation_chain: str | Sequence[str] | None = None,
+    ):
+        super().__init__()
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.table_id = table_id
+        self.gcp_conn_id = gcp_conn_id
+        self.poll_interval = poll_interval
+        self.hook_params = hook_params
+        self.impersonation_chain = impersonation_chain
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serialize BigQueryStreamingBufferEmptyTrigger arguments and 
classpath."""
+        return (
+            
"airflow.providers.google.cloud.triggers.bigquery.BigQueryStreamingBufferEmptyTrigger",
+            {
+                "project_id": self.project_id,
+                "dataset_id": self.dataset_id,
+                "table_id": self.table_id,
+                "gcp_conn_id": self.gcp_conn_id,
+                "poll_interval": self.poll_interval,
+                "hook_params": self.hook_params,
+                "impersonation_chain": self.impersonation_chain,
+            },
+        )
+
+    def _get_async_hook(self) -> BigQueryTableAsyncHook:
+        """Get the async hook for BigQuery table operations."""
+        return BigQueryTableAsyncHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        """
+        Continuously check if the streaming buffer is empty.
+
+        Yields a TriggerEvent when the streaming buffer becomes empty or if an 
error occurs.
+        """
+        try:
+            hook = self._get_async_hook()
+            while True:
+                self.log.info(
+                    "Checking streaming buffer for table %s.%s.%s",
+                    self.project_id,
+                    self.dataset_id,
+                    self.table_id,
+                )
+
+                is_buffer_empty = await self._is_streaming_buffer_empty(
+                    hook=hook,
+                    project_id=self.project_id,
+                    dataset_id=self.dataset_id,
+                    table_id=self.table_id,
+                )
+
+                if is_buffer_empty:
+                    table_uri = 
f"{self.project_id}:{self.dataset_id}.{self.table_id}"
+                    message = f"Streaming buffer is empty for table: 
{table_uri}"
+                    self.log.info(message)
+                    yield TriggerEvent(
+                        {
+                            "status": "success",
+                            "message": message,
+                        }
+                    )
+                    return
+                else:
+                    self.log.info(
+                        "Streaming buffer still has data. Sleeping for %s 
seconds.",
+                        self.poll_interval,
+                    )
+                    await asyncio.sleep(self.poll_interval)
+
+        except Exception as e:
+            self.log.exception(
+                "Exception occurred while checking streaming buffer for table 
%s.%s.%s",
+                self.project_id,
+                self.dataset_id,
+                self.table_id,
+            )
+            yield TriggerEvent({"status": "error", "message": str(e)})
+
+    async def _is_streaming_buffer_empty(
+        self,
+        hook: BigQueryTableAsyncHook,
+        project_id: str,
+        dataset_id: str,
+        table_id: str,
+    ) -> bool:
+        """
+        Check if the streaming buffer is empty for the specified table.
+
+        :param hook: BigQueryTableAsyncHook instance for async operations.
+        :param project_id: The Google Cloud Project ID.
+        :param dataset_id: The dataset ID containing the table.
+        :param table_id: The table ID to check.
+        :return: True if streaming buffer is empty or doesn't exist, False 
otherwise.
+        """
+        async with ClientSession() as session:
+            try:
+                client = await hook.get_table_client(
+                    dataset=dataset_id,
+                    table_id=table_id,
+                    project_id=project_id,
+                    session=session,
+                )

Review Comment:
   `ClientSession()` is created and torn down on every poll. Since `run()` 
loops indefinitely, this adds unnecessary overhead and connection churn. Create 
a single `ClientSession` in `run()` (wrapping the whole polling loop) and pass 
it into `_is_streaming_buffer_empty(...)`, or refactor 
`_is_streaming_buffer_empty` to accept a pre-created session.



##########
providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py:
##########
@@ -256,3 +255,143 @@ def execute_complete(self, context: dict[str, Any], 
event: dict[str, str] | None
 
         message = "No event received in trigger callback"
         raise AirflowException(message)
+
+
+class BigQueryStreamingBufferEmptySensor(BaseSensorOperator):
+    """
+    Sensor for checking whether the streaming buffer in a BigQuery table is 
empty.
+
+        The BigQueryStreamingBufferEmptySensor waits for the streaming buffer 
in a specified
+        BigQuery table to be empty before proceeding. It can be used in ETL 
pipelines to ensure
+        that recent streamed data has been processed before continuing 
downstream tasks.
+
+        :ivar template_fields: Fields that can be templated in this operator.
+        :type template_fields: Sequence[str]
+        :ivar ui_color: Color of the operator in the Airflow UI.
+        :type ui_color: Str
+        :ivar project_id: The Google Cloud project ID where the BigQuery table 
resides.
+        :type project_id: Str
+        :ivar dataset_id: The ID of the dataset containing the BigQuery table.
+        :type dataset_id: Str
+        :ivar table_id: The ID of the BigQuery table to monitor.
+        :type table_id: Str
+        :ivar gcp_conn_id: The Airflow connection ID for GCP. Defaults to 
"google_cloud_default".
+        :type gcp_conn_id: Str
+        :ivar impersonation_chain: Optional array or string of service 
accounts to impersonate using short-term
+                                   credentials. If multiple accounts are 
provided, the service account must
+                                   grant the role 
`roles/iam.serviceAccountTokenCreator` on the next account
+                                   in the chain.
+        :type impersonation_chain: Str | Sequence[str] | None
+        :ivar deferrable: Indicates whether the operator supports deferrable 
execution. If True, the sensor
+                          can defer instead of polling, leading to reduced 
resource use.
+        :type deferrable: Bool
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "dataset_id",
+        "table_id",
+        "impersonation_chain",
+    )
+
+    ui_color = "#f0eee4"
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        dataset_id: str,
+        table_id: str,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs,
+    ) -> None:
+        if deferrable and "poke_interval" not in kwargs:
+            kwargs["poke_interval"] = 30
+
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.table_id = table_id
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.deferrable = deferrable
+
+    def execute(self, context: Context) -> None:
+        """
+        Executes the operator logic taking into account the `deferrable` 
attribute.
+        If not deferrable, it uses the base class `execute` method; otherwise, 
it
+        sets up deferral with a specific trigger to wait until the BigQuery 
Streaming
+        Buffer is empty.
+
+        :param context: The execution context provided by Airflow. It allows
+            access to metadata and runtime information of the task instance.
+        :type context: Context
+        :return: None. The method does not return anything but performs actions
+            relevant to the operator's execution.
+        """
+        if not self.deferrable:
+            super().execute(context)
+        else:
+            if not self.poke(context=context):
+                self.defer(
+                    timeout=timedelta(seconds=self.timeout),
+                    trigger=BigQueryStreamingBufferEmptyTrigger(
+                        project_id=self.project_id,
+                        dataset_id=self.dataset_id,
+                        table_id=self.table_id,
+                        poll_interval=self.poke_interval,
+                        gcp_conn_id=self.gcp_conn_id,
+                        hook_params={
+                            "impersonation_chain": self.impersonation_chain,
+                        },
+                    ),
+                    method_name="execute_complete",
+                )
+
+    def execute_complete(self, context: dict[str, Any], event: dict[str, str] 
| None = None) -> str:
+        """
+        Act as a callback for when the trigger fires - returns immediately.
+
+        Relies on trigger to throw an exception, otherwise it assumes 
execution was successful.
+        """
+        table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
+        self.log.info("Checking streaming buffer state for table: %s", 
table_uri)
+        if event:
+            if event["status"] == "success":
+                return event["message"]
+            raise AirflowException(event["message"])
+
+        message = "No event received in trigger callback"
+        raise AirflowException(message)
+
+    def poke(self, context: Context) -> bool:
+        """
+        Check if the BigQuery streaming buffer is empty for the specified 
table.
+
+        This method periodically checks the status of the BigQuery table's 
streaming buffer
+        to determine if it is empty. It is useful for ensuring that recent 
streamed data
+        has been fully processed before continuing with downstream tasks.
+        """
+        table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
+        self.log.info("Checking streaming buffer state for table: %s", 
table_uri)
+
+        hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        try:
+            table = hook.get_table(
+                project_id=self.project_id,
+                dataset_id=self.dataset_id,
+                table_id=self.table_id,
+            )
+            return table.get("streamingBuffer") is None
+        except Exception as err:
+            if "not found" in str(err):
+                raise AirflowException(
+                    f"Table 
{self.project_id}.{self.dataset_id}.{self.table_id} not found"
+                ) from err
+            raise err

Review Comment:
   This assumes `hook.get_table(...)` returns a dict-like object (using 
`.get(...)`). In many BigQuery client paths, `get_table` returns a 
`google.cloud.bigquery.table.Table` where the correct check is via 
`table.streaming_buffer` (or checking the API repr). Align the sync sensor with 
the trigger/tests by using the SDK Table object property (and catch 
`google.api_core.exceptions.NotFound` explicitly instead of string-matching 
`'not found'`, which is brittle and can misclassify errors).



##########
providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py:
##########
@@ -806,3 +806,158 @@ async def _partition_exists(self, hook: 
BigQueryAsyncHook, job_id: str | None, p
         if records:
             records = [row[0] for row in records]
             return self.partition_id in records
+
+
+class BigQueryStreamingBufferEmptyTrigger(BaseTrigger):
+    """
+    Trigger that periodically checks if a BigQuery table's streaming buffer is 
empty.
+
+    This trigger continuously polls a BigQuery table to determine if its 
streaming buffer
+    has been fully processed and is now empty. It's particularly useful before 
running
+    DML operations (UPDATE, DELETE, MERGE) on tables populated via streaming 
inserts.
+
+    :param project_id: The Google Cloud Project in which to look for the table.
+    :param dataset_id: The dataset ID of the table to check.
+    :param table_id: The table ID of the table to check.
+    :param gcp_conn_id: Reference to Google Cloud connection ID.
+    :param hook_params: Additional parameters for hook initialization.
+    :param poll_interval: Polling period in seconds to check the streaming 
buffer status.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    """
+
+    def __init__(
+        self,
+        project_id: str,
+        dataset_id: str,
+        table_id: str,
+        gcp_conn_id: str,
+        hook_params: dict[str, Any],
+        poll_interval: float = 30.0,
+        impersonation_chain: str | Sequence[str] | None = None,
+    ):
+        super().__init__()
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.table_id = table_id
+        self.gcp_conn_id = gcp_conn_id
+        self.poll_interval = poll_interval
+        self.hook_params = hook_params
+        self.impersonation_chain = impersonation_chain
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serialize BigQueryStreamingBufferEmptyTrigger arguments and 
classpath."""
+        return (
+            
"airflow.providers.google.cloud.triggers.bigquery.BigQueryStreamingBufferEmptyTrigger",
+            {
+                "project_id": self.project_id,
+                "dataset_id": self.dataset_id,
+                "table_id": self.table_id,
+                "gcp_conn_id": self.gcp_conn_id,
+                "poll_interval": self.poll_interval,
+                "hook_params": self.hook_params,
+                "impersonation_chain": self.impersonation_chain,
+            },
+        )
+
+    def _get_async_hook(self) -> BigQueryTableAsyncHook:
+        """Get the async hook for BigQuery table operations."""
+        return BigQueryTableAsyncHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,

Review Comment:
   `hook_params` is serialized/stored but never used by the trigger. This is 
confusing for API consumers and contributed to the impersonation bug in the 
sensor callsite. Either remove `hook_params` from the trigger API (and 
serialization), or apply it when building the hook (e.g., derive 
`impersonation_chain` from it, and/or pass supported hook args through 
consistently).
   ```suggestion
           impersonation_chain = self.impersonation_chain
           if impersonation_chain is None:
               impersonation_chain = self.hook_params.get("impersonation_chain")
   
           return BigQueryTableAsyncHook(
               gcp_conn_id=self.gcp_conn_id,
               impersonation_chain=impersonation_chain,
   ```



##########
providers/google/tests/unit/google/cloud/triggers/test_bigquery.py:
##########
@@ -888,3 +888,168 @@ def test_serialization_successfully(self):
             "poll_interval": POLLING_PERIOD_SECONDS,
             "hook_params": TEST_HOOK_PARAMS,
         }
+
+
[email protected]
+def streaming_buffer_trigger():
+    return BigQueryStreamingBufferEmptyTrigger(
+        project_id=TEST_GCP_PROJECT_ID,
+        dataset_id=TEST_DATASET_ID,
+        table_id=TEST_TABLE_ID,
+        gcp_conn_id=TEST_GCP_CONN_ID,
+        hook_params=TEST_HOOK_PARAMS,
+        poll_interval=POLLING_PERIOD_SECONDS,
+        impersonation_chain=TEST_IMPERSONATION_CHAIN,
+    )
+
+
+class TestBigQueryStreamingBufferEmptyTrigger:
+    def test_serialization(self, streaming_buffer_trigger):
+        """Asserts that the trigger correctly serializes its arguments and 
classpath."""
+        classpath, kwargs = streaming_buffer_trigger.serialize()
+        assert (
+            classpath
+            == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryStreamingBufferEmptyTrigger"
+        )
+        assert kwargs == {
+            "project_id": TEST_GCP_PROJECT_ID,
+            "dataset_id": TEST_DATASET_ID,
+            "table_id": TEST_TABLE_ID,
+            "gcp_conn_id": TEST_GCP_CONN_ID,
+            "poll_interval": POLLING_PERIOD_SECONDS,
+            "hook_params": TEST_HOOK_PARAMS,
+            "impersonation_chain": TEST_IMPERSONATION_CHAIN,
+        }
+
+    @pytest.mark.asyncio
+    @mock.patch(
+        "airflow.providers.google.cloud.triggers.bigquery."
+        "BigQueryStreamingBufferEmptyTrigger._is_streaming_buffer_empty"
+    )
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.bigquery.BigQueryStreamingBufferEmptyTrigger._get_sync_hook"
+    )
+    async def test_run_buffer_empty_yields_success(
+        self, mock_get_hook, mock_is_empty, streaming_buffer_trigger
+    ):

Review Comment:
   These tests don’t match the trigger implementation: (1) the trigger exposes 
`_get_async_hook`, not `_get_sync_hook`, so the patch target is invalid; (2) 
`_is_streaming_buffer_empty` currently uses 
`BigQueryTableAsyncHook.get_table_client()` + `await client.get()`, but the 
tests are written around `BigQueryHook.get_client().get_table()` and SDK 
`BQTable`. Update the tests to mock `BigQueryTableAsyncHook.get_table_client` 
and the returned client’s async `get()` response (including `streamingBuffer` 
presence/absence). Also avoid `asyncio.get_event_loop().stop()` in tests—cancel 
the task and assert via a timeout (e.g., `asyncio.wait_for(..., timeout=...)`) 
to prevent leaking tasks and interfering with the pytest-asyncio loop.



##########
providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py:
##########
@@ -149,9 +156,65 @@
         project_id=PROJECT_ID,
         dataset_id=DATASET_NAME,
         table_id=TABLE_NAME,
+        deferrable=True,
     )
     # [END howto_sensor_bigquery_table_partition_async]
 
+    # [START howto_sensor_bigquery_streaming_buffer_empty]
+    check_streaming_buffer_empty = BigQueryStreamingBufferEmptySensor(
+        task_id="check_streaming_buffer_empty",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        table_id=TABLE_NAME,
+        poke_interval=30,
+        timeout=5400,  # 90 minutes - Google Cloud flushes streaming buffer 
within 90 minutes
+    )
+    # [END howto_sensor_bigquery_streaming_buffer_empty]
+
+    # Streaming operations: INSERT, UPDATE, DELETE
+    # These operations write data to the streaming buffer before being flushed 
to persistent storage
+    stream_insert = BigQueryInsertJobOperator(
+        task_id="stream_insert",
+        configuration={
+            "query": {
+                "query": STREAMING_INSERT_QUERY,
+                "useLegacySql": False,
+            }
+        },
+    )

Review Comment:
   The example describes ‘Streaming’ operations and implies it populates the 
streaming buffer, but `BigQueryInsertJobOperator` runs query jobs (including 
`INSERT ... VALUES`), which do not use the streaming buffer mechanism created 
by `tabledata.insertAll`. This makes the example misleading and may not 
demonstrate the real failure mode this sensor addresses. Consider updating the 
example to (a) perform an actual streaming insert (e.g., via the provider’s 
streaming insert operator/hook), then (b) run DML only after 
`BigQueryStreamingBufferEmptySensor` succeeds (i.e., `streaming_insert >> 
check_streaming_buffer_empty >> dml_update/delete`).



##########
providers/google/docs/operators/cloud/bigquery.rst:
##########
@@ -518,6 +518,28 @@ Also you can use deferrable mode in this operator if you 
would like to free up t
     :start-after: [START howto_sensor_bigquery_table_partition_async]
     :end-before: [END howto_sensor_bigquery_table_partition_async]
 
+Check that the BigQuery Table Streaming Buffer is empty
+""""""""""""""""""""""""""""""""""""""""""""""""""""""""
+
+To check that the BigQuery streaming buffer of a table is empty you can use
+:class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryStreamingBufferEmptySensor`.
+This sensor is useful in ETL pipelines to ensure that recent streamed data has 
been fully
+processed before continuing downstream tasks.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_sensor_bigquery_streaming_buffer_empty]
+    :end-before: [END howto_sensor_bigquery_streaming_buffer_empty]
+
+Also you can use deferrable mode in this operator if you would like to free up 
the worker slots while the sensor is running.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_sensor_bigquery_streaming_buffer_empty_defered]
+    :end-before: [END howto_sensor_bigquery_streaming_buffer_empty_defered]

Review Comment:
   Correct the tag spelling from 'defered' to 'deferred' to avoid propagating a 
typo into documentation references.



##########
providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py:
##########
@@ -806,3 +806,158 @@ async def _partition_exists(self, hook: 
BigQueryAsyncHook, job_id: str | None, p
         if records:
             records = [row[0] for row in records]
             return self.partition_id in records
+
+
+class BigQueryStreamingBufferEmptyTrigger(BaseTrigger):
+    """
+    Trigger that periodically checks if a BigQuery table's streaming buffer is 
empty.
+
+    This trigger continuously polls a BigQuery table to determine if its 
streaming buffer
+    has been fully processed and is now empty. It's particularly useful before 
running
+    DML operations (UPDATE, DELETE, MERGE) on tables populated via streaming 
inserts.
+
+    :param project_id: The Google Cloud Project in which to look for the table.
+    :param dataset_id: The dataset ID of the table to check.
+    :param table_id: The table ID of the table to check.
+    :param gcp_conn_id: Reference to Google Cloud connection ID.
+    :param hook_params: Additional parameters for hook initialization.
+    :param poll_interval: Polling period in seconds to check the streaming 
buffer status.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    """
+
+    def __init__(
+        self,
+        project_id: str,
+        dataset_id: str,
+        table_id: str,
+        gcp_conn_id: str,
+        hook_params: dict[str, Any],
+        poll_interval: float = 30.0,
+        impersonation_chain: str | Sequence[str] | None = None,
+    ):

Review Comment:
   `hook_params` is serialized/stored but never used by the trigger. This is 
confusing for API consumers and contributed to the impersonation bug in the 
sensor callsite. Either remove `hook_params` from the trigger API (and 
serialization), or apply it when building the hook (e.g., derive 
`impersonation_chain` from it, and/or pass supported hook args through 
consistently).



##########
providers/google/tests/unit/google/cloud/triggers/test_bigquery.py:
##########
@@ -888,3 +888,168 @@ def test_serialization_successfully(self):
             "poll_interval": POLLING_PERIOD_SECONDS,
             "hook_params": TEST_HOOK_PARAMS,
         }
+
+
[email protected]
+def streaming_buffer_trigger():
+    return BigQueryStreamingBufferEmptyTrigger(
+        project_id=TEST_GCP_PROJECT_ID,
+        dataset_id=TEST_DATASET_ID,
+        table_id=TEST_TABLE_ID,
+        gcp_conn_id=TEST_GCP_CONN_ID,
+        hook_params=TEST_HOOK_PARAMS,
+        poll_interval=POLLING_PERIOD_SECONDS,
+        impersonation_chain=TEST_IMPERSONATION_CHAIN,
+    )
+
+
+class TestBigQueryStreamingBufferEmptyTrigger:
+    def test_serialization(self, streaming_buffer_trigger):
+        """Asserts that the trigger correctly serializes its arguments and 
classpath."""
+        classpath, kwargs = streaming_buffer_trigger.serialize()
+        assert (
+            classpath
+            == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryStreamingBufferEmptyTrigger"
+        )
+        assert kwargs == {
+            "project_id": TEST_GCP_PROJECT_ID,
+            "dataset_id": TEST_DATASET_ID,
+            "table_id": TEST_TABLE_ID,
+            "gcp_conn_id": TEST_GCP_CONN_ID,
+            "poll_interval": POLLING_PERIOD_SECONDS,
+            "hook_params": TEST_HOOK_PARAMS,
+            "impersonation_chain": TEST_IMPERSONATION_CHAIN,
+        }
+
+    @pytest.mark.asyncio
+    @mock.patch(
+        "airflow.providers.google.cloud.triggers.bigquery."
+        "BigQueryStreamingBufferEmptyTrigger._is_streaming_buffer_empty"
+    )
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.bigquery.BigQueryStreamingBufferEmptyTrigger._get_sync_hook"
+    )
+    async def test_run_buffer_empty_yields_success(
+        self, mock_get_hook, mock_is_empty, streaming_buffer_trigger
+    ):
+        """When buffer is empty, trigger yields a success TriggerEvent."""
+        mock_is_empty.return_value = True
+
+        generator = streaming_buffer_trigger.run()
+        actual = await generator.asend(None)
+
+        table_uri = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}"
+        assert actual == TriggerEvent(
+            {"status": "success", "message": f"Streaming buffer is empty for 
table: {table_uri}"}
+        )
+
+    @pytest.mark.asyncio
+    @mock.patch(
+        "airflow.providers.google.cloud.triggers.bigquery."
+        "BigQueryStreamingBufferEmptyTrigger._is_streaming_buffer_empty"
+    )
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.bigquery.BigQueryStreamingBufferEmptyTrigger._get_sync_hook"
+    )
+    async def test_run_buffer_not_empty_keeps_polling(
+        self, mock_get_hook, mock_is_empty, streaming_buffer_trigger
+    ):
+        """When buffer is not empty, trigger keeps polling (does not yield 
immediately)."""
+        mock_is_empty.return_value = False
+
+        task = asyncio.create_task(streaming_buffer_trigger.run().__anext__())
+        await asyncio.sleep(0.5)

Review Comment:
   These tests don’t match the trigger implementation: (1) the trigger exposes 
`_get_async_hook`, not `_get_sync_hook`, so the patch target is invalid; (2) 
`_is_streaming_buffer_empty` currently uses 
`BigQueryTableAsyncHook.get_table_client()` + `await client.get()`, but the 
tests are written around `BigQueryHook.get_client().get_table()` and SDK 
`BQTable`. Update the tests to mock `BigQueryTableAsyncHook.get_table_client` 
and the returned client’s async `get()` response (including `streamingBuffer` 
presence/absence). Also avoid `asyncio.get_event_loop().stop()` in tests—cancel 
the task and assert via a timeout (e.g., `asyncio.wait_for(..., timeout=...)`) 
to prevent leaking tasks and interfering with the pytest-asyncio loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to