Source code for kojismokydingo.common

# This library is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this library; if not, see <http://www.gnu.org/licenses/>.


"""
Koji Smoky Dingo - Common Utils

Some simple functions used by the other modules.

:author: Christopher O'Brien <obriencj@gmail.com>
:license: GPL v3
"""


# Note: features implemented in this module should not be specific to
# working with koji. ie: nothing should require a session object or
# work with the koji-specific types (build info, tag info, etc)


import re

from configparser import ConfigParser
from datetime import datetime, timezone
from fnmatch import fnmatchcase
from functools import lru_cache
from glob import glob
from itertools import filterfalse, islice
from operator import itemgetter
from os.path import expanduser, isdir, join
from typing import (
    Any, Callable, Dict, Iterable, Iterator, List,
    Optional, Sequence, Tuple, TypeVar, Union, )

from .types import KeySpec


try:
    import appdirs
except ImportError:  # pragma: no cover
    appdirs = None


__all__ = (
    "chunkseq",
    "escapable_replace",
    "fnmatches",
    "find_cache_dir",
    "find_config_dirs",
    "find_config_files",
    "get_plugin_config",
    "globfilter",
    "ichunkseq",
    "itemsgetter",
    "load_full_config",
    "load_plugin_config",
    "merge_extend",
    "parse_datetime",
    "unique",
    "update_extend",
)


[docs] def itemsgetter( key: Any, *keys: Any) -> Callable[[Any], Tuple]: """ Similar to `operator.itemgetter`. However, the returned unary callable always returns a tuple of results, even if there's only one key. :param key: first key to fetch with the returned getter :param keys: additional keys to fetch with the returned getter :returns: a callabler getter which when called with a value will return the result of getting all of the specified keys, in a tuple. :since: 2.1 """ if keys: return itemgetter(key, *keys) else: return lambda v: (v[key], )
[docs] def chunkseq( seq: Iterable, chunksize: int) -> Iterator[Iterable]: """ Chop up a sequence into sub-sequences, each up to chunksize in length. :param seq: a sequence to chunk up :param chunksize: max length for chunks :since: 1.0 """ if not isinstance(seq, (tuple, list)): seq = list(seq) seqlen = len(seq) return (seq[offset:offset + chunksize] for offset in range(0, seqlen, chunksize))
[docs] def ichunkseq( seq: Iterable, chunksize: int) -> Iterator[Iterable]: """ Similar to chunkseq, but lazy. Note that each chunk must be exhausted before beginning a new chunk, as the chunks will be read from the original sequence only when they themselves are iterated over. :param seq: a sequence to chunk up :param chunksize: max length for chunks :since: 2.1 """ it = iter(seq) cs = chunksize - 1 def chunk(primer): yield primer yield from islice(it, cs) while True: try: primer = next(it) except StopIteration: break else: yield chunk(primer)
[docs] def escapable_replace( orig: str, character: str, replacement: str) -> str: """ Single-character string substitutions. Doubled sentinel characters can be used to represent that exact character. Examples: * ``escapable_replace('Hello %', '%', 'World')`` returns ``"Hello World"`` * ``escapable_replace('Hello %%', '%', 'World')`` returns ``"Hello %"`` :param orig: Original text :param character: Single-character token. :param replacement: Replacement text :since: 1.0 """ assert len(character) == 1, "escapable_replace requires single characters" gather: List[str] = [] collect = gather.append pieces = iter(orig) for p in pieces: if p == character: n = next(pieces, None) if n is None: collect(replacement) elif n == character: collect(character) else: collect(replacement) collect(n) else: collect(p) return "".join(gather)
[docs] def fnmatches( value: str, patterns: Iterable[str], ignore_case: bool = False) -> bool: """ Checks value against multiple glob patterns. Returns True if any match. :param value: string to be matched :param patterns: list of glob-style pattern strings :param ignore_case: if True case is normalized, Default False :since: 1.0 """ if ignore_case: value = value.lower() patterns = [p.lower() for p in patterns] for pattern in patterns: if fnmatchcase(value, pattern): return True else: return False
KT = TypeVar('KT')
[docs] def update_extend( dict_orig: Dict[KT, List[Any]], *dict_additions: Dict[KT, List[Any]]) -> Dict[KT, List[Any]]: """ Extend the list values of the original dict with the list values of the additions dict. eg. :: A = {'a': [1, 2], 'b': [7], 'c': [10]} B = {'a': [3], 'b': [8, 9], 'd': [11]} update_extend(A, B) A >> {'a': [1, 2, 3], 'b': [7, 8, 9], 'c': [10], 'd': [11]} The values of dict_orig must support an extend method. :param dict_orig: The original dict, which may be mutated and whose values may be extended :param dict_additions: The additions dict. Will not be altered. :returns: The original dict instance :since: 1.0 """ # oddity here, really *dict_additions should be Dict[Any, # Iterable] but for some bizarre reason MyPy doesn't consider # lists to be iterable when they're dict values. Really weird. for additions in dict_additions: for key, val in additions.items(): orig = dict_orig.setdefault(key, []) orig.extend(val) return dict_orig
[docs] def merge_extend( *dict_additions: Dict[KT, List[Any]]) -> Dict[KT, List[Any]]: """ Similar to `update_extend` but creates a new dict to hold results, and new initial lists to be extended, leaving all the arguments unaltered. :param dict_additions: The additions dict. Will not be altered. :returns: A new dict, whose values are new lists :since: 1.0 """ return update_extend({}, *dict_additions)
GFT = TypeVar('GFT')
[docs] def globfilter( seq: Iterable[GFT], patterns: Iterable[str], key: Optional[KeySpec] = None, invert: bool = False, ignore_case: bool = False) -> Iterable[GFT]: """ Generator yielding members of sequence seq which match any of the glob patterns specified. Patterns must be a list of glob-style pattern strings. If key is specified, it must be a unary callable which translates a given sequence item into a string for comparison with the patterns. If invert is True, yields the non-matches rather than the matches. If ignore_case is True, the pattern comparison is case normalized. :param seq: series of objects to be filtered. Normally strings, but may be any type provided the key parameter is specified to provide a string for matching based on the given object. :param patterns: list of glob-style pattern strings. Members of seq which match any of these patterns are yielded. :param key: A unary callable which translates individual items on seq into the value to be matched against the patterns. Default, match against values in seq directly. :param invert: Invert the logic, yielding the non-matches rather than the matches. Default, yields matches :param ignore_case: pattern comparison is case normalized if True. Default, False :since: 1.0 """ if ignore_case: # rather than passing ignore_case directly on to fnmatches, # we'll do the case normalization ourselves. This way it only # needs to happen one time for the patterns patterns = [p.lower() for p in patterns] if not (key is None or callable(key)): key = itemgetter(key) def test(val): if key: val = key(val) if ignore_case: val = val.lower() return fnmatches(val, patterns) return filterfalse(test, seq) if invert else filter(test, seq)
UT = TypeVar('UT')
[docs] def unique( sequence: Iterable[UT], key: Optional[KeySpec] = None) -> List[UT]: """ Given a sequence, de-duplicate it into a new list, preserving order. In the event that the sequence contains non-hashable objects, `key` must be specified as a unary callable which produces a hashable unique identifier for the individual items in the sequence. This identifier is then used to perform the de-duplication. :param sequence: series of hashable objects :param key: unary callable that produces a hashable identifying value. Default, use each object in sequence as its own identifier. :since: 1.0 """ if key: if not callable(key): # undocumented behavior! woo!! key = itemgetter(key) work = {key(v): v for v in sequence} return list(work.values()) else: return list(dict.fromkeys(sequence))
DATETIME_FORMATS = ( (re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6} .{3}$"), lambda d: datetime.strptime(d, "%Y-%m-%d %H:%M:%S.%f %Z")), (re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}[+-]\d{4}$"), lambda d: datetime.strptime(d, "%Y-%m-%d %H:%M:%S.%f%z")), (re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}[+-]\d{2}:\d{2}$"), lambda d: datetime.strptime("".join(d.rsplit(":", 1)), "%Y-%m-%d %H:%M:%S.%f%z")), (re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} .{3}$"), lambda d: datetime.strptime(d, "%Y-%m-%d %H:%M:%S %Z")), (re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[+-]\d{4}$"), lambda d: datetime.strptime(d, "%Y-%m-%d %H:%M:%S%z")), (re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}$"), lambda d: datetime.strptime("".join(d.rsplit(":", 1)), "%Y-%m-%d %H:%M:%S%z")), (re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$"), lambda d: datetime.strptime(d, "%Y-%m-%d %H:%M:%S")), (re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}$"), lambda d: datetime.strptime(d, "%Y-%m-%d %H:%M")), (re.compile(r"\d{4}-\d{2}-\d{2}$"), lambda d: datetime.strptime(d, "%Y-%m-%d")), (re.compile(r"\d{4}-\d{2}$"), lambda d: datetime.strptime(d, "%Y-%m")), (re.compile(r"\d+$"), lambda d: datetime.utcfromtimestamp(int(d))), (re.compile("now$"), lambda d: datetime.utcnow()), )
[docs] def parse_datetime( src: str, strict: bool = True) -> datetime: """ Attempts to parse a datetime string in numerous ways based on pre-defined regex mappings Supported formats: - %Y-%m-%d %H:%M:%S.%f %Z - %Y-%m-%d %H:%M:%S.%f%z - %Y-%m-%d %H:%M:%S %Z - %Y-%m-%d %H:%M:%S%z - %Y-%m-%d %H:%M:%S - %Y-%m-%d %H:%M - %Y-%m-%d - %Y-%m Plus integer timestamps and the string ``"now"`` Timezone offset formats (%z) may also be specified as either +HHMM or +HH:MM (the : will be removed) :param src: Date-time text to be parsed :param strict: Raise an exception if no matching format is known and the date-time text cannot be parsed. If False, simply return `None` when the value cannot be parsed. :raises ValueError: if strict and no src matches none of the pre-defined formats :since: 1.0 """ for pattern, parser in DATETIME_FORMATS: mtch = pattern.match(src) if mtch: return parser(mtch.string) else: if strict: raise ValueError(f"Invalid date-time format, {src!r}") else: return None
[docs] def find_cache_dir(usage: str = None) -> str: """ The use cache dir for koji-smoky-dingo for the given usage (eg. repoquery). Attempts to use the ``appdirs`` package of it is available. :param usage: nested directory to apply to the cache path :since: 2.1 """ if appdirs is None: user_cache_dir = expanduser("~/.cache/ksd/") else: user_cache_dir = appdirs.user_cache_dir("ksd") if usage: return join(user_cache_dir, usage) else: return user_cache_dir
[docs] def find_config_dirs() -> Tuple[str, str]: """ The site and user configuration dirs for koji-smoky-dingo, as a tuple. Attempts to use the ``appdirs`` package if it is available. :since: 1.0 """ if appdirs is None: site_conf_dir = "/etc/xdg/ksd/" user_conf_dir = expanduser("~/.config/ksd/") else: site_conf_dir = appdirs.site_config_dir("ksd") user_conf_dir = appdirs.user_config_dir("ksd") return (site_conf_dir, user_conf_dir)
[docs] def find_config_files( dirs: Optional[Iterable[str]] = None) -> List[str]: """ The ordered list of configuration files to be loaded. If `dirs` is specified, it must be a sequence of directory names, from which conf files will be loaded in order. If unspecified, defaults to the result of `find_config_dirs` Configuration files must have the extension ``.conf`` to be considered. The files will be listed in directory order, and then in alphabetical order from within each directory. :param dirs: list of directories to look for config files within :since: 1.0 """ if dirs is None: dirs = find_config_dirs() found: List[str] = [] for confdir in dirs: if isdir(confdir): wanted = join(confdir, "*.conf") found.extend(sorted(glob(wanted))) return found
@lru_cache(maxsize=1) def _load_full_config( config_files: Optional[Iterable[str]]) -> ConfigParser: conf = ConfigParser() conf.read(config_files) return conf
[docs] def load_full_config( config_files: Optional[Iterable[str]] = None) -> ConfigParser: """ Configuration object representing the full merged view of config files. If `config_files` is None, use the results of `find_config_files`. Otherwise, `config_files` must be a sequence of filenames. :param config_files: configuration files to be loaded, in order. If not specified, the results of `find_config_files` will be used. :returns: a configuration representing a merged view of all config files :since: 1.0 """ # this is actually just a wrapper to a cached call, but we need to # convert to a hashable argument type first. if config_files is None: config_files = find_config_files() # type: ignore return _load_full_config(tuple(config_files))
[docs] def get_plugin_config( conf: ConfigParser, plugin: str, profile: Optional[str] = None) -> Dict[str, Any]: """ Given a loaded configuration, return the section specific to the given plugin, and optionally profile-specific as well. :param conf: loaded configuration data :param plugin: plugin name :param profile: profile name, optional :since: 1.0 """ plugin_conf: Dict[str, Any] = {} if conf.has_section(plugin): plugin_conf.update(conf.items(plugin)) if profile is not None: profile = ":".join((plugin, profile)) if conf.has_section(profile): plugin_conf.update(conf.items(profile)) return plugin_conf
[docs] def load_plugin_config( plugin: str, profile: Optional[str] = None) -> Dict[str, Any]: """ Configuration specific to a given plugin, and optionally specific to a given profile as well. Profile-specific sections are denoted by a suffix on the section name, eg. :: [my_plugin] # this setting is for my_plugin on all profiles setting = foo [my_plugin:testing] # this setting is for my_plugin on the testing profile setting = bar :param plugin: plugin name :param profile: profile name :since: 1.0 """ conf = load_full_config() return get_plugin_config(conf, plugin, profile)
# # The end.