Copilot commented on code in PR #10791: URL: https://github.com/apache/gravitino/pull/10791#discussion_r3135933173
########## clients/client-python/tests/unittests/test_gvfs_credential_cache.py: ########## @@ -0,0 +1,319 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint: disable=protected-access,import-outside-toplevel,no-value-for-parameter,broad-exception-caught,unsubscriptable-object +import sys +import threading +import time +import unittest +from unittest.mock import MagicMock + +from gravitino.api.credential.s3_token_credential import S3TokenCredential +from gravitino.filesystem.gvfs_config import GVFSConfig +from gravitino.name_identifier import NameIdentifier + + +def _create_mock_credential(expire_time_in_ms): + """Create a mock S3TokenCredential with the given expire time.""" + return S3TokenCredential( + credential_info={ + S3TokenCredential._SESSION_ACCESS_KEY_ID: "access_id", + S3TokenCredential._SESSION_SECRET_ACCESS_KEY: "secret_key", + S3TokenCredential._SESSION_TOKEN: "session_token", + }, + expire_time_in_ms=expire_time_in_ms, + ) + + +def _create_mock_fileset(credentials_list): + """Create a mock fileset that returns the given credentials.""" + from gravitino.client.generic_fileset import GenericFileset + + mock_support = MagicMock() + mock_support.get_credentials.return_value = credentials_list + mock_fileset = MagicMock(spec=GenericFileset) + mock_fileset.support_credentials.return_value = mock_support + return mock_fileset + + +class _ConcreteGVFSOperations: + """Minimal concrete subclass to test BaseGVFSOperations credential cache logic.""" + + pass + + +class TestGVFSCredentialCache(unittest.TestCase): + """Tests for credential-level lazy caching in BaseGVFSOperations.""" + + def _create_operations(self, options=None): + """Create a BaseGVFSOperations-like object with credential cache fields.""" + # We directly construct the relevant fields from BaseGVFSOperations.__init__ + # without going through the full constructor to avoid needing all dependencies. + from readerwriterlock import rwlock + + ops = MagicMock() + ops._options = options or {} + ops._enable_credential_vending = (options or {}).get( + GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING, False + ) + ops._credential_cache = {} + ops._credential_cache_lock = rwlock.RWLockFair() + Review Comment: These tests set ops._credential_cache to a plain dict, but production uses cachetools.LRUCache. Using the real LRUCache here would better match production semantics (e.g., eviction hooks/order and get/set behavior) and reduce the chance of tests passing while the real cache behaves differently. ########## clients/client-python/tests/unittests/test_gvfs_credential_cache.py: ########## @@ -0,0 +1,319 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint: disable=protected-access,import-outside-toplevel,no-value-for-parameter,broad-exception-caught,unsubscriptable-object +import sys +import threading +import time +import unittest +from unittest.mock import MagicMock + +from gravitino.api.credential.s3_token_credential import S3TokenCredential +from gravitino.filesystem.gvfs_config import GVFSConfig +from gravitino.name_identifier import NameIdentifier + + +def _create_mock_credential(expire_time_in_ms): + """Create a mock S3TokenCredential with the given expire time.""" + return S3TokenCredential( + credential_info={ + S3TokenCredential._SESSION_ACCESS_KEY_ID: "access_id", + S3TokenCredential._SESSION_SECRET_ACCESS_KEY: "secret_key", + S3TokenCredential._SESSION_TOKEN: "session_token", + }, + expire_time_in_ms=expire_time_in_ms, + ) + + +def _create_mock_fileset(credentials_list): + """Create a mock fileset that returns the given credentials.""" + from gravitino.client.generic_fileset import GenericFileset + + mock_support = MagicMock() + mock_support.get_credentials.return_value = credentials_list + mock_fileset = MagicMock(spec=GenericFileset) + mock_fileset.support_credentials.return_value = mock_support + return mock_fileset + + +class _ConcreteGVFSOperations: + """Minimal concrete subclass to test BaseGVFSOperations credential cache logic.""" + + pass + + +class TestGVFSCredentialCache(unittest.TestCase): + """Tests for credential-level lazy caching in BaseGVFSOperations.""" + + def _create_operations(self, options=None): + """Create a BaseGVFSOperations-like object with credential cache fields.""" + # We directly construct the relevant fields from BaseGVFSOperations.__init__ + # without going through the full constructor to avoid needing all dependencies. + from readerwriterlock import rwlock + + ops = MagicMock() + ops._options = options or {} + ops._enable_credential_vending = (options or {}).get( + GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING, False + ) + ops._credential_cache = {} + ops._credential_cache_lock = rwlock.RWLockFair() + + # Bind the real methods from BaseGVFSOperations + from gravitino.filesystem.gvfs_base_operations import BaseGVFSOperations + + ops._file_system_expired = BaseGVFSOperations._file_system_expired.__get__(ops) + ops._calculate_credential_expire_time = ( + BaseGVFSOperations._calculate_credential_expire_time.__get__(ops) + ) + ops._get_credentials_with_cache = ( + BaseGVFSOperations._get_credentials_with_cache.__get__(ops) + ) + return ops + + def test_credential_vending_disabled_returns_none(self): + """When credential vending is disabled, should return None without touching cache.""" + ops = self._create_operations( + {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: False} + ) + fileset = _create_mock_fileset([]) + + result = ops._get_credentials_with_cache( + NameIdentifier.of("metalake", "catalog", "schema", "fileset"), + fileset, + "default", + ) + + self.assertIsNone(result) + fileset.support_credentials().get_credentials.assert_not_called() + + def test_first_call_fetches_from_server(self): + """First call should fetch credentials from server and cache them.""" + credential = _create_mock_credential( + int(time.time() * 1000) + 3600_000 + ) # 1 hour from now + fileset = _create_mock_fileset([credential]) + ops = self._create_operations( + {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True} + ) + fileset_ident = NameIdentifier.of("metalake", "catalog", "schema", "fileset") + + result = ops._get_credentials_with_cache(fileset_ident, fileset, "default") + + self.assertEqual(result, [credential]) + fileset.support_credentials().get_credentials.assert_called_once() + + def test_cached_credentials_returned_without_server_call(self): + """Second call with valid cache should not hit the server.""" + credential = _create_mock_credential( + int(time.time() * 1000) + 3600_000 + ) # 1 hour from now + fileset = _create_mock_fileset([credential]) + ops = self._create_operations( + {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True} + ) + fileset_ident = NameIdentifier.of("metalake", "catalog", "schema", "fileset") + + # First call - fetches from server + result1 = ops._get_credentials_with_cache(fileset_ident, fileset, "default") + # Second call - should use cache + result2 = ops._get_credentials_with_cache(fileset_ident, fileset, "default") + + self.assertEqual(result1, result2) + # Server should only be called once + fileset.support_credentials().get_credentials.assert_called_once() + + def test_expired_credentials_trigger_refresh(self): + """When cached credentials are expired, should fetch new ones from server.""" + from gravitino.client.generic_fileset import GenericFileset + + # Create credential that's already expired (expire time in the past) + expired_credential = _create_mock_credential( + int(time.time() * 1000) - 1000 + ) # 1 second ago + + # New credential with future expiry + fresh_credential = _create_mock_credential(int(time.time() * 1000) + 3600_000) + + # Use a single mock fileset with side_effect to return different values + mock_support = MagicMock() + mock_support.get_credentials.side_effect = [ + [expired_credential], + [fresh_credential], + ] + fileset = MagicMock(spec=GenericFileset) + fileset.support_credentials.return_value = mock_support + + ops = self._create_operations( + {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True} + ) + fileset_ident = NameIdentifier.of("metalake", "catalog", "schema", "fileset") + + # First call - fetches expired credential from server and caches it + result1 = ops._get_credentials_with_cache(fileset_ident, fileset, "default") + # Second call - cache is expired, calls get_credentials() again + result2 = ops._get_credentials_with_cache(fileset_ident, fileset, "default") + + self.assertEqual(result1, [expired_credential]) + self.assertEqual(result2, [fresh_credential]) + # get_credentials() should have been called twice (first: no cache, second: expired) + self.assertEqual( + mock_support.get_credentials.call_count, + 2, + ) + + def test_different_locations_cached_separately(self): + """Different location names should have separate cache entries.""" + credential_s3 = _create_mock_credential(int(time.time() * 1000) + 3600_000) + credential_oss = _create_mock_credential(int(time.time() * 1000) + 7200_000) + + fileset_s3 = _create_mock_fileset([credential_s3]) + fileset_oss = _create_mock_fileset([credential_oss]) + + ops = self._create_operations( + {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True} + ) + fileset_ident = NameIdentifier.of("metalake", "catalog", "schema", "fileset") + + result_s3 = ops._get_credentials_with_cache( + fileset_ident, fileset_s3, "s3-location" + ) + result_oss = ops._get_credentials_with_cache( + fileset_ident, fileset_oss, "oss-location" + ) + + self.assertEqual(result_s3, [credential_s3]) + self.assertEqual(result_oss, [credential_oss]) + fileset_s3.support_credentials().get_credentials.assert_called_once() + fileset_oss.support_credentials().get_credentials.assert_called_once() + + def test_different_filesets_cached_separately(self): + """Different fileset identifiers should have separate cache entries.""" + credential1 = _create_mock_credential(int(time.time() * 1000) + 3600_000) + credential2 = _create_mock_credential(int(time.time() * 1000) + 7200_000) + + fileset1 = _create_mock_fileset([credential1]) + fileset2 = _create_mock_fileset([credential2]) + + ops = self._create_operations( + {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True} + ) + ident1 = NameIdentifier.of("metalake", "catalog", "schema", "fileset1") + ident2 = NameIdentifier.of("metalake", "catalog", "schema", "fileset2") + + result1 = ops._get_credentials_with_cache(ident1, fileset1, "default") + result2 = ops._get_credentials_with_cache(ident2, fileset2, "default") + + self.assertEqual(result1, [credential1]) + self.assertEqual(result2, [credential2]) + fileset1.support_credentials().get_credentials.assert_called_once() + fileset2.support_credentials().get_credentials.assert_called_once() + + def test_empty_credentials_cached(self): + """Empty credential list should be cached to avoid repeated calls.""" + fileset = _create_mock_fileset([]) + ops = self._create_operations( + {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True} + ) + fileset_ident = NameIdentifier.of("metalake", "catalog", "schema", "fileset") + + result1 = ops._get_credentials_with_cache(fileset_ident, fileset, "default") + result2 = ops._get_credentials_with_cache(fileset_ident, fileset, "default") + + self.assertEqual(result1, []) + self.assertEqual(result2, []) + # Only called once - second call hits cache + fileset.support_credentials().get_credentials.assert_called_once() + + def test_credential_expire_time_ratio(self): + """Verify that the credential expiration ratio is applied correctly.""" + ops = self._create_operations( + { + GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True, + GVFSConfig.GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO: "0.5", + } + ) + + # Credential expires in 1000ms + expire_ms = int(time.time() * 1000) + 1000 + calculated = ops._calculate_credential_expire_time(expire_ms) + + # With ratio 0.5, the cache should expire at roughly current_time + 500ms + expected_approx = int(time.time() * 1000) + 500 + # Allow 50ms tolerance for test execution time + self.assertAlmostEqual(calculated, expected_approx, delta=50) Review Comment: test_credential_expire_time_ratio is time-sensitive and uses a very tight 50ms delta, which can be flaky on loaded CI hosts. Consider patching time.time() to a fixed value (unittest.mock.patch) so the expected value is deterministic, or loosening the tolerance to something that won't spuriously fail. ########## clients/client-python/gravitino/filesystem/gvfs_base_operations.py: ########## @@ -644,6 +645,97 @@ def _get_actual_file_path( def _file_system_expired(self, expire_time: int): return expire_time <= time.time() * 1000 + def _calculate_credential_expire_time(self, credential_expire_time_ms: int) -> int: + """Calculate the local cache expiration time for a credential. + + Uses the same ratio-based calculation as the filesystem cache: + current_time + (credential_remaining_time * ratio) + + :param credential_expire_time_ms: The credential's expiry time in epoch ms + :return: The local cache expiration time in epoch ms + """ + if credential_expire_time_ms <= 0: + return TIME_WITHOUT_EXPIRATION + + ratio = float( + self._options.get( + GVFSConfig.GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO, + GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO, + ) + if self._options + else GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO + ) + now_ms = time.time() * 1000 + return int(now_ms + (credential_expire_time_ms - now_ms) * ratio) Review Comment: The credential-expiry calculation here largely duplicates StorageHandler._get_expire_time_by_ratio (same config key and formula). Having two implementations risks drift (e.g., validation, time source, rounding). Consider factoring the shared ratio parsing/validation into a single helper used by both paths. ########## clients/client-python/gravitino/filesystem/gvfs_base_operations.py: ########## @@ -644,6 +645,97 @@ def _get_actual_file_path( def _file_system_expired(self, expire_time: int): return expire_time <= time.time() * 1000 + def _calculate_credential_expire_time(self, credential_expire_time_ms: int) -> int: + """Calculate the local cache expiration time for a credential. + + Uses the same ratio-based calculation as the filesystem cache: + current_time + (credential_remaining_time * ratio) + + :param credential_expire_time_ms: The credential's expiry time in epoch ms + :return: The local cache expiration time in epoch ms + """ + if credential_expire_time_ms <= 0: + return TIME_WITHOUT_EXPIRATION + + ratio = float( + self._options.get( + GVFSConfig.GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO, + GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO, + ) + if self._options + else GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO + ) + now_ms = time.time() * 1000 + return int(now_ms + (credential_expire_time_ms - now_ms) * ratio) Review Comment: _calculate_credential_expire_time parses credential_expiration_ratio but doesn't validate its range. If a user sets ratio > 1, the computed local expiry can be after the real credential expiry, causing the client to keep using expired credentials; if ratio <= 0 it can force constant refresh. Consider validating/clamping (e.g., 0 < ratio <= 1) and raising a clear GravitinoRuntimeException when invalid. ########## clients/client-python/tests/unittests/test_gvfs_credential_cache.py: ########## @@ -0,0 +1,319 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint: disable=protected-access,import-outside-toplevel,no-value-for-parameter,broad-exception-caught,unsubscriptable-object +import sys +import threading +import time +import unittest +from unittest.mock import MagicMock + +from gravitino.api.credential.s3_token_credential import S3TokenCredential +from gravitino.filesystem.gvfs_config import GVFSConfig +from gravitino.name_identifier import NameIdentifier + + +def _create_mock_credential(expire_time_in_ms): + """Create a mock S3TokenCredential with the given expire time.""" + return S3TokenCredential( + credential_info={ + S3TokenCredential._SESSION_ACCESS_KEY_ID: "access_id", + S3TokenCredential._SESSION_SECRET_ACCESS_KEY: "secret_key", + S3TokenCredential._SESSION_TOKEN: "session_token", + }, + expire_time_in_ms=expire_time_in_ms, + ) + + +def _create_mock_fileset(credentials_list): + """Create a mock fileset that returns the given credentials.""" + from gravitino.client.generic_fileset import GenericFileset + + mock_support = MagicMock() + mock_support.get_credentials.return_value = credentials_list + mock_fileset = MagicMock(spec=GenericFileset) + mock_fileset.support_credentials.return_value = mock_support + return mock_fileset + + +class _ConcreteGVFSOperations: + """Minimal concrete subclass to test BaseGVFSOperations credential cache logic.""" + + pass + + Review Comment: _ConcreteGVFSOperations is declared but never used in this test module. Removing it would reduce noise and avoid implying there's a concrete subclass requirement for these tests. ```suggestion ``` ########## clients/client-python/gravitino/filesystem/gvfs_base_operations.py: ########## @@ -644,6 +645,97 @@ def _get_actual_file_path( def _file_system_expired(self, expire_time: int): return expire_time <= time.time() * 1000 + def _calculate_credential_expire_time(self, credential_expire_time_ms: int) -> int: + """Calculate the local cache expiration time for a credential. + + Uses the same ratio-based calculation as the filesystem cache: + current_time + (credential_remaining_time * ratio) + + :param credential_expire_time_ms: The credential's expiry time in epoch ms + :return: The local cache expiration time in epoch ms + """ + if credential_expire_time_ms <= 0: + return TIME_WITHOUT_EXPIRATION + + ratio = float( + self._options.get( + GVFSConfig.GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO, + GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO, + ) + if self._options + else GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO + ) + now_ms = time.time() * 1000 + return int(now_ms + (credential_expire_time_ms - now_ms) * ratio) + + def _get_credentials_with_cache( + self, + fileset_ident: NameIdentifier, + fileset: Fileset, + target_location_name: str, + ) -> Optional[List[Credential]]: + """Get credentials with client-side caching to avoid redundant REST calls. + + Implements lazy refresh: returns cached credentials if not expired, + otherwise fetches new credentials from the server. + + :param fileset_ident: The fileset identifier + :param fileset: The fileset object + :param target_location_name: The resolved location name + :return: List of credentials, or None if credential vending is disabled + """ + if not self._enable_credential_vending: + return None + + cache_key = (fileset_ident, target_location_name) + + # Fast path: read lock, check cache + read_lock = self._credential_cache_lock.gen_rlock() + try: + read_lock.acquire() + cache_value = self._credential_cache.get(cache_key) + if cache_value is not None: + expire_time, cached_credentials = cache_value + if not self._file_system_expired(expire_time): + return cached_credentials + finally: + read_lock.release() + + # Slow path: write lock, double-check, then fetch + write_lock = self._credential_cache_lock.gen_wlock() + try: + write_lock.acquire() + # Double-check after acquiring write lock + cache_value = self._credential_cache.get(cache_key) + if cache_value is not None: + expire_time, cached_credentials = cache_value + if not self._file_system_expired(expire_time): + return cached_credentials + + # Fetch fresh credentials from server + fresh_credentials = fileset.support_credentials().get_credentials() + + if fresh_credentials: + expirable = [ + c.expire_time_in_ms() + for c in fresh_credentials + if c.expire_time_in_ms() > 0 + ] Review Comment: Credential-cache expiry logic explicitly ignores never-expiring credentials (expire_time_in_ms() == 0) when mixed with expiring ones. Please add a unit test that uses a mixed credential list (one expire_time_in_ms()==0, one expiring) and asserts the cache refresh happens based on the expiring credential; this protects the new expirable-filtering behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
