Source code for stream_framework.feeds.aggregated_feed.notification_feed

from stream_framework.feeds.aggregated_feed.base import AggregatedFeed
from stream_framework.serializers.aggregated_activity_serializer import \
from import RedisTimelineStorage
import copy
import json
import logging

logger = logging.getLogger(__name__)

[docs]class NotificationFeed(AggregatedFeed): ''' Similar to an aggregated feed, but: - doesnt use the activity storage (serializes everything into the timeline storage) - features denormalized counts - pubsub signals which you can subscribe to For now this is entirely tied to Redis ''' #: notification feeds only need a small max length max_length = 99 key_format = 'notification_feed:1:user:%(user_id)s' #: the format we use to denormalize the count count_format = 'notification_feed:1:user:%(user_id)s:count' #: the key used for locking lock_format = 'notification_feed:1:user:%s:lock' #: the main channel to publish pubsub_main_channel = 'juggernaut' timeline_serializer = NotificationSerializer activity_storage_class = None activity_serializer = None def __init__(self, user_id, **kwargs): ''' User id (the user for which we want to read/write notifications) ''' AggregatedFeed.__init__(self, user_id, **kwargs) # location to which we denormalize the count self.format_dict = dict(user_id=user_id) self.count_key = self.count_format % self.format_dict # set the pubsub key if we're using it self.pubsub_key = user_id self.lock_key = self.lock_format % self.format_dict from import get_redis_connection self.redis = get_redis_connection()
[docs] def add_many(self, activities, **kwargs): ''' Similar to the AggregatedActivity.add_many The only difference is that it denormalizes a count of unseen activities ''' with self.redis.lock(self.lock_key, timeout=2): current_activities = AggregatedFeed.add_many( self, activities, **kwargs) # denormalize the count self.denormalize_count() # return the current state of the notification feed return current_activities
[docs] def get_denormalized_count(self): ''' Returns the denormalized count stored in self.count_key ''' result = self.redis.get(self.count_key) or 0 result = int(result) return result
[docs] def set_denormalized_count(self, count): ''' Updates the denormalized count to count :param count: the count to update to ''' self.redis.set(self.count_key, count) self.publish_count(count)
[docs] def publish_count(self, count): ''' Published the count via pubsub :param count: the count to publish ''' count_dict = dict(unread_count=count, unseen_count=count) count_data = json.dumps(count_dict) data = {'channel': self.pubsub_key, 'data': count_data} encoded_data = json.dumps(data) self.redis.publish(self.pubsub_main_channel, encoded_data)
[docs] def denormalize_count(self): ''' Denormalize the number of unseen aggregated activities to the key defined in self.count_key ''' # now count the number of unseen count = self.count_unseen() # and update the count if it changed stored_count = self.get_denormalized_count() if stored_count != count: self.set_denormalized_count(count) return count
[docs] def count_unseen(self, aggregated_activities=None): ''' Counts the number of aggregated activities which are unseen :param aggregated_activities: allows you to specify the aggregated activities for improved performance ''' count = 0 if aggregated_activities is None: aggregated_activities = self[:self.max_length] for aggregated in aggregated_activities: if not aggregated.is_seen(): count += 1 return count
[docs] def mark_all(self, seen=True, read=None): ''' Mark all the entries as seen or read :param seen: set seen_at :param read: set read_at ''' with self.redis.lock(self.lock_key, timeout=10): # get the current aggregated activities aggregated_activities = self[:self.max_length] # create the update dict update_dict = {} for aggregated_activity in aggregated_activities: changed = False old_activity = copy.deepcopy(aggregated_activity) if seen is True and not aggregated_activity.is_seen(): aggregated_activity.update_seen_at() changed = True if read is True and not aggregated_activity.is_read(): aggregated_activity.update_read_at() changed = True if changed: update_dict[old_activity] = aggregated_activity # send the diff to the storage layer new, deleted = [], [] changed = update_dict.items() self._update_from_diff(new, changed, deleted) # denormalize the count self.denormalize_count() # return the new activities return aggregated_activities
[docs]class RedisNotificationFeed(NotificationFeed): timeline_storage_class = RedisTimelineStorage