This is an automated email from the ASF dual-hosted git repository.

yuqi1129 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 2646f8da7a [#10790] improvement(clients/python): Add client-side 
credential caching to avoid redundant REST calls (#10791)
2646f8da7a is described below

commit 2646f8da7a2e08165042856ce2495bdb06c7a161
Author: Sun Yuhan <[email protected]>
AuthorDate: Fri Apr 24 20:59:08 2026 +0800

    [#10790] improvement(clients/python): Add client-side credential caching to 
avoid redundant REST calls (#10791)
    
    ### What changes were proposed in this pull request?
    
    Add client-side credential caching in `BaseGVFSOperations` to avoid
    redundant HTTP calls to the Gravitino server's `/credentials` endpoint
    when credential vending is enabled.
    
    **Changes:**
    
    - Added `_credential_cache` (LRUCache) and `_credential_cache_lock`
    (RWLock) to `BaseGVFSOperations`
    - Added `_calculate_credential_expire_time()` method using the existing
    `credential_expiration_ratio` config (default 0.5)
    - Added `_get_credentials_with_cache()` method with read-write lock +
    double-checked locking pattern
    - Replaced the unconditional `get_credentials()` call in
    `_get_actual_filesystem_by_location_name()` with the cached version
    - Filtered static credentials (`expire_time_in_ms() == 0`) when
    calculating cache expiry to handle mixed credential types correctly
    - Added 10 unit tests covering: cache hit/miss, expiry refresh,
    per-fileset/per-location isolation, empty credentials, thread safety,
    and config correctness
    
    ### Why are the changes needed?
    
    When `enable_credential_vending=True`, the Python SDK makes an HTTP
    request to `/credentials` on **every** file operation, even if the
    previously fetched credentials have not expired. With a typical 1-hour
    credential lifetime, this results in hundreds of unnecessary HTTP
    round-trips.
    
    The Java SDK avoids this by embedding lazy-refresh `CredentialsProvider`
    instances (e.g., `OSSCredentialsProvider`, `S3CredentialsProvider`) that
    cache credentials locally and only fetch new ones at 50% of their
    lifetime. This PR brings the Python SDK to parity with the Java SDK's
    approach.
    
    Fix: #10790
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. All changes are internal to `BaseGVFSOperations`. No public API or
    configuration changes required.
    
    ### How was this patch tested?
    
    - **Unit tests** (10 new cases in `test_gvfs_credential_cache.py`):
    cache hit/miss, expiry refresh, per-fileset/per-location isolation,
    empty credentials, thread safety, expiration ratio, never-expire
    credentials
    - **Integration tests**:
    - Verified 5 file operations trigger only 1 `/credentials` request (vs.
    5 without caching)
      - Verified expired cache triggers a fresh credential fetch
    - **Existing tests**: All existing gvfs/credential unit tests + 12
    integration tests pass with no regression
    - **Code quality**: `ruff check` and `ruff format` pass
---
 .../gravitino/filesystem/gvfs_base_operations.py   | 102 +++++-
 .../tests/unittests/test_gvfs_credential_cache.py  | 349 +++++++++++++++++++++
 2 files changed, 446 insertions(+), 5 deletions(-)

diff --git a/clients/client-python/gravitino/filesystem/gvfs_base_operations.py 
b/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
index 72c37bba0a..37c091caf3 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
@@ -215,6 +215,9 @@ class BaseGVFSOperations(ABC):
         self._filesystem_cache = TTLCache(maxsize=cache_size, 
ttl=cache_expired_time)
         self._cache_lock = rwlock.RWLockFair()
 
+        self._credential_cache: LRUCache = LRUCache(maxsize=cache_size)
+        self._credential_cache_lock = rwlock.RWLockFair()
+
         self._enable_fileset_metadata_cache = (
             self.ENABLE_FILESET_METADATA_CACHE_DEFAULT
             if options is None
@@ -563,11 +566,9 @@ class BaseGVFSOperations(ABC):
             CallerContextHolder.set(caller_context)
 
         try:
-            # Get credentials if credential vending is enabled
-            credentials = (
-                fileset.support_credentials().get_credentials()
-                if self._enable_credential_vending
-                else None
+            # Get credentials with client-side caching to avoid redundant REST 
calls
+            credentials = self._get_credentials_with_cache(
+                fileset_ident, fileset, target_location_name
             )
 
             # Get the filesystem using the new path-based caching approach
@@ -644,6 +645,97 @@ class BaseGVFSOperations(ABC):
     def _file_system_expired(self, expire_time: int):
         return expire_time <= time.time() * 1000
 
+    def _calculate_credential_expire_time(self, credential_expire_time_ms: 
int) -> int:
+        """Calculate the local cache expiration time for a credential.
+
+        Uses the same ratio-based calculation as the filesystem cache:
+        current_time + (credential_remaining_time * ratio)
+
+        :param credential_expire_time_ms: The credential's expiry time in 
epoch ms
+        :return: The local cache expiration time in epoch ms
+        """
+        if credential_expire_time_ms <= 0:
+            return TIME_WITHOUT_EXPIRATION
+
+        ratio = float(
+            self._options.get(
+                GVFSConfig.GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO,
+                GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO,
+            )
+            if self._options
+            else GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO
+        )
+        now_ms = time.time() * 1000
+        return int(now_ms + (credential_expire_time_ms - now_ms) * ratio)
+
+    def _get_credentials_with_cache(
+        self,
+        fileset_ident: NameIdentifier,
+        fileset: Fileset,
+        target_location_name: str,
+    ) -> Optional[List[Credential]]:
+        """Get credentials with client-side caching to avoid redundant REST 
calls.
+
+        Implements lazy refresh: returns cached credentials if not expired,
+        otherwise fetches new credentials from the server.
+
+        :param fileset_ident: The fileset identifier
+        :param fileset: The fileset object
+        :param target_location_name: The resolved location name
+        :return: List of credentials, or None if credential vending is disabled
+        """
+        if not self._enable_credential_vending:
+            return None
+
+        cache_key = (fileset_ident, target_location_name)
+
+        # Fast path: read lock, check cache
+        read_lock = self._credential_cache_lock.gen_rlock()
+        try:
+            read_lock.acquire()
+            cache_value = self._credential_cache.get(cache_key)
+            if cache_value is not None:
+                expire_time, cached_credentials = cache_value
+                if not self._file_system_expired(expire_time):
+                    return cached_credentials
+        finally:
+            read_lock.release()
+
+        # Slow path: write lock, double-check, then fetch
+        write_lock = self._credential_cache_lock.gen_wlock()
+        try:
+            write_lock.acquire()
+            # Double-check after acquiring write lock
+            cache_value = self._credential_cache.get(cache_key)
+            if cache_value is not None:
+                expire_time, cached_credentials = cache_value
+                if not self._file_system_expired(expire_time):
+                    return cached_credentials
+
+            # Fetch fresh credentials from server
+            fresh_credentials = fileset.support_credentials().get_credentials()
+
+            if fresh_credentials:
+                expirable = [
+                    c.expire_time_in_ms()
+                    for c in fresh_credentials
+                    if c.expire_time_in_ms() > 0
+                ]
+                if expirable:
+                    earliest_expire_ms = min(expirable)
+                    expire_time = self._calculate_credential_expire_time(
+                        earliest_expire_ms
+                    )
+                else:
+                    expire_time = TIME_WITHOUT_EXPIRATION
+            else:
+                expire_time = TIME_WITHOUT_EXPIRATION
+
+            self._credential_cache[cache_key] = (expire_time, 
fresh_credentials)
+            return fresh_credentials
+        finally:
+            write_lock.release()
+
     def _get_filesystem(
         self,
         credentials: Optional[List[Credential]],
diff --git 
a/clients/client-python/tests/unittests/test_gvfs_credential_cache.py 
b/clients/client-python/tests/unittests/test_gvfs_credential_cache.py
new file mode 100644
index 0000000000..d5184b32c9
--- /dev/null
+++ b/clients/client-python/tests/unittests/test_gvfs_credential_cache.py
@@ -0,0 +1,349 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint: 
disable=protected-access,import-outside-toplevel,no-value-for-parameter,broad-exception-caught,unsubscriptable-object
+import sys
+import threading
+import time
+import unittest
+from unittest.mock import MagicMock
+
+from gravitino.api.credential.s3_token_credential import S3TokenCredential
+from gravitino.filesystem.gvfs_config import GVFSConfig
+from gravitino.name_identifier import NameIdentifier
+
+
+def _create_mock_credential(expire_time_in_ms):
+    """Create a mock S3TokenCredential with the given expire time."""
+    return S3TokenCredential(
+        credential_info={
+            S3TokenCredential._SESSION_ACCESS_KEY_ID: "access_id",
+            S3TokenCredential._SESSION_SECRET_ACCESS_KEY: "secret_key",
+            S3TokenCredential._SESSION_TOKEN: "session_token",
+        },
+        expire_time_in_ms=expire_time_in_ms,
+    )
+
+
+def _create_mock_fileset(credentials_list):
+    """Create a mock fileset that returns the given credentials."""
+    from gravitino.client.generic_fileset import GenericFileset
+
+    mock_support = MagicMock()
+    mock_support.get_credentials.return_value = credentials_list
+    mock_fileset = MagicMock(spec=GenericFileset)
+    mock_fileset.support_credentials.return_value = mock_support
+    return mock_fileset
+
+
+class TestGVFSCredentialCache(unittest.TestCase):
+    """Tests for credential-level lazy caching in BaseGVFSOperations."""
+
+    def _create_operations(self, options=None):
+        """Create a BaseGVFSOperations-like object with credential cache 
fields."""
+        # We directly construct the relevant fields from 
BaseGVFSOperations.__init__
+        # without going through the full constructor to avoid needing all 
dependencies.
+        from readerwriterlock import rwlock
+
+        ops = MagicMock()
+        ops._options = options or {}
+        ops._enable_credential_vending = (options or {}).get(
+            GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING, False
+        )
+        ops._credential_cache = {}
+        ops._credential_cache_lock = rwlock.RWLockFair()
+
+        # Bind the real methods from BaseGVFSOperations
+        from gravitino.filesystem.gvfs_base_operations import 
BaseGVFSOperations
+
+        ops._file_system_expired = 
BaseGVFSOperations._file_system_expired.__get__(ops)
+        ops._calculate_credential_expire_time = (
+            BaseGVFSOperations._calculate_credential_expire_time.__get__(ops)
+        )
+        ops._get_credentials_with_cache = (
+            BaseGVFSOperations._get_credentials_with_cache.__get__(ops)
+        )
+        return ops
+
+    def test_credential_vending_disabled_returns_none(self):
+        """When credential vending is disabled, should return None without 
touching cache."""
+        ops = self._create_operations(
+            {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: False}
+        )
+        fileset = _create_mock_fileset([])
+
+        result = ops._get_credentials_with_cache(
+            NameIdentifier.of("metalake", "catalog", "schema", "fileset"),
+            fileset,
+            "default",
+        )
+
+        self.assertIsNone(result)
+        fileset.support_credentials().get_credentials.assert_not_called()
+
+    def test_first_call_fetches_from_server(self):
+        """First call should fetch credentials from server and cache them."""
+        credential = _create_mock_credential(
+            int(time.time() * 1000) + 3600_000
+        )  # 1 hour from now
+        fileset = _create_mock_fileset([credential])
+        ops = self._create_operations(
+            {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+        )
+        fileset_ident = NameIdentifier.of("metalake", "catalog", "schema", 
"fileset")
+
+        result = ops._get_credentials_with_cache(fileset_ident, fileset, 
"default")
+
+        self.assertEqual(result, [credential])
+        fileset.support_credentials().get_credentials.assert_called_once()
+
+    def test_cached_credentials_returned_without_server_call(self):
+        """Second call with valid cache should not hit the server."""
+        credential = _create_mock_credential(
+            int(time.time() * 1000) + 3600_000
+        )  # 1 hour from now
+        fileset = _create_mock_fileset([credential])
+        ops = self._create_operations(
+            {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+        )
+        fileset_ident = NameIdentifier.of("metalake", "catalog", "schema", 
"fileset")
+
+        # First call - fetches from server
+        result1 = ops._get_credentials_with_cache(fileset_ident, fileset, 
"default")
+        # Second call - should use cache
+        result2 = ops._get_credentials_with_cache(fileset_ident, fileset, 
"default")
+
+        self.assertEqual(result1, result2)
+        # Server should only be called once
+        fileset.support_credentials().get_credentials.assert_called_once()
+
+    def test_expired_credentials_trigger_refresh(self):
+        """When cached credentials are expired, should fetch new ones from 
server."""
+        from gravitino.client.generic_fileset import GenericFileset
+
+        # Create credential that's already expired (expire time in the past)
+        expired_credential = _create_mock_credential(
+            int(time.time() * 1000) - 1000
+        )  # 1 second ago
+
+        # New credential with future expiry
+        fresh_credential = _create_mock_credential(int(time.time() * 1000) + 
3600_000)
+
+        # Use a single mock fileset with side_effect to return different values
+        mock_support = MagicMock()
+        mock_support.get_credentials.side_effect = [
+            [expired_credential],
+            [fresh_credential],
+        ]
+        fileset = MagicMock(spec=GenericFileset)
+        fileset.support_credentials.return_value = mock_support
+
+        ops = self._create_operations(
+            {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+        )
+        fileset_ident = NameIdentifier.of("metalake", "catalog", "schema", 
"fileset")
+
+        # First call - fetches expired credential from server and caches it
+        result1 = ops._get_credentials_with_cache(fileset_ident, fileset, 
"default")
+        # Second call - cache is expired, calls get_credentials() again
+        result2 = ops._get_credentials_with_cache(fileset_ident, fileset, 
"default")
+
+        self.assertEqual(result1, [expired_credential])
+        self.assertEqual(result2, [fresh_credential])
+        # get_credentials() should have been called twice (first: no cache, 
second: expired)
+        self.assertEqual(
+            mock_support.get_credentials.call_count,
+            2,
+        )
+
+    def test_different_locations_cached_separately(self):
+        """Different location names should have separate cache entries."""
+        credential_s3 = _create_mock_credential(int(time.time() * 1000) + 
3600_000)
+        credential_oss = _create_mock_credential(int(time.time() * 1000) + 
7200_000)
+
+        fileset_s3 = _create_mock_fileset([credential_s3])
+        fileset_oss = _create_mock_fileset([credential_oss])
+
+        ops = self._create_operations(
+            {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+        )
+        fileset_ident = NameIdentifier.of("metalake", "catalog", "schema", 
"fileset")
+
+        result_s3 = ops._get_credentials_with_cache(
+            fileset_ident, fileset_s3, "s3-location"
+        )
+        result_oss = ops._get_credentials_with_cache(
+            fileset_ident, fileset_oss, "oss-location"
+        )
+
+        self.assertEqual(result_s3, [credential_s3])
+        self.assertEqual(result_oss, [credential_oss])
+        fileset_s3.support_credentials().get_credentials.assert_called_once()
+        fileset_oss.support_credentials().get_credentials.assert_called_once()
+
+    def test_different_filesets_cached_separately(self):
+        """Different fileset identifiers should have separate cache entries."""
+        credential1 = _create_mock_credential(int(time.time() * 1000) + 
3600_000)
+        credential2 = _create_mock_credential(int(time.time() * 1000) + 
7200_000)
+
+        fileset1 = _create_mock_fileset([credential1])
+        fileset2 = _create_mock_fileset([credential2])
+
+        ops = self._create_operations(
+            {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+        )
+        ident1 = NameIdentifier.of("metalake", "catalog", "schema", "fileset1")
+        ident2 = NameIdentifier.of("metalake", "catalog", "schema", "fileset2")
+
+        result1 = ops._get_credentials_with_cache(ident1, fileset1, "default")
+        result2 = ops._get_credentials_with_cache(ident2, fileset2, "default")
+
+        self.assertEqual(result1, [credential1])
+        self.assertEqual(result2, [credential2])
+        fileset1.support_credentials().get_credentials.assert_called_once()
+        fileset2.support_credentials().get_credentials.assert_called_once()
+
+    def test_empty_credentials_cached(self):
+        """Empty credential list should be cached to avoid repeated calls."""
+        fileset = _create_mock_fileset([])
+        ops = self._create_operations(
+            {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+        )
+        fileset_ident = NameIdentifier.of("metalake", "catalog", "schema", 
"fileset")
+
+        result1 = ops._get_credentials_with_cache(fileset_ident, fileset, 
"default")
+        result2 = ops._get_credentials_with_cache(fileset_ident, fileset, 
"default")
+
+        self.assertEqual(result1, [])
+        self.assertEqual(result2, [])
+        # Only called once - second call hits cache
+        fileset.support_credentials().get_credentials.assert_called_once()
+
+    def test_credential_expire_time_ratio(self):
+        """Verify that the credential expiration ratio is applied correctly."""
+        ops = self._create_operations(
+            {
+                GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True,
+                GVFSConfig.GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO: 
"0.5",
+            }
+        )
+
+        # Credential expires in 1000ms
+        expire_ms = int(time.time() * 1000) + 1000
+        calculated = ops._calculate_credential_expire_time(expire_ms)
+
+        # With ratio 0.5, the cache should expire at roughly current_time + 
500ms
+        expected_approx = int(time.time() * 1000) + 500
+        # Allow 50ms tolerance for test execution time
+        self.assertAlmostEqual(calculated, expected_approx, delta=50)
+
+    def test_credential_expire_time_never_expire(self):
+        """When credential expire_time_in_ms <= 0, cache should never 
expire."""
+        ops = self._create_operations(
+            {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+        )
+
+        result = ops._calculate_credential_expire_time(0)
+        self.assertEqual(result, sys.maxsize)
+
+        result = ops._calculate_credential_expire_time(-1)
+        self.assertEqual(result, sys.maxsize)
+
+    def test_mixed_credentials_cache_expiry_based_on_expiring_one(self):
+        """Mixed credentials (never-expire + expiring) should use the expiring 
one for cache TTL."""
+        from gravitino.client.generic_fileset import GenericFileset
+
+        # S3TokenCredential requires expire_time_in_ms > 0, so use MagicMock 
for never-expire
+        never_expire_credential = MagicMock()
+        never_expire_credential.expire_time_in_ms.return_value = 0
+        expired_credential = _create_mock_credential(
+            int(time.time() * 1000) - 1000
+        )  # already expired
+        fresh_credential = _create_mock_credential(int(time.time() * 1000) + 
3600_000)
+
+        # First call returns mixed list with expired credential, second 
returns fresh
+        mock_support = MagicMock()
+        mock_support.get_credentials.side_effect = [
+            [never_expire_credential, expired_credential],
+            [never_expire_credential, fresh_credential],
+        ]
+        fileset = MagicMock(spec=GenericFileset)
+        fileset.support_credentials.return_value = mock_support
+
+        ops = self._create_operations(
+            {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+        )
+        fileset_ident = NameIdentifier.of("metalake", "catalog", "schema", 
"fileset")
+
+        # First call - caches mixed credentials
+        result1 = ops._get_credentials_with_cache(fileset_ident, fileset, 
"default")
+        self.assertEqual(result1, [never_expire_credential, 
expired_credential])
+
+        # Second call - cache expired due to expiring credential, triggers 
refresh
+        result2 = ops._get_credentials_with_cache(fileset_ident, fileset, 
"default")
+        self.assertEqual(result2, [never_expire_credential, fresh_credential])
+
+        self.assertEqual(mock_support.get_credentials.call_count, 2)
+
+    def test_thread_safety_concurrent_access(self):
+        """Multiple threads accessing cache simultaneously should be safe and 
deduplicated."""
+        credential = _create_mock_credential(int(time.time() * 1000) + 
3600_000)
+
+        # Use a shared mock fileset so call count is meaningful
+        shared_fileset = _create_mock_fileset([credential])
+
+        ops = self._create_operations(
+            {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+        )
+        fileset_ident = NameIdentifier.of("metalake", "catalog", "schema", 
"fileset")
+
+        num_threads = 10
+        results = [None] * num_threads
+        errors = [None] * num_threads
+
+        def worker(idx):
+            try:
+                results[idx] = ops._get_credentials_with_cache(
+                    fileset_ident, shared_fileset, "default"
+                )
+            except Exception as e:
+                errors[idx] = e
+
+        threads = [
+            threading.Thread(target=worker, args=(i,)) for i in 
range(num_threads)
+        ]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+
+        # No errors should have occurred
+        for e in errors:
+            self.assertIsNone(e, f"Thread encountered error: {e}")
+
+        # All results should be the same credential list
+        for result in results:
+            self.assertIsNotNone(result)
+            self.assertEqual(len(result), 1)
+            assert result is not None  # type: ignore[unreachable]
+            self.assertEqual(result[0].access_key_id(), "access_id")
+
+        # Double-check locking should ensure only 1 server call
+        
shared_fileset.support_credentials().get_credentials.assert_called_once()
+
+
+if __name__ == "__main__":
+    unittest.main()

Reply via email to