Source code for ezycore.manager.segment

from __future__ import annotations
from abc import ABC, abstractmethod

from ezycore.models import Model, M
from ezycore.exceptions import Full, SegmentError
from typing import Any, Callable, Iterable, Optional, Union
from re import _compile


[docs]class BaseSegment(ABC): """ Base class for creating segments Parameters ---------- name: :class:`str` Name of segment max_size: :class:`int` Maximum size of segment, if < 0 then size of segment is infinite model: :class:`Model` Model being used to store data make_space: :class:`bool` Whether to start removing content once segment is full starting from the piece of data last accessed """ def __init__( self, name: str, model: Model, *, max_size: int = 1000, make_space: bool = True ) -> None: try: assert type(name) == str, 'Name of segment must be a string' assert issubclass(model, Model), 'modal provided must inherit the Modal class' assert type(max_size) == int, 'Max size must be an integer' assert type(make_space) == bool, 'Value for make space must be a boolean' except (AssertionError, TypeError) as err: raise SegmentError('Invalid args provided') from err self.__name = name self.__max_size = max_size self.__model = model self.__ms = make_space self.__manager = None
[docs] def update_segment(self, *, name: str = ..., max_size: int = ..., model: Model = ..., make_space: bool = ... ) -> None: """ Used to update enclosed variables in a segment Parameters ---------- name: :class:`str` Name of segment max_size: :class:`int` Maximum size of segment, if < 0 then size of segment is infinite model: :class:`Model` Model being used to store data make_space: :class:`bool` Whether to start removing content starting from the piece of data last accessed """ if model != ...: assert issubclass(model, Model), 'modal provided must inherit the Modal class' if self.__manager: __mod = self.__manager._modify_mod() __mod.pop(self.__name, None) __mod[self.__name] = model self.__model = model if name != ...: assert type(name) == str, 'Name of segment must be a string' if self.__manager: __loc = self.__manager._modify_loc() __loc.pop(self.__name, None) __loc[name] = self __mod = self.__manager._modify_mod() _m = __mod.pop(self.__name, None) __mod[name] = _m self.__name = name if max_size != ...: assert type(max_size) == int, 'Max size must be an integer' self.__max_size = max_size if make_space != ...: assert type(make_space) == bool, 'Value for make space must be a boolean' self.__ms = make_space
def _set_manager(self, manager: Any) -> None: if not self.__manager: self.__manager = manager else: raise ValueError('Segment already set to manager') def _del_manager(self) -> None: if not self.__manager: raise ValueError('No manager set') self.__manager = None def _get_manager(self) -> Optional[Any]: return self.__manager ########################################################################################### ## ## Properties ## ########################################################################################### @property def name(self) -> str: """ Returns name of segment """ return self.__name @property def max_size(self) -> int: """ Returns maximum-size of segment """ return self.__max_size @property def model(self) -> Model: """ Returns model of segment """ return self.__model @property def make_space(self) -> bool: """ Whether model should remove least accessed data """ return self.__ms ########################################################################################### ## ## Methods ## ###########################################################################################
[docs] @abstractmethod def size(self) -> int: """ Returns total number of elements in segment """
[docs] @abstractmethod def keys(self) -> Iterable[Any]: """ Returns an iterator of all keys in segment """
[docs] @abstractmethod def values(self) -> Iterable[Model]: """ Returns an iterator of all values in segment """
[docs] @abstractmethod def get(self, obj_key: Any, *flags, default: Any = ..., **export_kwds) -> Optional[Model]: """ Retrieves an element from cache Parameters ---------- obj_key: Any value to search for, set in `Model._config.search_by` *flags elements to include in cache, read more in the :class:`Model`'s section default: Any default value if element not found **export_kwds: export kwargs, read more `here <https://docs.pydantic.dev/usage/exporting_models/>`_ """
[docs] @abstractmethod def search(self, func: Callable[[Model], bool], *fields, limit: int = -1, **export_kwds) -> Iterable[M]: """ Searches for elements matching query in cache Parameters ---------- func: Callable[[:class:`Model`], :class:`bool`] Function which returns whether element is needed *fields List of fields to return from model limit: :class:`int` Number of results to restrict search to, if < 0 no limit is set. **export_kwds: export kwargs, read more `here <https://docs.pydantic.dev/usage/exporting_models/>`_ """
[docs] @abstractmethod def search_using_re(self, expr: str, *fields, flags: int = 0, key: str = ..., limit: int = -1, **export_kwds) -> Iterable[M]: """Searches for elements using regular expressions .. warning:: Converts all values to :class:`str` using the ``str()`` func Parameters ---------- expr: :class:`str` Regular expression to use flags: :class:`int` Flags for expression *fields List of fields to return from model key: :class:`str` Name of field to use, defaults to :attr:`Config.search_by` limit: :class:`int` Number of results to restrict search to, if < 0 no limit is set. **export_kwds: export kwargs, read more `here <https://docs.pydantic.dev/usage/exporting_models/>`_ """
[docs] @abstractmethod def add(self, obj: Union[dict, Model], *, overwrite: bool = False) -> None: """ Adds an element within the segment, raises `ValueError` if object already exists unless overwrite set to `True`. Parameters ---------- obj: Union[:class:`dict`, :class:`Model`] Object to add overwrite: :class:`bool` Whether to overwrite an existing element """
[docs] @abstractmethod def remove(self, obj_key: Any, *default: Any) -> Optional[Model]: """ Removes an element from segment, raises `KeyError` if key doesn't exist unless default is provided Parameters ---------- obj_key: Any Value of stored key to remove *default: If value not found, returns this instead of raising an error """
[docs] @abstractmethod def invalidate_all(self, func: Callable[[Model], bool], *, limit: int = -1) -> Iterable[Model]: """ Invalidates all entries which match check function Parameters ---------- func: Callable[[:class:`Model`], :class:`bool`] Function which indicates whether entry should be removed limit: :class:`int` Limit how many entries should be removed, if < 0 no limit is set """
[docs] @abstractmethod def update(self, obj_key: Any, **kwds) -> None: """ Updates an element in the segment Parameters ---------- obj_key: Any Object key to update **kwds: Fields to update """
[docs] @abstractmethod def last(self) -> Optional[Model]: """ Retrieves item which was last accessed """
[docs] @abstractmethod def first(self) -> Optional[Model]: """ Retrieves the item which was most recently accessed """
[docs] @abstractmethod def clear(self) -> None: """ Removes all elements from segment """
[docs] @abstractmethod def pretty_print(self, *, limit: int = -1) -> None: """ Prints out all data in segment, or until specified limit Parameters ---------- limit: :class:`int` How many rows to print out, starting from most request data """
########################################################################################### ## ## Others ## ########################################################################################### def __iter__(self): return self @abstractmethod def __next__(self) -> Any: ... def __repr__(self) -> str: return f"{self.__class__.__name__}(name={self.name}, size={self.size()}, max_size={self.max_size}, model={self.model})" def __len__(self) -> int: return self.size()
[docs]class Segment(BaseSegment): """ Default segment class Parameters ---------- name: :class:`str` Name of segment max_size: :class:`int` Maximum size of segment model: :class:`Model` Model being used to store data make_space: :class:`bool` Whether to start removing content starting from the piece of data last accessed """ def __init__( self, name: str, model: Model, *, max_size: int = 1000, make_space: bool = True ) -> None: super().__init__(name, model, max_size=max_size, make_space=make_space) self.__queue = list() self.__data = dict() self.__position = 0 self._invalidated_last = False
[docs] def size(self) -> int: return len(self.__data)
[docs] def keys(self) -> Iterable[Any]: return iter(self.__data.keys())
[docs] def values(self) -> Iterable[Model]: return iter(self.__data.values())
def _get(self, obj_key: Any, *include, default: Any = None, ignore: bool = False, original: bool = False, **export_kwds) -> Optional[Model]: ## Simply retrieves value, no queue/cache invalidation handling here try: data: Model = self.__data[obj_key] manager = self._get_manager() for partial in data.__ezycore_partials__: if not manager: break prim_key = getattr(data, partial) try: setattr(data, partial, manager[data._config.partials[partial]].get(prim_key)) except ValueError: pass if ignore: return (data, data) if original else data if not (include or export_kwds or self.model._config.exclude): return (data, data) if original else data if '*' in include: if original: return data.dict(), data return data.dict() inc = dict() if include: for field in include: if isinstance(field, str): inc[field] = True else: inc[field[0]] = field[1] export_kwds['exclude'] = export_kwds.get('exclude', dict()) export_kwds['include'] = export_kwds.get('include') or inc if isinstance(export_kwds['exclude'], set) and isinstance(data._config.exclude, set): return data.dict(include=inc, **export_kwds) elif isinstance(export_kwds['exclude'], set) and isinstance(data._config.exclude, dict): export_kwds['exclude'] = dict(**data._config.exclude) for field in export_kwds['exclude']: export_kwds['exclude'][field] = True else: for field in data._config.exclude: export_kwds['exclude'][field] = True if not export_kwds['include']: export_kwds.pop('include') if not export_kwds['exclude']: export_kwds.pop('exclude') if original: return data.dict(**export_kwds), data return data.dict(**export_kwds) except KeyError as err: if default: return default raise KeyError('object not found') from err
[docs] def get(self, obj_key: Any, *flags, default: Any = ..., **export_kwds) -> Optional[Model]: _ignore_q = export_kwds.pop('ignore_queue', False) if not _ignore_q: try: self.__queue.remove(obj_key) except ValueError: if default == ...: raise ValueError('Object not found') return default self.__queue.append(obj_key) value, result = self._get(obj_key, *flags, original=True, default=default, **export_kwds) max_fetches = result._config.invalidate_after if max_fetches < 0: self._invalidated_last = False return value fetches = result._config.__ezycore_internal__['n_fetch'] + 1 if fetches >= max_fetches: self._invalidated_last = True self.remove(obj_key) else: self._invalidated_last = False result._config.__ezycore_internal__['n_fetch'] = fetches return value
[docs] def search(self, func: Callable[[Model], bool], *fields, limit: int = -1, **export_kwds) -> Iterable[M]: export_kwds.update(ignore_queue=True) results = list() for key in self.__queue: if len(results) >= limit and limit > 0: break works = func(self._get(key, ignore=True)) if not works: continue results.append(self.get(key, *fields, **export_kwds)) return results
[docs] def search_using_re(self, expr: str, *fields, flags: int = 0, key: str = None, limit: int = -1, **export_kwds) -> Iterable[M]: export_kwds.update(ignore_queue=True) results = list() search_key = key or self.model._config.search_by re = _compile(expr, flags) for key in self.__queue: if len(results) >= limit and limit > 0: break works = re.match(str(getattr(self._get(key, ignore=True), search_key))) if not works: continue results.append(self.get(key, *fields, **export_kwds)) return results
[docs] def add(self, obj: M, *, overwrite: bool = False) -> None: assert isinstance(obj, (dict, self.model)), 'Invalid object passed' if isinstance(obj, self.model): obj._config.__ezycore_internal__['n_fetch'] = 0 v = dict(obj) key = self.model._config.search_by if v[key] in self.__data and not overwrite: raise ValueError('Item already exists') if (len(self.__queue) >= self.max_size) and (self.max_size > 0): if not self.make_space: raise Full('Segment full') k = self.__queue.pop(0) self.__data.pop(k) self.__data[v[key]] = self.model(**v) self.__queue.append(v[key])
[docs] def remove(self, obj_key: Any, *default: Any) -> Optional[Model]: try: i = self.__queue.index(obj_key) except ValueError as err: if default: return default[0] if len(default) == 1 else default raise err self.__queue.pop(i) r = self.__data.pop(obj_key) return r
[docs] def invalidate_all(self, func: Callable[[Model], bool], *, limit: int = -1) -> Iterable[Model]: values = list() for key in self.__queue: if len(values) >= limit and limit > 0: break works = func(self._get(key, ignore=True)) if works: values.append(key) return [self.remove(i) for i in values]
[docs] def update(self, obj_key: Any, **kwds) -> None: current = self.get(obj_key) d = dict(current) d.update(kwds) self.__data[obj_key] = self.model(**d)
[docs] def first(self) -> Optional[Model]: if self.size() == 0: return return self.__data[self.__queue[-1]]
[docs] def last(self) -> Optional[Model]: if self.size() == 0: return return self.__data[self.__queue[0]]
[docs] def oldest(self, limit: int = -1) -> Iterable[Model]: """ Retrieves elements starting from the least accessed values Parameters ---------- limit: :class:`int` How many elements to retrieve, if < 0 then all elements are retrieved """ limit = limit if limit > 0 else self.size() for i in range(limit): yield self.__data[self.__queue[i]]
[docs] def newest(self, limit: int = -1) -> Iterable[Model]: """ Retrieves elements starting from the most recently accessed values Parameters ---------- limit: :class:`int` How many elements to retreive, if < 0 then all elements are retrieved """ limit = limit if limit > 0 else self.size() for i in range(limit): yield self.__data[self.__queue[-1 - i]]
[docs] def clear(self) -> None: self.__position = 0 self.__data.clear() self.__queue.clear()
[docs] def pretty_print(self, *, limit: int = -1) -> None: if (limit < 0) or (limit > self.size()): limit = self.size() headers = [] for field in self.model.__fields__: headers.append(field) print('\t'.join(headers)) for i in range(limit): key = self.__queue[-1 - i] obj = self.__data[key] for header in headers: print(getattr(obj, header), end='\t') print() print()
def __iter__(self, *, position: int = 0): self.__position = position return super().__iter__() def __next__(self) -> Model: if self.__position >= self.size(): self.__position = 0 raise StopIteration self.__position += 1 return self.__data[self.__queue[-1 - (self.__position - 1)]]