diff --git a/plugins/module_utils/_image_name.py b/plugins/module_utils/_image_name.py new file mode 100644 index 00000000..84c801b7 --- /dev/null +++ b/plugins/module_utils/_image_name.py @@ -0,0 +1,117 @@ +# Copyright (c) 2025 Felix Fontein +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +# Note that this module util is **PRIVATE** to the collection. It can have breaking changes at any time. +# Do not use this from other collections or standalone plugins/modules! + +from __future__ import annotations + +import re +import typing as t +from dataclasses import dataclass + + +_PATH_RE = re.compile( + r"^[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*(\/[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*)*$" +) +_TAG_RE = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}$") +_DIGEST_RE = re.compile(r"^sha256:[0-9a-fA-F]{64}$") + + +def is_digest(name: str, allow_empty: bool = False) -> bool: + """Check whether the given name is in fact an image ID (hash).""" + if not name: + return allow_empty + return _DIGEST_RE.match(name) is not None + + +def is_tag(name: str, allow_empty: bool = False) -> bool: + """Check whether the given name can be an image tag.""" + if not name: + return allow_empty + return _TAG_RE.match(name) is not None + + +@dataclass +class ImageName: + registry: str | None + path: str + tag: str | None + digest: str | None + + @classmethod + def parse(cls, name: str) -> t.Self: + registry: str | None = None + tag: str | None = None + digest: str | None = None + parts = name.rsplit("@", 1) + if len(parts) == 2: + name, digest = parts + parts = name.rsplit(":", 1) + if len(parts) == 2 and "/" not in parts[1]: + name, tag = parts + parts = name.split("/", 1) + if len(parts) == 2 and ( + "." in parts[0] or ":" in parts[0] or parts[0] == "localhost" + ): + registry, name = parts + return cls(registry, name, tag, digest) + + def validate(self) -> t.Self: + if self.registry: + if self.registry[0] == "-" or self.registry[-1] == "-": + raise ValueError( + f'Invalid registry name ({self.registry}): must not begin or end with a "-".' + ) + if self.registry[-1] == ":": + raise ValueError( + f'Invalid registry name ({self.registry}): must not end with ":".' + ) + if not _PATH_RE.match(self.path): + raise ValueError(f"Invalid path ({self.path}).") + if self.tag and not is_tag(self.tag): + raise ValueError(f"Invalid tag ({self.tag}).") + if self.digest and not is_digest(self.digest): + raise ValueError(f"Invalid digest ({self.digest}).") + return self + + def combine(self) -> str: + parts = [] + if self.registry: + parts.append(self.registry) + if self.path: + parts.append("/") + parts.append(self.path) + if self.tag: + parts.append(":") + parts.append(self.tag) + if self.digest: + parts.append("@") + parts.append(self.digest) + return "".join(parts) + + def normalize(self) -> ImageName: + registry = self.registry + path = self.path + if registry in ("", None, "index.docker.io", "registry.hub.docker.com"): + registry = "docker.io" + if registry == "docker.io" and "/" not in path and path: + path = f"library/{path}" + return ImageName(registry, path, self.tag, self.digest) + + def get_hostname_and_port(self) -> tuple[str, int]: + if self.registry is None: + raise ValueError( + "Cannot get hostname when there is no registry. Normalize first!" + ) + if self.registry == "docker.io": + return "index.docker.io", 443 + parts = self.registry.split(":", 1) + if len(parts) == 2: + try: + port = int(parts[1]) + except (TypeError, ValueError) as exc: + raise ValueError(f"Cannot parse port {parts[1]!r}") from exc + return parts[0], port + return self.registry, 443 diff --git a/tests/unit/plugins/module_utils/test__image_name.py b/tests/unit/plugins/module_utils/test__image_name.py new file mode 100644 index 00000000..bb4d7f41 --- /dev/null +++ b/tests/unit/plugins/module_utils/test__image_name.py @@ -0,0 +1,272 @@ +# Copyright (c) 2025 Felix Fontein +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + +import re +import typing as t + +import pytest + +from ansible_collections.community.docker.plugins.module_utils._image_name import ( + ImageName, + is_digest, + is_tag, +) + + +TEST_IS_DIGEST: list[tuple[str, dict[str, t.Any], bool]] = [ + ("", {}, False), + ("", {"allow_empty": True}, True), + ("sha256:abc", {}, False), + (f"sha256:{'a' * 63}", {}, False), + (f"sha256:{'a' * 64}", {}, True), + (f"sha256:{'a' * 65}", {}, False), + ( + "sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + {}, + True, + ), + ("1.25.3", {}, False), +] + + +@pytest.mark.parametrize("name, kwargs, expected", TEST_IS_DIGEST) +def test_is_digest(name: str, kwargs: dict[str, t.Any], expected: bool) -> None: + assert is_digest(name, **kwargs) == expected + + +TEST_IS_TAG: list[tuple[str, dict[str, t.Any], bool]] = [ + ("", {}, False), + ("", {"allow_empty": True}, True), + ("foo", {}, True), + ("-foo", {}, False), + ("f" * 128, {}, True), + ("f" * 129, {}, False), + ( + "sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + {}, + False, + ), + ("1.25.3", {}, True), +] + + +@pytest.mark.parametrize("name, kwargs, expected", TEST_IS_TAG) +def test_is_tag(name: str, kwargs: dict[str, t.Any], expected: bool) -> None: + assert is_tag(name, **kwargs) == expected + + +TEST_IMAGE_NAME_VALIDATE_SUCCESS: list[ImageName] = [ + ImageName(registry="localhost", path="nginx", tag=None, digest=None), + ImageName(registry=None, path="nginx", tag="1.25.3", digest=None), + ImageName( + registry=None, + path="nginx", + tag=None, + digest="sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ), + ImageName( + registry=None, + path="nginx", + tag="1.25.3", + digest="sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ), +] + + +@pytest.mark.parametrize("data", TEST_IMAGE_NAME_VALIDATE_SUCCESS) +def test_imagename_validate_success(data: ImageName) -> None: + assert data.validate() is data + + +TEST_IMAGE_NAME_VALIDATE_FAILED: list[tuple[ImageName, str]] = [ + ( + ImageName(registry="-foo", path="", tag=None, digest=None), + 'Invalid registry name (-foo): must not begin or end with a "-".', + ), + ( + ImageName(registry="foo:", path="", tag=None, digest=None), + 'Invalid registry name (foo:): must not end with ":".', + ), + (ImageName(registry=None, path="", tag=None, digest=None), "Invalid path ()."), + (ImageName(registry=None, path="-", tag=None, digest=None), "Invalid path (-)."), + (ImageName(registry=None, path="/", tag=None, digest=None), "Invalid path (/)."), + (ImageName(registry=None, path="a", tag="-", digest=None), "Invalid tag (-)."), + (ImageName(registry=None, path="a", tag=None, digest="-"), "Invalid digest (-)."), +] + + +@pytest.mark.parametrize("data, expected", TEST_IMAGE_NAME_VALIDATE_FAILED) +def test_imagename_validate_failed(data: ImageName, expected: str) -> None: + with pytest.raises(ValueError, match=f"^{re.escape(expected)}$"): + data.validate() + + +TEST_IMAGE_NAME_PARSE: list[tuple[str, ImageName]] = [ + ("", ImageName(registry=None, path="", tag=None, digest=None)), + ("foo", ImageName(registry=None, path="foo", tag=None, digest=None)), + ("foo:5000", ImageName(registry=None, path="foo", tag="5000", digest=None)), + ("foo:5000/", ImageName(registry="foo:5000", path="", tag=None, digest=None)), + ("foo:5000/bar", ImageName(registry="foo:5000", path="bar", tag=None, digest=None)), + ("/bar", ImageName(registry=None, path="/bar", tag=None, digest=None)), + ( + "localhost/foo:5000", + ImageName(registry="localhost", path="foo", tag="5000", digest=None), + ), + ( + "foo.bar/baz:5000", + ImageName(registry="foo.bar", path="baz", tag="5000", digest=None), + ), + ( + "foo:bar/baz:bam:5000", + ImageName(registry="foo:bar", path="baz:bam", tag="5000", digest=None), + ), + ("foo:bar:baz", ImageName(registry=None, path="foo:bar", tag="baz", digest=None)), + ("foo@bar@baz", ImageName(registry=None, path="foo@bar", tag=None, digest="baz")), + ("nginx:1.25.3", ImageName(registry=None, path="nginx", tag="1.25.3", digest=None)), + ( + "nginx@sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ImageName( + registry=None, + path="nginx", + tag=None, + digest="sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ), + ), + ( + "nginx:1.25.3@sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ImageName( + registry=None, + path="nginx", + tag="1.25.3", + digest="sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ), + ), +] + + +@pytest.mark.parametrize("name, expected", TEST_IMAGE_NAME_PARSE) +def test_imagename_parse(name: str, expected: ImageName) -> None: + assert ImageName.parse(name) == expected + + +TEST_IMAGE_NAME_COMBINE: list[tuple[ImageName, str]] = [ + (ImageName(registry=None, path="", tag=None, digest=None), ""), + (ImageName(registry=None, path="nginx", tag="1.25.3", digest=None), "nginx:1.25.3"), + ( + ImageName( + registry=None, + path="nginx", + tag=None, + digest="sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ), + "nginx@sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ), + ( + ImageName( + registry=None, + path="nginx", + tag="1.25.3", + digest="sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ), + "nginx:1.25.3@sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ), +] + + +@pytest.mark.parametrize("data, expected", TEST_IMAGE_NAME_COMBINE) +def test_imagename_combine(data: ImageName, expected: str) -> None: + assert data.combine() == expected + + +TEST_IMAGE_NAME_NORMALIZE: list[tuple[ImageName, ImageName]] = [ + ( + ImageName(registry=None, path="", tag=None, digest=None), + ImageName(registry="docker.io", path="", tag=None, digest=None), + ), + ( + ImageName(registry="", path="", tag=None, digest=None), + ImageName(registry="docker.io", path="", tag=None, digest=None), + ), + ( + ImageName(registry="index.docker.io", path="", tag=None, digest=None), + ImageName(registry="docker.io", path="", tag=None, digest=None), + ), + ( + ImageName(registry="registry.hub.docker.com", path="", tag=None, digest=None), + ImageName(registry="docker.io", path="", tag=None, digest=None), + ), + ( + ImageName(registry=None, path="foo/bar", tag=None, digest=None), + ImageName(registry="docker.io", path="foo/bar", tag=None, digest=None), + ), + ( + ImageName(registry=None, path="nginx", tag="1.25.3", digest=None), + ImageName( + registry="docker.io", path="library/nginx", tag="1.25.3", digest=None + ), + ), + ( + ImageName( + registry=None, + path="nginx", + tag=None, + digest="sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ), + ImageName( + registry="docker.io", + path="library/nginx", + tag=None, + digest="sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ), + ), + ( + ImageName( + registry=None, + path="nginx", + tag="1.25.3", + digest="sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ), + ImageName( + registry="docker.io", + path="library/nginx", + tag="1.25.3", + digest="sha256:d02f9b9db4d759ef27dc26b426b842ff2fb881c5c6079612d27ec36e36b132dd", + ), + ), +] + + +@pytest.mark.parametrize("data, expected", TEST_IMAGE_NAME_NORMALIZE) +def test_imagename_normalize(data: ImageName, expected: ImageName) -> None: + assert data.normalize() == expected + + +TEST_IMAGE_NAME_HOSTNAME_AND_PORT: list[tuple[ImageName, str, int]] = [ + ( + ImageName(registry="docker.io", path="", tag=None, digest=None), + "index.docker.io", + 443, + ), + (ImageName(registry="localhost", path="", tag=None, digest=None), "localhost", 443), + (ImageName(registry="foo:5000", path="", tag=None, digest=None), "foo", 5000), +] + + +@pytest.mark.parametrize( + "data, expected_hostname, expected_port", TEST_IMAGE_NAME_HOSTNAME_AND_PORT +) +def test_imagename_get_hostname_and_port( + data: ImageName, expected_hostname: str, expected_port: int +) -> None: + hostname, port = data.get_hostname_and_port() + assert hostname == expected_hostname + assert port == expected_port + + +def test_imagename_get_hostname_and_port_fail() -> None: + msg = "Cannot get hostname when there is no registry. Normalize first!" + with pytest.raises(ValueError, match=f"^{re.escape(msg)}$"): + ImageName(registry=None, path="", tag=None, digest=None).get_hostname_and_port()