# This library is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this library; if not, see <http://www.gnu.org/licenses/>.
"""
Koji Smoky Dingo - Filtering Language Sifty Dingo
This is a mini-language based on S-Expressions used for filtering
sequences of dict data. The core language only supports some simple
logical constructs and a facility for setting and checking flags. The
language must be extended to add more predicates specific to the
schema of the data being filtered to become useful.
The Sifty Dingo mini-language has nothing to do with the Sifty
project, nor the Sieve email filtering language. I just thought that
Sifter and Sieve were good names for something that filters stuff.
:author: Christopher O'Brien <obriencj@gmail.com>
:license: GPL v3
"""
from abc import ABCMeta, abstractproperty
from collections import OrderedDict
from functools import partial
from io import TextIOBase
from koji import ClientSession
from operator import itemgetter
from typing import (
Any, Iterable, Callable, Dict, List, Sequence, Set,
Tuple, Type, TypeVar, Union, )
from .. import BadDingo
from ..types import KeySpec
from .parse import (
Glob, ItemPath, Matcher, Number, Reader, Regex, Symbol, SymbolGroup,
convert_token, parse_exprs, )
__all__ = (
"DEFAULT_SIEVES",
"Flagged",
"Flagger",
"IntStrSieve",
"ItemPathSieve",
"ItemSieve",
"Logic",
"LogicAnd",
"LogicNot",
"LogicOr",
"MatcherSieve",
"Sieve",
"Sifter",
"SifterError",
"SymbolSieve",
"VariadicSieve",
"ensure_all_int_or_str",
"ensure_all_matcher",
"ensure_all_sieve",
"ensure_all_symbol",
"ensure_int",
"ensure_int_or_str",
"ensure_matcher",
"ensure_sieve",
"ensure_str",
"ensure_symbol",
)
[docs]
class SifterError(BadDingo):
# Indicates an problem during the compilation of a Sifter, either
# due to a syntactic problem or in the initialization of a Sieve
# instance with incompatible parameter types.
complaint = "Error compiling Sifter"
[docs]
def ensure_symbol(
value: Any,
msg: str = None) -> Symbol:
"""
Checks that the value is a Symbol, and returns it. If value was
not a Symbol, raises a SifterError.
"""
if isinstance(value, Symbol):
return value
if not msg:
msg = "Value must be a symbol"
raise SifterError(f"{msg}: {value!r}"
f" (type {type(value).__name__})")
[docs]
def ensure_all_symbol(
values: List[Any],
expand: bool = True,
msg: str = None) -> List[Symbol]:
"""
Checks that all of the elements in values are Symbols, and returns
them as a new list. If not, raises a SifterError.
If expand is True then any SymbolGroup instances will be expanded
to their full combination of Symbols and inlined. Otherwise, the
inclusion of a SymbolGroup is an error.
:param expand: convert any SymbolGroups into their combinant
Symbols
"""
result: List[Symbol] = []
for val in values:
if isinstance(val, Symbol):
result.append(val)
elif expand and isinstance(val, SymbolGroup):
result.extend(val)
else:
if not msg:
msg = "Value must be a symbol"
raise SifterError(f"{msg}: {val!r}"
f" (type {type(val).__name__})")
return result
[docs]
def ensure_str(
value: Any,
msg: str = None) -> str:
"""
Checks that value is either an int, str, or Symbol, and returns a
str version of it. If value is not an int, str, or Symbol, raises
a SifterError.
"""
if isinstance(value, (int, str)):
return str(value)
if not msg:
msg = "Value must be a string"
raise SifterError(f"{msg}: {value!r}"
f" (type {type(value).__name__})")
[docs]
def ensure_int(
value: Any,
msg: str = None) -> int:
"""
Checks that valie is an int or Number, and returns it as an
int. If value is not an int or Number, raises a SifterError.
"""
if isinstance(value, int):
return int(value)
if not msg:
msg = "Value must be an int"
raise SifterError(f"{msg}: {value!r}"
f" (type {type(value).__name__})")
[docs]
def ensure_int_or_str(
value: Any,
msg: str = None) -> Union[int, str]:
"""
Checks that value is either a int, Number, str, or Symbol. Returns
an int or str as appropriate. If value is not an int, Number, str,
nor Symbol, raises a SifterError.
:param value: the value to coerce into an int or str
:param msg: optional error message if value cannot be coerced
"""
if isinstance(value, int):
return int(value)
elif isinstance(value, str):
# Symbol is a subclass of str, so convert it back
return str(value)
if not msg:
msg = "Value must be an int, Number, str, or Symbol"
raise SifterError(f"{msg}: {value!r}"
f" (type {type(value).__name__})")
[docs]
def ensure_all_int_or_str(
values: Iterable[Any],
msg: str = None) -> List[Union[int, str]]:
"""
Checks that all values are either a int, Number, str, or Symbol.
Returns each as an int or str as appropriate in a new list. If any
value is not an int, Number, str, nor Symbol, raises a
SifterError.
:param values: sequence of values to ensure or convert
:param msg: optional error message for exception raised if a portion
of values could not be coerced to an int or str
"""
return [ensure_int_or_str(v, msg) for v in values]
[docs]
def ensure_matcher(
value: Any,
msg: str = None) -> Union[str, Matcher]:
"""
Checks that value is either a str, or a Matcher instance, and
returns it. If not, raises a SifterError.
"""
if isinstance(value, (str, Matcher)):
return value
if not msg:
msg = "Value must be a string, regex, or glob"
raise SifterError(f"{msg}: {value!r}"
f" (type {type(value).__name__})")
[docs]
def ensure_all_matcher(
values: Iterable[Any],
msg: str = None) -> List[Union[str, Matcher]]:
"""
Checks that all of the elements in values are either a str,
Symbol, Regex, or Glob instance, and returns them as a new list.
If not, raises a SifterError.
"""
return [ensure_matcher(v, msg) for v in values]
[docs]
def ensure_sieve(
value: Any,
msg: str = None) -> 'Sieve':
"""
Checks that value is a Sieve instance, and returns it. If not,
raises a SifterError.
"""
if isinstance(value, Sieve):
return value
if not msg:
msg = "Value must be a sieve expression"
raise SifterError(f"{msg}: {value!r}"
f" (type {type(value).__name__})")
[docs]
def ensure_all_sieve(
values: Iterable[Any],
msg: str = None) -> List['Sieve']:
"""
Checks that all of the elements in values are Sieve instances, and
returns them in a new list. If not, raises a SifterError.
"""
return [ensure_sieve(v, msg) for v in values]
def gather_args(
values: Iterable[str]) -> Tuple[list, dict]:
"""
Converts list of values into an *args and **kwds pair for use in
creating a Sieve instance.
"""
missing = object()
args = []
kwds = {}
ivals = iter(values)
for val in ivals:
if isinstance(val, Symbol) and val.endswith(":"):
key = val.rstrip(":")
val = next(ivals, missing) # type: ignore
if val is missing:
msg = f"Missing value for keyword argument {key}"
raise SifterError(msg)
else:
kwds[key] = val
else:
args.append(val)
return args, kwds
ST = TypeVar('ST')
[docs]
class Sifter():
"""
A flagging data filter, compiled from an s-expression syntax.
Sifter instances are callable, and when invoked with a session and
a list of info dicts will perform filtering tests on the data to
determine which items match the predicates from the source syntax.
"""
def __init__(self,
sieves: Union[Dict[str, Type['Sieve']],
Iterable[Type['Sieve']]],
source: Union[str, Reader],
key: KeySpec = "id",
params: Dict[str, str] = None):
"""
:param sieves: list of classes to use in compiling the source
str. Each class should be a subclass of Sieve. The name
attribute of each class is used as the lookup value when
compiling a sieve expression
:param source: Source from which to parse Sieve expressions
:param key: Unique hashable identifier key for the info
dicts. This is used to deduplicate or otherwise correlate
the incoming information. Default, use the "id" value.
:param params: Map of text substitutions for quoted strings
"""
if not callable(key):
key = itemgetter(key)
self.key: Callable = key
self.params: Dict[str, str] = params or {}
# {flagname: {data_id: bool}}
self._flags: Dict[str, Dict[Any, bool]] = {}
# {(cachename, data_id): {}}
self._cache: Dict[Tuple[str, Any], Any] = {}
sievedict: Dict[str, Type[Sieve]]
if not isinstance(sieves, dict):
# convert a list of sieves into a dict mapping the sieve
# names and their aliases to the classes
sieves = tuple(sieves)
sievedict = {sieve.name: sieve # type: ignore
for sieve in sieves}
for sieve in sieves:
for alias in sieve.aliases:
sievedict[alias] = sieve
sieves = sievedict
self._sieve_classes: Dict[str, Type[Sieve]] = sieves
exprs = self._compile(source) if source else []
self._exprs = ensure_all_sieve(exprs)
[docs]
def sieve_exprs(self) -> List['Sieve']:
"""
The list of Sieve expressions in this Sifter
"""
return self._exprs
def _compile(self, source: Union[Reader, str]):
"""
Turns a source string into a list of Sieve instances
"""
if isinstance(source, Reader):
reader = source
else:
reader = Reader(str(source))
return [self._convert(p) for p in parse_exprs(reader)]
def _convert_sieve_aliases(
self,
sym: Symbol,
args: Tuple) -> Tuple[Symbol, Tuple]:
"""
When there is no sieve with a matchin name for sym, we check if it
could be a convenience alias for some other forms.
* (not-FOO ARGS...) becomes (not (FOO ARGS...))
* (!FOO ARGS...) becomes (not (FOO ARGS...))
* (BAR?) becomes (flagged BAR)
"""
if sym.startswith("not-"):
# converts (not-foo 1) into (not (foo 1))
subexpr = [Symbol(sym[4:])]
subexpr.extend(args)
return Symbol("not"), (subexpr,)
elif sym.startswith("!"):
# converts (!foo 1) into (not (foo 1))
subexpr = [Symbol(sym[1:])]
subexpr.extend(args)
return Symbol("not"), (subexpr,)
elif sym.endswith("?") and not args:
# converts (bar?) into (flagged bar)
return Symbol("flagged"), (Symbol(sym[:-1]),)
else:
return sym, args
def _convert(self, parsed):
"""
Takes the simple parse tree and turns it into a series of nested
Sieve instances
"""
if isinstance(parsed, list):
if not parsed:
raise SifterError("Empty expression: ()")
if isinstance(parsed[0], ItemPath):
# a shortcut to the built-in 'item' sieve is to start the
# sieve with an ItemPath
name = Symbol("item")
args = parsed
else:
name = ensure_symbol(parsed[0], "Sieve names must be symbols")
args = parsed[1:]
cls = self._sieve_classes.get(name)
if cls is None:
# no direct matches, so we'll look up syntactic
# aliases. This is where conversion of the ! prefix
# and the ? suffix would happen.
newname, args = self._convert_sieve_aliases(name, args)
cls = self._sieve_classes.get(newname)
if cls is None:
# even after converting for aliases we have no match, so
# we cannot compile the sieve.
raise SifterError(f"No such sieve: {name}")
# looks for positional and option parameters from the tail
# of the list.
args, kwds = gather_args(map(self._convert, args))
try:
result = cls(self, *args, **kwds)
except TypeError as te:
raise SifterError(f"Error creating Sieve {name}: {te}")
elif isinstance(parsed, Symbol):
if parsed.startswith("$") and parsed[1:] in self.params:
# this is a parameter reference, and should be
# converted to the value of the parameter.
result = convert_token(self.params[parsed[1:]])
result = self._convert(result)
else:
result = parsed
elif isinstance(parsed, str):
if "{" in parsed:
# strings can have {param_name} entries in them which
# will allow for substitutions with parameters
result = parsed.format(**self.params)
else:
result = parsed
else:
result = parsed
return result
[docs]
def run(self,
session: ClientSession,
info_dicts: Iterable[ST]) -> Dict[str, List[ST]]:
"""
Clears existing flags and runs contained sieves on the given
info_dicts.
"""
self._flags.clear()
key = self.key
data = {key(b): b for b in info_dicts if b}
work = tuple(data.values())
for expr in self._exprs:
autoflag = not isinstance(expr, Flagger)
for binfo in expr(session, work):
if autoflag:
self.set_flag("default", binfo)
results = {}
for flag, bids in self._flags.items():
results[flag] = [data[bid] for bid in bids]
return results
def __call__(self,
session: ClientSession,
info_dicts: Iterable[ST]) -> Dict[str, List[ST]]:
"""
Invokes run if there are any elements in info_dicts sequence. If
there are not any elements, returns an empty dict.
This bypassing of `run` would prevent the prep methods being
invoked on any of the sieves.
"""
work = tuple(info_dicts)
return self.run(session, work) if work else {}
[docs]
def reset(self):
"""
Clears flags and data caches
"""
self._cache.clear()
self._flags.clear()
[docs]
def is_flagged(
self,
flagname: str,
data: ST) -> bool:
"""
True if the data has been flagged with the given flagname, either
via a ``(flag ...)`` sieve expression, or via `set_flag`
"""
return ((flagname in self._flags) and
(self._flags[flagname].get(self.key(data), False)))
[docs]
def set_flag(
self,
flagname: str,
data: ST):
"""
Records the given data as having been flagged with the given
flagname.
"""
bfl: Dict[Any, bool] = self._flags.get(flagname)
if bfl is None:
# we want to preserve the order
self._flags[flagname] = bfl = {}
bfl[self.key(data)] = True
[docs]
def get_cache(self, cachename, key) -> dict:
"""
Flexible storage for caching data in a sifter. Sieves can use this
to record data about individual info dicts, or to cache results
from arbitrary koji session calls.
This data is cleared when the `reset` method is invoked.
"""
cachekey = (cachename, key)
cch = self._cache.get(cachekey)
if cch is None:
cch = self._cache[cachekey] = {}
return cch
[docs]
def get_info_cache(self, cachename, data) -> dict:
"""
Cache associated with a particular info dict.
This data is cleared when the `reset` method is invoked
"""
return self.get_cache(cachename, self.key(data))
[docs]
class Sieve(metaclass=ABCMeta):
"""
The abstract base type for all Sieve expressions.
A Sieve is a callable instance which is passed a session and a
sequence of info dicts, and returns a filtered subset of those
info dicts.
The default ``run`` implementation will trigger the `prep` method
first, and then use the `check` method on each info dict to
determine whether it should be included in the results or not.
Subclasses can therefore easily write just the check method.
The prep method is there in the event that additional queries
should be called on the whole set of incoming data (enabling
multicall optimizations).
Sieves are typically instanciated by a Sifter when it compiles the
sieve expression string.
Sieve subclasses must provide a ``name`` class property or
attribute. This property is the key used to define how the Sieve
is invoked by the source. For example, a source of
``(check-enabled X)`` is going to expect that the Sifter has a
Sieve class available with a name of `"check-enabled"`
"""
@abstractproperty
def name(self) -> str:
pass
aliases: Sequence[str] = ()
def __init__(self,
sifter: Sifter,
*tokens, **options):
self.sifter = sifter
self.key = sifter.key
self.tokens = tokens
self.options = options
def __call__(
self,
session: ClientSession,
info_dicts: Iterable[ST]) -> Iterable[ST]:
work = tuple(info_dicts)
return tuple(self.run(session, work)) if work else work
def __repr__(self):
params = list(map(repr, self.tokens))
for key, val in self.options.items():
params.append(key + ":")
params.append(repr(val))
if params:
e = " ".join(params)
return f"({self.name} {e})"
else:
return f"({self.name})"
[docs]
def check(self,
session: ClientSession,
info: ST) -> bool:
"""
Override to return True if the predicate matches the given
info dict.
This is used by the default `run` implementation in a filter.
Only the info dicts which return True from this method will be
included in the results.
:param info: The info dict to be checked.
"""
pass
[docs]
def prep(self,
session: ClientSession,
info_dicts: Iterable[ST]):
"""
Override if some bulk pre-loading operations are necessary.
This is used by the default `run` implementation to allow bulk
operations to be performed over the entire set of info dicts
to be filtered, rather than one at a time in the `check`
method
"""
return
[docs]
def run(self,
session: ClientSession,
info_dicts: Iterable[ST]) -> Iterable[ST]:
"""
Use this Sieve instance to select and return a subset of the
info_dicts sequence.
"""
self.prep(session, info_dicts)
return filter(partial(self.check, session), info_dicts)
[docs]
def get_cache(self, key: str) -> dict:
"""
Gets a cache dict from the sifter using the name of this sieve
and the given key (which must be hashable)
The same cache dict will be returned for this key until the
sifter has its `reset` method invoked.
"""
return self.sifter.get_cache(self.name, key)
[docs]
def get_info_cache(self, info: ST) -> dict:
"""
Gets a cache dict from the sifter using the name of this sieve and
the sifter's designated key for the given info dict. The default
sifter key will get the "id" value from the info dict.
The same cache dict will be returned for this info dict until
the sifter has its `reset` method invoked.
"""
return self.sifter.get_info_cache(self.name, info)
[docs]
class MatcherSieve(Sieve):
"""
A Sieve that requires all of its arguments to be matchers. Calls
`ensure_all_matcher` on `tokens`
"""
def __init__(self, sifter, *tokens):
tokens = ensure_all_matcher(tokens)
super().__init__(sifter, *tokens)
[docs]
class SymbolSieve(Sieve):
"""
A Sieve that requires all of its arguments to be matchers. Calls
`ensure_all_symbol` on `tokens`
"""
def __init__(self, sifter, *tokens):
tokens = ensure_all_symbol(tokens)
super().__init__(sifter, *tokens)
[docs]
class IntStrSieve(Sieve):
"""
A Sieve that requires all of its arguments to be matchers. Calls
`ensure_all_int_or_str` on `tokens`
"""
def __init__(self, sifter, *tokens):
tokens = ensure_all_int_or_str(tokens)
super().__init__(sifter, *tokens)
[docs]
class Logic(Sieve, metaclass=ABCMeta):
check = None
def __init__(self, sifter, *exprs):
exprs = ensure_all_sieve(exprs)
super().__init__(sifter, *exprs)
[docs]
class LogicAnd(Logic):
"""
Usage: ``(and EXPR [EXPR...])``
filters for info dicts which match all sub expressions.
"""
name = "and"
[docs]
def run(self, session, info_dicts):
work = info_dicts
for expr in self.tokens:
if not work:
break
work = expr(session, work)
return work
[docs]
class LogicOr(Logic):
"""
Usage: ``(or EXPR [EXPR...])``
filters for info dicts which match any of the sub expressions.
"""
name = "or"
[docs]
def run(self, session, info_dicts):
work = {self.key(b): b for b in info_dicts}
results = {}
for expr in self.tokens:
if not work:
break
for b in expr(session, work.values()):
bid = self.key(b)
del work[bid]
results[bid] = b
return results.values()
[docs]
class LogicNot(Logic):
"""
Usage: ``(not EXPR [EXPR...])``
filters for info dicts which match none of the sub expressions.
"""
name = "not"
aliases = ("!", )
[docs]
def run(self, session, info_dicts):
work = {self.key(b): b for b in info_dicts}
for expr in self.tokens:
if not work:
break
for b in expr(session, work.values()):
del work[self.key(b)]
return work.values()
[docs]
class Flagger(LogicAnd):
"""
Usage: ``(flag NAME EXPR [EXPR...])``
filters for info dicts which match all of the sub expressions, and
marks them with the given named flag.
"""
name = "flag"
def __init__(self, sifter, flag, *exprs):
super().__init__(sifter, *exprs)
self.flag = ensure_symbol(flag)
[docs]
def run(self, session, info_dicts):
results = super().run(session, info_dicts)
for info in results:
self.sifter.set_flag(self.flag, info)
return results
def __repr__(self):
e = " ".join(map(repr, self.tokens))
return f"({self.name} {self.flag!r} {e})"
[docs]
class VariadicSieve(Sieve, metaclass=ABCMeta):
"""
Utility class which automatically applies an outer ``(or ...)`` when
presented with more than one argument.
This allows for example ``(name foo bar baz)`` to automatically
become ``(or (name foo) (name bar) (name baz))`` while the
``name`` sieve only needs to be written to check for a single
value.
"""
def __new__(cls, sifter, *exprs):
if len(exprs) > 1:
wrapped = [cls(sifter, expr) for expr in exprs]
return LogicOr(sifter, *wrapped)
else:
return object.__new__(cls)
def __init__(self, sifter, token):
super().__init__(sifter, token)
self.token = token
[docs]
class Flagged(VariadicSieve):
"""
Usage: ``(flagged NAME [NAME...])``
filters for info dicts which have been marked with any of the
given named flags
"""
name = "flagged"
aliases = ("?", )
def __init__(self, sifter, name):
super().__init__(sifter, ensure_symbol(name))
[docs]
def check(self, session, info):
return self.sifter.is_flagged(self.token, info)
[docs]
class ItemSieve(VariadicSieve, metaclass=ABCMeta):
"""
A VariadicSieve which performs a comparison by fetching a named
key from the info dict.
Subclasses must provide a `field` attribute which will be used as
a key to fetch a comparison value from any checked info dicts.
If a pattern is specified, then the predicate matches if the info
dict has an item by the given field key, and the value of that
item matches the pattern.
If a pattern is absent then this predicate will only check that
given field key exists and is not None.
"""
@abstractproperty
def field(self):
pass
def __init__(self, sifter, pattern=None):
if pattern is not None:
pattern = ensure_matcher(pattern)
super().__init__(sifter, pattern)
[docs]
def check(self, session, info):
if self.token is None:
return info.get(self.field) is not None
else:
return ((self.field in info) and
(self.token == info[self.field]))
def __repr__(self):
if self.token is None:
return f"({self.name})"
else:
return f"({self.name} {self.token!r})"
[docs]
class ItemPathSieve(Sieve):
"""
usage: ``(item PATH [VALUE...])``
Resolves the given PATH on each element and checks that any of the given
values match. If any do, the element passes.
"""
name = "item"
def __init__(self, sifter, path, *values):
if not isinstance(path, ItemPath):
path = ItemPath(path)
values = ensure_all_matcher(values)
super().__init__(sifter, *values)
self.path = path
[docs]
def check(self, session, data):
work = self.path.get(data)
if self.tokens:
for pathv in work:
for val in self.tokens:
if val == pathv:
return True
else:
for pathv in work:
if pathv is not None:
return True
return False
def __repr__(self):
if self.tokens:
e = " ".join(map(str, self.tokens))
return f"({self.name} {self.path!s} {e})"
else:
return f"({self.name} {self.path!s})"
DEFAULT_SIEVES: List[Type[Sieve]] = [
Flagged, Flagger,
ItemPathSieve,
LogicAnd, LogicOr, LogicNot,
]
#
# The end.