Source code for stream_framework.storage.redis.structures.base

from stream_framework.storage.redis.connection import get_redis_connection
from redis.client import BasePipeline


[docs]class RedisCache(object): ''' The base for all redis data structures ''' key_format = 'redis:cache:%s' def __init__(self, key, redis=None): # write the key self.key = key # handy when using fallback to other data sources self.source = 'redis' # the redis connection, self.redis is lazy loading the connection self._redis = redis
[docs] def get_redis(self): ''' Only load the redis connection if we use it ''' if self._redis is None: self._redis = get_redis_connection() return self._redis
[docs] def set_redis(self, value): ''' Sets the redis connection ''' self._redis = value
redis = property(get_redis, set_redis)
[docs] def get_key(self): return self.key
[docs] def delete(self): key = self.get_key() self.redis.delete(key)
def _pipeline_if_needed(self, operation, *args, **kwargs): ''' If the redis connection is already in distributed state use it Otherwise spawn a new distributed connection using .map ''' pipe_needed = not isinstance(self.redis, BasePipeline) if pipe_needed: pipe = self.redis.pipeline(transaction=False) operation(pipe, *args, **kwargs) results = pipe.execute() else: results = operation(self.redis, *args, **kwargs) return results