Tests: implement system test framework

As discussed at length, this is the implementation of the new system
tests framework for shadow. This is a proof of concept that contains the
key elements to be able to run basic user (i.e. useradd, usermod) and
group (i.e. usermod) tests. If you like the framework the rest of the
functionality will be added in the future.

Some useful facts:
* It is implemented in python
* It is based on pytest and pytest-mh
* It works on all the distributions that are part of our CI
* It can be run in the cloud (VM or container) as well as on-premises
* After the execution of each test the environment is cleaned up
* Logs and other artifacts for failed tests are collected
* It has a rich API that can be extended and extended to cover new
  functionalities

Closes: https://github.com/shadow-maint/shadow/issues/835

Signed-off-by: Iker Pedrosa <ipedrosa@redhat.com>
This commit is contained in:
Iker Pedrosa 2024-10-07 15:44:17 +02:00 committed by Serge Hallyn
parent 6a2ab3d760
commit 128650dfd4
15 changed files with 1404 additions and 0 deletions

View File

View File

@ -0,0 +1,53 @@
from __future__ import annotations
from typing import Type
from pytest_mh import MultihostConfig, MultihostDomain, MultihostHost, MultihostRole
__all__ = [
"ShadowMultihostConfig",
"ShadowMultihostDomain",
]
class ShadowMultihostConfig(MultihostConfig):
@property
def id_to_domain_class(self) -> dict[str, Type[MultihostDomain]]:
"""
All domains are mapped to :class:`ShadowMultihostDomain`.
:rtype: Class name.
"""
return {"*": ShadowMultihostDomain}
class ShadowMultihostDomain(MultihostDomain[ShadowMultihostConfig]):
@property
def role_to_host_class(self) -> dict[str, Type[MultihostHost]]:
"""
Map roles to classes:
* shadow to ShadowHost
:rtype: Class name.
"""
from .hosts.shadow import ShadowHost
return {
"shadow": ShadowHost,
}
@property
def role_to_role_class(self) -> dict[str, Type[MultihostRole]]:
"""
Map roles to classes:
* shadow to Shadow
:rtype: Class name.
"""
from .roles.shadow import Shadow
return {
"shadow": Shadow,
}

View File

@ -0,0 +1,45 @@
"""Pytest fixtures."""
from __future__ import annotations
import os
import pytest
@pytest.fixture(scope="session")
def datadir(request: pytest.FixtureRequest) -> str:
"""
Data directory shared for all tests.
:return: Path to the data directory ``(root-pytest-dir)/data``.
:rtype: str
"""
return os.path.join(request.node.path, "data")
@pytest.fixture(scope="module")
def moduledatadir(datadir: str, request: pytest.FixtureRequest) -> str:
"""
Data directory shared for all tests within a single module.
:return: Path to the data directory ``(root-pytest-dir)/data/$module_name``.
:rtype: str
"""
name = request.module.__name__
return os.path.join(datadir, name)
@pytest.fixture(scope="function")
def testdatadir(moduledatadir: str, request: pytest.FixtureRequest) -> str:
"""
Data directory for current test.
:return: Path to the data directory ``(root-pytest-dir)/data/$module_name/$test_name``.
:rtype: str
"""
if not isinstance(request.node, pytest.Function):
raise TypeError(f"Excepted pytest.Function, got {type(request.node)}")
name = request.node.originalname
return os.path.join(moduledatadir, name)

View File

@ -0,0 +1,3 @@
"""shadow multihost hosts."""
from __future__ import annotations

View File

@ -0,0 +1,107 @@
"""Base classes and objects for shadow specific multihost hosts."""
from __future__ import annotations
import csv
from pytest_mh import MultihostBackupHost, MultihostHost
from pytest_mh.utils.fs import LinuxFileSystem
from ..config import ShadowMultihostDomain
__all__ = [
"BaseHost",
"BaseLinuxHost",
]
class BaseHost(MultihostBackupHost[ShadowMultihostDomain]):
"""
Base class for all shadow hosts.
"""
def __init__(self, *args, **kwargs) -> None:
# restore is handled in topology controllers
super().__init__(*args, **kwargs)
@property
def features(self) -> dict[str, bool]:
"""
Features supported by the host.
"""
return {}
class BaseLinuxHost(MultihostHost[ShadowMultihostDomain]):
"""
Base Linux host.
Adds linux specific reentrant utilities.
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.fs: LinuxFileSystem = LinuxFileSystem(self)
self._os_release: dict = {}
self._distro_name: str = "unknown"
self._distro_major: int = 0
self._distro_minor: int = 0
def _distro_information(self):
"""
Pulls distro information from a host from /ets/os-release
"""
self.logger.info(f"Detecting distro information on {self.hostname}")
os_release = self.fs.read("/etc/os-release")
self._os_release = dict(csv.reader([x for x in os_release.splitlines() if x], delimiter="="))
if "NAME" in self._os_release:
self._distro_name = self._os_release["NAME"]
if "VERSION_ID" not in self._os_release:
return
if "." in self._os_release["VERSION_ID"]:
self._distro_major = int(self._os_release["VERSION_ID"].split(".", maxsplit=1)[0])
self._distro_minor = int(self._os_release["VERSION_ID"].split(".", maxsplit=1)[1])
else:
self._distro_major = int(self._os_release["VERSION_ID"])
@property
def distro_name(self) -> str:
"""
Host distribution
:return: Distribution name or "unknown"
:rtype: str
"""
# NAME item from os-release
if not self._os_release:
self._distro_information()
return self._distro_name
@property
def distro_major(self) -> int:
"""
Host distribution major version
:return: Major version
:rtype: int
"""
# First part of VERSION_ID from os-release
# Returns zero when could not detect
if not self._os_release:
self._distro_information()
return self._distro_major
@property
def distro_minor(self) -> int:
"""
Host distribution minor version
:return: Minor version
:rtype: int
"""
# Second part of VERSION_ID from os-release
# Returns zero when no minor version is present
if not self._os_release:
self._distro_information()
return self._distro_minor

View File

@ -0,0 +1,175 @@
"""shadow multihost host."""
from __future__ import annotations
from pathlib import PurePosixPath
from typing import Any
from pytest_mh.conn import ProcessLogLevel
from .base import BaseHost, BaseLinuxHost
__all__ = [
"ShadowHost",
]
class ShadowHost(BaseHost, BaseLinuxHost):
"""
shadow host object.
This is the host where the tests are run.
.. note::
Full backup and restore of shadow state is supported.
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._features: dict[str, bool] | None = None
"""Features dictionary."""
self._backup_path: PurePosixPath | None = None
"""Path to backup files."""
self._verify_files: [dict[str, str]] = [
{"origin": "/etc/passwd", "backup": "passwd"},
{"origin": "/etc/shadow", "backup": "shadow"},
{"origin": "/etc/group", "backup": "group"},
{"origin": "/etc/gshadow", "backup": "gshadow"},
]
"""Files to verify for mismatch."""
def pytest_setup(self) -> None:
super().pytest_setup()
def start(self) -> None:
"""
Not supported.
:raises NotImplementedError: _description_
"""
raise NotImplementedError("Starting shadow service is not implemented.")
def stop(self) -> None:
"""
Not supported.
:raises NotImplementedError: _description_
"""
raise NotImplementedError("Stopping shadow service is not implemented.")
def backup(self) -> Any:
"""
Backup all shadow data.
:return: Backup data.
:rtype: Any
"""
self.logger.info("Creating backup of shadow host")
result = self.conn.run(
"""
set -ex
function backup {
if [ -d "$1" ] || [ -f "$1" ]; then
cp --force --archive "$1" "$2"
fi
}
path=`mktemp -d`
backup /etc/login.defs "$path/login.defs"
backup /etc/default/useradd "$path/useradd"
backup /etc/passwd "$path/passwd"
backup /etc/shadow "$path/shadow"
backup /etc/group "$path/group"
backup /etc/gshadow "$path/gshadow"
backup /etc/subuid "$path/subuid"
backup /etc/subgid "$path/subgid"
backup /home "$path/home"
backup /var/log/secure "$path/secure"
echo $path
""",
log_level=ProcessLogLevel.Error,
)
self._backup_path = PurePosixPath(result.stdout_lines[-1].strip())
return PurePosixPath(result.stdout_lines[-1].strip())
def restore(self, backup_data: Any | None) -> None:
"""
Restore all shadow data.
:return: Backup data.
:rtype: Any
"""
if backup_data is None:
return
if not isinstance(backup_data, PurePosixPath):
raise TypeError(f"Expected PurePosixPath, got {type(backup_data)}")
backup_path = str(backup_data)
self.logger.info(f"Restoring shadow data from {backup_path}")
self.conn.run(
f"""
set -ex
function restore {{
rm --force --recursive "$2"
if [ -d "$1" ] || [ -f "$1" ]; then
cp --force --archive "$1" "$2"
fi
}}
rm --force --recursive /var/log/secure
restore "{backup_path}/login.defs" /etc/login.defs
restore "{backup_path}/useradd" /etc/default/useradd
restore "{backup_path}/passwd" /etc/passwd
restore "{backup_path}/shadow" /etc/shadow
restore "{backup_path}/group" /etc/group
restore "{backup_path}/gshadow" /etc/gshadow
restore "{backup_path}/subuid" /etc/subuid
restore "{backup_path}/subgid" /etc/subgid
restore "{backup_path}/home" /home
restore "{backup_path}/secure" /var/log/secure
""",
log_level=ProcessLogLevel.Error,
)
def detect_file_mismatches(self) -> None:
"""
Shadow binaries modify a number of files, but usually do not modify all of them. This is why we add an
additional check at the end of the test to verify that the files that should not have been modified are still
intact.
"""
self.logger.info(f"Detecting mismatches in shadow files {self._backup_path}")
for x in self._verify_files:
result = self.conn.run(
f"""
set -ex
cmp {x['origin']} {self._backup_path}/{x['backup']}
""",
log_level=ProcessLogLevel.Error,
raise_on_error=False,
)
if result.rc != 0:
self.logger.error(f"File mismatch in '{x['origin']}' and '{self._backup_path}/{x['backup']}'")
result.throw()
def discard_file(self, origin: str) -> None:
"""
Discard modified files from the files that should be verified.
"""
for x in self._verify_files:
if x["origin"] == origin:
self._verify_files.remove(x)
break

View File

@ -0,0 +1,100 @@
"""Pytest fixtures."""
from __future__ import annotations
from functools import partial
import pytest
from pytest_mh import MultihostItemData, Topology
from .misc import to_list_of_strings
from .roles.base import BaseRole
from .topology import KnownTopology, KnownTopologyGroup
def pytest_configure(config: pytest.Config):
"""
Pytest hook: register multihost plugin.
"""
# register additional markers
config.addinivalue_line(
"markers",
"builtwith(feature): Run test only if shadow was built with given feature",
)
def builtwith(item: pytest.Function, requirements: dict[str, str], **kwargs: BaseRole):
def value_error(msg: str) -> ValueError:
return ValueError(f"{item.nodeid}::{item.originalname}: @pytest.mark.builtwith: {msg}")
errors: list[str] = []
for role, features in requirements.items():
if role not in kwargs:
raise value_error(f"unknown fixture '{role}'")
if not isinstance(kwargs[role], BaseRole):
raise value_error(f"fixture '{role}' is not instance of BaseRole")
obj = kwargs[role]
for feature in to_list_of_strings(features):
if feature not in obj.features:
raise value_error(f"unknown feature '{feature}' in '{role}'")
if not obj.features[feature]:
errors.append(f'{role} does not support "{feature}"')
if len(errors) == 1:
return (False, errors[0])
elif len(errors) > 1:
return (False, str(errors))
# All requirements were passed
return True
@pytest.hookimpl(tryfirst=True)
def pytest_runtest_setup(item: pytest.Item) -> None:
if not isinstance(item, pytest.Function):
raise TypeError(f"Unexpected item type: {type(item)}")
topology: list[Topology] = []
mh_item_data: MultihostItemData | None = MultihostItemData.GetData(item)
for mark in item.iter_markers("builtwith"):
requirements: dict[str, str] = {}
if len(mark.args) == 1 and not mark.kwargs:
# @pytest.mark.builtwith("feature_x")
# -> check if "feature_x" is supported by shadow
requirements["shadow"] = mark.args[0]
topology = []
elif not mark.args and mark.kwargs:
# @pytest.mark.builtwith(shadow="feature_x", another_host="feature_x") ->
# -> check if "feature_x" is supported by both shadow and another_host
requirements = dict(mark.kwargs)
topology = []
elif (
len(mark.args) == 1
and isinstance(mark.args[0], (Topology, KnownTopology, KnownTopologyGroup))
and mark.kwargs
):
# @pytest.mark.builtwith(KnownTopology.Shadow, shadow="feature_x") ->
# -> check if "feature_x" is supported by shadow only if the test runs on shadow topology
requirements = dict(mark.kwargs)
if isinstance(mark.args[0], Topology):
topology = [mark.args[0]]
elif isinstance(mark.args[0], KnownTopology):
topology = [mark.args[0].value.topology]
elif isinstance(mark.args[0], KnownTopologyGroup):
topology = [x.value.topology for x in mark.args[0].value]
else:
raise ValueError(f"{item.nodeid}::{item.originalname}: invalid arguments for @pytest.mark.builtwith")
if mh_item_data is None:
raise ValueError(f"{item.nodeid}::{item.originalname}: multihost item data is not set")
if mh_item_data.topology_mark is None:
raise ValueError(f"{item.nodeid}::{item.originalname}: multihost topology mark is not set")
if not topology or mh_item_data.topology_mark.topology in topology:
item.add_marker(pytest.mark.require(partial(builtwith, item=item, requirements=requirements)))

View File

@ -0,0 +1,42 @@
"""Miscellaneous functions."""
from __future__ import annotations
from typing import Any
def to_list(value: Any | list[Any] | None) -> list[Any]:
"""
Convert value into a list.
- if value is ``None`` then return an empty list
- if value is already a list then return it unchanged
- if value is not a list then return ``[value]``
:param value: Value that should be converted to a list.
:type value: Any | list[Any] | None
:return: List with the value as an element.
:rtype: list[Any]
"""
if value is None:
return []
if isinstance(value, list):
return value
return [value]
def to_list_of_strings(value: Any | list[Any] | None) -> list[str]:
"""
Convert given list or single value to list of strings.
The ``value`` is first converted to a list and then ``str(item)`` is run on
each of its item.
:param value: Value to convert.
:type value: Any | list[Any] | None
:return: List of strings.
:rtype: list[str]
"""
return [str(x) for x in to_list(value)]

View File

@ -0,0 +1,42 @@
from __future__ import annotations
class ExpectScriptError(Exception):
"""
Expect script error.
Seeing this exception means that there is an unhandled path or other error
in the expect script that was executed. The script needs to be fixed.
"""
def __init__(self, code: int, msg: str | None = None) -> None:
"""
:param code: Expect script error code.
:type code: int
:param msg: Error message, defaults to None (translate error code to message)
:type msg: str | None, optional
"""
self.code: int = code
if msg is None:
msg = self.code_to_message(code)
super().__init__(msg)
def code_to_message(self, code: int) -> str:
"""
Translate expect script error codes used in this framework to message.
:param code: Expect script error code.
:type code: int
:return: Error message.
:rtype: str
"""
match code:
case 201:
return "Timeout, unexpected output"
case 202:
return "Unexpected end of file"
case 203:
return "Unexpected code path"
return "Unknown error code"

View File

@ -0,0 +1,3 @@
"""shadow multihost roles."""
from __future__ import annotations

View File

@ -0,0 +1,172 @@
"""Base classes and objects for shadow specific multihost roles."""
from __future__ import annotations
from typing import Any, Generic, TypeGuard, TypeVar
from pytest_mh import MultihostRole
from pytest_mh.cli import CLIBuilder
from pytest_mh.conn import Bash, Shell
from pytest_mh.conn.ssh import SSHClient
from pytest_mh.utils.coredumpd import Coredumpd
from pytest_mh.utils.firewall import Firewalld
from pytest_mh.utils.fs import LinuxFileSystem
from pytest_mh.utils.journald import JournaldUtils
from pytest_mh.utils.tc import LinuxTrafficControl
from ..hosts.base import BaseHost
from ..utils.tools import LinuxToolsUtils
HostType = TypeVar("HostType", bound=BaseHost)
RoleType = TypeVar("RoleType", bound=MultihostRole)
__all__ = [
"HostType",
"RoleType",
"DeleteAttribute",
"BaseObject",
"BaseRole",
"BaseLinuxRole",
]
class DeleteAttribute(object):
"""
This class is used to distinguish between setting an attribute to an empty
value and deleting it completely.
"""
pass
class BaseObject(Generic[HostType, RoleType]):
"""
Base class for object management classes (like users or groups).
It provides shortcuts to low level functionality to easily enable execution
of remote commands. It also defines multiple helper methods that are shared
across roles.
"""
def __init__(self, role: RoleType) -> None:
self.role: RoleType = role
"""Multihost role object."""
self.host: HostType = role.host
"""Multihost host object."""
self.cli: CLIBuilder = self.host.cli
"""Command line builder to easy build command line for execution."""
class BaseRole(MultihostRole[HostType]):
"""
Base role class. Roles are the main interface to the remote hosts that can
be directly accessed in test cases as fixtures.
All changes to the remote host that were done through the role object API
are automatically reverted when a test is finished.
"""
Delete: DeleteAttribute = DeleteAttribute()
"""
Use this to indicate that you want to delete an attribute instead of setting
it to an empty value.
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
def is_delete_attribute(self, value: Any) -> TypeGuard[DeleteAttribute]:
"""
Return ``True`` if the value is :attr:`DeleteAttribute`
:param value: Value to test.
:type value: Any
:return: Return ``True`` if the value is :attr:`DeleteAttribute`
:rtype: TypeGuard[DeleteAttribute]
"""
return isinstance(value, DeleteAttribute)
@property
def features(self) -> dict[str, bool]:
"""
Features supported by the role.
"""
return self.host.features
def ssh(self, user: str, password: str, *, shell: Shell | None = None) -> SSHClient:
"""
Open SSH connection to the host as given user.
:param user: Username.
:type user: str
:param password: User password.
:type password: str
:param shell: Shell that will run the commands, defaults to ``None`` (= ``Bash``)
:type shell: Shell | None, optional
:return: SSH client connection.
:rtype: SSHClient
"""
if shell is None:
shell = Bash()
host = self.host.hostname
port = 22
if isinstance(self.host.conn, SSHClient):
host = getattr(self.host.conn, "host", host)
port = getattr(self.host.conn, "port", 22)
return SSHClient(
host=host,
port=port,
user=user,
password=password,
shell=shell,
logger=self.logger,
)
class BaseLinuxRole(BaseRole[HostType]):
"""
Base linux role.
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.fs: LinuxFileSystem = LinuxFileSystem(self.host)
"""
File system manipulation.
"""
self.firewall: Firewalld = Firewalld(self.host).postpone_setup()
"""
Configure firewall using firewalld.
"""
self.tc: LinuxTrafficControl = LinuxTrafficControl(self.host).postpone_setup()
"""
Traffic control manipulation.
"""
self.tools: LinuxToolsUtils = LinuxToolsUtils(self.host)
"""
Standard tools interface.
"""
self.journald: JournaldUtils = JournaldUtils(self.host)
"""
Journald utilities.
"""
coredumpd_config = self.host.config.get("coredumpd", {})
coredumpd_mode = coredumpd_config.get("mode", "ignore")
coredumpd_filter = coredumpd_config.get("filter", None)
self.coredumpd: Coredumpd = Coredumpd(self.host, self.fs, mode=coredumpd_mode, filter=coredumpd_filter)
"""
Coredumpd utilities.
"""

View File

@ -0,0 +1,129 @@
"""shadow multihost role."""
from __future__ import annotations
import shlex
from typing import Dict
from pytest_mh.conn import ProcessLogLevel, ProcessResult
from ..hosts.shadow import ShadowHost
from .base import BaseLinuxRole
__all__ = [
"Shadow",
]
class Shadow(BaseLinuxRole[ShadowHost]):
"""
shadow role.
Provides unified Python API for managing and testing shadow.
"""
def __init__(self, *args, **kwargs) -> None:
"""
Set up the environment.
"""
super().__init__(*args, **kwargs)
def teardown(self) -> None:
"""
Detect file mismatches before cleaning up the environment.
"""
self.host.detect_file_mismatches()
"""
Clean up the environment.
"""
super().teardown()
def _parse_args(self, *args) -> Dict[str, str]:
args_list = shlex.split(*args[0])
name = args_list[-1]
return {"name": name}
def useradd(self, *args) -> ProcessResult:
"""
Create user.
"""
args_dict = self._parse_args(args)
self.logger.info(f'Creating user "{args_dict["name"]}" on {self.host.hostname}')
cmd = self.host.conn.run("useradd " + args[0], log_level=ProcessLogLevel.Error)
self.host.discard_file("/etc/passwd")
self.host.discard_file("/etc/shadow")
self.host.discard_file("/etc/group")
self.host.discard_file("/etc/gshadow")
return cmd
def usermod(self, *args) -> ProcessResult:
"""
Modify user.
"""
args_dict = self._parse_args(args)
self.logger.info(f'Modifying user "{args_dict["name"]}" on {self.host.hostname}')
cmd = self.host.conn.run("usermod " + args[0], log_level=ProcessLogLevel.Error)
self.host.discard_file("/etc/passwd")
self.host.discard_file("/etc/shadow")
self.host.discard_file("/etc/group")
self.host.discard_file("/etc/gshadow")
return cmd
def userdel(self, *args) -> ProcessResult:
"""
Delete user.
"""
args_dict = self._parse_args(args)
self.logger.info(f'Deleting user "{args_dict["name"]}" on {self.host.hostname}')
cmd = self.host.conn.run("userdel " + args[0], log_level=ProcessLogLevel.Error)
self.host.discard_file("/etc/passwd")
self.host.discard_file("/etc/shadow")
self.host.discard_file("/etc/group")
self.host.discard_file("/etc/gshadow")
return cmd
def groupadd(self, *args) -> ProcessResult:
"""
Create group.
"""
args_dict = self._parse_args(args)
self.logger.info(f'Creating group "{args_dict["name"]}" on {self.host.hostname}')
cmd = self.host.conn.run("groupadd " + args[0], log_level=ProcessLogLevel.Error)
self.host.discard_file("/etc/group")
self.host.discard_file("/etc/gshadow")
return cmd
def groupmod(self, *args) -> ProcessResult:
"""
Modify group.
"""
args_dict = self._parse_args(args)
self.logger.info(f'Modifying group "{args_dict["name"]}" on {self.host.hostname}')
cmd = self.host.conn.run("groupmod " + args[0], log_level=ProcessLogLevel.Error)
self.host.discard_file("/etc/group")
self.host.discard_file("/etc/gshadow")
return cmd
def groupdel(self, *args) -> ProcessResult:
"""
Delete group.
"""
args_dict = self._parse_args(args)
self.logger.info(f'Deleting group "{args_dict["name"]}" on {self.host.hostname}')
cmd = self.host.conn.run("groupdel " + args[0], log_level=ProcessLogLevel.Error)
self.host.discard_file("/etc/group")
self.host.discard_file("/etc/gshadow")
return cmd

View File

@ -0,0 +1,55 @@
"""Predefined well-known topologies."""
from __future__ import annotations
from enum import unique
from typing import final
from pytest_mh import KnownTopologyBase, KnownTopologyGroupBase, Topology, TopologyDomain, TopologyMark
__all__ = [
"KnownTopology",
"KnownTopologyGroup",
]
@final
@unique
class KnownTopology(KnownTopologyBase):
"""
Well-known topologies that can be given to ``pytest.mark.topology``
directly. It is expected to use these values in favor of providing
custom marker values.
.. code-block:: python
:caption: Example usage
@pytest.mark.topology(KnownTopology.Shadow)
def test_ldap(shadow: Shadow):
assert True
"""
Shadow = TopologyMark(
name="shadow",
topology=Topology(TopologyDomain("shadow", shadow=1)),
fixtures=dict(shadow="shadow.shadow[0]"),
)
class KnownTopologyGroup(KnownTopologyGroupBase):
"""
Groups of well-known topologies that can be given to ``pytest.mark.topology``
directly. It is expected to use these values in favor of providing
custom marker values.
The test is parametrized and runs multiple times, once per each topology.
.. code-block:: python
:caption: Example usage (runs on Shadow topology)
@pytest.mark.topology(KnownTopologyGroup.AnyProvider)
def test_ldap(shadow: Shadow):
assert True
"""
AnyProvider = [KnownTopology.Shadow]

View File

@ -0,0 +1,3 @@
"""shadow multihost utils used by roles."""
from __future__ import annotations

View File

@ -0,0 +1,475 @@
"""Run various standard Linux commands on remote host."""
from __future__ import annotations
from typing import Any
import jc
from pytest_mh import MultihostHost, MultihostUtility
from pytest_mh.conn import Process
__all__ = [
"UnixObject",
"UnixUser",
"UnixGroup",
"IdEntry",
"PasswdEntry",
"GroupEntry",
"InitgroupsEntry",
"LinuxToolsUtils",
"KillCommand",
"GetentUtils",
]
class UnixObject(object):
"""
Generic Unix object.
"""
def __init__(self, id: int | None, name: str | None) -> None:
"""
:param id: Object ID.
:type id: int | None
:param name: Object name.
:type name: str | None
"""
self.id: int | None = id
"""
ID.
"""
self.name: str | None = name
"""
Name.
"""
def __str__(self) -> str:
return f'({self.id},"{self.name}")'
def __repr__(self) -> str:
return str(self)
def __eq__(self, o: object) -> bool:
if isinstance(o, str):
return o == self.name
elif isinstance(o, int):
return o == self.id
elif isinstance(o, tuple):
if len(o) != 2 or not isinstance(o[0], int) or not isinstance(o[1], str):
raise NotImplementedError(f"Unable to compare {type(o)} with {self.__class__}")
(id, name) = o
return id == self.id and name == self.name
elif isinstance(o, UnixObject):
# Fallback to identity comparison
return NotImplemented
raise NotImplementedError(f"Unable to compare {type(o)} with {self.__class__}")
class UnixUser(UnixObject):
"""
Unix user.
"""
pass
class UnixGroup(UnixObject):
"""
Unix group.
"""
pass
class IdEntry(object):
"""
Result of ``id``
"""
def __init__(self, user: UnixUser, group: UnixGroup, groups: list[UnixGroup]) -> None:
self.user: UnixUser = user
"""
User information.
"""
self.group: UnixGroup = group
"""
Primary group.
"""
self.groups: list[UnixGroup] = groups
"""
Secondary groups.
"""
def memberof(self, groups: int | str | tuple[int, str] | list[int | str | tuple[int, str]]) -> bool:
"""
Check if the user is member of give group(s).
Group specification can be either a single gid or group name. But it can
be also a tuple of (gid, name) where both gid and name must match or list
of groups where the user must be member of all given groups.
:param groups: _description_
:type groups: int | str | tuple
:return: _description_
:rtype: bool
"""
if isinstance(groups, (int, str, tuple)):
return groups in self.groups
return all(x in self.groups for x in groups)
def __str__(self) -> str:
return f"{{user={str(self.user)},group={str(self.group)},groups={str(self.groups)}}}"
def __repr__(self) -> str:
return str(self)
@classmethod
def FromDict(cls, d: dict[str, Any]) -> IdEntry:
user = UnixUser(d["uid"]["id"], d["uid"].get("name", None))
group = UnixGroup(d["gid"]["id"], d["gid"].get("name", None))
groups = []
for secondary_group in d["groups"]:
groups.append(UnixGroup(secondary_group["id"], secondary_group.get("name", None)))
return cls(user, group, groups)
@classmethod
def FromOutput(cls, stdout: str) -> IdEntry:
jcresult = jc.parse("id", stdout)
if not isinstance(jcresult, dict):
raise TypeError(f"Unexpected type: {type(jcresult)}, expecting dict")
return cls.FromDict(jcresult)
class PasswdEntry(object):
"""
Result of ``getent passwd``
"""
def __init__(self, name: str, password: str, uid: int, gid: int, gecos: str, home: str, shell: str) -> None:
self.name: str | None = name
"""
User name.
"""
self.password: str | None = password
"""
User password.
"""
self.uid: int = uid
"""
User id.
"""
self.gid: int = gid
"""
Group id.
"""
self.gecos: str | None = gecos
"""
GECOS.
"""
self.home: str | None = home
"""
Home directory.
"""
self.shell: str | None = shell
"""
Login shell.
"""
def __str__(self) -> str:
return f"({self.name}:{self.password}:{self.uid}:{self.gid}:{self.gecos}:{self.home}:{self.shell})"
def __repr__(self) -> str:
return str(self)
@classmethod
def FromDict(cls, d: dict[str, Any]) -> PasswdEntry:
return cls(
name=d.get("username", None),
password=d.get("password", None),
uid=d.get("uid", None),
gid=d.get("gid", None),
gecos=d.get("comment", None),
home=d.get("home", None),
shell=d.get("shell", None),
)
@classmethod
def FromOutput(cls, stdout: str) -> PasswdEntry:
result = jc.parse("passwd", stdout)
if not isinstance(result, list):
raise TypeError(f"Unexpected type: {type(result)}, expecting list")
if len(result) != 1:
raise ValueError("More then one entry was returned")
return cls.FromDict(result[0])
class GroupEntry(object):
"""
Result of ``getent group``
"""
def __init__(self, name: str, password: str, gid: int, members: list[str]) -> None:
self.name: str | None = name
"""
Group name.
"""
self.password: str | None = password
"""
Group password.
"""
self.gid: int = gid
"""
Group id.
"""
self.members: list[str] = members
"""
Group members.
"""
def __str__(self) -> str:
return f'({self.name}:{self.password}:{self.gid}:{",".join(self.members)})'
def __repr__(self) -> str:
return str(self)
@classmethod
def FromDict(cls, d: dict[str, Any]) -> GroupEntry:
return cls(
name=d.get("group_name", None),
password=d.get("password", None),
gid=d.get("gid", None),
members=d.get("members", []),
)
@classmethod
def FromOutput(cls, stdout: str) -> GroupEntry:
result = jc.parse("group", stdout)
if not isinstance(result, list):
raise TypeError(f"Unexpected type: {type(result)}, expecting list")
if len(result) != 1:
raise ValueError("More then one entry was returned")
return cls.FromDict(result[0])
class InitgroupsEntry(object):
"""
Result of ``getent initgroups``
If user does not exist or does not have any supplementary groups then ``self.groups`` is empty.
"""
def __init__(self, name: str, groups: list[int]) -> None:
self.name: str = name
"""
Exact username for which ``initgroups`` was called
"""
self.groups: list[int] = groups
"""
Group ids that ``name`` is member of.
"""
def __str__(self) -> str:
return f'({self.name}:{",".join([str(i) for i in self.groups])})'
def __repr__(self) -> str:
return str(self)
def memberof(self, groups: list[int]) -> bool:
"""
Check if the user is member of given groups.
This method checks only supplementary groups not the primary group.
:param groups: List of group ids
:type groups: list[int]
:return: If user is member of all given groups True, otherwise False.
:rtype: bool
"""
return all(x in self.groups for x in groups)
@classmethod
def FromDict(cls, d: dict[str, Any]) -> InitgroupsEntry:
return cls(
name=d["name"],
groups=d.get("groups", []),
)
@classmethod
def FromOutput(cls, stdout: str) -> InitgroupsEntry:
result: list[str] = stdout.split()
dictionary: dict[str, str | list[int]] = {}
dictionary["name"] = result[0]
if len(result) > 1:
dictionary["groups"] = [int(x) for x in result[1:]]
return cls.FromDict(dictionary)
class LinuxToolsUtils(MultihostUtility[MultihostHost]):
"""
Run various standard commands on remote host.
"""
def __init__(self, host: MultihostHost) -> None:
"""
:param host: Remote host.
:type host: MultihostHost
"""
super().__init__(host)
self.getent: GetentUtils = GetentUtils(host)
"""
Run ``getent`` command.
"""
def id(self, name: str | int) -> IdEntry | None:
"""
Run ``id`` command.
:param name: User name or id.
:type name: str | int
:return: id data, None if not found
:rtype: IdEntry | None
"""
command = self.host.conn.exec(["id", name], raise_on_error=False)
if command.rc != 0:
return None
return IdEntry.FromOutput(command.stdout)
def grep(self, pattern: str, paths: str | list[str], args: list[str] | None = None) -> bool:
"""
Run ``grep`` command.
:param pattern: Pattern to match.
:type pattern: str
:param paths: Paths to search.
:type paths: str | list[str]
:param args: Additional arguments to ``grep`` command, defaults to None.
:type args: list[str] | None, optional
:return: True if grep returned 0, False otherwise.
:rtype: bool
"""
if args is None:
args = []
paths = [paths] if isinstance(paths, str) else paths
command = self.host.conn.exec(["grep", *args, pattern, *paths])
return command.rc == 0
class KillCommand(object):
def __init__(self, host: MultihostHost, process: Process, pid: int) -> None:
self.host = host
self.process = process
self.pid = pid
self.__killed: bool = False
def kill(self) -> None:
if self.__killed:
return
self.host.conn.exec(["kill", self.pid])
self.__killed = True
def __enter__(self) -> KillCommand:
return self
def __exit__(self, exception_type, exception_value, traceback) -> None:
self.kill()
self.process.wait()
class GetentUtils(MultihostUtility[MultihostHost]):
"""
Interface to getent command.
"""
def __init__(self, host: MultihostHost) -> None:
"""
:param host: Remote host.
:type host: MultihostHost
"""
super().__init__(host)
def passwd(self, name: str | int, *, service: str | None = None) -> PasswdEntry | None:
"""
Call ``getent passwd $name``
:param name: User name or id.
:type name: str | int
:param service: Service used, defaults to None
:type service: str | None
:return: passwd data, None if not found
:rtype: PasswdEntry | None
"""
return self.__exec(PasswdEntry, "passwd", name, service)
def group(self, name: str | int, *, service: str | None = None) -> GroupEntry | None:
"""
Call ``getent group $name``
:param name: Group name or id.
:type name: str | int
:param service: Service used, defaults to None
:type service: str | None
:return: group data, None if not found
:rtype: PasswdEntry | None
"""
return self.__exec(GroupEntry, "group", name, service)
def initgroups(self, name: str, *, service: str | None = None) -> InitgroupsEntry:
"""
Call ``getent initgroups $name``
If ``name`` does not exist, group list is empty. This is standard behavior of ``getent initgroups``
:param name: User name.
:type name: str
:param service: Service used, defaults to None
:type service: str | None
:return: Initgroups data
:rtype: InitgroupsEntry
"""
return self.__exec(InitgroupsEntry, "initgroups", name, service)
def __exec(self, cls, cmd: str, name: str | int, service: str | None = None) -> Any:
args = []
if service is not None:
args = ["-s", service]
command = self.host.conn.exec(["getent", *args, cmd, name], raise_on_error=False)
if command.rc != 0:
return None
return cls.FromOutput(command.stdout)