Source code for kojismokydingo.users

# This library is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this library; if not, see <http://www.gnu.org/licenses/>.


"""
Koji Smoki Dingo - users and permissions

:author: Christopher O'Brien <obriencj@gmail.com>
:license: GPL v3
"""


from koji import ClientSession, GenericError, ParameterError
from operator import itemgetter
from time import asctime, localtime
from typing import List, Optional, Union, cast

from . import (
    NoSuchContentGenerator, NoSuchPermission, NoSuchUser,
    as_userinfo, bulk_load_users, version_check, )
from .types import (
    CGInfo, DecoratedPermInfo, DecoratedUserInfo, NamedCGInfo,
    PermSpec, PermUser, UserInfo, UserSpec, UserStatistics,
    UserGroup, UserType, )


__all__ = (
    "collect_cg_access",
    "collect_cgs",
    "collect_perminfo",
    "collect_userinfo",
    "collect_userstats",
    "get_group_members",
    "get_user_groups",
)


[docs] def collect_userstats( session: ClientSession, user: UserSpec) -> UserStatistics: """ Collects user statistics into a dict :param session: an active koji client session :param user: name of a user or their kerberos ID :raises NoSuchUser: if user is an ID or name which cannot be resolved :since: 2.0 """ userinfo = as_userinfo(session, user) with session.multicall() as mc: build_count = mc.listBuilds(userID=userinfo['id'], queryOpts={'countOnly': True}) package_count = mc.listPackages(userID=userinfo['id'], with_dups=True, queryOpts={'countOnly': True}) task_count = mc.listTasks(opts={'owner': userinfo['id'], 'parent': None}, queryOpts={'countOnly': True}) last_build = mc.listBuilds(userID=userinfo['id'], queryOpts={'limit': 1, 'order': '-build_id'}) last_task = mc.listTasks(opts={'owner': userinfo['id'], 'parent': None}, queryOpts={'limit': 1, 'order': '-id'}) stats: UserStatistics = { # I haven't figured out how to indicate that the queryOpts # with countOnly set changes the return type to int. 'build_count': build_count.result, # type: ignore 'package_count': package_count.result, # type: ignore 'task_count': task_count.result, # type: ignore # just need to unwrap the list if any 'last_build': last_build.result[0] if last_build.result else None, 'last_task': last_task.result[0] if last_task.result else None, } return stats
[docs] def get_user_groups( session: ClientSession, user: UserSpec) -> List[UserGroup]: """ Identify groups that a user is a member of :param session: an active koji client session :param user: name or ID of a potential member :returns: list of groups that user is a member of :raises NoSuchUser: if user could not be found :since: 2.2 """ user = as_userinfo(session, user) uid = user["id"] if version_check(session, (1, 35)): # added in 1.35 return session.getUserGroups(uid) or [] else: hist = session.queryHistory(tables=["user_groups"], active=True) return [{"name": g["group.name"], "id": g["group_id"]} for g in hist["user_groups"] if g["user_id"] == uid]
[docs] def get_group_members( session: ClientSession, user: UserSpec) -> List[UserInfo]: """ An anonymous version of the admin-only ``getGroupMembers`` hub API call. Uses ``queryHistory`` to gather still-active group additions :param session: an active koji client session :param user: name or ID of a user group :returns: list of users that are members of the given group :raises NoSuchUser: if no matching user group was found :since: 2.2 """ # getGroupMembers returns a list of dicts, with keys: id, # krb_principals, name, usertype. However, the call requires the # admin permission prior to 1.35 # queryHistory is anonymous, and returns a dict mapping table to a # list of events. In those events are keys: "user.name" and # "user_id" which we can use to then lookup the rest of the # information user = as_userinfo(session, user) if user["usertype"] != UserType.GROUP: # we shortcut this because querying by a non-existent group ID # causes hub to return all group memberships return [] if version_check(session, (1, 35)): return session.getGroupMembers(user["id"]) or [] else: hist = session.queryHistory(tables=["user_groups"], active=True, user=user["id"]) uids = map(itemgetter("user_id"), hist["user_groups"]) found = bulk_load_users(session, uids, err=False) return list(filter(None, found.values()))
[docs] def collect_userinfo( session: ClientSession, user: UserSpec, stats: bool = False, members: bool = False) -> DecoratedUserInfo: """ Gather information about a named user, including the list of permissions the user has. Will convert the older `'krb_principal'` value (koji < 1.19) into a `'krb_principals'` list (koji >= 1.19) to provide some level of uniformity. :param session: an active koji client session :param user: name of a user or their kerberos ID :param stats: collect user statistics :param members: look up group members and memberships :raises NoSuchUser: if user is an ID or name which cannot be resolved :since: 2.2 """ userinfo = cast(DecoratedUserInfo, as_userinfo(session, user)) # depending on koji version, getUser resulted in either a # krb_principal or krb_principals entry (or neither if it's not # set up for kerberos). Let's normalize on the newer # krb_principals one by converting. See # https://pagure.io/koji/issue/1629 if "krb_principal" in userinfo: krb = userinfo["krb_principal"] userinfo["krb_principals"] = [krb] if krb else [] uid = userinfo["id"] userinfo["permissions"] = session.getUserPerms(uid) userinfo["content_generators"] = collect_cg_access(session, userinfo) ut = userinfo.get("usertype", UserType.NORMAL) if ut == UserType.NORMAL: if stats: userinfo["statistics"] = collect_userstats(session, userinfo) if members: userinfo["ksd_groups"] = get_user_groups(session, uid) elif ut == UserType.GROUP: if members: userinfo["ksd_members"] = get_group_members(session, uid) return userinfo
[docs] def collect_cg_access( session: ClientSession, user: UserSpec) -> List[NamedCGInfo]: """ List of content generators user has access to run CGImport with. :param session: an active koji client session :param user: Name, ID, or userinfo dict :raises NoSuchUser: if user is an ID or name which cannot be resolved :since: 1.0 """ userinfo = as_userinfo(session, user) username = userinfo["name"] found = [] for cgname, val in session.listCGs().items(): if username in val.get("users", ()): nval = cast(NamedCGInfo, val) nval["name"] = cgname found.append(nval) return found
[docs] def collect_cgs( session: ClientSession, name: Optional[str] = None) -> List[NamedCGInfo]: """ :param name: only collect the given CG. Default, collect all :raises NoSuchContentGenerator: if name is specified and no content generator matches :since: 1.0 """ cgs = session.listCGs() if name: # filter the cgs dict down to just the named one if name in cgs: cgs = {name: cgs[name]} else: raise NoSuchContentGenerator(name) result = [] # convert the cgs dict into a list, augmenting the cg data with # its own name for name, cg in cgs.items(): ncg = cast(NamedCGInfo, cg) ncg["name"] = name result.append(ncg) return result
[docs] def collect_perminfo( session: ClientSession, permission: PermSpec) -> DecoratedPermInfo: """ Gather information about a named permission, including the list of users granted the permission, as well as the date that the permission was granted and the user that granted it. :param session: an active koji client session :param permission: the ID or name of the permission :raises NoSuchPermission: if there is no matching permission found :since: 1.0 """ if isinstance(permission, int): field = itemgetter("id") else: field = itemgetter("name") for p in session.getAllPerms(): if field(p) == permission: pinfo = cast(DecoratedPermInfo, p) break else: raise NoSuchPermission(permission) users = session.queryHistory(tables=["user_perms"], permission=permission, active=1)["user_perms"] for user in users: user["user_name"] = user.pop("user.name") user["permission_name"] = user.pop("permission.name") user["create_date"] = asctime(localtime(user["create_ts"])) pinfo["users"] = cast(List[PermUser], users) return pinfo
# # The end.