This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 56eae9c9a4 Deprecate `runtime_parameters` in favor of options in
`hook_params` (#32345)
56eae9c9a4 is described below
commit 56eae9c9a4b784e7b239335560c31ed30f57e0a1
Author: JDarDagran <[email protected]>
AuthorDate: Tue Jul 4 22:52:58 2023 +0200
Deprecate `runtime_parameters` in favor of options in `hook_params` (#32345)
for PostgresOperator.
Signed-off-by: Jakub Dardzinski <[email protected]>
---
airflow/providers/postgres/hooks/postgres.py | 9 +++++-
airflow/providers/postgres/operators/postgres.py | 32 +++++++---------------
.../operators/postgres_operator_howto_guide.rst | 9 +++---
tests/providers/postgres/hooks/test_postgres.py | 14 ++++++++++
.../providers/postgres/operators/test_postgres.py | 1 +
.../system/providers/postgres/example_postgres.py | 2 +-
6 files changed, 39 insertions(+), 28 deletions(-)
diff --git a/airflow/providers/postgres/hooks/postgres.py
b/airflow/providers/postgres/hooks/postgres.py
index 0a7aa0732a..2e76029028 100644
--- a/airflow/providers/postgres/hooks/postgres.py
+++ b/airflow/providers/postgres/hooks/postgres.py
@@ -59,6 +59,9 @@ class PostgresHook(DbApiHook):
:param postgres_conn_id: The :ref:`postgres conn id
<howto/connection:postgres>`
reference to a specific postgres database.
+ :param options: Optional. Specifies command-line options to send to the
server
+ at connection start. For example, setting this to ``-c
search_path=myschema``
+ sets the session's value of the ``search_path`` to ``myschema``.
"""
conn_name_attr = "postgres_conn_id"
@@ -67,7 +70,7 @@ class PostgresHook(DbApiHook):
hook_name = "Postgres"
supports_autocommit = True
- def __init__(self, *args, **kwargs) -> None:
+ def __init__(self, *args, options: str | None = None, **kwargs) -> None:
if "schema" in kwargs:
warnings.warn(
'The "schema" arg has been renamed to "database" as it
contained the database name.'
@@ -80,6 +83,7 @@ class PostgresHook(DbApiHook):
self.connection: Connection | None = kwargs.pop("connection", None)
self.conn: connection = None
self.database: str | None = kwargs.pop("database", None)
+ self.options = options
@property
def schema(self):
@@ -131,6 +135,9 @@ class PostgresHook(DbApiHook):
if raw_cursor:
conn_args["cursor_factory"] = self._get_cursor(raw_cursor)
+ if self.options:
+ conn_args["options"] = self.options
+
for arg_name, arg_val in conn.extra_dejson.items():
if arg_name not in [
"iam",
diff --git a/airflow/providers/postgres/operators/postgres.py
b/airflow/providers/postgres/operators/postgres.py
index e70ac72027..ef92b51dda 100644
--- a/airflow/providers/postgres/operators/postgres.py
+++ b/airflow/providers/postgres/operators/postgres.py
@@ -20,8 +20,6 @@ from __future__ import annotations
import warnings
from typing import Mapping, Sequence
-from psycopg2.sql import SQL, Identifier
-
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
@@ -41,6 +39,7 @@ class PostgresOperator(SQLExecuteQueryOperator):
:param database: name of database which overwrite defined one in connection
:param runtime_parameters: a mapping of runtime params added to the final
sql being executed.
For example, you could set the schema via `{"search_path":
"CUSTOM_SCHEMA"}`.
+ Deprecated - use `hook_params={'options': '-c <connection_options>'}`
instead.
"""
template_fields: Sequence[str] = ("sql",)
@@ -61,26 +60,15 @@ class PostgresOperator(SQLExecuteQueryOperator):
kwargs["hook_params"] = {"schema": database, **hook_params}
if runtime_parameters:
- sql = kwargs.pop("sql")
- parameters = kwargs.pop("parameters", {})
-
- final_sql = []
- sql_param = {}
- for param in runtime_parameters:
- set_param_sql = f"SET {{}} TO %({param})s;"
- dynamic_sql = SQL(set_param_sql).format(Identifier(f"{param}"))
- final_sql.append(dynamic_sql)
- for param, val in runtime_parameters.items():
- sql_param.update({f"{param}": f"{val}"})
- if parameters:
- sql_param.update(parameters)
- if isinstance(sql, str):
- final_sql.append(SQL(sql))
- else:
- final_sql.extend(list(map(SQL, sql)))
-
- kwargs["sql"] = final_sql
- kwargs["parameters"] = sql_param
+ warnings.warn(
+ """`runtime_parameters` is deprecated.
+ Please use `hook_params={'options': '-c
<connection_options>}`.""",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ hook_params = kwargs.pop("hook_params", {})
+ options = " ".join(f"-c {param}={val}" for param, val in
runtime_parameters.items())
+ kwargs["hook_params"] = {"options": options, **hook_params}
super().__init__(conn_id=postgres_conn_id, **kwargs)
warnings.warn(
diff --git
a/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
index 7fb4150d8a..962bbf87de 100644
---
a/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
+++
b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
@@ -159,8 +159,9 @@ class.
Passing Server Configuration Parameters into PostgresOperator
-------------------------------------------------------------
-PostgresOperator provides the optional ``runtime_parameters`` attribute which
makes it possible to set
-the `server configuration parameter values
<https://www.postgresql.org/docs/current/runtime-config-client.html>`_ for the
SQL request during runtime.
+PostgresOperator provides ``hook_params`` attribute that allows you to pass
add parameters to PostgresHook.
+You can pass ``options`` argument this way so that you specify `command-line
options
<https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNECT-OPTIONS>`_
+sent to the server at connection start.
.. exampleinclude:: /../../tests/system/providers/postgres/example_postgres.py
:language: python
@@ -186,5 +187,5 @@ In this how-to guide we explored the Apache Airflow
PostgreOperator. Let's quick
It is best practice to create subdirectory called ``sql`` in your ``dags``
directory where you can store your sql files.
This will make your code more elegant and more maintainable.
And finally, we looked at the different ways you can dynamically pass
parameters into our PostgresOperator
-tasks using ``parameters`` or ``params`` attribute and how you can control the
server configuration parameters by passing
-the ``runtime_parameters`` attribute.
+tasks using ``parameters`` or ``params`` attribute and how you can control the
session parameters by passing
+options in the ``hook_params`` attribute.
diff --git a/tests/providers/postgres/hooks/test_postgres.py
b/tests/providers/postgres/hooks/test_postgres.py
index 804ee9da9a..a6fa619634 100644
--- a/tests/providers/postgres/hooks/test_postgres.py
+++ b/tests/providers/postgres/hooks/test_postgres.py
@@ -101,6 +101,20 @@ class TestPostgresHookConn:
user="login-conn", password="password-conn", host="host",
dbname="database-override", port=None
)
+ @mock.patch("airflow.providers.postgres.hooks.postgres.psycopg2.connect")
+ def test_get_conn_from_connection_with_options(self, mock_connect):
+ conn = Connection(login="login-conn", password="password-conn",
host="host", schema="database")
+ hook = PostgresHook(connection=conn, options="-c
statement_timeout=3000ms")
+ hook.get_conn()
+ mock_connect.assert_called_once_with(
+ user="login-conn",
+ password="password-conn",
+ host="host",
+ dbname="database",
+ port=None,
+ options="-c statement_timeout=3000ms",
+ )
+
@mock.patch("airflow.providers.postgres.hooks.postgres.psycopg2.connect")
@mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook")
@pytest.mark.parametrize("aws_conn_id", [NOTSET, None, "mock_aws_conn"])
diff --git a/tests/providers/postgres/operators/test_postgres.py
b/tests/providers/postgres/operators/test_postgres.py
index 4b615917ea..b58abf3497 100644
--- a/tests/providers/postgres/operators/test_postgres.py
+++ b/tests/providers/postgres/operators/test_postgres.py
@@ -112,3 +112,4 @@ class TestPostgres:
runtime_parameters={"statement_timeout": "3000ms"},
)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
+ assert op.get_db_hook().get_first("SHOW statement_timeout;")[0] == "3s"
diff --git a/tests/system/providers/postgres/example_postgres.py
b/tests/system/providers/postgres/example_postgres.py
index ceab527195..fd0a80783c 100644
--- a/tests/system/providers/postgres/example_postgres.py
+++ b/tests/system/providers/postgres/example_postgres.py
@@ -73,7 +73,7 @@ with DAG(
task_id="get_birth_date",
sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC
%(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
- runtime_parameters={"statement_timeout": "3000ms"},
+ hook_params={"options": "-c statement_timeout=3000ms"},
)
# [END postgres_operator_howto_guide_get_birth_date]