"""Classes Module."""
from __future__ import annotations
__all__ = (
"Chain",
"ColorLogger",
"ConfigParser",
"dd",
"dictsort",
"getter",
"LetterCounter",
"NamedtupleMeta",
"Noset",
"NOSET",
)
import abc
import collections
import configparser
import logging
import string
import types
from collections.abc import Callable, Iterable, MutableMapping
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, Union
from .functions import toiter
if TYPE_CHECKING:
from collections.abc import Hashable
from nodeps.modules.typings import ChainLiteral
_KT = TypeVar("_KT")
_VT = TypeVar("_VT")
[docs]
class Chain(collections.ChainMap):
# noinspection PyUnresolvedReferences
"""Variant of chain that allows direct updates to inner scopes and returns more than one value, not the first one.
Examples:
>>> from nodeps import Chain
>>>
>>> class Test3:
... a = 2
>>>
>>> class Test4:
... a = 2
>>>
>>> Test1 = collections.namedtuple('Test1', 'a b')
>>> Test2 = collections.namedtuple('Test2', 'a d')
>>> test1 = Test1(1, 2)
>>> test2 = Test2(3, 5)
>>> test3 = Test3()
>>> test4 = Test4()
>>>
>>> maps = [dict(a=1, b=2), dict(a=2, c=3), dict(a=3, d=4), dict(a=dict(z=1)), dict(a=dict(z=1)), \
dict(a=dict(z=2))]
>>> chain = Chain(*maps)
>>> assert chain['a'] == [1, 2, 3, {'z': 1}, {'z': 2}]
>>> chain = Chain(*maps, rv="first")
>>> assert chain['a'] == 1
>>> chain = Chain(*maps, rv="all")
>>> assert chain['a'] == [1, 2, 3, {'z': 1}, {'z': 1}, {'z': 2}]
>>>
>>> maps = [dict(a=1, b=2), dict(a=2, c=3), dict(a=3, d=4), dict(a=dict(z=1)), dict(a=dict(z=1)),\
dict(a=dict(z=2)), test1, test2]
>>> chain = Chain(*maps)
>>> assert chain['a'] == [1, 2, 3, {'z': 1}, {'z': 2}]
>>> chain = Chain(*maps, rv="first")
>>> assert chain['a'] == 1
>>> chain = Chain(*maps, rv="all")
>>> assert chain['a'] == [1, 2, 3, {'z': 1}, {'z': 1}, {'z': 2}, 1, 3]
>>>
>>> maps = [dict(a=1, b=2), dict(a=2, c=3), dict(a=3, d=4), dict(a=dict(z=1)), dict(a=dict(z=1)), \
dict(a=dict(z=2)), test1, test2]
>>> chain = Chain(*maps)
>>> del chain['a']
>>> assert chain == Chain({'b': 2}, {'c': 3}, {'d': 4}, test1, test2)
>>> assert chain['a'] == [1, 3]
>>>
>>> maps = [dict(a=1, b=2), dict(a=2, c=3), dict(a=3, d=4), dict(a=dict(z=1)), dict(a=dict(z=1)), \
dict(a=dict(z=2)), test1, test2]
>>> chain = Chain(*maps)
>>> assert chain.delete('a') == Chain({'b': 2}, {'c': 3}, {'d': 4}, test1, test2)
>>> assert chain.delete('a')['a'] == [1, 3]
>>>
>>> maps = [dict(a=1, b=2), dict(a=2, c=3), dict(a=3, d=4), dict(a=dict(z=1)), dict(a=dict(z=1)), \
dict(a=dict(z=2)), test1, test2]
>>> chain = Chain(*maps, rv="first")
>>> del chain['a']
>>> del maps[0]['a'] # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
KeyError:
>>>
>>> assert chain['a'] == 2
>>>
>>> maps = [dict(a=1, b=2), dict(a=2, c=3), dict(a=3, d=4), dict(a=dict(z=1)), dict(a=dict(z=1)), \
dict(a=dict(z=2)), test1, test2]
>>> chain = Chain(*maps, rv="first")
>>> new = chain.delete('a')
>>> del maps[0]['a'] # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
KeyError:
>>> assert new.delete('a')
>>> del maps[1]['a'] # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
KeyError:
>>>
>>> assert new['a'] == 3
>>>
>>> maps = [dict(a=1, b=2), dict(a=2, c=3), dict(a=3, d=4), dict(a=dict(z=1)), dict(a=dict(z=1)), \
dict(a=dict(z=2)), test1, test3]
>>> chain = Chain(*maps)
>>> del chain['a']
>>> assert chain[4] == []
>>> assert not hasattr(test3, 'a')
>>> assert chain.set('a', 9)
>>> assert chain['a'] == [9, 1]
>>>
>>> maps = [dict(a=1, b=2), dict(a=2, c=3), dict(a=3, d=4), dict(a=dict(z=1)), dict(a=dict(z=1)), \
dict(a=dict(z=2)), test1, test4]
>>> chain = Chain(*maps)
>>> chain.set('j', 9) # doctest: +ELLIPSIS
Chain({'a': 1, 'b': 2, 'j': 9}, {'a': 2, 'c': 3}, {'a': 3, 'd': 4}, {'a': {'z': 1}}, {'a': {'z': 1}}, \
{'a': {'z': 2}}, Test1(a=1, b=2), <....Test4 object at 0x...>)
>>> assert [maps[0]['j']] == chain['j'] == [9]
>>> chain.set('a', 10) # doctest: +ELLIPSIS
Chain({'a': 10, 'b': 2, 'j': 9}, {'a': 10, 'c': 3}, {'a': 10, 'd': 4}, {'a': 10}, {'a': 10}, {'a': 10}, \
Test1(a=1, b=2), <....Test4 object at 0x...>)
>>> # noinspection PyUnresolvedReferences
>>> assert [maps[0]['a'], 1] == chain['a'] == [maps[7].a, 1] == [10, 1] # 1 from namedtuple
>>>
>>> maps = [dict(a=1, b=2), dict(a=2, c=3), dict(a=3, d=4), dict(a=dict(z=1)), dict(a=dict(z=1)), \
dict(a=dict(z=2)), test1, test4]
>>> chain = Chain(*maps, rv="first")
>>> chain.set('a', 9) # doctest: +ELLIPSIS
Chain({'a': 9, 'b': 2}, {'a': 2, 'c': 3}, {'a': 3, 'd': 4}, {'a': {'z': 1}}, {'a': {'z': 1}}, \
{'a': {'z': 2}}, Test1(a=1, b=2), <....Test4 object at 0x...>)
>>> assert maps[0]['a'] == chain['a'] == 9
>>> assert maps[1]['a'] == 2
"""
rv: ChainLiteral = "unique"
default: Any = None
maps: list[Iterable | NamedtupleMeta | MutableMapping] = [] # noqa: RUF012
def __init__(self, *maps, rv: ChainLiteral = "unique", default: Any = None) -> None:
"""Init."""
super().__init__(*maps)
self.rv = rv
self.default = default
def __getitem__(self, key: Hashable) -> Any: # noqa: PLR0912
"""Get item."""
rv = []
for mapping in self.maps:
if hasattr(mapping, "_field_defaults"):
mapping = mapping._asdict() # noqa: PLW2901
elif hasattr(mapping, "asdict"):
to_dict = mapping.__class__.asdict
if isinstance(to_dict, property):
mapping = mapping.asdict # noqa: PLW2901
elif callable(to_dict):
mapping = mapping.asdict() # noqa: PLW2901
if hasattr(mapping, "__getitem__"):
try:
value = mapping[key]
if self.rv == "first":
return value
if (self.rv == "unique" and value not in rv) or self.rv == "all":
rv.append(value)
except KeyError:
pass
elif (
hasattr(mapping, "__getattribute__")
and isinstance(key, str)
and not isinstance(mapping, (tuple | bool | int | str | bytes))
):
try:
value = getattr(mapping, key)
if self.rv == "first":
return value
if (self.rv == "unique" and value not in rv) or self.rv == "all":
rv.append(value)
except AttributeError:
pass
return self.default if self.rv == "first" else rv
def __delitem__(self, key: Hashable) -> Chain:
"""Delete item."""
index = 0
deleted = []
found = False
for mapping in self.maps:
if mapping:
if not isinstance(mapping, (tuple | bool | int | str | bytes)):
if hasattr(mapping, "__delitem__"):
if key in mapping:
del mapping[key]
if self.rv == "first":
found = True
elif hasattr(mapping, "__delattr__") and hasattr(mapping, key) and isinstance(key, str):
delattr(mapping.__class__, key) if key in dir(mapping.__class__) else delattr(mapping, key)
if self.rv == "first":
found = True
if not mapping:
deleted.append(index)
if found:
break
index += 1
for index in reversed(deleted):
del self.maps[index]
return self
[docs]
def delete(self, key: Hashable) -> Chain:
"""Delete item."""
del self[key]
return self
def __setitem__(self, key: Hashable, value: Any) -> Chain: # noq: C901
"""Set item."""
found = False
for mapping in self.maps:
if mapping:
if not isinstance(mapping, (tuple | bool | int | str | bytes)):
if hasattr(mapping, "__setitem__"):
if key in mapping:
mapping[key] = value
if self.rv == "first":
found = True
elif hasattr(mapping, "__setattr__") and hasattr(mapping, key) and isinstance(key, str):
setattr(mapping, key, value)
if self.rv == "first":
found = True
if found:
break
if not found and not isinstance(self.maps[0], (tuple | bool | int | str | bytes)):
if hasattr(self.maps[0], "__setitem__"):
self.maps[0][key] = value
elif hasattr(self.maps[0], "__setattr__") and isinstance(key, str):
setattr(self.maps[0], key, value)
return self
[docs]
def set(self, key: Hashable, value: Any) -> Chain: # noqa: A003
"""Set item."""
return self.__setitem__(key, value)
[docs]
class ColorLogger(logging.Formatter):
"""Color logger class."""
black = "\x1b[30m"
blue = "\x1b[34m"
cyan = "\x1b[36m"
gr = "\x1b[32m"
grey = "\x1b[38;21m"
mg = "\x1b[35m"
red = "\x1b[31;21m"
red_bold = "\x1b[31;1m"
reset = "\x1b[0m"
white = "\x1b[37m"
yellow = "\x1b[33;21m"
fmt = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)"
vertical = f"{red}|{reset} "
FORMATS: ClassVar[dict[int, str]] = {
logging.DEBUG: grey + fmt + reset,
logging.INFO: f"{cyan}%(levelname)8s{reset} {vertical}"
f"{cyan}%(name)s{reset} {vertical}"
f"{cyan}%(filename)s{reset}:{cyan}%(lineno)d{reset} {vertical}"
f"{gr}%(extra)s{reset} {vertical}"
f"{cyan}%(message)s{reset}",
logging.WARNING: f"{yellow}%(levelname)8s{reset} {vertical}"
f"{yellow}%(name)s{reset} {vertical}"
f"{yellow}%(filename)s{reset}:{yellow}%(lineno)d{reset} {vertical}"
f"{gr}%(repo)s{reset} {vertical}"
f"{yellow}%(message)s{reset}",
logging.ERROR: red + fmt + reset,
logging.CRITICAL: red_bold + fmt + reset,
}
[docs]
@classmethod
def logger(cls, name: str = __name__) -> logging.Logger:
"""Get logger.
Examples:
>>> from nodeps import ColorLogger
>>> from nodeps import NODEPS_PROJECT_NAME
>>>
>>> lo = ColorLogger.logger(NODEPS_PROJECT_NAME)
>>> lo.info("hola", extra=dict(extra="bapy"))
>>> lo.info("hola")
Args:
name: logger name
Returns:
logging.Logger
"""
l = logging.getLogger(name)
l.propagate = False
l.setLevel(logging.DEBUG)
if l.handlers:
l.handlers[0].setLevel(logging.DEBUG)
l.handlers[0].setFormatter(cls())
else:
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(cls())
l.addHandler(handler)
return l
[docs]
class ConfigParser(configparser.ConfigParser):
"""Config parser to get list from setup.cfg."""
[docs]
def getlist(self, section: str = "options", option: str = "scripts") -> list:
"""Get list."""
value = None
if self.has_section(section):
value = self.get(section, option) if self.has_option(section, option) else None
return list(filter(None, (x.strip() for x in value.splitlines()))) if value else []
[docs]
def setlist(self, section: str = "options", option: str = "scripts", value: list | None = None):
"""Set list."""
if value:
newline = "\n"
value = f"\n{newline.join(value)}"
if not self.has_section(section):
self.add_section(section)
self.set(section=section, option=option, value=value)
else:
self.remove_option(section=section, option=option)
# noinspection PyPep8Naming
[docs]
class dd(collections.defaultdict): # noqa: N801
"""Default Dict Helper Class.
Examples:
>>> from nodeps import dd
>>>
>>> d = dd()
>>> d
dd(None, {})
>>> d[1]
>>> d.get(1)
>>>
>>> d = dd({})
>>> d
dd(None, {})
>>> d[1]
>>> d.get(1)
>>>
>>> d = dd({}, a=1)
>>> d
dd(None, {'a': 1})
>>> d[1]
>>> d.get(1)
>>>
>>> d = dd(dict)
>>> d
dd(<class 'dict'>, {})
>>> d.get(1)
>>> d
dd(<class 'dict'>, {})
>>> d[1]
{}
>>> d
dd(<class 'dict'>, {1: {}})
>>> d = dd(tuple)
>>> d
dd(<class 'tuple'>, {})
>>> d[1]
()
>>> d.get(1)
()
>>>
>>> d = dd(True)
>>> d
dd(True, {})
>>> d[1]
True
>>> d.get(1)
True
>>>
>>> d = dd({1: 1}, a=1)
>>> d
dd(None, {1: 1, 'a': 1})
>>> d[1]
1
>>> d.get(1)
1
>>>
>>> d = dd(list, {1: 1}, a=1)
>>> d
dd(<class 'list'>, {1: 1, 'a': 1})
>>> d[2]
[]
>>> d
dd(<class 'list'>, {1: 1, 'a': 1, 2: []})
>>>
>>> d = dd(True, {1: 1}, a=1)
>>> d
dd(True, {1: 1, 'a': 1})
>>> d.get('c')
>>> d['c']
True
"""
__slots__ = ("__factory__",)
def __init__(self, factory: Union[Callable, Any] = None, *args: Any, **kwargs: Any): # noqa: UP007
"""Init."""
def dd_factory(value):
return lambda: value() if callable(value) else value
iterable = isinstance(factory, Iterable)
self.__factory__ = None if iterable else factory
super().__init__(dd_factory(self.__factory__), *((*args, factory) if iterable else args), **kwargs)
def __repr__(self) -> str:
"""Representation."""
return f"{self.__class__.__name__}({self.__factory__}, {dict(self)})"
__class_getitem__ = classmethod(types.GenericAlias)
# noinspection PyPep8Naming
[docs]
class dictsort(dict, MutableMapping[_KT, _VT]): # noqa: N801
"""Dict Sort Class.
Examples:
>>> from nodeps import dictsort
>>>
>>> d = dictsort(b=1, c=2, a=3)
>>> assert d.sort() == dictsort({'a': 3, 'b': 1, 'c': 2})
"""
__slots__ = ()
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Init."""
super().__init__(*args, **kwargs)
[docs]
def sort(self) -> dictsort[_KT, _VT]:
"""Sort."""
return self.__class__({item: self[item] for item in sorted(self)})
# noinspection PyPep8Naming
[docs]
class getter(Callable[[Any], Any | tuple[Any, ...]]): # noqa: N801
"""Return a callable object that fetches the given attribute(s)/item(s) from its operand.
Examples:
>>> from types import SimpleNamespace
>>> from pickle import dumps, loads
>>> from copy import deepcopy
>>> from nodeps import getter
>>>
>>> test = SimpleNamespace(a='a', b='b')
>>> assert getter('a b')(test) == (test.a, test.b)
>>> assert getter('a c')(test) == (test.a, None)
>>> dicts = getter('a c d', default={})(test)
>>> assert dicts == (test.a, {}, {})
>>> assert id(dicts[1]) != id(dicts[2])
>>> assert getter('a')(test) == test.a
>>> assert getter('a b', 'c')(test) == (test.a, test.b, None)
>>> assert getter(['a', 'b'], 'c')(test) == (test.a, test.b, None)
>>> assert getter(['a', 'b'])(test) == (test.a, test.b)
>>>
>>> test = dict(a='a', b='b')
>>> assert getter('a b')(test) == (test['a'], test['b'])
>>> assert getter('a c')(test) == (test['a'], None)
>>> dicts = getter('a c d', default={})(test)
>>> assert dicts == (test['a'], {}, {})
>>> assert id(dicts[1]) != id(dicts[2])
>>> assert getter('a')(test) == test['a']
>>> assert getter('a b', 'c')(test) == (test['a'], test['b'], None)
>>> assert getter(['a', 'b'], 'c')(test) == (test['a'], test['b'], None)
>>> assert getter(['a', 'b'])(test) == (test['a'], test['b'])
>>>
>>> test = SimpleNamespace(a='a', b='b')
>>> test1 = SimpleNamespace(d='d', test=test)
>>> assert getter('d test.a test.a.c test.c test.m.j.k')(test1) == (test1.d, test1.test.a, None, None, None)
>>> assert getter('a c')(test1) == (None, None)
>>> dicts = getter('a c d test.a', 'test.b', default={})(test1)
>>> assert dicts == ({}, {}, test1.d, test1.test.a, test1.test.b)
>>> assert id(dicts[1]) != id(dicts[2])
>>> assert getter('a')(test1) is None
>>> assert getter('test.b')(test1) == test1.test.b
>>> assert getter(['a', 'test.b'], 'c')(test1) == (None, test1.test.b, None)
>>> assert getter(['a', 'a.b.c'])(test1) == (None, None)
>>>
>>> test = dict(a='a', b='b')
>>> test1_dict = dict(d='d', test=test)
>>> assert getter('d test.a test.a.c test.c test.m.j.k')(test1_dict) == \
getter('d test.a test.a.c test.c test.m.j.k')(test1)
>>> assert getter('d test.a test.a.c test.c test.m.j.k')(test1_dict) == \
(test1_dict['d'], test1_dict['test']['a'], None, None, None)
>>> assert getter('a c')(test1_dict) == (None, None)
>>> dicts = getter('a c d test.a', 'test.b', default={})(test1_dict)
>>> assert dicts == ({}, {}, test1_dict['d'], test1_dict['test']['a'], test1_dict['test']['b'])
>>> assert id(dicts[1]) != id(dicts[2])
>>> assert getter('a')(test1_dict) is None
>>> assert getter('test.b')(test1_dict) == test1_dict['test']['b']
>>> assert getter(['a', 'test.b'], 'c')(test1_dict) == (None, test1_dict['test']['b'], None)
>>> assert getter(['a', 'a.b.c'])(test1_dict) == (None, None)
>>>
>>> encode = dumps(test1_dict)
>>> test1_dict_decode = loads(encode)
>>> assert id(test1_dict) != id(test1_dict_decode)
>>> test1_dict_copy = deepcopy(test1_dict)
>>> assert id(test1_dict) != id(test1_dict_copy)
>>>
>>> assert getter('d test.a test.a.c test.c test.m.j.k')(test1_dict_decode) == \
(test1_dict_decode['d'], test1_dict_decode['test']['a'], None, None, None)
>>> assert getter('a c')(test1_dict_decode) == (None, None)
>>> dicts = getter('a c d test.a', 'test.b', default={})(test1_dict_decode)
>>> assert dicts == ({}, {}, test1_dict_decode['d'], test1_dict['test']['a'], test1_dict_decode['test']['b'])
>>> assert id(dicts[1]) != id(dicts[2])
>>> assert getter('a')(test1_dict_decode) is None
>>> assert getter('test.b')(test1_dict_decode) == test1_dict_decode['test']['b']
>>> assert getter(['a', 'test.b'], 'c')(test1_dict_decode) == (None, test1_dict_decode['test']['b'], None)
>>> assert getter(['a', 'a.b.c'])(test1_dict_decode) == (None, None)
The call returns:
- getter('name')(r): r.name/r['name'].
- getter('name', 'date')(r): (r.name, r.date)/(r['name'], r['date']).
- getter('name.first', 'name.last')(r):(r.name.first, r.name.last)/(r['name.first'], r['name.last']).
"""
__slots__ = ("_attrs", "_call", "_copy", "_default", "_mm")
def __init__(self, attr: str | Iterable[str], *attrs: str, default: bool | Any = None):
"""Init."""
self._copy: bool = "copy" in dir(type(default))
self._default: bool | Any = default
_attrs = toiter(attr)
attr = _attrs[0]
attrs = (tuple(_attrs[1:]) if len(_attrs) > 1 else ()) + attrs
if not attrs:
if not isinstance(attr, str):
msg = "attribute name must be a string"
raise TypeError(msg)
self._attrs = (attr,)
names = attr.split(".")
def func(obj):
mm = isinstance(obj, MutableMapping)
count = 0
total = len(names)
for name in names:
count += 1
_default = self._default.copy() if self._copy else self._default
if mm:
try:
obj = obj[name]
if not isinstance(obj, MutableMapping) and count < total:
obj = None
break
except KeyError:
obj = _default
break
else:
obj = getattr(obj, name, _default)
return obj
self._call: Callable[[Any], Any | tuple[Any, ...]] = func
else:
self._attrs = (attr, *attrs)
callers = tuple(self.__class__(item, default=self._default) for item in self._attrs)
def func(obj):
return tuple(call(obj) for call in callers)
self._call = func
def __call__(self, obj: Any) -> Any | tuple[Any, ...]:
"""Call."""
return self._call(obj)
def __reduce__(self) -> tuple[type[getter], type[str, ...]]:
"""Reduce."""
return self.__class__, self._attrs
def __repr__(self) -> str:
"""Representation."""
return self.__class__.__name__ + "(" + ",".join(f"{i}={getattr(self, i)!r}" for i in self._attrs) + ")"
[docs]
class LetterCounter:
"""Letter Counter generator function. This way, each time you call next() on the generator.
It will yield the next counter value. We will also remove the maximum counter check
Examples:
>>> from nodeps import LetterCounter
>>>
>>> c = LetterCounter("Z")
>>> assert c.increment() == 'AA'
"""
def __init__(self, start: str = "A") -> None:
"""Init."""
self.current_value = [string.ascii_uppercase.index(v) for v in start[::-1]]
[docs]
def increment(self) -> str:
"""Increments 1.
Exaamples:
>>> from nodeps import LetterCounter
>>>
>>> c = LetterCounter('BWDLQZZ')
>>> assert c.increment() == 'BWDLRAA'
>>> assert c.increment() == 'BWDLRAB'
Returns:
str
"""
for i in range(len(self.current_value)):
# If digit is less than Z, increment and finish
if self.current_value[i] < 25: # noqa: PLR2004
self.current_value[i] += 1
break
# Otherwise, set digit to A (0) and continue to next digit
self.current_value[i] = 0
# If we've just set the most significant digit to A,
# we need to add another 'A' at the most significant end
if i == len(self.current_value) - 1:
self.current_value.append(0)
break
# Form the string and return
return "".join(reversed([string.ascii_uppercase[i] for i in self.current_value]))
[docs]
class Noset:
"""Marker object for globals not initialized or other objects.
Examples:
>>> from nodeps import NOSET
>>>
>>> name = Noset.__name__.lower()
>>> assert str(NOSET) == f'<{name}>'
>>> assert repr(NOSET) == f'<{name}>'
>>> assert repr(Noset("test")) == f'<test>'
"""
name: str
__slots__ = ("name",)
def __init__(self, name: str = ""):
"""Init."""
self.name = name if name else self.__class__.__name__.lower()
def __hash__(self) -> int:
"""Hash."""
return hash(
(
self.__class__,
self.name,
)
)
def __reduce__(self) -> tuple[type[Noset], tuple[str]]:
"""Reduce."""
return self.__class__, (self.name,)
def __repr__(self):
"""Repr."""
return self.__str__()
def __str__(self):
"""Str."""
return f"<{self.name}>"
NOSET = Noset()