This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 56eae9c9a4 Deprecate `runtime_parameters` in favor of options in 
`hook_params` (#32345)
56eae9c9a4 is described below

commit 56eae9c9a4b784e7b239335560c31ed30f57e0a1
Author: JDarDagran <[email protected]>
AuthorDate: Tue Jul 4 22:52:58 2023 +0200

    Deprecate `runtime_parameters` in favor of options in `hook_params` (#32345)
    
    for PostgresOperator.
    
    Signed-off-by: Jakub Dardzinski <[email protected]>
---
 airflow/providers/postgres/hooks/postgres.py       |  9 +++++-
 airflow/providers/postgres/operators/postgres.py   | 32 +++++++---------------
 .../operators/postgres_operator_howto_guide.rst    |  9 +++---
 tests/providers/postgres/hooks/test_postgres.py    | 14 ++++++++++
 .../providers/postgres/operators/test_postgres.py  |  1 +
 .../system/providers/postgres/example_postgres.py  |  2 +-
 6 files changed, 39 insertions(+), 28 deletions(-)

diff --git a/airflow/providers/postgres/hooks/postgres.py 
b/airflow/providers/postgres/hooks/postgres.py
index 0a7aa0732a..2e76029028 100644
--- a/airflow/providers/postgres/hooks/postgres.py
+++ b/airflow/providers/postgres/hooks/postgres.py
@@ -59,6 +59,9 @@ class PostgresHook(DbApiHook):
 
     :param postgres_conn_id: The :ref:`postgres conn id 
<howto/connection:postgres>`
         reference to a specific postgres database.
+    :param options: Optional. Specifies command-line options to send to the 
server
+        at connection start. For example, setting this to ``-c 
search_path=myschema``
+        sets the session's value of the ``search_path`` to ``myschema``.
     """
 
     conn_name_attr = "postgres_conn_id"
@@ -67,7 +70,7 @@ class PostgresHook(DbApiHook):
     hook_name = "Postgres"
     supports_autocommit = True
 
-    def __init__(self, *args, **kwargs) -> None:
+    def __init__(self, *args, options: str | None = None, **kwargs) -> None:
         if "schema" in kwargs:
             warnings.warn(
                 'The "schema" arg has been renamed to "database" as it 
contained the database name.'
@@ -80,6 +83,7 @@ class PostgresHook(DbApiHook):
         self.connection: Connection | None = kwargs.pop("connection", None)
         self.conn: connection = None
         self.database: str | None = kwargs.pop("database", None)
+        self.options = options
 
     @property
     def schema(self):
@@ -131,6 +135,9 @@ class PostgresHook(DbApiHook):
         if raw_cursor:
             conn_args["cursor_factory"] = self._get_cursor(raw_cursor)
 
+        if self.options:
+            conn_args["options"] = self.options
+
         for arg_name, arg_val in conn.extra_dejson.items():
             if arg_name not in [
                 "iam",
diff --git a/airflow/providers/postgres/operators/postgres.py 
b/airflow/providers/postgres/operators/postgres.py
index e70ac72027..ef92b51dda 100644
--- a/airflow/providers/postgres/operators/postgres.py
+++ b/airflow/providers/postgres/operators/postgres.py
@@ -20,8 +20,6 @@ from __future__ import annotations
 import warnings
 from typing import Mapping, Sequence
 
-from psycopg2.sql import SQL, Identifier
-
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
 
@@ -41,6 +39,7 @@ class PostgresOperator(SQLExecuteQueryOperator):
     :param database: name of database which overwrite defined one in connection
     :param runtime_parameters: a mapping of runtime params added to the final 
sql being executed.
         For example, you could set the schema via `{"search_path": 
"CUSTOM_SCHEMA"}`.
+        Deprecated - use `hook_params={'options': '-c <connection_options>'}` 
instead.
     """
 
     template_fields: Sequence[str] = ("sql",)
@@ -61,26 +60,15 @@ class PostgresOperator(SQLExecuteQueryOperator):
             kwargs["hook_params"] = {"schema": database, **hook_params}
 
         if runtime_parameters:
-            sql = kwargs.pop("sql")
-            parameters = kwargs.pop("parameters", {})
-
-            final_sql = []
-            sql_param = {}
-            for param in runtime_parameters:
-                set_param_sql = f"SET {{}} TO %({param})s;"
-                dynamic_sql = SQL(set_param_sql).format(Identifier(f"{param}"))
-                final_sql.append(dynamic_sql)
-            for param, val in runtime_parameters.items():
-                sql_param.update({f"{param}": f"{val}"})
-            if parameters:
-                sql_param.update(parameters)
-            if isinstance(sql, str):
-                final_sql.append(SQL(sql))
-            else:
-                final_sql.extend(list(map(SQL, sql)))
-
-            kwargs["sql"] = final_sql
-            kwargs["parameters"] = sql_param
+            warnings.warn(
+                """`runtime_parameters` is deprecated.
+                Please use `hook_params={'options': '-c 
<connection_options>}`.""",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
+            hook_params = kwargs.pop("hook_params", {})
+            options = " ".join(f"-c {param}={val}" for param, val in 
runtime_parameters.items())
+            kwargs["hook_params"] = {"options": options, **hook_params}
 
         super().__init__(conn_id=postgres_conn_id, **kwargs)
         warnings.warn(
diff --git 
a/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
 
b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
index 7fb4150d8a..962bbf87de 100644
--- 
a/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
+++ 
b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
@@ -159,8 +159,9 @@ class.
 Passing Server Configuration Parameters into PostgresOperator
 -------------------------------------------------------------
 
-PostgresOperator provides the optional ``runtime_parameters`` attribute which 
makes it possible to set
-the `server configuration parameter values 
<https://www.postgresql.org/docs/current/runtime-config-client.html>`_ for the 
SQL request during runtime.
+PostgresOperator provides ``hook_params`` attribute that allows you to pass 
add parameters to PostgresHook.
+You can pass ``options`` argument this way so that you specify `command-line 
options 
<https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNECT-OPTIONS>`_
+sent to the server at connection start.
 
 .. exampleinclude:: /../../tests/system/providers/postgres/example_postgres.py
     :language: python
@@ -186,5 +187,5 @@ In this how-to guide we explored the Apache Airflow 
PostgreOperator. Let's quick
 It is best practice to create subdirectory called ``sql`` in your ``dags`` 
directory where you can store your sql files.
 This will make your code more elegant and more maintainable.
 And finally, we looked at the different ways you can dynamically pass 
parameters into our PostgresOperator
-tasks using ``parameters`` or ``params`` attribute and how you can control the 
server configuration parameters by passing
-the ``runtime_parameters`` attribute.
+tasks using ``parameters`` or ``params`` attribute and how you can control the 
session parameters by passing
+options in the ``hook_params`` attribute.
diff --git a/tests/providers/postgres/hooks/test_postgres.py 
b/tests/providers/postgres/hooks/test_postgres.py
index 804ee9da9a..a6fa619634 100644
--- a/tests/providers/postgres/hooks/test_postgres.py
+++ b/tests/providers/postgres/hooks/test_postgres.py
@@ -101,6 +101,20 @@ class TestPostgresHookConn:
             user="login-conn", password="password-conn", host="host", 
dbname="database-override", port=None
         )
 
+    @mock.patch("airflow.providers.postgres.hooks.postgres.psycopg2.connect")
+    def test_get_conn_from_connection_with_options(self, mock_connect):
+        conn = Connection(login="login-conn", password="password-conn", 
host="host", schema="database")
+        hook = PostgresHook(connection=conn, options="-c 
statement_timeout=3000ms")
+        hook.get_conn()
+        mock_connect.assert_called_once_with(
+            user="login-conn",
+            password="password-conn",
+            host="host",
+            dbname="database",
+            port=None,
+            options="-c statement_timeout=3000ms",
+        )
+
     @mock.patch("airflow.providers.postgres.hooks.postgres.psycopg2.connect")
     @mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook")
     @pytest.mark.parametrize("aws_conn_id", [NOTSET, None, "mock_aws_conn"])
diff --git a/tests/providers/postgres/operators/test_postgres.py 
b/tests/providers/postgres/operators/test_postgres.py
index 4b615917ea..b58abf3497 100644
--- a/tests/providers/postgres/operators/test_postgres.py
+++ b/tests/providers/postgres/operators/test_postgres.py
@@ -112,3 +112,4 @@ class TestPostgres:
             runtime_parameters={"statement_timeout": "3000ms"},
         )
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+        assert op.get_db_hook().get_first("SHOW statement_timeout;")[0] == "3s"
diff --git a/tests/system/providers/postgres/example_postgres.py 
b/tests/system/providers/postgres/example_postgres.py
index ceab527195..fd0a80783c 100644
--- a/tests/system/providers/postgres/example_postgres.py
+++ b/tests/system/providers/postgres/example_postgres.py
@@ -73,7 +73,7 @@ with DAG(
         task_id="get_birth_date",
         sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC 
%(begin_date)s AND %(end_date)s",
         parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
-        runtime_parameters={"statement_timeout": "3000ms"},
+        hook_params={"options": "-c statement_timeout=3000ms"},
     )
     # [END postgres_operator_howto_guide_get_birth_date]
 

Reply via email to