This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d6e7c45  Adding Variable.update method and improving detection of 
variable key collisions (#18159)
d6e7c45 is described below

commit d6e7c4500bd7e180a62c86a20d0f6d6a761c3817
Author: Sam Wheating <[email protected]>
AuthorDate: Sun Sep 12 13:30:18 2021 -0700

    Adding Variable.update method and improving detection of variable key 
collisions (#18159)
    
    * Update Variable.set() method to take a description argument.
    * Update Variable.setdefault() method to take a description argument.
    * Adding a Variable.update() method which will throw a KeyError if the 
Variable doesn't exist, and an AttributeError if it
      doesn't exist in the Database (since a non-metastore Variable can't be 
modified)
    * Improved logging around key collisions between different variable 
backends.
    *  Updated documentation to warn users about key collisions between 
variable backends.
    * If a user has a duplicated key in the metastore and an extra secrets 
backend, then updates to the Variable will update the
      value in the metastore, but reads will read the value in the additional 
backend.
---
 airflow/models/variable.py                         | 89 ++++++++++++++++++----
 .../security/secrets/secrets-backend/index.rst     |  6 ++
 tests/models/test_variable.py                      | 59 +++++++++++++-
 3 files changed, 137 insertions(+), 17 deletions(-)

diff --git a/airflow/models/variable.py b/airflow/models/variable.py
index b11194c..b724686 100644
--- a/airflow/models/variable.py
+++ b/airflow/models/variable.py
@@ -18,7 +18,6 @@
 
 import json
 import logging
-import os
 from typing import Any, Optional
 
 from cryptography.fernet import InvalidToken as InvalidFernetToken
@@ -29,6 +28,7 @@ from sqlalchemy.orm import Session, reconstructor, synonym
 from airflow.configuration import ensure_secrets_loaded
 from airflow.models.base import ID_LEN, Base
 from airflow.models.crypto import get_fernet
+from airflow.secrets.metastore import MetastoreBackend
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.log.secrets_masker import mask_secret
 from airflow.utils.session import provide_session
@@ -94,7 +94,7 @@ class Variable(Base, LoggingMixin):
         return synonym('_val', descriptor=property(cls.get_val, cls.set_val))
 
     @classmethod
-    def setdefault(cls, key, default, deserialize_json=False):
+    def setdefault(cls, key, default, description=None, 
deserialize_json=False):
         """
         Like a Python builtin dict object, setdefault returns the current value
         for a key, and if it isn't there, stores the default value and returns 
it.
@@ -111,7 +111,7 @@ class Variable(Base, LoggingMixin):
         obj = Variable.get(key, default_var=None, 
deserialize_json=deserialize_json)
         if obj is None:
             if default is not None:
-                Variable.set(key, default, serialize_json=deserialize_json)
+                Variable.set(key, default, description=description, 
serialize_json=deserialize_json)
                 return default
             else:
                 raise ValueError('Default Value must be set')
@@ -149,34 +149,65 @@ class Variable(Base, LoggingMixin):
 
     @classmethod
     @provide_session
-    def set(cls, key: str, value: Any, serialize_json: bool = False, session: 
Session = None):
+    def set(
+        cls,
+        key: str,
+        value: Any,
+        description: str = None,
+        serialize_json: bool = False,
+        session: Session = None,
+    ):
         """
-        Sets a value for an Airflow Variable with a given Key
+        Sets a value for an Airflow Variable with a given Key.
+        This operation will overwrite an existing variable.
 
         :param key: Variable Key
         :param value: Value to set for the Variable
+        :param description: Value to set for the Variable
         :param serialize_json: Serialize the value to a JSON string
         :param session: SQL Alchemy Sessions
         """
-        env_var_name = "AIRFLOW_VAR_" + key.upper()
-        if env_var_name in os.environ:
-            log.warning(
-                "You have the environment variable %s defined, which takes 
precedence over reading "
-                "from the database. The value will be saved, but to read it 
you have to delete "
-                "the environment variable.",
-                env_var_name,
-            )
+        # check if the secret exists in the custom secrets backend.
+        cls.check_for_write_conflict(key)
         if serialize_json:
             stored_value = json.dumps(value, indent=2)
         else:
             stored_value = str(value)
 
         Variable.delete(key, session=session)
-        session.add(Variable(key=key, val=stored_value))
+        session.add(Variable(key=key, val=stored_value, 
description=description))
         session.flush()
 
     @classmethod
     @provide_session
+    def update(
+        cls,
+        key: str,
+        value: Any,
+        serialize_json: bool = False,
+        session: Session = None,
+    ):
+        """
+        Updates a given Airflow Variable with the Provided value
+
+        :param key: Variable Key
+        :param value: Value to set for the Variable
+        :param serialize_json: Serialize the value to a JSON string
+        :param session: SQL Alchemy Session
+        """
+        cls.check_for_write_conflict(key)
+
+        if cls.get_variable_from_secrets(key) is None:
+            raise KeyError(f'Variable {key} does not exist')
+
+        obj = session.query(cls).filter(cls.key == key).first()
+        if obj is None:
+            raise AttributeError(f'Variable {key} does not exist in the 
Database and cannot be updated.')
+
+        cls.set(key, value, description=obj.description, 
serialize_json=serialize_json)
+
+    @classmethod
+    @provide_session
     def delete(cls, key: str, session: Session = None) -> int:
         """
         Delete an Airflow Variable for a given key
@@ -192,6 +223,36 @@ class Variable(Base, LoggingMixin):
         if self._val and self.is_encrypted:
             self._val = fernet.rotate(self._val.encode('utf-8')).decode()
 
+    def check_for_write_conflict(key: str) -> None:
+        """
+        Logs a warning if a variable exists outside of the metastore.
+
+        If we try to write a variable to the metastore while the same key
+        exists in an environment variable or custom secrets backend, then
+        subsequent reads will not read the set value.
+
+        :param key: Variable Key
+        """
+        for secrets_backend in ensure_secrets_loaded():
+            if not isinstance(secrets_backend, MetastoreBackend):
+                try:
+                    var_val = secrets_backend.get_variable(key=key)
+                    if var_val is not None:
+                        log.warning(
+                            "The variable {key} is defined in the {cls} 
secrets backend, which takes "
+                            "precedence over reading from the database. The 
value in the database will be "
+                            "updated, but to read it you have to delete the 
conflicting variable "
+                            "from {cls}".format(key=key, 
cls=secrets_backend.__class__.__name__)
+                        )
+                        return
+                except Exception:  # pylint: disable=broad-except
+                    log.exception(
+                        'Unable to retrieve variable from secrets backend 
(%s). '
+                        'Checking subsequent secrets backend.',
+                        type(secrets_backend).__name__,
+                    )
+            return None
+
     @staticmethod
     def get_variable_from_secrets(key: str) -> Optional[str]:
         """
diff --git a/docs/apache-airflow/security/secrets/secrets-backend/index.rst 
b/docs/apache-airflow/security/secrets/secrets-backend/index.rst
index a70556d..befa84b 100644
--- a/docs/apache-airflow/security/secrets/secrets-backend/index.rst
+++ b/docs/apache-airflow/security/secrets/secrets-backend/index.rst
@@ -41,6 +41,12 @@ database second.
 If you enable an alternative secrets backend, it will be searched first, 
followed by environment variables,
 then metastore.  This search ordering is not configurable.
 
+.. warning::
+
+    When using environment variables or an alternative secrets backend to 
store secrets or variables, it is possible to create key collisions.
+    In the event of a duplicated key between backends, all write operations 
will update the value in the metastore, but all read operations will
+    return the first match for the requested key starting with the custom 
backend, then the environment variables and finally the metastore.
+
 .. _secrets_backend_configuration:
 
 Configuration
diff --git a/tests/models/test_variable.py b/tests/models/test_variable.py
index 28aa056..721e7a5 100644
--- a/tests/models/test_variable.py
+++ b/tests/models/test_variable.py
@@ -26,6 +26,7 @@ from parameterized import parameterized
 
 from airflow import settings
 from airflow.models import Variable, crypto, variable
+from airflow.secrets.metastore import MetastoreBackend
 from tests.test_utils import db
 from tests.test_utils.config import conf_vars
 
@@ -111,9 +112,30 @@ class TestVariable(unittest.TestCase):
             assert "new-db-value" == Variable.get("key")
 
         assert log_context.records[0].message == (
-            'You have the environment variable AIRFLOW_VAR_KEY defined, which 
takes precedence over '
-            'reading from the database. The value will be saved, but to read 
it you have to delete '
-            'the environment variable.'
+            "The variable key is defined in the EnvironmentVariablesBackend 
secrets backend, "
+            "which takes precedence over reading from the database. The value 
in the database "
+            "will be updated, but to read it you have to delete the 
conflicting variable from "
+            "EnvironmentVariablesBackend"
+        )
+
+    @mock.patch('airflow.models.variable.ensure_secrets_loaded')
+    def test_variable_set_with_extra_secret_backend(self, mock_ensure_secrets):
+
+        mock_backend = mock.Mock()
+        mock_backend.get_variable.return_value = "secret_val"
+        mock_backend.__class__.__name__ = 'MockSecretsBackend'
+        mock_ensure_secrets.return_value = [mock_backend, MetastoreBackend]
+
+        with self.assertLogs(variable.log) as log_context:
+            Variable.set("key", "new-db-value")
+
+        assert Variable.get("key") == "secret_val"
+
+        assert log_context.records[0].message == (
+            "The variable key is defined in the MockSecretsBackend secrets 
backend, "
+            "which takes precedence over reading from the database. The value 
in the database "
+            "will be updated, but to read it you have to delete the 
conflicting variable from "
+            "MockSecretsBackend"
         )
 
     def test_variable_set_get_round_trip_json(self):
@@ -121,6 +143,33 @@ class TestVariable(unittest.TestCase):
         Variable.set("tested_var_set_id", value, serialize_json=True)
         assert value == Variable.get("tested_var_set_id", 
deserialize_json=True)
 
+    def test_variable_update(self):
+        Variable.set("test_key", "value1")
+        assert "value1" == Variable.get("test_key")
+        Variable.update("test_key", "value2")
+        assert "value2" == Variable.get("test_key")
+
+    def test_variable_update_fails_on_non_metastore_variable(self):
+        with mock.patch.dict('os.environ', AIRFLOW_VAR_KEY="env-value"):
+            with pytest.raises(AttributeError):
+                Variable.update("key", "new-value")
+
+    def test_variable_update_preserves_description(self):
+        Variable.set("key", "value", description="a test variable")
+        assert Variable.get("key") == "value"
+        Variable.update("key", "value2")
+        session = settings.Session()
+        test_var = session.query(Variable).filter(Variable.key == 'key').one()
+        assert test_var.val == "value2"
+        assert test_var.description == "a test variable"
+
+    def test_set_variable_sets_description(self):
+        Variable.set('key', 'value', description="a test variable")
+        session = settings.Session()
+        test_var = session.query(Variable).filter(Variable.key == 'key').one()
+        assert test_var.description == "a test variable"
+        assert test_var.val == 'value'
+
     def test_variable_set_existing_value_to_blank(self):
         test_value = 'Some value'
         test_key = 'test_key'
@@ -136,6 +185,10 @@ class TestVariable(unittest.TestCase):
         with pytest.raises(KeyError):
             Variable.get("thisIdDoesNotExist")
 
+    def test_update_non_existing_var_should_raise_key_error(self):
+        with pytest.raises(KeyError):
+            Variable.update("thisIdDoesNotExist", "value")
+
     def test_get_non_existing_var_with_none_default_should_return_none(self):
         assert Variable.get("thisIdDoesNotExist", default_var=None) is None
 

Reply via email to