New submission from Mykola Mokhnach :
Currently the asyncio.Condition class does not provide any properties that
would allow to determine how many (if) consumers are waiting for the current
condition. My scenario is the following:
```
...
FILE_STATS_CONDITIONS: Dict[str, asyncio.Condition] = {}
FILE_STATS_CACHE: Dict[str, Union[FileStat, Exception]] = {}
...
async def _fetch_file_stat(self: T, file_id: str) -> FileStat:
"""
The idea behind this caching is to avoid sending multiple info requests
for the same file id to the server and thus to avoid throttling. This
is safe, because
stored files are immutable.
"""
try:
file_stat: FileStat
if file_id in FILE_STATS_CONDITIONS:
self.log.debug(f'Reusing the previously cached stat request for
the file {file_id}')
async with FILE_STATS_CONDITIONS[file_id]:
await FILE_STATS_CONDITIONS[file_id].wait()
cached_result = FILE_STATS_CACHE[file_id]
if isinstance(cached_result, FileStat):
file_stat = cached_result
else:
raise cached_result
else:
FILE_STATS_CONDITIONS[file_id] = asyncio.Condition()
cancellation_exception: Optional[asyncio.CancelledError] = None
async with FILE_STATS_CONDITIONS[file_id]:
file_stat_task =
asyncio.create_task(self.storage_client.get_file_stat(file_id))
try:
try:
file_stat = await asyncio.shield(file_stat_task)
except asyncio.CancelledError as e:
if len(getattr(FILE_STATS_CONDITIONS[file_id],
'_waiters', (None,))) == 0:
# it is safe to cancel now if there are no
consumers in the waiting queue
file_stat_task.cancel()
raise
# we don't want currently waiting consumers to fail
because of the task cancellation
file_stat = await file_stat_task
cancellation_exception = e
FILE_STATS_CACHE[file_id] = file_stat
except Exception as e:
FILE_STATS_CACHE[file_id] = e
raise
finally:
FILE_STATS_CONDITIONS[file_id].notify_all()
if cancellation_exception is not None:
raise cancellation_exception
# noinspection PyUnboundLocalVariable
self.log.info(f'File stat: {file_stat}')
return file_stat
except ObjectNotFoundError:
self.log.info(f'The file identified by "{file_id}" either does not
exist or has expired')
raise file_not_found_error(file_id)
finally:
if file_id in FILE_STATS_CONDITIONS and not
FILE_STATS_CONDITIONS[file_id].locked():
del FILE_STATS_CONDITIONS[file_id]
if file_id in FILE_STATS_CACHE:
del FILE_STATS_CACHE[file_id]
```
Basically I need to use `getattr(FILE_STATS_CONDITIONS[file_id], '_waiters',
(None,))` to workaround this limitation in order to figure out whether to
cancel my producer now or later.
It would be nice to have a public property on the Condition class, which would
basically return the value of `len(condition._waiters)`.
--
components: asyncio
messages: 401434
nosy: asvetlov, mykola-mokhnach, yselivanov
priority: normal
severity: normal
status: open
title: Add a possibility for asyncio.Condition to determine the count of
currently waiting consumers
type: enhancement
versions: Python 3.10, Python 3.11, Python 3.6, Python 3.7, Python 3.8, Python
3.9
___
Python tracker
<https://bugs.python.org/issue45146>
___
___
Python-bugs-list mailing list
Unsubscribe:
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com