# This library is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this library; if not, see <http://www.gnu.org/licenses/>.
"""
Koji Smoky Dingo - Client utilities for working with the Koji
build system
:author: Christopher O'Brien <obriencj@gmail.com>
:license: GPL v3
"""
from functools import partial
from koji import (
ClientSession, Fault, GenericError, ParameterError,
convertFault, read_config)
from koji_cli.lib import activate_session, ensure_connection
from logging import DEBUG, basicConfig
from typing import (
Any, Callable, Dict, Iterator, Iterable, List,
Optional, Sequence, TypeVar, Tuple, Union, cast)
from .common import chunkseq
from .types import (
ArchiveInfo, ArchiveInfos, ArchiveSpec,
BuildInfo, BuildSpec,
ChannelInfo, ChannelSpec,
HostInfo, HostSpec,
HubVersionSpec,
PackageInfo, PackageSpec,
RepoInfo, RepoSpec,
RPMInfo, RPMInfos, RPMSignature, RPMSpec,
TagInfo, TagSpec,
TargetInfo, TargetSpec,
TaskInfo, TaskSpec,
UserInfo, UserSpec, )
__all__ = (
"AnonClientSession",
"BadDingo",
"FeatureUnavailable",
"ManagedClientSession",
"NoSuchArchive",
"NoSuchBuild",
"NoSuchChannel",
"NoSuchContentGenerator",
"NoSuchHost",
"NoSuchPackage",
"NoSuchPermission",
"NoSuchRepo",
"NoSuchRPM",
"NoSuchTag",
"NoSuchTarget",
"NoSuchTask",
"NoSuchUser",
"NotPermitted",
"ProfileClientSession",
"as_archiveinfo",
"as_buildinfo",
"as_channelinfo",
"as_hostinfo",
"as_packageinfo",
"as_repoinfo",
"as_rpminfo",
"as_taginfo",
"as_targetinfo",
"as_taskinfo",
"as_userinfo",
"bulk_load",
"bulk_load_build_archives",
"bulk_load_build_rpms",
"bulk_load_builds",
"bulk_load_buildroot_archives",
"bulk_load_buildroot_rpms",
"bulk_load_buildroots",
"bulk_load_rpm_sigs",
"bulk_load_tags",
"bulk_load_tasks",
"bulk_load_users",
"hub_version",
"iter_bulk_load",
# "paged_query_history",
"version_check",
"version_require",
)
[docs]
class ManagedClientSession(ClientSession):
"""
A `koji.ClientSession` that can be used as via the ``with``
keyword to provide a managed session that will handle
authenticated login and logout.
:since: 1.0
"""
def __enter__(self):
self.activate()
return self
def __exit__(self, exc_type, _exc_val, _exc_tb):
self.logout()
if self.rsession:
self.rsession.close()
self.rsession = None
return (exc_type is None)
[docs]
def activate(self):
"""
Invokes `koji_cli.lib.activate_session` with this session's
options, which will trigger the appropriate login method.
:since: 2.0
"""
return activate_session(self, self.opts)
@property
def logger(self):
# a cached copy of `logging.getLogger('koji')`, assigned
# during `ClientSession.__init__` invocation. There are some
# code paths in the underlying ClientSession which will
# presume that logging handlers have been configured, without
# checking that they actually have been. This is likely
# because the koji command-line interface sets up that logger
# shortly after parsing initial CLI args. However, when a
# script uses a ClientSession, that setup won't have
# happened. Then if the script encounters an error along one
# of those code paths, an additional logging warning will be
# output after that path attempts to log at some unconfigured
# level. This property allows us to set a default
# configuration if one hasn't been given yet, just before the
# instance would attempt to use the logger.
logger = self._logger
if logger and not logger.handlers:
basicConfig()
# the koji CLI will use the --debug and --quiet options to
# determine the logger level. However, only the debug
# option is recorded as part of the session options. We'll
# mimic as much of the logging behavior as we can
opts = self.opts
if opts.get('debug') or opts.get('debug_xmlrpc'):
logger.setLevel(DEBUG)
return logger
@logger.setter
def logger(self, logger):
self._logger = logger
[docs]
class ProfileClientSession(ManagedClientSession):
"""
A `koji.ClientSession` which loads profile config information and
which can be used via tha ``with`` keyword.
:since: 1.0
"""
def __init__(self, profile: str = "koji"):
"""
:param profile: name of the koji profile to load from local
configuration locations
"""
conf = read_config(profile)
server = conf["server"]
super().__init__(server, opts=conf)
[docs]
class AnonClientSession(ProfileClientSession):
"""
A `koji.ClientSession` which loads profile config information and
which can be used via the ``with`` keyword.
Suitable for working with anonymous commands which do not require
authentication. Does not authenticate, and will only connect
lazily.
:since: 1.0
"""
def __enter__(self):
# we could always set up a connection ahead of time,
# but... the connection will be created when we make our first
# call, so let's be lazy instead.
# ensure_connection(self)
return self
[docs]
def activate(self):
"""
Ensures the anonymous session is connected, but does not attempt
to login.
:since: 2.0
"""
ensure_connection(self)
[docs]
class BadDingo(Exception):
"""
Generalized base class for exceptions raised from kojismokydingo.
This class and its subclasses are used to combine a fixed
complaint string with some specific information. This is a
convenience primarily for the CLI, but can also be used to track
more detailed situations where a requested data type wasn't
present on the koji hub, rather than just working with
`koji.GenericError`
:since: 1.0
"""
complaint: str = "Something bad happened"
def __str__(self):
orig = super().__str__()
return f"{self.complaint}: {orig}"
[docs]
class NoSuchBuild(BadDingo):
"""
A build was not found
:since: 1.0
"""
complaint = "No such build"
[docs]
class NoSuchHost(BadDingo):
"""
A host was not found
:since: 1.0
"""
complaint = "No such host"
[docs]
class NoSuchChannel(BadDingo):
"""
A channel was not found
:since: 1.0
"""
complaint = "No such builder channel"
[docs]
class NoSuchContentGenerator(BadDingo):
"""
A content generator was not found
:since: 1.0
"""
complaint = "No such content generator"
[docs]
class NoSuchPackage(BadDingo):
"""
A package was not found
:since: 1.1
"""
complaint = "No such package"
[docs]
class NoSuchTag(BadDingo):
"""
A tag was not found
:since: 1.0
"""
complaint = "No such tag"
[docs]
class NoSuchTarget(BadDingo):
"""
A target was not found
:since: 1.0
"""
complaint = "No such target"
[docs]
class NoSuchTask(BadDingo):
"""
A task was not found
:since: 1.0
"""
complaint = "No such task"
[docs]
class NoSuchUser(BadDingo):
"""
A user was not found
:since: 1.0
"""
complaint = "No such user"
[docs]
class NoSuchPermission(BadDingo):
"""
A permission was not found
:since: 1.0
"""
complaint = "No such permission"
[docs]
class NoSuchArchive(BadDingo):
"""
An archive was not found
:since: 1.0
"""
complaint = "No such archive"
[docs]
class NoSuchRepo(BadDingo):
"""
A repository was not found
:since: 1.1
"""
complaint = "No such repo"
[docs]
class NoSuchRPM(BadDingo):
"""
An RPM was not found
"""
complaint = "No such RPM"
[docs]
class NotPermitted(BadDingo):
"""
A required permission was not associated with the currently logged
in user account.
"""
complaint = "Insufficient permissions"
[docs]
class FeatureUnavailable(BadDingo):
"""
A given feature isn't available due to the version on the koji hub
"""
complaint = "The koji hub version doesn't support this feature"
KT = TypeVar('KT')
[docs]
def iter_bulk_load(
session: ClientSession,
loadfn: Callable[[Any], Any],
keys: Iterable[KT],
err: bool = True,
size: int = 100) -> Iterator[Tuple[KT, Any]]:
"""
Generic bulk loading generator. Invokes the given loadfn on each
key in keys using chunking multicalls limited to the specified
size.
Yields (key, result) pairs in order.
If err is True (default) then any faults will raise an exception.
If err is False, then a None will be substituted as the result for
the failing key.
:param session: The koji session
:param loadfn: The loading function, to be invoked in a multicall
arrangement. Will be called once with each given key from keys
:param keys: The sequence of keys to be used to invoke loadfn.
:param err: Whether to raise any underlying fault returns as
exceptions. Default, True
:param size: How many calls to loadfn to chunk up for each
multicall. Default, 100
:raises koji.GenericError: if err is True and an issue
occurrs while invoking the loadfn
:since: 1.0
"""
for key_chunk in chunkseq(keys, size):
session.multicall = True
for key in key_chunk:
loadfn(key)
for key, info in zip(key_chunk, session.multiCall(strict=err)):
if info:
if "faultCode" in info:
if err:
raise convertFault(Fault(**info)) # type: ignore
else:
yield key, None
else:
yield key, info[0] # type: ignore
else:
yield key, None
[docs]
def bulk_load(
session: ClientSession,
loadfn: Callable[[Any], Any],
keys: Iterable[Any],
err: bool = True,
size: int = 100,
results: Optional[dict] = None) -> Dict[Any, Any]:
"""
Generic bulk loading function. Invokes the given `loadfn` on each
key in `keys` using chunking multicalls limited to the specified
size.
Returns a dict associating the individual keys with the returned
value of loadfn. If `results` is specified, it must support
dict-like assignment via an update method, and will be used in
place of a newly allocated dict to store and return the results.
:param session: an active koji client session
:param loadfn: The loading function, to be invoked in a multicall
arrangement. Will be called once with each given key from `keys`
:param keys: The sequence of keys to be used to invoke `loadfn`.
These keys need to be individually hashable, or the `results`
value needs to be specified with an instance that accepts
assignmnet using these values as the key.
:param err: Whether to raise any underlying fault returns as
exceptions. Default, `True`
:param size: How many calls to `loadfn` to chunk up for each
multicall. Default, `100`
:param results: storage for `loadfn` results. If specified, must
support item assignment (like a dict) via an update method, and
it will be populated and then used as the return value for this
function. Default, a new dict will be allocated.
:raises koji.GenericError: if `err` is `True` and an issue
occurrs while invoking the `loadfn`
:since: 1.0
"""
results = {} if results is None else results
results.update(iter_bulk_load(session, loadfn, keys, err, size))
return results
[docs]
def bulk_load_builds(
session: ClientSession,
nvrs: Iterable[Union[str, int]],
err: bool = True,
size: int = 100,
results: Optional[dict] = None) -> Dict[Union[int, str],
BuildInfo]:
"""
Load many buildinfo dicts from a koji client session and a
sequence of NVRs.
Returns a dict associating the individual NVRs with their
resulting buildinfo.
If err is True (default) then any missing build info will raise a
`NoSuchBuild` exception. If err is False, then a None will be
substituted into the ordered dict for the result.
If results is non-None, it must support dict assignment, and will
be used in place of a newly allocated dict to store and return the
results.
:param nvrs: Sequence of build NVRs or build IDs to load
:param err: Raise an exception if an NVR fails to load. Default,
True.
:param size: Count of NVRs to load in a single multicall. Default,
100
:param results: mapping to store the results in. Default, produce
a new dict
:raises NoSuchBuild: if err is True and any of the given builds
could not be loaded
:since: 1.0
"""
results = {} if results is None else results
for key, info in iter_bulk_load(session, session.getBuild, nvrs,
False, size):
if err and not info:
raise NoSuchBuild(key)
else:
results[key] = info
return results
[docs]
def bulk_load_tasks(
session: ClientSession,
task_ids: Iterable[int],
request: bool = False,
err: bool = True,
size: int = 100,
results: Optional[dict] = None) -> Dict[int, TaskInfo]:
"""
Load many taskinfo dicts from a koji client session and a sequence
of task IDs.
Returns a dict associating the individual IDs with their resulting
taskinfo.
:param session: an active koji client session
:param task_ids: IDs of tasks to be loaded
:param request: if True then load the task's request data as
well. Default, False
:param err: raise an exception if a task fails to load. Default,
True
:param size: count of tasks to load in a single
multicall. Default, 100
:param results: mapping to store the results in. Default, produce
a new dict
:raises NoSuchTask: if err is True and a task couldn't be loaded
:since: 1.0
"""
results = {} if results is None else results
fn = partial(session.getTaskInfo, request=request, strict=False)
for key_chunk in chunkseq(task_ids, size):
for key, info in zip(key_chunk, fn(key_chunk)):
if err and not info:
raise NoSuchTask(key)
else:
results[key] = info
return results
[docs]
def bulk_load_rpm_sigs(
session: ClientSession,
rpm_ids: Iterable[int],
size: int = 100,
results: Optional[dict] = None) -> Dict[int, List[RPMSignature]]:
"""
Set up a chunking multicall to fetch the signatures for a list of
RPM via `session.queryRPMSigs` for each ID in rpm_ids.
Returns a dict associating the individual RPM IDs with their
resulting RPM signature lists.
If results is non-None, it must support a dict-like update method,
and will be used in place of a newly allocated dict to store and
return the results.
:since: 1.0
"""
results = {} if results is None else results
results.update(iter_bulk_load(session, session.queryRPMSigs,
rpm_ids, True, size))
return results
[docs]
def bulk_load_buildroot_archives(
session: ClientSession,
buildroot_ids: Iterable[int],
btype: Optional[str] = None,
size: int = 100,
results: Optional[dict] = None) -> Dict[int, List[ArchiveInfo]]:
"""
Set up a chunking multicall to fetch the archives of buildroots
via `session.listArchives` for each buildroot ID in buildrood_ids.
Returns a dict associating the individual buildroot IDs with their
resulting archive lists.
If results is non-None, it must support dict-like update method,
and will be used in place of a newly allocated dict to store and
return the results.
:since: 1.0
"""
results = {} if results is None else results
fn = lambda i: session.listArchives(componentBuildrootID=i, type=btype)
results.update(iter_bulk_load(session, fn, buildroot_ids, True, size))
return results
[docs]
def bulk_load_buildroot_rpms(
session: ClientSession,
buildroot_ids: Iterable[int],
size: int = 100,
results: Optional[dict] = None) -> Dict[int, List[RPMInfo]]:
"""
Set up a chunking multicall to fetch the RPMs of buildroots via
`session.listRPMs` for each buildroot ID in buildrood_ids.
Returns a dict associating the individual buildroot IDs with their
resulting RPM lists.
If results is non-None, it must support dict-like update method,
and will be used in place of a newly allocated dict to store and
return the results.
:since: 1.0
"""
results = {} if results is None else results
fn = lambda i: session.listRPMs(componentBuildrootID=i)
results.update(iter_bulk_load(session, fn, buildroot_ids, True, size))
return results
[docs]
def bulk_load_build_archives(
session: ClientSession,
build_ids: Iterable[int],
btype: Optional[str] = None,
size: int = 100,
results: Optional[dict] = None) -> Dict[int, List[ArchiveInfo]]:
"""
Set up a chunking multicall to fetch the archives of builds
via `session.listArchives` for each build ID in build_ids.
Returns a dict associating the individual build IDs with their
resulting archive lists.
If results is non-None, it must support dict-like update method,
and will be used in place of a newly allocated dict to store and
return the results.
:since: 1.0
"""
results = {} if results is None else results
fn = lambda i: session.listArchives(buildID=i, type=btype)
results.update(iter_bulk_load(session, fn, build_ids, True, size))
return results
[docs]
def bulk_load_build_rpms(
session: ClientSession,
build_ids: Iterable[int],
size: int = 100,
results: Optional[dict] = None) -> Dict[int, List[RPMInfo]]:
"""
Set up a chunking multicall to fetch the RPMs of builds via
`session.listRPMS` for each build ID in build_ids.
Returns a dict associating the individual build IDs with their
resulting RPM lists.
If results is non-None, it must support a dict-like update method,
and will be used in place of a newly allocated dict to store and
return the results.
:since: 1.0
"""
results = {} if results is None else results
results.update(iter_bulk_load(session, session.listRPMs,
build_ids, True, size))
return results
[docs]
def bulk_load_buildroots(
session: ClientSession,
broot_ids: Iterable[int],
size: int = 100,
results: Optional[dict] = None) -> Dict[int, dict]:
"""
Set up a chunking multicall to fetch the buildroot data via
`session.getBuildroot` for each ID in broot_ids.
Returns a dict associating the individual buildroot IDs with their
resulting buildroot info dicts.
If results is non-None, it must support a dict-like update method,
and will be used in place of a newly allocated dict to store and
return the results.
:since: 1.0
"""
results = {} if results is None else results
results.update(iter_bulk_load(session, session.getBuildroot,
broot_ids, True, size))
return results
[docs]
def bulk_load_users(
session: ClientSession,
users: Iterable[Union[int, str]],
err: bool = True,
size: int = 100,
results: Optional[dict] = None) -> Dict[Union[int, str],
UserInfo]:
"""
Load many userinfo dicts from a koji client session and a sequence of
user identifiers.
Returns a dict associating the individual identifiers with their
resulting userinfo.
If err is True (default) then any missing user info will raise a
NoSuchUser exception. If err is False, then a None will be
substituted into the ordered dict for the result.
If results is non-None, it must support dict assignment, and will
be used in place of a newly allocated dict to store and return the
results.
:param session: active koji session
:param users: user names or IDs to load
:param err: halt on problems and raise an exception. Default, True
:param size: number of users to load in a single
multicall. Default, 100
:param results: dict to store results in. Default, allocate a new
dict
:raises NoSuchUser: if err is True and a user could not be loaded
:since: 1.0
"""
users = tuple(users)
results = {} if results is None else results
if not users:
return results
# we need to identify which signature the getUser API will
# support. Unfortunately the change in signatures happened before
# there was a way to check the hub version. First we'll check
# whether there's already a cached answer as to which API is
# available
session_vars = vars(session)
new_get_user = session_vars.get("__ksd_new_get_user")
if new_get_user is None:
# there wasn't already an answer, so we'll have to find out
# ourselves. In this case we'll load the first user in the
# list of users separately, outside of a multicall, and using
# the as_userinfo function. This function will first try the
# newer signature. If successful, it will record
# __ksd_new_get_user as True, and we'll know to use the newer
# signature. If not, the function will retry with the older
# signature and set __ksd_new_get_user to False.
key = users[0]
users = users[1:]
try:
results[key] = as_userinfo(session, key)
except NoSuchUser:
if err:
raise
else:
results[key] = None
# the use of as_userinfo will have updated the
# __ksd_new_get_user sentinel attribute to either True or
# False
new_get_user = session_vars.get("__ksd_new_get_user")
if new_get_user:
fn = lambda u: session.getUser(u, False, True)
else:
fn = session.getUser
for key, info in iter_bulk_load(session, fn, users, False, size):
if err and not info:
raise NoSuchUser(key)
else:
results[key] = info
return results
[docs]
def as_buildinfo(
session: ClientSession,
build: BuildSpec) -> BuildInfo:
"""
Coerces a build value into a koji build info dict.
If build is an
* int, will attempt to load as a build ID
* str, will attempt to load as an NVR
* dict, will presume already a build info
:param session: active koji session
:param build: value to lookup
:raises NoSuchBuild: if the build value could not be resolved
into a build info dict
:since: 1.0
"""
if isinstance(build, (str, int)):
info = session.getBuild(build)
elif isinstance(build, dict):
info = build
else:
info = None
if not info:
raise NoSuchBuild(build)
return info
[docs]
def as_channelinfo(
session: ClientSession,
channel: ChannelSpec) -> ChannelInfo:
"""
Coerces a channel value into a koji channel info dict.
If channel is an
* int, will attempt to load as a channel ID
* str, will attempt to load as a channel name
* dict, will presume already a channel info
:param session: an active koji client session
:param channel: value to lookup
:raises NoSuchChannel: if the channel value could not be resolved
into a channel info dict
:since: 1.1
"""
if isinstance(channel, (str, int)):
info = session.getChannel(channel)
elif isinstance(channel, dict):
info = channel
else:
info = None
if not info:
raise NoSuchChannel(channel)
return info
[docs]
def as_taginfo(
session: ClientSession,
tag: TagSpec) -> TagInfo:
"""
Coerces a tag value into a koji tag info dict.
If tag is an
* int, will attempt to load as a tag ID
* str, will attempt to load as a tag name
* dict, will presume already a tag info
:param session: active koji session
:param tag: value to lookup
:raises NoSuchTag: if the tag value could not be resolved into a
tag info dict
:since: 1.0
"""
if isinstance(tag, (str, int)):
if version_check(session, (1, 23)):
info = session.getTag(tag, blocked=True)
else:
info = session.getTag(tag)
elif isinstance(tag, dict):
info = tag
else:
info = None
if not info:
raise NoSuchTag(tag)
return info
[docs]
def as_taskinfo(
session: ClientSession,
task: TaskSpec) -> TaskInfo:
"""
Coerces a task value into a koji task info dict.
If task is an
* int, will attempt to load as a task ID
* dict, will presume already a task info
Note that if this function does attempt to load a task, it will
request it with the task's request data as well.
:param session: active koji session
:param task: value to lookup
:raises NoSuchTask: if the task value could not be resolved
into a task info dict
:since: 1.0
"""
if isinstance(task, int):
info = session.getTaskInfo(task, True)
elif isinstance(task, dict):
info = task
else:
info = None
if not info:
raise NoSuchTask(task)
return info
[docs]
def as_targetinfo(
session: ClientSession,
target: TargetSpec) -> TargetInfo:
"""
Coerces a target value into a koji target info dict.
If target is an
* int, will attempt to load as a target ID
* str, will attempt to load as a target name
* dict, will presume already a target info
:param session: active koji session
:param target: value to lookup
:raises NoSuchTarget: if the target value could not be resolved
into a target info dict
:since: 1.0
"""
if isinstance(target, (str, int)):
info = session.getBuildTarget(target)
elif isinstance(target, dict):
info = target
else:
info = None
if not info:
raise NoSuchTarget(target)
return info
[docs]
def as_hostinfo(
session: ClientSession,
host: HostSpec) -> HostInfo:
"""
Coerces a host value into a host info dict.
If host is an:
* int, will attempt to load as a host ID
* str, will attempt to load as a host name
* dict, will presume already a host info
:param session: active koji session
:param host: value to lookup
:raises NoSuchHost: if the host value could not be resolved
into a host info dict
:since: 1.0
"""
if isinstance(host, (str, int)):
info = session.getHost(host)
elif isinstance(host, dict):
info = host
else:
info = None
if not info:
raise NoSuchHost(host)
return info
[docs]
def as_packageinfo(
session: ClientSession,
pkg: PackageSpec) -> PackageInfo:
"""
Coerces a host value into a host info dict.
If pkg is an:
* int, will attempt to load as a package ID
* str, will attempt to load as a package name
* dict, will presume already a package info
:param session: an active koji client session
:param pkg: value to lookup
:raises NoSuchPackage: if the pkg value could not be resolved into
a package info dict
:since: 1.1
"""
if isinstance(pkg, (str, int)):
info = session.getPackage(pkg)
elif isinstance(pkg, dict):
info = pkg
else:
info = None
if not info:
raise NoSuchPackage(pkg)
return info
[docs]
def as_archiveinfo(
session: ClientSession,
archive: ArchiveSpec) -> ArchiveInfo:
"""
Coerces an archive value into an archive info dict.
If archive is an:
* int, will attempt to load as an archive ID
* str, will attempt to load as the first-found archive matching
the given filename
* dict, will presume already an archive info
:param session: active koji session
:param archive: value to lookup
:raises NoSuchArchive: if the archive value could not be resolved
into an archive info dict
:since: 1.0
"""
if isinstance(archive, int):
info = session.getArchive(archive)
elif isinstance(archive, str):
found = session.listArchives(filename=archive)
info = found[0] if found else None
elif isinstance(archive, dict):
info = archive
else:
info = None
if not info:
raise NoSuchArchive(archive)
return info
[docs]
def as_repoinfo(
session: ClientSession,
repo: RepoSpec) -> RepoInfo:
"""
Coerces a repo value into a Repo info dict.
If repo is an:
* dict with name, will attempt to load the current repo from a
tag by that name
* str, will attempt to load the current repo from a tag by name
* int, will attempt to load the repo by ID
* dict, will presume already a repo info
:param session: active koji session
:param repo: value to lookup
:raises NoSuchRepo: if the repo value could not be resolved
into a repo info dict
:since: 1.1
"""
info: RepoInfo = None
if isinstance(repo, dict):
if "name" in repo:
tag = cast(TagInfo, repo)
repo = tag["name"]
else:
info = cast(RepoInfo, repo)
if isinstance(repo, str):
repotag = session.getRepo(repo)
if repotag is None:
raise NoSuchRepo(repo)
repo = repotag["id"]
if isinstance(repo, int):
info = session.repoInfo(repo)
if not info:
raise NoSuchRepo(repo)
return info
[docs]
def as_rpminfo(
session: ClientSession,
rpm: RPMSpec) -> RPMInfo:
"""
Coerces a host value into a RPM info dict.
If rpm is specified as an:
* int, will attempt to load as a RPM ID
* str, will attempt to load as a RPM NVRA
* dict, will presume already an RPM info
:param session: active koji session
:param rpm: value to lookup
:raises NoSuchRPM: if the rpm value could not be resolved
into a RPM info dict
:since: 1.0
"""
info: RPMInfo
if isinstance(rpm, (str, int)):
info = session.getRPM(rpm) # type: ignore
elif isinstance(rpm, dict):
info = rpm
else:
info = None
if not info:
raise NoSuchRPM(rpm)
return info
[docs]
def as_userinfo(
session: ClientSession,
user: UserSpec) -> UserInfo:
"""
Resolves user to a userinfo dict.
If user is a str or int, then getUser will be invoked. If user is
already a dict, it's presumed to be a userinfo already and it's
returned unaltered.
:param session: active koji session
:param user: Name, ID, or User Info describing a koji user
:raises NoSuchUser: when user cannot be found
:since: 1.0
"""
if isinstance(user, (str, int)):
session_vars = vars(session)
new_get_user = session_vars.get("__ksd_new_get_user")
if new_get_user:
# we've tried the new way and it worked, so keep doing it.
info = session.getUser(user, False, True)
elif new_get_user is None:
# an API incompatibility emerged at some point in Koji's
# past, so we need to try the new way first and fall back
# to the older signature if that fails. This happened
# before Koji hub started reporting its version, so we
# cannot use the version_check function to gate this.
try:
info = session.getUser(user, False, True)
session_vars["__ksd_new_get_user"] = True
except ParameterError:
info = session.getUser(user)
session_vars["__ksd_new_get_user"] = False
else:
# we've already tried the new way once and it didn't work.
info = session.getUser(user)
elif isinstance(user, dict):
info = user
else:
info = None
if not info:
raise NoSuchUser(user)
return info
# koji's queryHistory call doesn't support queryOpts for some reason
#
# def paged_query_history(
# session: ClientSession,
# table: str,
# pagesize: int = 100,
# **kwargs: Any):
#
# qopts = {
# "order": "create_event",
# "offset": 0,
# "limit": pagesize,
# }
# kwargs["tables"] = [table]
# kwargs["queryOpts"] = qopts
#
# while True:
# hist = session.queryHistory(**kwargs)[table]
# histlen = len(hist)
#
# if histlen == 0:
# break
#
# yield hist
#
# if histlen < pagesize:
# break
#
# qopts["offset"] += histlen
def _int(val):
if isinstance(val, str) and val.isdigit():
val = int(val)
return val
[docs]
def hub_version(
session: ClientSession) -> Tuple[int, ...]:
"""
Wrapper for ``session.getKojiVersion`` which caches the results on
the session and splits the value into a tuple of ints for easy
comparison.
If the getKojiVersion method isn't implemented on the hub, we
presume that we're version 1.22 ``(1, 22)`` which is the last
version before the getKojiVersion API was added.
If used with koji hub and client >= 1.35 then this value is farmed
from the Koji-Version header of XMLRPC responses, rather than
directly calling getKojiVersion.
:param session: active koji session
:since: 1.0
"""
# we need to use this instead of getattr as koji sessions will
# automatically create all missing properties as proxies to a
# remote hub method.
session_vars = vars(session)
hub_ver = session_vars.get("__ksd_hub_version", None)
if hub_ver is None:
if hasattr(type(session), "hub_version_str"):
# introduced in koji 1.35, koji client can determine the
# hub version via a header that is available in every
# response (including the login response). That saves us
# an explicit getKojiVersion call, so use it if
# possible. However, because client sessions dynamically
# bind every missing attr as a virtual call, we'll need to
# look on the session's class for the hub_version_str
# property rather than using vars or the session itself
hub_ver = session.hub_version_str
else:
try:
hub_ver = session.getKojiVersion()
except GenericError:
pass
if hub_ver is None:
hub_ver = (1, 22)
elif isinstance(hub_ver, str):
hub_ver = tuple(map(_int, hub_ver.split(".")))
session_vars["__ksd_hub_version"] = hub_ver
return hub_ver
[docs]
def version_check(
session: ClientSession,
minimum: HubVersionSpec = (1, 23)) -> bool:
"""
Verifies that the requested minimum version is met compared
against session.getKojiVersion.
If the getKojiVersion method isn't implemented on the hub, we
presume that we're version 1.22 (the last version before
getKojiVersion was added). Because of this, checking for minimum
versions lower than 1.23 will always return True.
Version is specified as a tuple of integers, eg. 1.23 is ``(1,
23)``
:param session: active koji session
:param minimum: Minimum version required. Default, ``(1, 23)``
:since: 1.0
"""
if isinstance(minimum, str):
minimum = tuple(map(_int, minimum.split(".")))
hub_ver = hub_version(session)
return bool(hub_ver and hub_ver >= minimum)
[docs]
def version_require(
session: ClientSession,
minimum: HubVersionSpec = (1, 23),
message: Optional[str] = None) -> bool:
"""
Verifies that the requested minimum version is met compared
against ``session.getKojiVersion()``
If the getKojiVersion method isn't implemented on the hub, we
presume that we're version 1.22 (the last version before
getKojiVersion was added). Because of this, checking for minimum
versions lower than 1.23 will always return True.
Version is specified as a tuple of integers, eg. 1.23 is ``(1,
23)``
If the version requirement is not met, a `FeatureUnavailable`
exception is raised, with the given message. If message is not
provided, a simple one is constructed based on the minimum value.
:param session: active koji session
:param minimum: Minimum version required. Default, ``(1, 23)``
:param message: Message to use in exception if version check
fails. Default, with a minimum of ``(1, 23)``, ``"requires >=
1.23"``
:raises FeatureUnavailable: If the minimum version is not met
:since: 1.0
"""
if version_check(session, minimum=minimum):
return True
if message is None:
if isinstance(minimum, (list, tuple)):
minimum = ".".join(str(m) for m in minimum)
message = f"requires >= {minimum}"
raise FeatureUnavailable(message)
#
# The end.