Skip to content Skip to sidebar Skip to footer

How To Cache Asyncio Coroutines

I am using aiohttp to make a simple HTTP request in python 3.4 like this: response = yield from aiohttp.get(url) The application requests the same URL over and over again so natur

Solution 1:

Maybe a bit late, but I've started a new package that may help: https://github.com/argaen/aiocache. Contributions/comments are always welcome.

An example:

import asyncio

from collections import namedtuple

from aiocache import cached
from aiocache.serializers import PickleSerializer

Result = namedtuple('Result', "content, status")


@cached(ttl=10, serializer=PickleSerializer())
async def async_main():
    print("First ASYNC non cached call...")
    await asyncio.sleep(1)
    return Result("content", 200)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    print(loop.run_until_complete(async_main()))
    print(loop.run_until_complete(async_main()))
    print(loop.run_until_complete(async_main()))
    print(loop.run_until_complete(async_main()))

Note that as an extra, it can cache any python object into redis using Pickle serialization. In case you just want to work with memory, you can use the SimpleMemoryCache backend :).


Solution 2:

To use functools.lru_cache with coroutines, the following code works.

class Cacheable:
    def __init__(self, co):
        self.co = co
        self.done = False
        self.result = None
        self.lock = asyncio.Lock()

    def __await__(self):
        with (yield from self.lock):
            if self.done:
                return self.result
            self.result = yield from self.co.__await__()
            self.done = True
            return self.result

def cacheable(f):
    def wrapped(*args, **kwargs):
        r = f(*args, **kwargs)
        return Cacheable(r)
    return wrapped


@functools.lru_cache()
@cacheable
async def foo():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()

The following is thread safe

class ThreadSafeCacheable:
    def __init__(self, co):
        self.co = co
        self.done = False
        self.result = None
        self.lock = threading.Lock()

    def __await__(self):
        while True:
            if self.done:
                return self.result
            if self.lock.acquire(blocking=False):
                self.result = yield from self.co.__await__()
                self.done = True
                return self.result
            else:
                yield from asyncio.sleep(0.005)

Solution 3:

I wrote a simple cache decorator myself:

def async_cache(maxsize=128):
    cache = {}

    def decorator(fn):
        def wrapper(*args):                                                         
            key = ':'.join(args)

            if key not in cache:
                if len(cache) >= maxsize:
                    del cache[cache.keys().next()]

                cache[key] = yield from fn(*args)

            return cache[key]

        return wrapper

    return decorator


@async_cache()
@asyncio.coroutine
def expensive_io():
    ....

This kind-of-works. But many aspects can probably be improved. For example: If the cached function is called a second time before the first call returns, it will execute a second time.


Solution 4:

An popular async version of lru_cache exist here: async_lru


Solution 5:

I'm not that familiar with aiohttp so I'm not sure of exactly what is happening that would cause Nones to be returned, but the lru_cache decorator will not work with async functions.

I use a decorator which does essentially the same thing; note that it is different to tobib's decorator above in that it will always return a future or a task, rather than the value:

from collections import OrderedDict
from functools import _make_key, wraps

def future_lru_cache(maxsize=128):
    # support use as decorator without calling, for this case maxsize will
    # not be an int
    try:
        real_max_size = int(maxsize)
    except ValueError:
        real_max_size = 128

    cache = OrderedDict()

    async def run_and_cache(func, args, kwargs):
        """Run func with the specified arguments and store the result
        in cache."""
        result = await func(*args, **kwargs)
        cache[_make_key(args, kwargs, False)] = result
        if len(cache) > real_max_size:
            cache.popitem(False)
        return result

    def wrapper(func):
        @wraps(func)
        def decorator(*args, **kwargs):
            key = _make_key(args, kwargs, False)
            if key in cache:
                # Some protection against duplicating calls already in
                # progress: when starting the call cache the future, and if
                # the same thing is requested again return that future.
                if isinstance(cache[key], asyncio.Future):
                    return cache[key]
                else:
                    f = asyncio.Future()
                    f.set_result(cache[key])
                    return f
            else:
                task = asyncio.Task(run_and_cache(func, args, kwargs))
                cache[key] = task
                return task
        return decorator

    if callable(maxsize):
        return wrapper(maxsize)
    else:
        return wrapper

I used _make_key from functools as lru_cache does, I guess it's supposed to be private so probably better to copy it over.


Post a Comment for "How To Cache Asyncio Coroutines"