aggregated_feed Package

aggregated_feed Package

base Module

class stream_framework.feeds.aggregated_feed.base.AggregatedFeed(user_id)[source]

Bases: stream_framework.feeds.base.BaseFeed

Aggregated feeds are an extension of the basic feed. They turn activities into aggregated activities by using an aggregator class.

See BaseAggregator

You can use aggregated feeds to built smart feeds, such as Facebook’s newsfeed. Alternatively you can also use smart feeds for building complex notification systems.

Have a look at for the possibilities.


Aggregated feeds do more work in the fanout phase. Remember that for every user activity the number of fanouts is equal to their number of followers. So with a 1000 user activities, with an average of 500 followers per user, you already end up running 500.000 fanout operations

Since the fanout operation happens so often, you should make sure not to do any queries in the fanout phase or any other resource intensive operations.

Aggregated feeds differ from feeds in a few ways:

  • Aggregator classes aggregate activities into aggregated activities
  • We need to update aggregated activities instead of only appending
  • Serialization is different
add_many(activities, trim=True, current_activities=None, *args, **kwargs)[source]

Adds many activities to the feed

Unfortunately we can’t support the batch interface. The writes depend on the reads.

Also subsequent writes will depend on these writes. So no batching is possible at all.

Parameters:activities – the list of activities
add_many_aggregated(aggregated, *args, **kwargs)[source]

Adds the list of aggregated activities

Parameters:aggregated – the list of aggregated activities to add

alias of AggregatedActivity


alias of RecentVerbAggregator


Checks if the activity is present in any of the aggregated activities

Parameters:activity – the activity to search for

Returns the class used for aggregation

classmethod get_timeline_storage_options()[source]

Returns the options for the timeline storage

merge_max_length = 20
remove_many(activities, batch_interface=None, trim=True, *args, **kwargs)[source]

Removes many activities from the feed

Parameters:activities – the list of activities to remove
remove_many_aggregated(aggregated, *args, **kwargs)[source]

Removes the list of aggregated activities

Parameters:aggregated – the list of aggregated activities to remove

alias of AggregatedActivitySerializer

cassandra Module

redis Module

class stream_framework.feeds.aggregated_feed.redis.RedisAggregatedFeed(user_id)[source]

Bases: stream_framework.feeds.aggregated_feed.base.AggregatedFeed


alias of ActivitySerializer


alias of RedisActivityStorage


alias of AggregatedActivitySerializer


alias of RedisTimelineStorage

notification_feed Module

class stream_framework.feeds.aggregated_feed.notification_feed.NotificationFeed(user_id, **kwargs)[source]

Bases: stream_framework.feeds.aggregated_feed.base.AggregatedFeed

Similar to an aggregated feed, but: - doesnt use the activity storage (serializes everything into the timeline storage) - features denormalized counts - pubsub signals which you can subscribe to For now this is entirely tied to Redis

activity_serializer = None
activity_storage_class = None
add_many(activities, **kwargs)[source]

Similar to the AggregatedActivity.add_many The only difference is that it denormalizes a count of unseen activities

count_format = 'notification_feed:1:user:%(user_id)s:count'

the format we use to denormalize the count


Counts the number of aggregated activities which are unseen

Parameters:aggregated_activities – allows you to specify the aggregated activities for improved performance

Denormalize the number of unseen aggregated activities to the key defined in self.count_key


Returns the denormalized count stored in self.count_key

key_format = 'notification_feed:1:user:%(user_id)s'
lock_format = 'notification_feed:1:user:%s:lock'

the key used for locking

mark_all(seen=True, read=None)[source]

Mark all the entries as seen or read

  • seen – set seen_at
  • read – set read_at
max_length = 99

notification feeds only need a small max length


Published the count via pubsub

Parameters:count – the count to publish
pubsub_main_channel = 'juggernaut'

the main channel to publish


Updates the denormalized count to count

Parameters:count – the count to update to

alias of NotificationSerializer

class stream_framework.feeds.aggregated_feed.notification_feed.RedisNotificationFeed(user_id, **kwargs)[source]

Bases: stream_framework.feeds.aggregated_feed.notification_feed.NotificationFeed


alias of RedisTimelineStorage