Skip to content Skip to sidebar Skip to footer

How To Cache Asyncio Coroutines

I am using aiohttp to make a simple HTTP request in python 3.4 like this: response = yield from aiohttp.get(url) The application requests the same URL over and over again so natur

Solution 1:

Maybe a bit late, but I've started a new package that may help: https://github.com/argaen/aiocache. Contributions/comments are always welcome.

An example:

import asyncio

from collections import namedtuple

from aiocache import cached
from aiocache.serializers import PickleSerializer

Result = namedtuple('Result', "content, status")


@cached(ttl=10, serializer=PickleSerializer())asyncdefasync_main():
    print("First ASYNC non cached call...")
    await asyncio.sleep(1)
    return Result("content", 200)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    print(loop.run_until_complete(async_main()))
    print(loop.run_until_complete(async_main()))
    print(loop.run_until_complete(async_main()))
    print(loop.run_until_complete(async_main()))

Note that as an extra, it can cache any python object into redis using Pickle serialization. In case you just want to work with memory, you can use the SimpleMemoryCache backend :).

Solution 2:

To use functools.lru_cache with coroutines, the following code works.

classCacheable:
    def__init__(self, co):
        self.co = co
        self.done = False
        self.result = None
        self.lock = asyncio.Lock()

    def__await__(self):
        with (yieldfrom self.lock):
            if self.done:
                return self.result
            self.result = yieldfrom self.co.__await__()
            self.done = Truereturn self.result

defcacheable(f):
    defwrapped(*args, **kwargs):
        r = f(*args, **kwargs)
        return Cacheable(r)
    return wrapped


@functools.lru_cache()@cacheableasyncdeffoo():
    asyncwith aiohttp.ClientSession() as session:
        asyncwith session.get(url) as resp:
            returnawait resp.text()

The following is thread safe

classThreadSafeCacheable:
    def__init__(self, co):
        self.co = co
        self.done = False
        self.result = None
        self.lock = threading.Lock()

    def__await__(self):
        whileTrue:
            if self.done:
                return self.result
            if self.lock.acquire(blocking=False):
                self.result = yieldfrom self.co.__await__()
                self.done = Truereturn self.result
            else:
                yieldfrom asyncio.sleep(0.005)

Solution 3:

I wrote a simple cache decorator myself:

defasync_cache(maxsize=128):
    cache = {}

    defdecorator(fn):
        defwrapper(*args):                                                         
            key = ':'.join(args)

            if key notin cache:
                iflen(cache) >= maxsize:
                    del cache[cache.keys().next()]

                cache[key] = yieldfrom fn(*args)

            return cache[key]

        return wrapper

    return decorator


@async_cache()@asyncio.coroutinedefexpensive_io():
    ....

This kind-of-works. But many aspects can probably be improved. For example: If the cached function is called a second time before the first call returns, it will execute a second time.

Solution 4:

An popular async version of lru_cache exist here: async_lru

Solution 5:

I'm not that familiar with aiohttp so I'm not sure of exactly what is happening that would cause Nones to be returned, but the lru_cache decorator will not work with async functions.

I use a decorator which does essentially the same thing; note that it is different to tobib's decorator above in that it will always return a future or a task, rather than the value:

from collections import OrderedDict
from functools import _make_key, wraps

deffuture_lru_cache(maxsize=128):
    # support use as decorator without calling, for this case maxsize will# not be an inttry:
        real_max_size = int(maxsize)
    except ValueError:
        real_max_size = 128

    cache = OrderedDict()

    asyncdefrun_and_cache(func, args, kwargs):
        """Run func with the specified arguments and store the result
        in cache."""
        result = await func(*args, **kwargs)
        cache[_make_key(args, kwargs, False)] = result
        iflen(cache) > real_max_size:
            cache.popitem(False)
        return result

    defwrapper(func):
        @wraps(func)defdecorator(*args, **kwargs):
            key = _make_key(args, kwargs, False)
            if key in cache:
                # Some protection against duplicating calls already in# progress: when starting the call cache the future, and if# the same thing is requested again return that future.ifisinstance(cache[key], asyncio.Future):
                    return cache[key]
                else:
                    f = asyncio.Future()
                    f.set_result(cache[key])
                    return f
            else:
                task = asyncio.Task(run_and_cache(func, args, kwargs))
                cache[key] = task
                return task
        return decorator

    ifcallable(maxsize):
        return wrapper(maxsize)
    else:
        return wrapper

I used _make_key from functools as lru_cache does, I guess it's supposed to be private so probably better to copy it over.

Post a Comment for "How To Cache Asyncio Coroutines"