Skip to content

Cache

https://aiocache.readthedocs.io/en/latest/caches.html

question

if there are multiple servers and the next call might not be go to the same server- cache cannot be used!

Solution? use a distributed cache, like Redis or Memcached, to share data across the pods:

  • deploy a memory cache service as a Kubernetes service

  • install cache client library in app - use the cache client to store and retrieve data

Cache StreamingResponse

How to cache StreamReponse or make a cache middleware\ https://github.com/tiangolo/fastapi/issues/4751

Solution:

  • https://github.com/krukov/cashews/issues/107

  • cache each chunk of the StreamingResponse content with a separate key

  • cache other StreamingResponse properties into a bytes object

  • then recreate the StreamingResponse based on the cached content and properties

my solution using aiocache (borrowed code from the following link) https://github.com/Krukov/cashews/pull/123/files#diff-3df331569a7628a330e72e831d8f338342ef954c7cb3897c6195565a85c32b6fR1

Note that the code can also be updated to fallback to a default memory cache if redis server (ping) is not available.

class stream_cached(cached):
    async def set_in_cache(self, key, value):
        try:
            if isinstance(value, StreamingResponse):
                value = await stream_cached.encode_streaming_response(
                    value=value,
                    cache=self.cache,
                    key=key,
                    ttl=self.ttl,
                )
            await self.cache.set(key, value, ttl=self.ttl)
        except Exception:
            cached_logger.exception(
                f"Unexpected error. Couldn't set {value} in key {key}"
            )

    async def get_from_cache(self, key):
        try:
            value = await self.cache.get(key)
            is_streaming_response = (
                value is not None and 'streamingresponse' in key
            )
            if is_streaming_response:
                value = await stream_cached.decode_streaming_response(
                    value=value,
                    cache=self.cache,
                    key=key,
                )
            return value
        except Exception:
            cached_logger.exception(
                f"Unexpected error. Couldn't retrieve key {key}"
            )

    async def encode_streaming_response(
        value: StreamingResponse,
        cache: Cache,
        key: str,
        ttl: object,
    ) -> bytes:
        value.body_iterator = stream_cached.set_iterator(
            cache, key, value.body_iterator, ttl
        )
        serialized_value = b''
        serialized_value += bytes(value.media_type, 'utf-8') + b':'
        serialized_value += bytes(str(value.status_code), 'utf-8') + b':'
        for header_name, header_value in value.raw_headers:
            serialized_value += header_name + b'=' + header_value + b';'
        return serialized_value

    async def decode_streaming_response(
        value: bytes,
        cache: Cache,
        key: str,
    ) -> StreamingResponse:
        media_type, status_code, headers = value.split(b':')
        media_type = str(media_type)
        status_code = int(status_code)
        raw_headers = []
        for header in headers.split(b';'):
            if not header:
                continue
            header_name, header_value = header.split(b'=')
            raw_headers.append((header_name, header_value))
        content = stream_cached.get_iterator(cache, key)
        resp = StreamingResponse(
            content=content,
            media_type=media_type,
            status_code=status_code,
        )
        resp.raw_headers = raw_headers
        return resp

    async def set_iterator(cache: Cache, key: str, iterator, ttl):
        chunk_number = 0
        async for chunk in iterator:
            await cache.set(f'{key}:chunk:{chunk_number}', chunk, ttl=ttl)
            yield chunk
            chunk_number += 1

    async def get_iterator(cache: Cache, key: str):
        chunk_number = 0
        while True:
            chunk = await cache.get(f'{key}:chunk:{chunk_number}')
            if not chunk:
                return
            yield chunk
            chunk_number += 1

key_builder

def key_builder(f, **kwargs):
    return f.__name__ + json.dumps({
        k: v for k, v in kwargs.items() if k != 'request'
    })

Cache decorator with request header

https://aiocache.readthedocs.io/en/latest/decorators.html

When use cache in api, we should also consider the header. Otherwise different headers will get the same response.

Issue: StreamingResponse is not supported as data is sent chunk by chunk - only the last chunk (empty string) is cached. See StreamingResponse definition.

from typing import Optional
from aiocache import cached, Cache
from fastapi import status, Request, Header

@router.get(
    "/hello",
    status_code=status.HTTP_200_OK,
    description='Get hello'
)
@vary_on_headers('Accept')
@cached(ttl=3600, cache=Cache.MEMORY, namespace='dev', key_builder=key_builder)
async def get_hello(
    request: Request,
    accept: Optional[str] = Header(None), # header is `Accept`
):
    return 'hello, Accept header is `{accept}`'

Cache dataframe

https://github.com/aio-libs/aiocache/issues/493

import zlib
import pickle
import pandas as pd

import asyncio
from aiocache import Cache
from aiocache.serializers import BaseSerializer

class CompressionSerializer(BaseSerializer):
    DEFAULT_ENCODING = None #zlib works with bytes

    def dumps(self, value):
        compressed = zlib.compress(pickle.dumps(value))
        return compressed

    def loads(self, value):
        #if value is too large to read into memory
        #use zlib.decompressobj instead of zlib.decompress 
        decompressed = pickle.loads(zlib.decompress(value))
        return decompressed

cache = Cache(Cache.MEMORY, serializer=CompressionSerializer(), namespace='dev')

df = pd.DataFrame({'x':[1,2], 'y':[3,4]})
loop = asyncio.get_event_loop()
loop.run_until_complete(cache.set("key", df))
loop.run_until_complete(cache.get("key"))
loop.run_until_complete(cache.delete("key"))