This is an automated email from the ASF dual-hosted git repository.
yuqi1129 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 2646f8da7a [#10790] improvement(clients/python): Add client-side
credential caching to avoid redundant REST calls (#10791)
2646f8da7a is described below
commit 2646f8da7a2e08165042856ce2495bdb06c7a161
Author: Sun Yuhan <[email protected]>
AuthorDate: Fri Apr 24 20:59:08 2026 +0800
[#10790] improvement(clients/python): Add client-side credential caching to
avoid redundant REST calls (#10791)
### What changes were proposed in this pull request?
Add client-side credential caching in `BaseGVFSOperations` to avoid
redundant HTTP calls to the Gravitino server's `/credentials` endpoint
when credential vending is enabled.
**Changes:**
- Added `_credential_cache` (LRUCache) and `_credential_cache_lock`
(RWLock) to `BaseGVFSOperations`
- Added `_calculate_credential_expire_time()` method using the existing
`credential_expiration_ratio` config (default 0.5)
- Added `_get_credentials_with_cache()` method with read-write lock +
double-checked locking pattern
- Replaced the unconditional `get_credentials()` call in
`_get_actual_filesystem_by_location_name()` with the cached version
- Filtered static credentials (`expire_time_in_ms() == 0`) when
calculating cache expiry to handle mixed credential types correctly
- Added 10 unit tests covering: cache hit/miss, expiry refresh,
per-fileset/per-location isolation, empty credentials, thread safety,
and config correctness
### Why are the changes needed?
When `enable_credential_vending=True`, the Python SDK makes an HTTP
request to `/credentials` on **every** file operation, even if the
previously fetched credentials have not expired. With a typical 1-hour
credential lifetime, this results in hundreds of unnecessary HTTP
round-trips.
The Java SDK avoids this by embedding lazy-refresh `CredentialsProvider`
instances (e.g., `OSSCredentialsProvider`, `S3CredentialsProvider`) that
cache credentials locally and only fetch new ones at 50% of their
lifetime. This PR brings the Python SDK to parity with the Java SDK's
approach.
Fix: #10790
### Does this PR introduce _any_ user-facing change?
No. All changes are internal to `BaseGVFSOperations`. No public API or
configuration changes required.
### How was this patch tested?
- **Unit tests** (10 new cases in `test_gvfs_credential_cache.py`):
cache hit/miss, expiry refresh, per-fileset/per-location isolation,
empty credentials, thread safety, expiration ratio, never-expire
credentials
- **Integration tests**:
- Verified 5 file operations trigger only 1 `/credentials` request (vs.
5 without caching)
- Verified expired cache triggers a fresh credential fetch
- **Existing tests**: All existing gvfs/credential unit tests + 12
integration tests pass with no regression
- **Code quality**: `ruff check` and `ruff format` pass
---
.../gravitino/filesystem/gvfs_base_operations.py | 102 +++++-
.../tests/unittests/test_gvfs_credential_cache.py | 349 +++++++++++++++++++++
2 files changed, 446 insertions(+), 5 deletions(-)
diff --git a/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
b/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
index 72c37bba0a..37c091caf3 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
@@ -215,6 +215,9 @@ class BaseGVFSOperations(ABC):
self._filesystem_cache = TTLCache(maxsize=cache_size,
ttl=cache_expired_time)
self._cache_lock = rwlock.RWLockFair()
+ self._credential_cache: LRUCache = LRUCache(maxsize=cache_size)
+ self._credential_cache_lock = rwlock.RWLockFair()
+
self._enable_fileset_metadata_cache = (
self.ENABLE_FILESET_METADATA_CACHE_DEFAULT
if options is None
@@ -563,11 +566,9 @@ class BaseGVFSOperations(ABC):
CallerContextHolder.set(caller_context)
try:
- # Get credentials if credential vending is enabled
- credentials = (
- fileset.support_credentials().get_credentials()
- if self._enable_credential_vending
- else None
+ # Get credentials with client-side caching to avoid redundant REST
calls
+ credentials = self._get_credentials_with_cache(
+ fileset_ident, fileset, target_location_name
)
# Get the filesystem using the new path-based caching approach
@@ -644,6 +645,97 @@ class BaseGVFSOperations(ABC):
def _file_system_expired(self, expire_time: int):
return expire_time <= time.time() * 1000
+ def _calculate_credential_expire_time(self, credential_expire_time_ms:
int) -> int:
+ """Calculate the local cache expiration time for a credential.
+
+ Uses the same ratio-based calculation as the filesystem cache:
+ current_time + (credential_remaining_time * ratio)
+
+ :param credential_expire_time_ms: The credential's expiry time in
epoch ms
+ :return: The local cache expiration time in epoch ms
+ """
+ if credential_expire_time_ms <= 0:
+ return TIME_WITHOUT_EXPIRATION
+
+ ratio = float(
+ self._options.get(
+ GVFSConfig.GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO,
+ GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO,
+ )
+ if self._options
+ else GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO
+ )
+ now_ms = time.time() * 1000
+ return int(now_ms + (credential_expire_time_ms - now_ms) * ratio)
+
+ def _get_credentials_with_cache(
+ self,
+ fileset_ident: NameIdentifier,
+ fileset: Fileset,
+ target_location_name: str,
+ ) -> Optional[List[Credential]]:
+ """Get credentials with client-side caching to avoid redundant REST
calls.
+
+ Implements lazy refresh: returns cached credentials if not expired,
+ otherwise fetches new credentials from the server.
+
+ :param fileset_ident: The fileset identifier
+ :param fileset: The fileset object
+ :param target_location_name: The resolved location name
+ :return: List of credentials, or None if credential vending is disabled
+ """
+ if not self._enable_credential_vending:
+ return None
+
+ cache_key = (fileset_ident, target_location_name)
+
+ # Fast path: read lock, check cache
+ read_lock = self._credential_cache_lock.gen_rlock()
+ try:
+ read_lock.acquire()
+ cache_value = self._credential_cache.get(cache_key)
+ if cache_value is not None:
+ expire_time, cached_credentials = cache_value
+ if not self._file_system_expired(expire_time):
+ return cached_credentials
+ finally:
+ read_lock.release()
+
+ # Slow path: write lock, double-check, then fetch
+ write_lock = self._credential_cache_lock.gen_wlock()
+ try:
+ write_lock.acquire()
+ # Double-check after acquiring write lock
+ cache_value = self._credential_cache.get(cache_key)
+ if cache_value is not None:
+ expire_time, cached_credentials = cache_value
+ if not self._file_system_expired(expire_time):
+ return cached_credentials
+
+ # Fetch fresh credentials from server
+ fresh_credentials = fileset.support_credentials().get_credentials()
+
+ if fresh_credentials:
+ expirable = [
+ c.expire_time_in_ms()
+ for c in fresh_credentials
+ if c.expire_time_in_ms() > 0
+ ]
+ if expirable:
+ earliest_expire_ms = min(expirable)
+ expire_time = self._calculate_credential_expire_time(
+ earliest_expire_ms
+ )
+ else:
+ expire_time = TIME_WITHOUT_EXPIRATION
+ else:
+ expire_time = TIME_WITHOUT_EXPIRATION
+
+ self._credential_cache[cache_key] = (expire_time,
fresh_credentials)
+ return fresh_credentials
+ finally:
+ write_lock.release()
+
def _get_filesystem(
self,
credentials: Optional[List[Credential]],
diff --git
a/clients/client-python/tests/unittests/test_gvfs_credential_cache.py
b/clients/client-python/tests/unittests/test_gvfs_credential_cache.py
new file mode 100644
index 0000000000..d5184b32c9
--- /dev/null
+++ b/clients/client-python/tests/unittests/test_gvfs_credential_cache.py
@@ -0,0 +1,349 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:
disable=protected-access,import-outside-toplevel,no-value-for-parameter,broad-exception-caught,unsubscriptable-object
+import sys
+import threading
+import time
+import unittest
+from unittest.mock import MagicMock
+
+from gravitino.api.credential.s3_token_credential import S3TokenCredential
+from gravitino.filesystem.gvfs_config import GVFSConfig
+from gravitino.name_identifier import NameIdentifier
+
+
+def _create_mock_credential(expire_time_in_ms):
+ """Create a mock S3TokenCredential with the given expire time."""
+ return S3TokenCredential(
+ credential_info={
+ S3TokenCredential._SESSION_ACCESS_KEY_ID: "access_id",
+ S3TokenCredential._SESSION_SECRET_ACCESS_KEY: "secret_key",
+ S3TokenCredential._SESSION_TOKEN: "session_token",
+ },
+ expire_time_in_ms=expire_time_in_ms,
+ )
+
+
+def _create_mock_fileset(credentials_list):
+ """Create a mock fileset that returns the given credentials."""
+ from gravitino.client.generic_fileset import GenericFileset
+
+ mock_support = MagicMock()
+ mock_support.get_credentials.return_value = credentials_list
+ mock_fileset = MagicMock(spec=GenericFileset)
+ mock_fileset.support_credentials.return_value = mock_support
+ return mock_fileset
+
+
+class TestGVFSCredentialCache(unittest.TestCase):
+ """Tests for credential-level lazy caching in BaseGVFSOperations."""
+
+ def _create_operations(self, options=None):
+ """Create a BaseGVFSOperations-like object with credential cache
fields."""
+ # We directly construct the relevant fields from
BaseGVFSOperations.__init__
+ # without going through the full constructor to avoid needing all
dependencies.
+ from readerwriterlock import rwlock
+
+ ops = MagicMock()
+ ops._options = options or {}
+ ops._enable_credential_vending = (options or {}).get(
+ GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING, False
+ )
+ ops._credential_cache = {}
+ ops._credential_cache_lock = rwlock.RWLockFair()
+
+ # Bind the real methods from BaseGVFSOperations
+ from gravitino.filesystem.gvfs_base_operations import
BaseGVFSOperations
+
+ ops._file_system_expired =
BaseGVFSOperations._file_system_expired.__get__(ops)
+ ops._calculate_credential_expire_time = (
+ BaseGVFSOperations._calculate_credential_expire_time.__get__(ops)
+ )
+ ops._get_credentials_with_cache = (
+ BaseGVFSOperations._get_credentials_with_cache.__get__(ops)
+ )
+ return ops
+
+ def test_credential_vending_disabled_returns_none(self):
+ """When credential vending is disabled, should return None without
touching cache."""
+ ops = self._create_operations(
+ {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: False}
+ )
+ fileset = _create_mock_fileset([])
+
+ result = ops._get_credentials_with_cache(
+ NameIdentifier.of("metalake", "catalog", "schema", "fileset"),
+ fileset,
+ "default",
+ )
+
+ self.assertIsNone(result)
+ fileset.support_credentials().get_credentials.assert_not_called()
+
+ def test_first_call_fetches_from_server(self):
+ """First call should fetch credentials from server and cache them."""
+ credential = _create_mock_credential(
+ int(time.time() * 1000) + 3600_000
+ ) # 1 hour from now
+ fileset = _create_mock_fileset([credential])
+ ops = self._create_operations(
+ {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+ )
+ fileset_ident = NameIdentifier.of("metalake", "catalog", "schema",
"fileset")
+
+ result = ops._get_credentials_with_cache(fileset_ident, fileset,
"default")
+
+ self.assertEqual(result, [credential])
+ fileset.support_credentials().get_credentials.assert_called_once()
+
+ def test_cached_credentials_returned_without_server_call(self):
+ """Second call with valid cache should not hit the server."""
+ credential = _create_mock_credential(
+ int(time.time() * 1000) + 3600_000
+ ) # 1 hour from now
+ fileset = _create_mock_fileset([credential])
+ ops = self._create_operations(
+ {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+ )
+ fileset_ident = NameIdentifier.of("metalake", "catalog", "schema",
"fileset")
+
+ # First call - fetches from server
+ result1 = ops._get_credentials_with_cache(fileset_ident, fileset,
"default")
+ # Second call - should use cache
+ result2 = ops._get_credentials_with_cache(fileset_ident, fileset,
"default")
+
+ self.assertEqual(result1, result2)
+ # Server should only be called once
+ fileset.support_credentials().get_credentials.assert_called_once()
+
+ def test_expired_credentials_trigger_refresh(self):
+ """When cached credentials are expired, should fetch new ones from
server."""
+ from gravitino.client.generic_fileset import GenericFileset
+
+ # Create credential that's already expired (expire time in the past)
+ expired_credential = _create_mock_credential(
+ int(time.time() * 1000) - 1000
+ ) # 1 second ago
+
+ # New credential with future expiry
+ fresh_credential = _create_mock_credential(int(time.time() * 1000) +
3600_000)
+
+ # Use a single mock fileset with side_effect to return different values
+ mock_support = MagicMock()
+ mock_support.get_credentials.side_effect = [
+ [expired_credential],
+ [fresh_credential],
+ ]
+ fileset = MagicMock(spec=GenericFileset)
+ fileset.support_credentials.return_value = mock_support
+
+ ops = self._create_operations(
+ {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+ )
+ fileset_ident = NameIdentifier.of("metalake", "catalog", "schema",
"fileset")
+
+ # First call - fetches expired credential from server and caches it
+ result1 = ops._get_credentials_with_cache(fileset_ident, fileset,
"default")
+ # Second call - cache is expired, calls get_credentials() again
+ result2 = ops._get_credentials_with_cache(fileset_ident, fileset,
"default")
+
+ self.assertEqual(result1, [expired_credential])
+ self.assertEqual(result2, [fresh_credential])
+ # get_credentials() should have been called twice (first: no cache,
second: expired)
+ self.assertEqual(
+ mock_support.get_credentials.call_count,
+ 2,
+ )
+
+ def test_different_locations_cached_separately(self):
+ """Different location names should have separate cache entries."""
+ credential_s3 = _create_mock_credential(int(time.time() * 1000) +
3600_000)
+ credential_oss = _create_mock_credential(int(time.time() * 1000) +
7200_000)
+
+ fileset_s3 = _create_mock_fileset([credential_s3])
+ fileset_oss = _create_mock_fileset([credential_oss])
+
+ ops = self._create_operations(
+ {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+ )
+ fileset_ident = NameIdentifier.of("metalake", "catalog", "schema",
"fileset")
+
+ result_s3 = ops._get_credentials_with_cache(
+ fileset_ident, fileset_s3, "s3-location"
+ )
+ result_oss = ops._get_credentials_with_cache(
+ fileset_ident, fileset_oss, "oss-location"
+ )
+
+ self.assertEqual(result_s3, [credential_s3])
+ self.assertEqual(result_oss, [credential_oss])
+ fileset_s3.support_credentials().get_credentials.assert_called_once()
+ fileset_oss.support_credentials().get_credentials.assert_called_once()
+
+ def test_different_filesets_cached_separately(self):
+ """Different fileset identifiers should have separate cache entries."""
+ credential1 = _create_mock_credential(int(time.time() * 1000) +
3600_000)
+ credential2 = _create_mock_credential(int(time.time() * 1000) +
7200_000)
+
+ fileset1 = _create_mock_fileset([credential1])
+ fileset2 = _create_mock_fileset([credential2])
+
+ ops = self._create_operations(
+ {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+ )
+ ident1 = NameIdentifier.of("metalake", "catalog", "schema", "fileset1")
+ ident2 = NameIdentifier.of("metalake", "catalog", "schema", "fileset2")
+
+ result1 = ops._get_credentials_with_cache(ident1, fileset1, "default")
+ result2 = ops._get_credentials_with_cache(ident2, fileset2, "default")
+
+ self.assertEqual(result1, [credential1])
+ self.assertEqual(result2, [credential2])
+ fileset1.support_credentials().get_credentials.assert_called_once()
+ fileset2.support_credentials().get_credentials.assert_called_once()
+
+ def test_empty_credentials_cached(self):
+ """Empty credential list should be cached to avoid repeated calls."""
+ fileset = _create_mock_fileset([])
+ ops = self._create_operations(
+ {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+ )
+ fileset_ident = NameIdentifier.of("metalake", "catalog", "schema",
"fileset")
+
+ result1 = ops._get_credentials_with_cache(fileset_ident, fileset,
"default")
+ result2 = ops._get_credentials_with_cache(fileset_ident, fileset,
"default")
+
+ self.assertEqual(result1, [])
+ self.assertEqual(result2, [])
+ # Only called once - second call hits cache
+ fileset.support_credentials().get_credentials.assert_called_once()
+
+ def test_credential_expire_time_ratio(self):
+ """Verify that the credential expiration ratio is applied correctly."""
+ ops = self._create_operations(
+ {
+ GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True,
+ GVFSConfig.GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO:
"0.5",
+ }
+ )
+
+ # Credential expires in 1000ms
+ expire_ms = int(time.time() * 1000) + 1000
+ calculated = ops._calculate_credential_expire_time(expire_ms)
+
+ # With ratio 0.5, the cache should expire at roughly current_time +
500ms
+ expected_approx = int(time.time() * 1000) + 500
+ # Allow 50ms tolerance for test execution time
+ self.assertAlmostEqual(calculated, expected_approx, delta=50)
+
+ def test_credential_expire_time_never_expire(self):
+ """When credential expire_time_in_ms <= 0, cache should never
expire."""
+ ops = self._create_operations(
+ {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+ )
+
+ result = ops._calculate_credential_expire_time(0)
+ self.assertEqual(result, sys.maxsize)
+
+ result = ops._calculate_credential_expire_time(-1)
+ self.assertEqual(result, sys.maxsize)
+
+ def test_mixed_credentials_cache_expiry_based_on_expiring_one(self):
+ """Mixed credentials (never-expire + expiring) should use the expiring
one for cache TTL."""
+ from gravitino.client.generic_fileset import GenericFileset
+
+ # S3TokenCredential requires expire_time_in_ms > 0, so use MagicMock
for never-expire
+ never_expire_credential = MagicMock()
+ never_expire_credential.expire_time_in_ms.return_value = 0
+ expired_credential = _create_mock_credential(
+ int(time.time() * 1000) - 1000
+ ) # already expired
+ fresh_credential = _create_mock_credential(int(time.time() * 1000) +
3600_000)
+
+ # First call returns mixed list with expired credential, second
returns fresh
+ mock_support = MagicMock()
+ mock_support.get_credentials.side_effect = [
+ [never_expire_credential, expired_credential],
+ [never_expire_credential, fresh_credential],
+ ]
+ fileset = MagicMock(spec=GenericFileset)
+ fileset.support_credentials.return_value = mock_support
+
+ ops = self._create_operations(
+ {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+ )
+ fileset_ident = NameIdentifier.of("metalake", "catalog", "schema",
"fileset")
+
+ # First call - caches mixed credentials
+ result1 = ops._get_credentials_with_cache(fileset_ident, fileset,
"default")
+ self.assertEqual(result1, [never_expire_credential,
expired_credential])
+
+ # Second call - cache expired due to expiring credential, triggers
refresh
+ result2 = ops._get_credentials_with_cache(fileset_ident, fileset,
"default")
+ self.assertEqual(result2, [never_expire_credential, fresh_credential])
+
+ self.assertEqual(mock_support.get_credentials.call_count, 2)
+
+ def test_thread_safety_concurrent_access(self):
+ """Multiple threads accessing cache simultaneously should be safe and
deduplicated."""
+ credential = _create_mock_credential(int(time.time() * 1000) +
3600_000)
+
+ # Use a shared mock fileset so call count is meaningful
+ shared_fileset = _create_mock_fileset([credential])
+
+ ops = self._create_operations(
+ {GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True}
+ )
+ fileset_ident = NameIdentifier.of("metalake", "catalog", "schema",
"fileset")
+
+ num_threads = 10
+ results = [None] * num_threads
+ errors = [None] * num_threads
+
+ def worker(idx):
+ try:
+ results[idx] = ops._get_credentials_with_cache(
+ fileset_ident, shared_fileset, "default"
+ )
+ except Exception as e:
+ errors[idx] = e
+
+ threads = [
+ threading.Thread(target=worker, args=(i,)) for i in
range(num_threads)
+ ]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ # No errors should have occurred
+ for e in errors:
+ self.assertIsNone(e, f"Thread encountered error: {e}")
+
+ # All results should be the same credential list
+ for result in results:
+ self.assertIsNotNone(result)
+ self.assertEqual(len(result), 1)
+ assert result is not None # type: ignore[unreachable]
+ self.assertEqual(result[0].access_key_id(), "access_id")
+
+ # Double-check locking should ensure only 1 server call
+
shared_fileset.support_credentials().get_credentials.assert_called_once()
+
+
+if __name__ == "__main__":
+ unittest.main()