from stream_framework.storage.base import (BaseTimelineStorage, BaseActivityStorage)
from collections import defaultdict
from contextlib import contextmanager
import six
timeline_store = defaultdict(list)
activity_store = defaultdict(dict)
[docs]def reverse_bisect_left(a, x, lo=0, hi=None):
'''
same as python bisect.bisect_left but for
lists with reversed order
'''
if lo < 0:
raise ValueError('lo must be non-negative')
if hi is None:
hi = len(a)
while lo < hi:
mid = (lo + hi) // 2
if x > a[mid]:
hi = mid
else:
lo = mid + 1
return lo
[docs]class InMemoryActivityStorage(BaseActivityStorage):
[docs] def get_from_storage(self, activity_ids, *args, **kwargs):
return {_id: activity_store.get(_id) for _id in activity_ids}
[docs] def add_to_storage(self, activities, *args, **kwargs):
insert_count = 0
for activity_id, activity_data in six.iteritems(activities):
if activity_id not in activity_store:
insert_count += 1
activity_store[activity_id] = activity_data
return insert_count
[docs] def remove_from_storage(self, activity_ids, *args, **kwargs):
removed = 0
for activity_id in activity_ids:
exists = activity_store.pop(activity_id, None)
if exists:
removed += 1
return removed
[docs] def flush(self):
activity_store.clear()
[docs]class InMemoryTimelineStorage(BaseTimelineStorage):
[docs] def contains(self, key, activity_id):
return activity_id in timeline_store[key]
[docs] def get_index_of(self, key, activity_id):
return timeline_store[key].index(activity_id)
[docs] def get_slice_from_storage(self, key, start, stop, filter_kwargs=None, ordering_args=None):
results = list(timeline_store[key][start:stop])
score_value_pairs = list(zip(results, results))
return score_value_pairs
[docs] def add_to_storage(self, key, activities, *args, **kwargs):
timeline = timeline_store[key]
initial_count = len(timeline)
for activity_id, activity_data in six.iteritems(activities):
if self.contains(key, activity_id):
continue
timeline.insert(reverse_bisect_left(
timeline, activity_id), activity_data)
return len(timeline) - initial_count
[docs] def remove_from_storage(self, key, activities, *args, **kwargs):
timeline = timeline_store[key]
initial_count = len(timeline)
for activity_id in activities.keys():
if self.contains(key, activity_id):
timeline.remove(activity_id)
return initial_count - len(timeline)
@classmethod
[docs] def get_batch_interface(cls):
@contextmanager
def meandmyself():
yield cls
return meandmyself
[docs] def count(self, key, *args, **kwargs):
return len(timeline_store[key])
[docs] def delete(self, key, *args, **kwargs):
timeline_store.pop(key, None)
[docs] def trim(self, key, length):
timeline_store[key] = timeline_store[key][:length]