"""Functions Module."""
from __future__ import annotations
__all__ = (
"aioclosed",
"aiocmd",
"aiocommand",
"aiodmg",
"aiogz",
"aioloop",
"aioloopid",
"aiorunning",
"allin",
"ami",
"anyin",
"chdir",
"cmd",
"cmdrun",
"cmdsudo",
"command",
"completions",
"current_task_name",
"dict_sort",
"dmg",
"effect",
"elementadd",
"elevate",
"envsh",
"exec_module_from_file",
"filterm",
"findfile",
"findup",
"firstfound",
"flatten",
"framesimple",
"from_latin9",
"fromiter",
"getpths",
"getsitedir",
"group_user",
"gz",
"in_tox",
"indict",
"iscoro",
"map_with_args",
"mip",
"noexc",
"parent",
"printe",
"returncode",
"sourcepath",
"siteimported",
"split_pairs",
"stdout",
"stdquiet",
"suppress",
"syssudo",
"tardir",
"tilde",
"timestamp_now",
"to_camel",
"to_latin9",
"tomodules",
"urljson",
"varname",
"which",
"yield_if",
"yield_last",
)
import asyncio
import builtins
import collections
import contextlib
import fnmatch
import getpass
import grp
import importlib.util
import inspect
import io
import json
import os
import pickle
import pwd
import re
import shutil
import subprocess
import sys
import sysconfig
import tarfile
import tempfile
import textwrap
import time
import types
import urllib.request
from collections.abc import Callable, Generator, Iterable, Iterator, MutableMapping
from typing import TYPE_CHECKING, Any, AnyStr, Literal, ParamSpec, TextIO, TypeVar, Union, cast
from .constants import (
EXECUTABLE,
EXECUTABLE_SITE,
GITHUB_TOKEN,
MACOS,
NODEPS_PROJECT_NAME,
PW_ROOT,
PW_USER,
SUDO,
USER,
)
from .datas import GroupUser, IdName
from .errors import CalledProcessError, CmdError, CommandNotFoundError
from .path import AnyPath, FrameSimple, Path, toiter
if TYPE_CHECKING:
from .typings import ExcType, PathIsLiteral, RunningLoop
_KT = TypeVar("_KT")
_T = TypeVar("_T")
_VT = TypeVar("_VT")
P = ParamSpec("P")
T = TypeVar("T")
[docs]
def aioclosed() -> bool:
"""Check if event loop is closed."""
return asyncio.get_event_loop().is_closed()
[docs]
async def aiocmd(*args, **kwargs) -> subprocess.CompletedProcess:
"""Async Exec Command.
Examples:
>>> import asyncio
>>> from tempfile import TemporaryDirectory
>>> from nodeps import Path, aiocmd
>>> with TemporaryDirectory() as tmp:
... tmp = Path(tmp)
... rv = asyncio.run(aiocmd("git", "clone", "https://github.com/octocat/Hello-World.git", cwd=tmp))
... assert rv.returncode == 0
... assert (tmp / "Hello-World" / "README").exists()
Args:
*args: command and args
**kwargs: subprocess.run kwargs
Raises:
JetBrainsError
Returns:
None
"""
proc = await asyncio.create_subprocess_exec(
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, **kwargs
)
out, err = await proc.communicate()
completed = subprocess.CompletedProcess(
args, returncode=proc.returncode, stdout=out.decode() if out else None, stderr=err.decode() if err else None
)
if completed.returncode != 0:
raise CmdError(completed)
return completed
[docs]
async def aiocommand(
data: str | list, decode: bool = True, utf8: bool = False, lines: bool = False
) -> subprocess.CompletedProcess:
"""Asyncio run cmd.
Args:
data: command.
decode: decode and strip output.
utf8: utf8 decode.
lines: split lines.
Returns:
CompletedProcess.
"""
proc = await asyncio.create_subprocess_shell(
data, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, loop=asyncio.get_running_loop()
)
out, err = await proc.communicate()
if decode:
out = out.decode().rstrip(".\n")
err = err.decode().rstrip(".\n")
elif utf8:
out = out.decode("utf8").strip()
err = err.decode("utf8").strip()
out = out.splitlines() if lines else out
return subprocess.CompletedProcess(data, proc.returncode, out, cast(Any, err))
[docs]
async def aiodmg(src: AnyPath, dest: AnyPath) -> None:
"""Async Open dmg file and copy the app to dest.
Examples:
>>> from nodeps import aiodmg
>>> async def test(): # doctest: +SKIP
... await aiodmg("/tmp/JetBrains.dmg", "/tmp/JetBrains")
Args:
src: dmg file
dest: path to copy to
Returns:
CompletedProcess
"""
with tempfile.TemporaryDirectory() as tmpdir:
await aiocmd("hdiutil", "attach", "-mountpoint", tmpdir, "-nobrowse", "-quiet", src)
for item in Path(src).iterdir():
if item.name.endswith(".app"):
await aiocmd("cp", "-r", Path(tmpdir) / item.name, dest)
await aiocmd("xattr", "-r", "-d", "com.apple.quarantine", dest)
await aiocmd("hdiutil", "detach", tmpdir, "-force")
break
[docs]
async def aiogz(src: AnyPath, dest: AnyPath = ".") -> Path:
"""Async ncompress .gz src to dest (default: current directory).
It will be uncompressed to the same directory name as src basename.
Uncompressed directory will be under dest directory.
Examples:
>>> import os
>>> import tempfile
>>> from nodeps import Path, aiogz, tardir
>>>
>>> cwd = Path.cwd()
>>> with tempfile.TemporaryDirectory() as workdir:
... os.chdir(workdir)
... with tempfile.TemporaryDirectory() as compress:
... file = Path(compress) / "test.txt"
... _ = file.touch()
... compressed = tardir(compress)
... with tempfile.TemporaryDirectory() as uncompress:
... uncompressed = asyncio.run(aiogz(compressed, uncompress))
... assert uncompressed.is_dir()
... assert Path(uncompressed).joinpath(file.name).exists()
>>> os.chdir(cwd)
Args:
src: file to uncompress
dest: destination directory to where uncompress directory will be created (default: current directory)
Returns:
Absolute Path of the Uncompressed Directory
"""
return await asyncio.to_thread(gz, src, dest)
[docs]
def aioloop() -> RunningLoop | None:
"""Get running loop."""
return noexc(RuntimeError, asyncio.get_running_loop)
[docs]
def aioloopid() -> int | None:
"""Get running loop id."""
try:
# noinspection PyUnresolvedReferences
return asyncio.get_running_loop()._selector
except RuntimeError:
return None
[docs]
def aiorunning() -> bool:
"""Check if event loop is running."""
return asyncio.get_event_loop().is_running()
[docs]
def allin(origin: Iterable, destination: Iterable) -> bool:
"""Checks all items in origin are in destination iterable.
Examples:
>>> from nodeps import allin
>>> from nodeps.variables.builtin import BUILTIN_CLASS
>>>
>>> class Int(int):
... pass
>>> allin(tuple.__mro__, BUILTIN_CLASS)
True
>>> allin(Int.__mro__, BUILTIN_CLASS)
False
>>> allin('tuple int', 'bool dict int')
False
>>> allin('bool int', ['bool', 'dict', 'int'])
True
>>> allin(['bool', 'int'], ['bool', 'dict', 'int'])
True
Args:
origin: origin iterable.
destination: destination iterable to check if origin items are in.
Returns:
True if all items in origin are in destination.
"""
origin = toiter(origin)
destination = toiter(destination)
return all(x in destination for x in origin)
[docs]
def ami(user: str = "root") -> bool:
"""Check if Current User is User in Argument (default: root).
Examples:
>>> from nodeps import ami
>>> from nodeps import USER
>>> from nodeps import LOCAL
>>> from nodeps import DOCKER
>>> from nodeps import MACOS
>>>
>>> assert ami(USER) is True
>>> if LOCAL and MACOS:
... assert ami() is False
>>> if DOCKER:
... assert ami() is True
Arguments:
user: to check against current user (Default: root)
Returns:
bool True if I am user, False otherwise
"""
return os.getuid() == pwd.getpwnam(user or getpass.getuser()).pw_uid
[docs]
def anyin(origin: Iterable, destination: Iterable) -> Any | None:
"""Checks any item in origin are in destination iterable and return the first found.
Examples:
>>> from nodeps import anyin
>>> from nodeps.variables.builtin import BUILTIN_CLASS
>>>
>>> class Int(int):
... pass
>>> anyin(tuple.__mro__, BUILTIN_CLASS)
<class 'tuple'>
>>> assert anyin('tuple int', BUILTIN_CLASS) is None
>>> anyin('tuple int', 'bool dict int')
'int'
>>> anyin('tuple int', ['bool', 'dict', 'int'])
'int'
>>> anyin(['tuple', 'int'], ['bool', 'dict', 'int'])
'int'
Args:
origin: origin iterable.
destination: destination iterable to check if any of origin items are in.
Returns:
First found if any item in origin are in destination.
"""
origin = toiter(origin)
destination = toiter(destination)
for item in toiter(origin):
if item in destination:
return item
return None
[docs]
@contextlib.contextmanager
def chdir(data: AnyPath | bool = True) -> Iterable[tuple[Path, Path]]:
"""Change directory and come back to previous directory.
Examples:
>>> from nodeps import Path
>>> from nodeps import chdir
>>> from nodeps import MACOS
>>>
>>> previous = Path.cwd()
>>> new = Path('/usr')
>>> with chdir(new) as (pr, ne):
... assert previous == pr
... assert new == ne
... assert ne == Path.cwd()
>>>
>>> if MACOS:
... new = Path('/bin/ls')
... with chdir(new) as (pr, ne):
... assert previous == pr
... assert new.parent == ne
... assert ne == Path.cwd()
>>>
>>> if MACOS:
... new = Path('/bin/foo')
... with chdir(new) as (pr, ne):
... assert previous == pr
... assert new.parent == ne
... assert ne == Path.cwd()
>>>
>>> with chdir() as (pr, ne):
... assert previous == pr
... if MACOS:
... assert "var" in str(ne)
... assert ne == Path.cwd() # doctest: +SKIP
Args:
data: directory or parent if file or True for temp directory
Returns:
Old directory and new directory
"""
def y(new):
os.chdir(new)
return oldpwd, new
oldpwd = Path.cwd()
try:
if data is True:
with tempfile.TemporaryDirectory() as tmp:
yield y(Path(tmp))
else:
yield y(parent(data, none=False))
finally:
os.chdir(oldpwd)
[docs]
def cmd(*args, **kwargs) -> subprocess.CompletedProcess:
"""Exec Command.
Examples:
>>> import tempfile
>>> from nodeps import Path, cmd
>>>
>>> with tempfile.TemporaryDirectory() as tmp:
... rv = cmd("git", "clone", "https://github.com/octocat/Hello-World.git", tmp)
... assert rv.returncode == 0
... assert (Path(tmp) / "README").exists()
Args:
*args: command and args
**kwargs: subprocess.run kwargs
Raises:
CmdError
Returns:
None
"""
completed = subprocess.run(args, **kwargs, capture_output=True, text=True)
if completed.returncode != 0:
raise CmdError(completed)
return completed
[docs]
def cmdrun(
data: Iterable, exc: bool = False, lines: bool = True, shell: bool = True, py: bool = False, pysite: bool = True
) -> subprocess.CompletedProcess | int | list | str:
r"""Runs a cmd.
Examples:
>>> from nodeps import CI
>>> from nodeps import cmdrun
>>> from nodeps import in_tox
>>>
>>> cmdrun('ls a') # doctest: +ELLIPSIS
CompletedProcess(args='ls a', returncode=..., stdout=[], stderr=[...])
>>> cmdrun('ls a', shell=False, lines=False) # doctest: +ELLIPSIS
CompletedProcess(args=['ls', 'a'], returncode=..., stdout='', stderr=...)
>>> cmdrun('echo a', lines=False) # Extra '\' added to avoid docstring error.
CompletedProcess(args='echo a', returncode=0, stdout='a\n', stderr='')
>>> assert "venv" not in cmdrun("sysconfig", py=True, lines=False).stdout
>>> if os.environ.get("VIRTUAL_ENV"):
... assert "venv" in cmdrun("sysconfig", py=True, pysite=False, lines=False).stdout
Args:
data: command.
exc: raise exception.
lines: split lines so ``\\n`` is removed from all lines (extra '\' added to avoid docstring error).
py: runs with python executable.
shell: expands shell variables and one line (shell True expands variables in shell).
pysite: run on site python if running on a VENV.
Returns:
Union[CompletedProcess, int, list, str]: Completed process output.
Raises:
CmdError:
"""
if py:
m = "-m"
if isinstance(data, str) and data.startswith("/"):
m = ""
data = f"{EXECUTABLE_SITE if pysite else EXECUTABLE} {m} {data}"
elif not shell:
data = toiter(data)
text = not lines
proc = subprocess.run(data, shell=shell, capture_output=True, text=text)
def std(out=True):
if out:
if lines:
return proc.stdout.decode("utf-8").splitlines()
return proc.stdout
if lines:
return proc.stderr.decode("utf-8").splitlines()
return proc.stderr
rv = subprocess.CompletedProcess(proc.args, proc.returncode, std(), std(False))
if rv.returncode != 0 and exc:
raise CmdError(rv)
return rv
[docs]
def cmdsudo(*args, user: str = "root", **kwargs) -> subprocess.CompletedProcess | None:
"""Run Program with sudo if user is different that the current user.
Arguments:
*args: command and args to run
user: run as user (Default: False)
**kwargs: subprocess.run kwargs
Returns:
CompletedProcess if the current user is not the same as user, None otherwise
"""
if not ami(user):
return cmd(["sudo", "-u", user, *args], **kwargs)
return None
[docs]
def command(*args, **kwargs) -> subprocess.CompletedProcess:
"""Exec Command with the following defaults compared to :func:`subprocess.run`.
- capture_output=True
- text=True
- check=True
Examples:
>>> from nodeps import Path
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as tmp:
... rv = command("git", "clone", "https://github.com/octocat/Hello-World.git", tmp)
... assert rv.returncode == 0
... assert (Path(tmp) / ".git").exists()
Args:
*args: command and args
**kwargs: `subprocess.run` kwargs
Raises:
CmdError
Returns:
None
"""
completed = subprocess.run(args, **kwargs, capture_output=True, text=True)
if completed.returncode != 0:
raise CalledProcessError(completed=completed)
return completed
[docs]
def completions(name: str, install: bool = True, uninstall: bool = False) -> str | None:
"""Generate completions for command.
Args:
name: command name
install: install completions to /usr/local/etc/bash_completion.d/ or /etc/bash_completion.d
uninstall: uninstall completions
Returns:
Path to file if installed or prints if not installed
"""
completion = f"""# shellcheck shell=bash
#
# generated by {__file__}
#######################################
# {name} completion
# Globals:
# COMPREPLY
# COMP_CWORD
# COMP_WORDS
# Arguments:
# 1
# Returns:
# 0 ...
#######################################
_{name}_completion() {{
local IFS=$'
'
mapfile -t COMPREPLY < <(env COMP_WORDS="${{COMP_WORDS[*]}}" \\
COMP_CWORD="${{COMP_CWORD}}" \\
_{name.upper()}_COMPLETE=complete_bash "$1")
return 0
}}
complete -o default -F _{name}_completion {name}
"""
path = Path("/usr/local/etc/bash_completion.d" if MACOS else "/etc/bash_completion.d").mkdir()
# if not MACOS and not os.access(path, os.W_OK, effective_ids=True):
# elevate()
path.chown()
file = Path(path, f"{NODEPS_PROJECT_NAME}:{name}.bash")
if uninstall:
file.unlink(missing_ok=True)
return None
if install:
if not file.is_file():
file.write_text(completion)
return str(file)
with Path.tempfile() as tmp:
tmp.write_text(completion)
if not tmp.cmp(file):
shutil.move(tmp, file)
return str(file)
return None
print(completion)
return None
[docs]
def current_task_name() -> str:
"""Current asyncio task name."""
return asyncio.current_task().get_name() if aioloop() else ""
[docs]
def dict_sort(
data: dict[_KT, _VT], ordered: bool = False, reverse: bool = False
) -> dict[_KT, _VT] | collections.OrderedDict[_KT, _VT]:
"""Order a dict based on keys.
Examples:
>>> import platform
>>> from collections import OrderedDict
>>> from nodeps import dict_sort
>>>
>>> d = {"b": 2, "a": 1, "c": 3}
>>> dict_sort(d)
{'a': 1, 'b': 2, 'c': 3}
>>> dict_sort(d, reverse=True)
{'c': 3, 'b': 2, 'a': 1}
>>> v = platform.python_version()
>>> if "rc" not in v:
... # noinspection PyTypeHints
... assert dict_sort(d, ordered=True) == OrderedDict([('a', 1), ('b', 2), ('c', 3)])
Args:
data: dict to be ordered.
ordered: OrderedDict.
reverse: reverse.
Returns:
Union[dict, collections.OrderedDict]: Dict sorted
"""
data = {key: data[key] for key in sorted(data.keys(), reverse=reverse)}
if ordered:
return collections.OrderedDict(data)
return data
[docs]
def dmg(src: AnyPath, dest: AnyPath) -> None:
"""Open dmg file and copy the app to dest.
Examples:
>>> from nodeps import dmg
>>> dmg("/tmp/JetBrains.dmg", "/tmp/JetBrains") # doctest: +SKIP
Args:
src: dmg file
dest: path to copy to
Returns:
CompletedProcess
"""
with tempfile.TemporaryDirectory() as tmpdir:
cmd("hdiutil", "attach", "-mountpoint", tmpdir, "-nobrowse", "-quiet", src)
for item in Path(src).iterdir():
if item.name.endswith(".app"):
cmd("cp", "-r", Path(tmpdir) / item.name, dest)
cmd("xattr", "-r", "-d", "com.apple.quarantine", dest)
cmd("hdiutil", "detach", tmpdir, "-force")
break
[docs]
def effect(apply: Callable, *args: Iterable) -> None:
"""Perform function on iterable.
Examples:
>>> from types import SimpleNamespace
>>> from nodeps import effect
>>> simple = SimpleNamespace()
>>> effect(lambda x: simple.__setattr__(x, dict()), 'a b', 'c')
>>> assert simple.a == {}
>>> assert simple.b == {}
>>> assert simple.c == {}
Args:
apply: Function to apply.
*args: Iterable to perform function.
Returns:
No Return.
"""
for arg in toiter(args):
for item in arg:
apply(item)
[docs]
def elementadd(name: str | tuple[str, ...], closing: bool | None = False) -> str:
"""Converts to HTML element.
Examples:
>>> from nodeps import elementadd
>>>
>>> assert elementadd('light-black') == '<light-black>'
>>> assert elementadd('light-black', closing=True) == '</light-black>'
>>> assert elementadd(('green', 'bold',)) == '<green><bold>'
>>> assert elementadd(('green', 'bold',), closing=True) == '</green></bold>'
Args:
name: text or iterable text.
closing: True if closing/end, False if opening/start.
Returns:
Str
"""
return "".join(f'<{"/" if closing else ""}{i}>' for i in ((name,) if isinstance(name, str) else name))
[docs]
def elevate():
"""Other https://github.com/netinvent/command_runner/blob/master/command_runner/elevate.py."""
if os.getuid() == 0 or not SUDO:
return
os.execv(SUDO, sys.argv) # noqa: S606
# subprocess.check_call(SUDO, *sys.argv)
[docs]
def envsh(
path: AnyPath = ".env",
missing_ok: bool = False,
override: bool = True,
) -> dict[str, str]:
"""Source ``path`` or ``path``relative to cwd upwards and return the resulting environment as a dictionary.
Args:
path: bash file to source or name relative to cwd upwards.
missing_ok: do not raise exception if file ot found.
override: override os.environ.
Raises:
FileNotFoundError.
Return:
Dict with updated values
"""
p = Path(path)
p = p.find_up()
if p is None:
if missing_ok:
return {}
msg = f"{path=}"
raise FileNotFoundError(msg)
with Path.tempfile() as tmp:
code = f"import os, pickle; pickle.dump(dict(os.environ), open('{tmp}', 'wb'))"
subprocess.check_call(
f'set -a && . {p} && {sys.executable} -c "{code}"',
shell=True,
)
with tmp.open("rb") as f:
environ = pickle.load(f) # noqa: S301
updated = {
key: value
for key, value in environ.items()
if os.environ.get(key) != value and key not in ["PWD", "SHLVL", "_", "__CF_USER_TEXT_ENCODING", "LC_CTYPE"]
}
if override:
os.environ.update(updated)
return updated
[docs]
def exec_module_from_file(file: Path | str, name: str | None = None) -> types.ModuleType:
"""Executes module from file location.
Examples:
>>> import nodeps
>>> from nodeps import exec_module_from_file
>>> m = exec_module_from_file(nodeps.__file__)
>>> assert m.__name__ == nodeps.__name__
Args:
file: file location
name: module name (default from file)
Returns:
Module instance
"""
file = Path(file)
spec = importlib.util.spec_from_file_location(
name or file.parent.name if file.name == "__init__.py" else file.stem, file
)
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
[docs]
def filterm(
d: MutableMapping[_KT, _VT], k: Callable[..., bool] = lambda x: True, v: Callable[..., bool] = lambda x: True
) -> MutableMapping[_KT, _VT]:
"""Filter Mutable Mapping.
Examples:
>>> from nodeps import filterm
>>>
>>> assert filterm({'d':1}) == {'d': 1}
>>> # noinspection PyUnresolvedReferences
>>> assert filterm({'d':1}, lambda x: x.startswith('_')) == {}
>>> # noinspection PyUnresolvedReferences
>>> assert filterm({'d': 1, '_a': 2}, lambda x: x.startswith('_'), lambda x: isinstance(x, int)) == {'_a': 2}
Returns:
Filtered dict with
"""
# noinspection PyArgumentList
return d.__class__({x: y for x, y in d.items() if k(x) and v(y)})
[docs]
def findfile(pattern, path: AnyPath = None) -> list[Path]:
"""Find file with pattern.
Examples:
>>> from pathlib import Path
>>> import nodeps
>>> from nodeps import findfile, MACOS, LOCAL
>>>
>>> if MACOS and LOCAL:
... assert Path(nodeps.__file__) in findfile("*.py")
Args:
pattern: pattern to search files
path: default cwd
Returns:
list of files found
"""
result = []
for root, _, files in os.walk(path or Path.cwd()):
for name in files:
if fnmatch.fnmatch(name, pattern):
result.append(Path(root, name))
return result
[docs]
def findup(
path: AnyPath = None,
kind: PathIsLiteral = "is_file",
name: str | Path = ".env",
uppermost: bool = False,
) -> Path | None:
"""Find up if name exists or is file or directory.
Examples:
>>> import email
>>> import email.mime
>>> import nodeps
>>> from nodeps import chdir, findup, parent, Path, Env
>>>
>>>
>>> file = Path(email.mime.__file__)
>>>
>>> env = Env()
>>> input_file = env.GITHUB_WORKSPACE if env.GITHUB_WORKSPACE else nodeps.__file__
>>> with chdir(parent(input_file)):
... pyproject_toml = findup(input_file, name="pyproject.toml")
... assert pyproject_toml.is_file()
>>>
>>> with chdir(parent(email.mime.__file__)):
... email_mime_py = findup(name="__init__.py")
... assert email_mime_py.is_file()
... assert email_mime_py == Path(email.mime.__file__)
... email_py = findup(name="__init__.py", uppermost=True)
... assert email_py.is_file()
... assert email_py == Path(email.__file__)
>>>
>>> assert findup(file, kind="exists", name="__init__.py") == file.parent / "__init__.py"
>>> assert findup(file, name="__init__.py") == file.parent / "__init__.py"
>>> assert findup(file, name="__init__.py", uppermost=True) == file.parent.parent / "__init__.py"
Args:
path: CWD if None or Path.
kind: Exists, file or directory.
name: File or directory name.
uppermost: Find uppermost found if True (return the latest found if more than one) or first if False.
Returns:
Path if found.
"""
name = name.name if isinstance(name, Path) else name
start = parent(path or Path.cwd())
latest = None
while True:
if getattr(find := start / name, kind)():
if not uppermost:
return find
latest = find
if (start := start.parent) == Path("/"):
return latest
[docs]
def firstfound(data: Iterable, apply: Callable) -> Any:
"""Returns first value in data if apply is True.
Examples:
>>> from nodeps import firstfound
>>>
>>> assert firstfound([1, 2, 3], lambda x: x == 2) == 2
>>> assert firstfound([1, 2, 3], lambda x: x == 4) is None
Args:
data: iterable.
apply: function to apply.
Returns:
Value if found.
"""
for i in data:
if apply(i):
return i
return None
[docs]
def flatten(
data: tuple | list | set,
recurse: bool = False,
unique: bool = False,
sort: bool = True,
) -> tuple | list | set:
"""Flattens an Iterable.
Examples:
>>> from nodeps import flatten
>>>
>>> assert flatten([1, 2, 3, [1, 5, 7, [2, 4, 1, ], 7, 6, ]]) == [1, 2, 3, 1, 5, 7, [2, 4, 1], 7, 6]
>>> assert flatten([1, 2, 3, [1, 5, 7, [2, 4, 1, ], 7, 6, ]], recurse=True) == [1, 1, 1, 2, 2, 3, 4, 5, 6, 7, 7]
>>> assert flatten((1, 2, 3, [1, 5, 7, [2, 4, 1, ], 7, 6, ]), unique=True) == (1, 2, 3, 4, 5, 6, 7)
Args:
data: iterable
recurse: recurse
unique: when recurse
sort: sort
Returns:
Union[list, Iterable]:
"""
if unique:
recurse = True
cls = data.__class__
flat = []
_ = [
flat.extend(flatten(item, recurse, unique) if recurse else item)
if isinstance(item, list)
else flat.append(item)
for item in data
if item
]
value = set(flat) if unique else flat
if sort:
try:
value = cls(sorted(value))
except TypeError:
value = cls(value)
return value
[docs]
def framesimple(data: inspect.FrameInfo | types.FrameType | types.TracebackType) -> FrameSimple | None:
"""Returns :class:`nodeps.FrameSimple`.
Examples:
>>> import inspect
>>> from nodeps import Path
>>> from nodeps import framesimple
>>>
>>> frameinfo = inspect.stack()[0]
>>> finfo = framesimple(frameinfo)
>>> ftype = framesimple(frameinfo.frame)
>>> assert frameinfo.frame.f_code == finfo.code
>>> assert frameinfo.frame == finfo.frame
>>> assert frameinfo.filename == str(finfo.path)
>>> assert frameinfo.lineno == finfo.lineno
Returns:
:class:`FrameSimple`.
"""
if isinstance(data, inspect.FrameInfo):
frame = data.frame
back = frame.f_back
lineno = data.lineno
elif isinstance(data, types.FrameType):
frame = data
back = data.f_back
lineno = data.f_lineno
elif isinstance(data, types.TracebackType):
frame = data.tb_frame
back = data.tb_next
lineno = data.tb_lineno
else:
return None
code = frame.f_code
f_globals = frame.f_globals
f_locals = frame.f_locals
function = code.co_name
v = f_globals | f_locals
name = v.get("__name__") or function
return FrameSimple(
back=back,
code=code,
frame=frame,
function=function,
globals=f_globals,
lineno=lineno,
locals=f_locals,
name=name,
package=v.get("__package__") or name.split(".")[0],
path=sourcepath(data),
vars=v,
)
[docs]
def from_latin9(*args) -> str:
"""Converts string from latin9 hex.
Examples:
>>> from nodeps import from_latin9
>>>
>>> from_latin9("f1")
'ñ'
>>>
>>> from_latin9("4a6f73e920416e746f6e696f205075e972746f6c6173204d6f6e7461f1e973")
'José Antonio Puértolas Montañés'
>>>
>>> from_latin9("f1", "6f")
'ño'
Args:
args: strings to convert to latin9
Returns:
str
"""
rv = ""
if len(args) == 1:
pairs = split_pairs(args[0])
for pair in pairs:
rv += bytes.fromhex("".join(pair)).decode("latin9")
else:
for char in args:
rv += bytes.fromhex(char).decode("latin9")
return rv
[docs]
def fromiter(data, *args):
"""Gets attributes from Iterable of objects and returns dict with.
Examples:
>>> from types import SimpleNamespace as Simple
>>> from nodeps import fromiter
>>>
>>> assert fromiter([Simple(a=1), Simple(b=1), Simple(a=2)], 'a', 'b', 'c') == {'a': [1, 2], 'b': [1]}
>>> assert fromiter([Simple(a=1), Simple(b=1), Simple(a=2)], ('a', 'b', ), 'c') == {'a': [1, 2], 'b': [1]}
>>> assert fromiter([Simple(a=1), Simple(b=1), Simple(a=2)], 'a b c') == {'a': [1, 2], 'b': [1]}
Args:
data: object.
*args: attributes.
Returns:
Tuple
"""
value = {k: [getattr(C, k) for C in data if hasattr(C, k)] for i in args for k in toiter(i)}
return {k: v for k, v in value.items() if v}
[docs]
def getpths() -> dict[str, Path] | None:
"""Get list of pths under ``sitedir``.
Examples:
>>> from nodeps import getpths
>>>
>>> pths = getpths()
>>> assert "distutils-precedence" in pths
Returns:
Dictionary with pth name and file
"""
try:
s = getsitedir()
names = os.listdir(s)
except OSError:
return None
return {re.sub("(-[0-9].*|.pth)", "", name): Path(s / name) for name in names if name.endswith(".pth")}
[docs]
def getsitedir(index: bool = 2) -> Path:
"""Get site directory from stack if imported by :mod:`site` in a ``.pth`` file or :mod:`sysconfig`.
Examples:
>>> from nodeps import getsitedir
>>> assert "packages" in str(getsitedir())
Args:
index: 1 if directly needed by this function (default: 2), for caller to this function
Returns:
Path instance with site directory
"""
if (s := sys._getframe(index).f_locals.get("sitedir")) is None:
s = sysconfig.get_paths()["purelib"]
return Path(s)
[docs]
def group_user(data: int | str = USER) -> GroupUser:
"""Group and User for Name (id if name is str and vice versa).
Examples:
>>> import os
>>> import pathlib
>>>
>>> from nodeps import group_user
>>> from nodeps import PW_USER, PW_ROOT, MACOS, LOCAL
>>>
>>> stat = pathlib.Path().stat()
>>> gu = group_user()
>>> assert gu.group.id == stat.st_gid and gu.user.id == stat.st_uid
>>> gu = group_user(data=PW_USER.pw_uid)
>>> actual_gname = gu.group.name
>>> assert gu.user.name == PW_USER.pw_name
>>> gu = group_user('root')
>>> if MACOS and LOCAL:
... assert gu.group.id != stat.st_gid and gu.user.id == 0
>>> gu = group_user(data=0)
>>> if MACOS and LOCAL:
... assert gu.group.name != actual_gname and gu.user.name == 'root'
Args:
data: usename or id (default: USER)
Returns:
GroupUser.
"""
if isinstance(data, str):
struct = struct if data == (struct := PW_USER).pw_name or data == (struct := PW_ROOT).pw_name \
else pwd.getpwnam(data) # noqa: PLR1714
else:
struct = struct if data == (struct := PW_USER).pw_uid or data == (struct := PW_ROOT).pw_uid \
else pwd.getpwuid(data) # noqa: PLR1714
group = IdName(id=struct.pw_gid, name=grp.getgrgid(struct.pw_gid).gr_name)
user = IdName(id=struct.pw_uid, name=struct.pw_name)
return GroupUser(group=group, user=user)
[docs]
def gz(src: AnyPath, dest: AnyPath = ".") -> Path:
"""Uncompress .gz src to dest (default: current directory).
It will be uncompressed to the same directory name as src basename.
Uncompressed directory will be under dest directory.
Examples:
>>> import os
>>> import tempfile
>>> from nodeps import Path, gz, tardir
>>> cwd = Path.cwd()
>>> with tempfile.TemporaryDirectory() as workdir:
... os.chdir(workdir)
... with tempfile.TemporaryDirectory() as compress:
... file = Path(compress) / "test.txt"
... _ = file.touch()
... compressed = tardir(compress)
... with tempfile.TemporaryDirectory() as uncompress:
... uncompressed = gz(compressed, uncompress)
... assert uncompressed.is_dir()
... assert Path(uncompressed).joinpath(file.name).exists()
>>> os.chdir(cwd)
Args:
src: file to uncompress
dest: destination directory to where uncompress directory will be created (default: current directory)
Returns:
Absolute Path of the Uncompressed Directory
"""
dest = Path(dest)
with tarfile.open(src, "r:gz") as tar:
tar.extractall(dest)
return (dest / tar.getmembers()[0].name).parent.absolute()
[docs]
def in_tox() -> bool:
"""Running in tox."""
return ".tox" in sysconfig.get_paths()["purelib"]
[docs]
def indict(data: MutableMapping, items: MutableMapping | None = None, **kwargs: Any) -> bool:
"""All item/kwargs pairs in flat dict.
Examples:
>>> from nodeps import indict
>>> from nodeps.variables.builtin import BUILTIN
>>>
>>> assert indict(BUILTIN, {'iter': iter}, credits=credits) is True
>>> assert indict(BUILTIN, {'iter': 'fake'}) is False
>>> assert indict(BUILTIN, {'iter': iter}, credits='fake') is False
>>> assert indict(BUILTIN, credits='fake') is False
Args:
data: dict to search.
items: key/value pairs.
**kwargs: key/value pairs.
Returns:
True if all pairs in dict.
"""
return all(x[0] in data and x[1] == data[x[0]] for x in ((items if items else {}) | kwargs).items())
[docs]
def iscoro(data: Any) -> bool:
"""Is coro?."""
return any(
[
inspect.isasyncgen(data),
inspect.isasyncgenfunction(data),
asyncio.iscoroutine(data),
inspect.iscoroutinefunction(data),
]
)
[docs]
def map_with_args(
data: Any, func: Callable, /, *args, pred: Callable = lambda x: bool(x), split: str = " ", **kwargs
) -> list:
"""Apply pred/filter to data and map with args and kwargs.
Examples:
>>> from nodeps import map_with_args
>>>
>>> # noinspection PyUnresolvedReferences
>>> def f(i, *ar, **kw):
... return f'{i}: {[a(i) for a in ar]}, {", ".join([f"{k}: {v(i)}" for k, v in kw.items()])}'
>>> map_with_args('0.1.2', f, int, list, pred=lambda x: x != '0', split='.', int=int, str=str)
["1: [1, ['1']], int: 1, str: 1", "2: [2, ['2']], int: 2, str: 2"]
Args:
data: data.
func: final function to map.
*args: args to final map function.
pred: pred to filter data before map.
split: split for data str.
**kwargs: kwargs to final map function.
Returns:
List with results.
"""
return [func(item, *args, **kwargs) for item in yield_if(data, pred=pred, split=split)]
[docs]
def mip() -> str | None:
"""My Public IP.
Examples:
>>> from nodeps import mip
>>>
>>> mip() # doctest: +ELLIPSIS
'...............'
"""
return urllib.request.urlopen("https://checkip.amazonaws.com", timeout=5).read().strip().decode() # noqa: S310
[docs]
def noexc(
func: Callable[..., _T], *args: Any, default_: Any = None, exc_: ExcType = Exception, **kwargs: Any
) -> _T | Any:
"""Execute function suppressing exceptions.
Examples:
>>> from nodeps import noexc
>>> assert noexc(dict(a=1).pop, 'b', default_=2, exc_=KeyError) == 2
Args:
func: callable.
*args: args.
default_: default value if exception is raised.
exc_: exception or exceptions.
**kwargs: kwargs.
Returns:
Any: Function return.
"""
try:
return func(*args, **kwargs)
except exc_:
return default_
[docs]
def parent(path: AnyPath = __file__, none: bool = True) -> Path | None:
"""Parent if File or None if it does not exist.
Examples:
>>> from nodeps import parent
>>>
>>> parent("/bin/ls")
Path('/bin')
>>> parent("/bin")
Path('/bin')
>>> parent("/bin/foo", none=False)
Path('/bin')
>>> parent("/bin/foo")
Args:
path: file or dir.
none: return None if it is not a directory and does not exist (default: True)
Returns:
Path
"""
return path.parent if (path := Path(path)).is_file() else path if path.is_dir() else None if none else path.parent
[docs]
def printe(
*values: object,
sep: str | None = " ",
end: str | None = "\n",
flush: Literal[False] = False,
) -> None:
"""Print to sys.stderr."""
print(*values, sep=sep, end=end, file=sys.stderr, flush=flush)
builtins.printe = printe
[docs]
def returncode(c: str | list[str], shell: bool = True) -> int:
"""Runs command in shell and returns returncode showing stdout and stderr.
No exception is raised
Examples:
>>> from nodeps import returncode
>>>
>>> assert returncode("ls /bin/ls") == 0
>>> assert returncode("ls foo") in [1, 2]
Arguments:
c: command to run
shell: run in shell (default: True)
Returns:
return code
"""
return subprocess.call(c, shell=shell)
[docs]
def sourcepath(data: Any) -> Path:
"""Get path of object.
Examples:
>>> import asyncio
>>> from nodeps import Path
>>> from nodeps import sourcepath
>>>
>>> finfo = inspect.stack()[0]
>>> globs_locs = (finfo.frame.f_globals | finfo.frame.f_locals).copy()
>>> assert sourcepath(sourcepath) == Path(__file__)
>>> assert sourcepath(asyncio.__file__) == Path(asyncio.__file__)
>>> assert sourcepath(dict(a=1)) == Path("{'a': 1}")
Returns:
Path.
"""
if isinstance(data, MutableMapping):
f = data.get("__file__")
elif isinstance(data, inspect.FrameInfo):
f = data.filename
else:
try:
f = inspect.getsourcefile(data) or inspect.getfile(data)
except TypeError:
f = None
return Path(f or str(data))
[docs]
def siteimported() -> str | None:
"""True if imported by :mod:`site` in a ``.pth`` file."""
s = None
_frame = sys._getframe()
while _frame and (s := _frame.f_locals.get("sitedir")) is None:
_frame = _frame.f_back
return s
[docs]
def split_pairs(text):
"""Split text in pairs for even length.
Examples:
>>> from nodeps import split_pairs
>>>
>>> split_pairs("123456")
[('1', '2'), ('3', '4'), ('5', '6')]
Args:
text: text to split in pairs
Returns:
text
"""
return list(zip(text[0::2], text[1::2], strict=True))
[docs]
def stdout(
shell: AnyStr, keepends: bool = False, split: bool = False, cwd: Path | str | None = None
) -> list[str] | str | None:
"""Return stdout of executing cmd in a shell or None if error.
Execute the string 'cmd' in a shell with 'subprocess.getstatusoutput' and
return a stdout if success. The locale encoding is used
to decode the output and process newlines.
A trailing newline is stripped from the output.
Examples:
>>> from nodeps import stdout
>>>
>>> stdout("ls /bin/ls")
'/bin/ls'
>>> stdout("true")
''
>>> stdout("ls foo")
>>> stdout("ls /bin/ls", split=True)
['/bin/ls']
Args:
shell: command to be executed
keepends: line breaks when ``split`` if true, are not included in the resulting list unless keepends
is given and true.
split: return a list of the stdout lines in the string, breaking at line boundaries.(default: False)
cwd: cwd
Returns:
Stdout or None if error.
"""
with Path(cwd or "").cd():
exitcode, data = subprocess.getstatusoutput(shell)
if exitcode == 0:
if split:
return data.splitlines(keepends=keepends)
return data
return None
[docs]
@contextlib.contextmanager
def stdquiet() -> tuple[TextIO, TextIO]:
"""Redirect stdout/stderr to StringIO objects to prevent console output from distutils commands.
Returns:
Stdout, Stderr
"""
old_stdout = sys.stdout
old_stderr = sys.stderr
new_stdout = sys.stdout = io.StringIO()
new_stderr = sys.stderr = io.StringIO()
try:
yield new_stdout, new_stderr
finally:
new_stdout.seek(0)
new_stderr.seek(0)
sys.stdout = old_stdout
sys.stderr = old_stderr
[docs]
def suppress(
func: Callable[P, T],
*args: P.args,
exception: ExcType | None = Exception,
**kwargs: P.kwargs,
) -> T:
"""Try and supress exception.
Args:
func: function to call
*args: args to pass to func
exception: exception to suppress (default: Exception)
**kwargs: kwargs to pass to func
Returns:
result of func
"""
with contextlib.suppress(exception or Exception):
return func(*args, **kwargs)
[docs]
def syssudo(user: str = "root") -> subprocess.CompletedProcess | None:
"""Rerun Program with sudo ``sys.executable`` and ``sys.argv`` if user is different that the current user.
Arguments:
user: run as user (Default: False)
Returns:
CompletedProcess if the current user is not the same as user, None otherwise
"""
if not ami(user):
return cmd(["sudo", "-u", user, sys.executable, *sys.argv])
return None
[docs]
def tardir(src: AnyPath) -> Path:
"""Compress directory src to <basename src>.tar.gz in cwd.
Examples:
>>> import os
>>> import tempfile
>>> from nodeps import Path, tardir
>>> cwd = Path.cwd()
>>> with tempfile.TemporaryDirectory() as workdir:
... os.chdir(workdir)
... with tempfile.TemporaryDirectory() as compress:
... file = Path(compress) / "test.txt"
... _ = file.touch()
... compressed = tardir(compress)
... with tempfile.TemporaryDirectory() as uncompress:
... uncompressed = gz(compressed, uncompress)
... assert uncompressed.is_dir()
... assert Path(uncompressed).joinpath(file.name).exists()
>>> os.chdir(cwd)
Args:
src: directory to compress
Raises:
FileNotFoundError: No such file or directory
ValueError: Can't compress current working directory
Returns:
Compressed Absolute File Path
"""
src = Path(src)
if not src.exists():
msg = f"{src}: No such file or directory"
raise FileNotFoundError(msg)
if src.resolve() == Path.cwd().resolve():
msg = f"{src}: Can't compress current working directory"
raise ValueError(msg)
name = Path(src).name + ".tar.gz"
dest = Path(name)
with tarfile.open(dest, "w:gz") as tar:
for root, _, files in os.walk(src):
for file_name in files:
tar.add(Path(root, file_name))
return dest.absolute()
[docs]
def tilde(path: AnyPath = ".") -> str:
"""Replaces $HOME with ~.
Examples:
>>> from nodeps import Path, tilde
>>> assert tilde(f"{Path.home()}/file") == f"~/file"
Arguments:
path: path to replace (default: '.')
Returns:
str
"""
return str(path).replace(str(Path.home()), "~")
[docs]
def timestamp_now(file: Path | str):
"""Set modified and create date of file to now."""
now = time.time()
os.utime(file, (now, now))
[docs]
def to_camel(text: str, replace: bool = True) -> str:
"""Convert to Camel.
Examples:
>>> to_camel("__ignore_attr__")
'IgnoreAttr'
>>> to_camel("__ignore_attr__", replace=False) # doctest: +SKIP
'__Ignore_Attr__'
Args:
text: text to convert.
replace: remove '_' (default: True)
Returns:
Camel text.
"""
# noinspection PyTypeChecker
rv = "".join(map(str.title, toiter(text, split="_")))
return rv.replace("_", "") if replace else rv
[docs]
def to_latin9(chars: str) -> str:
"""Converts string to latin9 hex.
Examples:
>>> from nodeps import AUTHOR
>>> from nodeps import to_latin9
>>>
>>> to_latin9("ñ")
'f1'
>>>
>>> to_latin9(AUTHOR)
'4a6f73e920416e746f6e696f205075e972746f6c6173204d6f6e7461f1e973'
Args:
chars: chars to converto to latin9
Returns:
hex str
"""
rv = ""
for char in chars:
rv += char.encode("latin9").hex()
return rv
[docs]
def tomodules(obj: Any, suffix: bool = True) -> str:
"""Converts Iterable to A.B.C.
Examples:
>>> from nodeps import tomodules
>>> assert tomodules('a b c') == 'a.b.c'
>>> assert tomodules('a b c.py') == 'a.b.c'
>>> assert tomodules('a/b/c.py') == 'a.b.c'
>>> assert tomodules(['a', 'b', 'c.py']) == 'a.b.c'
>>> assert tomodules('a/b/c.py', suffix=False) == 'a.b.c.py'
>>> assert tomodules(['a', 'b', 'c.py'], suffix=False) == 'a.b.c.py'
Args:
obj: iterable.
suffix: remove suffix.
Returns:
String A.B.C
"""
split = "/" if isinstance(obj, str) and "/" in obj else " "
return ".".join(i.removesuffix(Path(i).suffix if suffix else "") for i in toiter(obj, split=split))
[docs]
def urljson(
data: str,
rm: bool = False,
) -> dict:
"""Url open json.
Examples:
>>> import os
>>> from nodeps import urljson
>>> from nodeps import GIT
>>> from nodeps import GITHUB_TOKEN
>>> from nodeps import NODEPS_PROJECT_NAME
>>>
>>> if os.environ.get('GITHUB_TOKEN'):
... github = urljson(f"https://api.github.com/repos/{GIT}/{NODEPS_PROJECT_NAME}")
... assert github['name'] == NODEPS_PROJECT_NAME
>>>
>>> pypi = urljson(f"https://pypi.org/pypi/{NODEPS_PROJECT_NAME}/json")
>>> assert pypi['info']['name'] == NODEPS_PROJECT_NAME
Args:
data: url
rm: use pickle cache or remove it before
Returns:
dict:
"""
if not rm and (rv := Path.pickle(name=data)):
return rv
if data.lower().startswith("https"):
request = urllib.request.Request(data)
else:
msg = f"Non-HTTPS URL: {data}"
raise ValueError(msg)
if "github" in data:
request.add_header("Authorization", f"token {GITHUB_TOKEN}")
with urllib.request.urlopen(request) as response: # noqa: S310
return Path.pickle(name=data, data=json.loads(response.read().decode()), rm=rm)
[docs]
def varname(index=2, lower=True, prefix=None, sep="_"):
"""Caller var name.
Examples:
>>> from dataclasses import dataclass
>>> from nodeps import varname
>>>
>>> def function() -> str:
... return varname()
>>>
>>> class ClassTest:
... def __init__(self):
... self.name = varname()
...
... @property
... def prop(self):
... return varname()
...
... # noinspection PyMethodMayBeStatic
... def method(self):
... return varname()
>>>
>>> @dataclass
... class DataClassTest:
... def __post_init__(self):
... self.name = varname()
>>>
>>> name = varname(1)
>>> Function = function()
>>> classtest = ClassTest()
>>> method = classtest.method()
>>> prop = classtest.prop
>>> dataclasstest = DataClassTest()
>>>
>>> def test_var():
... assert name == 'name'
>>>
>>> def test_function():
... assert Function == function.__name__.lower()
>>>
>>> def test_class():
... assert classtest.name == ClassTest.__name__.lower()
>>>
>>> def test_method():
... assert classtest.method() == ClassTest.__name__.lower()
... assert method == 'method'
>>> def test_property():
... assert classtest.prop == ClassTest.__name__.lower()
... assert prop == 'prop'
>>> def test_dataclass():
... assert dataclasstest.name == DataClassTest.__name__.lower()
.. code-block:: python
class A:
def __init__(self):
self.instance = varname()
a = A()
var = varname(1)
Args:
index: index.
lower: lower.
prefix: prefix to add.
sep: split.
Returns:
Optional[str]: Var name.
"""
with contextlib.suppress(IndexError, KeyError):
_stack = inspect.stack()
f = _stack[index - 1].function
index = index + 1 if f == "__post_init__" else index
if (line := textwrap.dedent(_stack[index].code_context[0])) and (
var := re.sub(f"(.| ){f}.*", "", line.split(" = ")[0].replace("assert ", "").split(" ")[0])
):
return (prefix if prefix else "") + (var.lower() if lower else var).split(sep=sep)[0]
return None
[docs]
def which(data="sudo", raises: bool = False) -> str:
"""Checks if cmd or path is executable.
Examples:
>>> from nodeps import which
>>> if which():
... assert "sudo" in which()
>>> assert which('/bin/ls') == '/bin/ls'
>>> which("foo", raises=True) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
nodeps.modules.errors.CommandNotFoundError: foo
Attribute:
data: command or path.
raises: raise exception if command not found
Raises:
CommandNotFound:
Returns:
Cmd path or ""
"""
rv = shutil.which(data, mode=os.X_OK) or ""
if raises and not rv:
raise CommandNotFoundError(data)
return rv
[docs]
def yield_if(
data: Any,
pred: Callable = lambda x: bool(x),
split: str = " ",
apply: Union[Callable, tuple[Callable, ...]] | None = None, # noqa: UP007
) -> Generator:
"""Yield value if condition is met and apply function if predicate.
Examples:
>>> from nodeps import yield_if
>>>
>>> assert list(yield_if([True, None])) == [True]
>>> assert list(yield_if('test1.test2', pred=lambda x: x.endswith('2'), split='.')) == ['test2']
>>> assert list(yield_if('test1.test2', pred=lambda x: x.endswith('2'), split='.', \
apply=lambda x: x.removeprefix('test'))) == ['2']
>>> assert list(yield_if('test1.test2', pred=lambda x: x.endswith('2'), split='.', \
apply=(lambda x: x.removeprefix('test'), lambda x: int(x)))) == [2]
Args:
data: data
pred: predicate (default: if value)
split: split char for str.
apply: functions to apply if predicate is met.
Returns:
Yield values if condition is met and apply functions if provided.
"""
for item in toiter(data, split=split):
if pred(item):
if apply:
for func in toiter(apply):
item = func(item) # noqa: PLW2901
yield item
[docs]
def yield_last(data: Any, split: str = " ") -> Iterator[tuple[bool, Any, None]]:
"""Yield value if condition is met and apply function if predicate.
Examples:
>>> from nodeps import yield_last
>>>
>>> assert list(yield_last([True, None])) == [(False, True, None), (True, None, None)]
>>> assert list(yield_last('first last')) == [(False, 'first', None), (True, 'last', None)]
>>> assert list(yield_last('first.last', split='.')) == [(False, 'first', None), (True, 'last', None)]
>>> assert list(yield_last(dict(first=1, last=2))) == [(False, 'first', 1), (True, 'last', 2)]
Args:
data: data.
split: split char for str.
Returns:
Yield value and True when is the last item on iterable
"""
data = toiter(data, split=split)
mm = isinstance(data, MutableMapping)
total = len(data)
count = 0
for i in data:
count += 1
yield (
count == total,
*(
i,
data.get(i) if mm else None,
),
)