# This library is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this library; if not, see <http://www.gnu.org/licenses/>.
"""
Some CLI adapters for working with Sifty Dingo filtering
:author: Christopher O'Brien <obriencj@gmail.com>
:license: GPL v3
"""
import os
from argparse import ArgumentParser, Namespace
from collections import defaultdict
from functools import partial
from operator import attrgetter, itemgetter
from os.path import basename
from sys import version_info
from typing import (
TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Type, )
from . import open_output, printerr, resplit
from ..common import escapable_replace
from ..sift import DEFAULT_SIEVES, Sieve, Sifter, SifterError
from ..sift.builds import build_info_sieves
from ..sift.tags import tag_info_sieves
from ..types import KeySpec
if version_info < (3, 11):
from pkg_resources import EntryPoint, iter_entry_points
elif not TYPE_CHECKING:
from importlib.metadata import EntryPoint, entry_points as _entry_points
def iter_entry_points(group):
return _entry_points(group=group)
__all__ = (
"BuildSifting",
"Sifting",
"TagSifting",
"output_sifted",
)
OnErr = Callable[[EntryPoint, Exception], None]
def _entry_point_sieves(
key: str,
on_err: Optional[OnErr] = None) -> List[Type[Sieve]]:
"""
Load all Sieve instances from entry points using the given
key. Returns a list of Sieve subclasses that can be used to
augment the predicates in a Sifter.
The individual endpoints can resolve to any of the following:
* a Sieve subclass
* a list or tuple of Sieve subclasses
* a callable which returns a Sieve subclass
* a callable which returns a list or tuple of Sieve subclasses
The entry points are sorted by their module name and name, and
the results are combined into a list.
If on_err is None, then any exceptions raised during the
resolution of entry points into Sieves will simply be skipped.
If on_err is not None, then is must be a callable that accepts two
arguments: an entry point and an exception. This callable will be
invoked if there is an exception raised during the resolution of
the entry point into Sieves. If the callable returns True, loading
of any additional entry points will continue. Otherwise no further
entry points will be loaded.
:param key: entry point key to load
:param on_err: error handling function
"""
points = sorted(iter_entry_points(key),
key=attrgetter('module_name', 'name'))
collected = []
for entry_point in points:
try:
ep_ref = entry_point.load()
# is this TOO flexible? The entry point can load as
# any of:
# * Sieve subclass
# * list/tuple of Sieve subclasses
# * null arity callable which returns the above
if issubclass(ep_ref, Sieve):
collected.append(ep_ref)
continue
if callable(ep_ref):
ep_ref = ep_ref()
if isinstance(ep_ref, (list, tuple)):
collected.extend(ep_ref)
elif issubclass(ep_ref, Sieve):
collected.append(ep_ref)
else:
pass
except Exception as ex:
if on_err and not on_err(entry_point, ex):
break
return collected
def entry_point_tag_info_sieves(
on_err: Optional[OnErr] = None) -> List[Type[Sieve]]:
return _entry_point_sieves("koji_smoky_dingo_tag_sieves", on_err)
def entry_point_build_info_sieves(
on_err: Optional[OnErr] = None) -> List[Type[Sieve]]:
return _entry_point_sieves("koji_smoky_dingo_build_sieves", on_err)
[docs]
class Sifting():
"""
A mixin for SmokyDingo instances that wish to offer a sifter.
"""
[docs]
def sifter_arguments(
self,
parser: ArgumentParser) -> ArgumentParser:
"""
Adds an argument group for for loading a Sifter from either an
text argument or a filename, and specifying output files for
the expected flags from that Sifter's results.
* ``--output/-o FLAG:FILENAME[,...]``
* ``--filter FILTER``
* ``--filter-file FILTER_FILE``
"""
grp = parser.add_argument_group("Filtering with Sifty sieves")
addarg = grp.add_argument
addarg("--param", "-P", action="append", default=list(),
dest="params", metavar="KEY=VALUE",
help="Provide compile-time values to the sifty"
" filter expressions")
addarg("--env-params", action="store_true", default=False,
dest="use_env",
help="Use environment vars for params left unassigned")
addarg("--output", "-o", action="append", default=list(),
dest="outputs", metavar="FLAG:FILENAME",
help="Divert results marked with the given FLAG to"
" FILENAME. If FILENAME is '-', output to stdout."
" The 'default' flag is output to stdout by default,"
" and other flags are discarded")
addarg("--no-entry-points", "-n", action="store_false", default=True,
dest="entry_points",
help="Disable loading of additional sieves from"
" entry_points")
grp = grp.add_mutually_exclusive_group()
addarg = grp.add_argument
addarg("--filter", action="store", default=None,
metavar="FILTER",
help="Use the given sifty filter predicates")
addarg("--filter-file", action="store", default=None,
metavar="FILTER_FILE",
help="Load sifty filter predictes from file")
return parser
[docs]
def default_params(self) -> Dict[str, str]:
params: Dict[str, str]
params = getattr(self, "_sifter_params", None)
if params is None:
self._sifter_params = params = {} # type: ignore
return params
[docs]
def get_params(
self,
options: Namespace) -> Dict[str, str]:
cli_params: Dict[str, str]
env_params: Dict[str, str]
params: Dict[str, str]
# build up a params dict based on command-line options, param
# definitions from the filter file, and finally the
# environment if the --env flag was set.
cli_params = options.params
env_params = os.environ if options.use_env else {} # type: ignore
params = self.default_params()
for opt in resplit(cli_params):
if "=" in opt:
key, val = opt.split("=", 1)
else:
key = opt
val = None
params[key] = val
for key, val in params.items():
if val is None:
if key in env_params:
params[key] = env_params[key]
else:
raise SifterError(f"param {key} is not defined")
return params
[docs]
def get_outputs(
self,
options: Namespace) -> Dict[str, str]:
"""
Produces a dict mapping flag names to output files based on the
accumulated results of the ``--output FLAG:FILENAME`` argument.
This dict can be used with `output_sifted` to record the results
of a sifter to a collection of files.
"""
result: Dict[str, str] = {}
newresult: Dict[str, str]
for opt in resplit(options.outputs):
if ":" in opt:
flag, dest = opt.split(":", 1)
else:
flag = opt
dest = "-"
result[flag] = dest or None
if "*" in result:
# the * output means that all otherwise-undefined flag
# outputs should be directed there, but we don't actually
# know all the flags at this point. So we'll create a
# defaultdict that produces that value for misses,
# pre-populate it with the output mappings we know of
overall = result.pop("*")
newresult = defaultdict(lambda: overall)
newresult.update(result)
result = newresult
elif "default" not in result:
result["default"] = "-"
return result
[docs]
def get_sieves(
self,
entry_points: bool = True) -> List[Type[Sieve]]:
return DEFAULT_SIEVES
[docs]
def get_sifter(
self,
options: Namespace) -> Sifter:
"""
Produces a Sifter instances constructed from values in options.
These options should have been generated from a parser that
has had the `Sifting.sifter_arguments` invoked on it.
"""
if options.filter:
filter_src = options.filter
elif options.filter_file:
with open(options.filter_file, "rt") as fin:
filter_src = fin.read()
else:
return None
params = self.get_params(options)
sieves = self.get_sieves(options.entry_points)
return Sifter(sieves, filter_src, params=params)
def _report_problem(msg, entry_point, exc):
# printerr(msg % (entry_point, exc))
printerr(msg.format(entry_point, exc))
return True
[docs]
class BuildSifting(Sifting):
[docs]
def get_sieves(self, entry_points=True):
sieves = build_info_sieves()
if entry_points:
msg = "Error loading build sieve from entry_point {} : {}"
err = partial(_report_problem, msg)
sieves.extend(entry_point_tag_info_sieves(on_err=err))
return sieves
[docs]
def output_sifted(
results: Dict[str, List[dict]],
key: KeySpec = "id",
outputs: Optional[Dict[str, str]] = None,
sort: Optional[KeySpec] = None):
"""
Records the results of a sifter to output. As sifter results are
dicts, the `key` parameter can be either a unary callable or an
index value to be used to fetch a simplified, printable
representaiton from the dicts.
`outputs` is a mapping of flag names to filenames using the rules
of the :py:func:`open_output` function.
:param results: results of invoking a Sifter on a set of data
:param key: transformation to apply to the individual data
elements prior to recording. Default, lookup the ``"id"`` index
from the element.
:param outputs: mapping of flags to destination filenames. If
unspecified, the default flag will be written to stdout and the
rest will be discarded.
:param sort: sorting to apply to the results in each flag. If
unspecified, order is preserved.
"""
if not callable(key):
key = itemgetter(key)
if sort and not callable(sort):
sort = partial(sorted, key=itemgetter(sort))
if outputs is None:
outputs = {"default": "-"}
# kinda hackish but we need to populate the defaults in this case
# because we really want to iterate over the outputs, not the
# results in order to make sure every output gets written to, even
# if it has no results.
if isinstance(outputs, defaultdict):
for flag in results:
dest = outputs[flag]
for flag, dest in outputs.items():
if "%" in dest:
safe_flag = flag.translate(str.maketrans("/\\ ", "___"))
dest = escapable_replace(dest, "%", safe_flag)
if dest.startswith("@"):
dest = dest[1:]
append = True
else:
append = False
flagged = results.get(flag, ())
if sort:
flagged = sort(flagged)
with open_output(dest, append) as dout:
for res in map(key, flagged):
print(res, file=dout)
#
# The end.