forked from Yelp/paasta
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_utils.py
59 lines (50 loc) · 1.64 KB
/
async_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import asyncio
import functools
import time
from typing import AsyncIterable
from typing import Awaitable
from typing import Callable
from typing import Dict # noqa: imported for typing
from typing import List
from typing import TypeVar
T = TypeVar('T')
def async_ttl_cache(
ttl: float=300,
) -> Callable[
[Callable[..., Awaitable[T]]], # wrapped
Callable[..., Awaitable[T]], # inner
]:
_cache: Dict = {} # Should be Dict[Any, T] but that doesn't work.
def outer(wrapped):
@functools.wraps(wrapped)
async def inner(*args, **kwargs):
key = functools._make_key(args, kwargs, typed=False)
try:
future, last_update = _cache[key]
if ttl > 0 and time.time() - last_update > ttl:
raise KeyError
except KeyError:
future = asyncio.ensure_future(wrapped(*args, **kwargs))
# set the timestamp to +infinity so that we always wait on the in-flight request.
_cache[key] = (future, float('Inf'))
value = await future
_cache[key] = (future, time.time())
return value
return inner
return outer
async def aiter_to_list(
aiter: AsyncIterable[T],
) -> List[T]:
return [x async for x in aiter]
def async_timeout(
seconds: int=10,
) -> Callable[
[Callable[..., Awaitable[T]]], # wrapped
Callable[..., Awaitable[T]], # inner
]:
def outer(wrapped):
@functools.wraps(wrapped)
async def inner(*args, **kwargs):
return await asyncio.wait_for(wrapped(*args, **kwargs), timeout=seconds)
return inner
return outer