Source code for stream_framework.storage.redis.structures.list

from stream_framework.storage.redis.structures.base import RedisCache
import six
import logging
logger = logging.getLogger(__name__)


[docs]class BaseRedisListCache(RedisCache): ''' Generic list functionality used for both the sorted set and list implementations Retrieve the sorted list/sorted set by using python slicing ''' key_format = 'redis:base_list_cache:%s' max_length = 100 def __getitem__(self, k): """ Retrieves an item or slice from the set of results. This is the complicated stuff which allows us to slice """ if not isinstance(k, (slice, six.integer_types)): raise TypeError assert ((not isinstance(k, slice) and (k >= 0)) or (isinstance(k, slice) and (k.start is None or k.start >= 0) and (k.stop is None or k.stop >= 0))), \ "Negative indexing is not supported." # Remember if it's a slice or not. We're going to treat everything as # a slice to simply the logic and will `.pop()` at the end as needed. if isinstance(k, slice): start = k.start if k.stop is not None: bound = int(k.stop) else: bound = None else: start = k bound = k + 1 start = start or 0 # We need check to see if we need to populate more of the cache. try: results = self.get_results(start, bound) except StopIteration: # There's nothing left, even though the bound is higher. results = None return results
[docs] def get_results(self, start, stop): raise NotImplementedError('please define this function in subclasses')
[docs]class RedisListCache(BaseRedisListCache): key_format = 'redis:list_cache:%s' #: the maximum number of items the list stores max_items = 1000
[docs] def get_results(self, start, stop): if start is None: start = 0 if stop is None: stop = -1 key = self.get_key() results = self.redis.lrange(key, start, stop) return results
[docs] def append(self, value): values = [value] results = self.append_many(values) result = results[0] return result
[docs] def append_many(self, values): key = self.get_key() results = [] def _append_many(redis, values): for value in values: logger.debug('adding to %s with value %s', key, value) result = redis.rpush(key, value) results.append(result) return results # start a new map redis or go with the given one results = self._pipeline_if_needed(_append_many, values) return results
[docs] def remove(self, value): values = [value] results = self.remove_many(values) result = results[0] return result
[docs] def remove_many(self, values): key = self.get_key() results = [] def _remove_many(redis, values): for value in values: logger.debug('removing from %s with value %s', key, value) result = redis.lrem(key, 10, value) results.append(result) return results # start a new map redis or go with the given one results = self._pipeline_if_needed(_remove_many, values) return results
[docs] def count(self): key = self.get_key() count = self.redis.llen(key) return count
[docs] def trim(self): ''' Removes the old items in the list ''' # clean up everything with a rank lower than max items up to the end of # the list key = self.get_key() removed = self.redis.ltrim(key, 0, self.max_items - 1) msg_format = 'cleaning up the list %s to a max of %s items' logger.info(msg_format, self.get_key(), self.max_items) return removed
[docs]class FallbackRedisListCache(RedisListCache): ''' Redis list cache which after retrieving all items from redis falls back to a main data source (like the database) ''' key_format = 'redis:db_list_cache:%s'
[docs] def get_fallback_results(self, start, stop): raise NotImplementedError('please define this function in subclasses')
[docs] def get_results(self, start, stop): ''' Retrieves results from redis and the fallback datasource ''' if stop is not None: redis_results = self.get_redis_results(start, stop - 1) required_items = stop - start enough_results = len(redis_results) == required_items assert len(redis_results) <= required_items, 'we should never have more than we ask for, start %s, stop %s' % ( start, stop) else: # [start:] slicing does not know what's enough so # does not hit the db unless the cache is empty redis_results = self.get_redis_results(start, stop) enough_results = True if not redis_results or not enough_results: self.source = 'fallback' filtered = getattr(self, "_filtered", False) db_results = self.get_fallback_results(start, stop) if start == 0 and not redis_results and not filtered: logger.info('setting cache for type %s with len %s', self.get_key(), len(db_results)) # only cache when we have no results, to prevent duplicates self.cache(db_results) elif start == 0 and redis_results and not filtered: logger.info('overwriting cache for type %s with len %s', self.get_key(), len(db_results)) # clear the cache and add these values self.overwrite(db_results) results = db_results logger.info( 'retrieved %s to %s from db and not from cache with key %s' % (start, stop, self.get_key())) else: results = redis_results logger.info('retrieved %s to %s from cache on key %s' % (start, stop, self.get_key())) return results
[docs] def get_redis_results(self, start, stop): ''' Returns the results from redis :param start: the beginning :param stop: the end ''' results = RedisListCache.get_results(self, start, stop) return results
[docs] def cache(self, fallback_results): ''' Hook to write the results from the fallback to redis ''' self.append_many(fallback_results)
[docs] def overwrite(self, fallback_results): ''' Clear the cache and write the results from the fallback ''' self.delete() self.cache(fallback_results)