Source code for stream_framework.storage.redis.activity_storage

from stream_framework.storage.base import BaseActivityStorage
from stream_framework.storage.redis.structures.hash import ShardedHashCache
from stream_framework.serializers.activity_serializer import ActivitySerializer
import six


[docs]class ActivityCache(ShardedHashCache): key_format = 'activity:cache:%s'
[docs]class RedisActivityStorage(BaseActivityStorage): default_serializer_class = ActivitySerializer
[docs] def get_key(self): return self.options.get('key', 'global')
[docs] def get_cache(self): key = self.get_key() return ActivityCache(key)
[docs] def get_from_storage(self, activity_ids, *args, **kwargs): cache = self.get_cache() activities = cache.get_many(activity_ids) activities = dict((k, six.text_type(v)) for k, v in activities.items() if v) return activities
[docs] def add_to_storage(self, serialized_activities, *args, **kwargs): cache = self.get_cache() key_value_pairs = serialized_activities.items() result = cache.set_many(key_value_pairs) insert_count = 0 if result: insert_count = len(key_value_pairs) return insert_count
[docs] def remove_from_storage(self, activity_ids, *args, **kwargs): # we never explicitly remove things from storage cache = self.get_cache() result = cache.delete_many(activity_ids) return result
[docs] def flush(self): cache = self.get_cache() cache.delete()