aggregated_feed Package

aggregated_feed Package

base Module

class stream_framework.feeds.aggregated_feed.base.AggregatedFeed(user_id)[source]

Bases: stream_framework.feeds.base.BaseFeed

Aggregated feeds are an extension of the basic feed. They turn activities into aggregated activities by using an aggregator class.

See BaseAggregator

You can use aggregated feeds to built smart feeds, such as Facebook’s newsfeed. Alternatively you can also use smart feeds for building complex notification systems.

Have a look at fashiolista.com for the possibilities.

Note

Aggregated feeds do more work in the fanout phase. Remember that for every user activity the number of fanouts is equal to their number of followers. So with a 1000 user activities, with an average of 500 followers per user, you already end up running 500.000 fanout operations

Since the fanout operation happens so often, you should make sure not to do any queries in the fanout phase or any other resource intensive operations.

Aggregated feeds differ from feeds in a few ways:

  • Aggregator classes aggregate activities into aggregated activities
  • We need to update aggregated activities instead of only appending
  • Serialization is different
add_many(activities, trim=True, current_activities=None, *args, **kwargs)[source]

Adds many activities to the feed

Unfortunately we can’t support the batch interface. The writes depend on the reads.

Also subsequent writes will depend on these writes. So no batching is possible at all.

Parameters:activities – the list of activities
add_many_aggregated(aggregated, *args, **kwargs)[source]

Adds the list of aggregated activities

Parameters:aggregated – the list of aggregated activities to add
aggregated_activity_class

alias of AggregatedActivity

aggregator_class

alias of RecentVerbAggregator

contains(activity)[source]

Checks if the activity is present in any of the aggregated activities

Parameters:activity – the activity to search for
get_aggregator()[source]

Returns the class used for aggregation

classmethod get_timeline_storage_options()[source]

Returns the options for the timeline storage

merge_max_length = 20
remove_many(activities, batch_interface=None, trim=True, *args, **kwargs)[source]

Removes many activities from the feed

Parameters:activities – the list of activities to remove
remove_many_aggregated(aggregated, *args, **kwargs)[source]

Removes the list of aggregated activities

Parameters:aggregated – the list of aggregated activities to remove
timeline_serializer

alias of AggregatedActivitySerializer

cassandra Module

redis Module

class stream_framework.feeds.aggregated_feed.redis.RedisAggregatedFeed(user_id)[source]

Bases: stream_framework.feeds.aggregated_feed.base.AggregatedFeed

activity_serializer

alias of ActivitySerializer

activity_storage_class

alias of RedisActivityStorage

timeline_serializer

alias of AggregatedActivitySerializer

timeline_storage_class

alias of RedisTimelineStorage

notification_feed Module

class stream_framework.feeds.aggregated_feed.notification_feed.NotificationFeed(user_id, **kwargs)[source]

Bases: stream_framework.feeds.aggregated_feed.base.AggregatedFeed

Similar to an aggregated feed, but: - doesnt use the activity storage (serializes everything into the timeline storage) - features denormalized counts - pubsub signals which you can subscribe to For now this is entirely tied to Redis

activity_serializer = None
activity_storage_class = None
add_many(activities, **kwargs)[source]

Similar to the AggregatedActivity.add_many The only difference is that it denormalizes a count of unseen activities

count_format = 'notification_feed:1:user:%(user_id)s:count'

the format we use to denormalize the count

count_unseen(aggregated_activities=None)[source]

Counts the number of aggregated activities which are unseen

Parameters:aggregated_activities – allows you to specify the aggregated activities for improved performance
denormalize_count()[source]

Denormalize the number of unseen aggregated activities to the key defined in self.count_key

get_denormalized_count()[source]

Returns the denormalized count stored in self.count_key

key_format = 'notification_feed:1:user:%(user_id)s'
lock_format = 'notification_feed:1:user:%s:lock'

the key used for locking

mark_all(seen=True, read=None)[source]

Mark all the entries as seen or read

Parameters:
  • seen – set seen_at
  • read – set read_at
max_length = 99

notification feeds only need a small max length

publish_count(count)[source]

Published the count via pubsub

Parameters:count – the count to publish
pubsub_main_channel = 'juggernaut'

the main channel to publish

set_denormalized_count(count)[source]

Updates the denormalized count to count

Parameters:count – the count to update to
timeline_serializer

alias of NotificationSerializer

class stream_framework.feeds.aggregated_feed.notification_feed.RedisNotificationFeed(user_id, **kwargs)[source]

Bases: stream_framework.feeds.aggregated_feed.notification_feed.NotificationFeed

timeline_storage_class

alias of RedisTimelineStorage