Fix _image_lookup to handle combined tag@digest format

The parse_repository_tag fix alone is not sufficient because Docker stores
RepoTags and RepoDigests separately. When looking up an image with combined
tag@digest (e.g., nginx:1.21@sha256:abc...), the _image_lookup function must
split the combined format and match BOTH RepoTags (for the tag) AND
RepoDigests (for the digest).

Docker stores:
- RepoTags: ["nginx:1.21"]
- RepoDigests: ["nginx@sha256:abc..."]

But NEVER stores the combined format. The previous code would construct:
- lookup = "nginx:1.21@sha256:abc..." (never matches RepoTags)
- lookup_digest = "nginx@1.21@sha256:abc..." (never matches RepoDigests)

This fix:
1. Adds filter_images_by_tag() helper function to _util.py to avoid code
   duplication between _common.py and _common_api.py
2. Detects combined tag@digest format in the tag parameter
3. Splits into tag_part and digest_part
4. Constructs proper lookups for both RepoTags and RepoDigests
5. Requires BOTH to match for successful image lookup

Without this fix, image_label_mismatch: ignore fails because the image
cannot be found, resulting in no image labels being included in expected
labels comparison.

Includes comprehensive unit tests in test__util.py covering all scenarios
including edge cases for multiple @ symbols and empty tag parts.
This commit is contained in:
Paul Berruti 2025-11-22 19:57:35 -08:00
parent 9e666af1ab
commit 21288cda7e
4 changed files with 270 additions and 24 deletions

View File

@ -27,6 +27,7 @@ from ansible_collections.community.docker.plugins.module_utils._util import (
DOCKER_COMMON_ARGS,
DOCKER_MUTUALLY_EXCLUSIVE,
DOCKER_REQUIRED_TOGETHER,
filter_images_by_tag,
sanitize_result,
update_tls_hostname,
)
@ -563,18 +564,7 @@ class AnsibleDockerClientBase(Client):
response = self.images(name=name)
except Exception as exc: # pylint: disable=broad-exception-caught
self.fail(f"Error searching for image {name} - {exc}")
images = response
if tag:
lookup = f"{name}:{tag}"
lookup_digest = f"{name}@{tag}"
images = []
for image in response:
tags = image.get("RepoTags")
digests = image.get("RepoDigests")
if (tags and lookup in tags) or (digests and lookup_digest in digests):
images = [image]
break
return images
return filter_images_by_tag(name, tag, response)
def pull_image(
self, name: str, tag: str = "latest", image_platform: str | None = None

View File

@ -56,6 +56,7 @@ from ansible_collections.community.docker.plugins.module_utils._util import (
DOCKER_COMMON_ARGS,
DOCKER_MUTUALLY_EXCLUSIVE,
DOCKER_REQUIRED_TOGETHER,
filter_images_by_tag,
sanitize_result,
update_tls_hostname,
)
@ -435,18 +436,7 @@ class AnsibleDockerClientBase(Client):
images = self.get_json("/images/json", params=params)
except Exception as exc: # pylint: disable=broad-exception-caught
self.fail(f"Error searching for image {name} - {exc}")
if tag:
lookup = f"{name}:{tag}"
lookup_digest = f"{name}@{tag}"
response = images
images = []
for image in response:
tags = image.get("RepoTags")
digests = image.get("RepoDigests")
if (tags and lookup in tags) or (digests and lookup_digest in digests):
images = [image]
break
return images
return filter_images_by_tag(name, tag, images)
def find_image(self, name: str, tag: str | None) -> dict[str, t.Any] | None:
"""

View File

@ -98,6 +98,54 @@ def is_image_name_id(name: str) -> bool:
return bool(re.match("^sha256:[0-9a-fA-F]{64}$", name))
def filter_images_by_tag(
name: str, tag: str | None, images: list[dict[str, t.Any]]
) -> list[dict[str, t.Any]]:
"""
Filter a list of images by name and tag.
Docker stores RepoTags and RepoDigests separately, never as combined references.
This function handles three formats:
- Tag-only: "v1.0" -> matches "repo:v1.0" in RepoTags
- Digest-only: "sha256:abc..." -> matches "repo@sha256:abc..." in RepoDigests
- Combined tag@digest: "v1.0@sha256:abc..." -> matches BOTH RepoTags AND RepoDigests
Args:
name: The repository name (e.g., "nginx" or "ghcr.io/user/repo")
tag: The tag, digest, or combined tag@digest (e.g., "v1.0", "sha256:abc...", "v1.0@sha256:abc...")
images: List of image dicts from Docker API with RepoTags and RepoDigests keys
Returns:
Filtered list of matching images (typically 0 or 1 items)
"""
if not tag:
return images
# Handle combined tag@digest format (e.g., "v1.0@sha256:abc123")
# The @ is for digest, but if it doesn't start with sha256:, it's combined format
if "@" in tag and not tag.startswith("sha256:"):
tag_part, digest_part = tag.split("@", 1)
lookup_tag = f"{name}:{tag_part}"
lookup_digest = f"{name}@{digest_part}"
for image in images:
repo_tags = image.get("RepoTags") or []
repo_digests = image.get("RepoDigests") or []
# For combined format, match BOTH tag AND digest
if lookup_tag in repo_tags and lookup_digest in repo_digests:
return [image]
return []
# Original logic for tag-only or digest-only
lookup = f"{name}:{tag}"
lookup_digest = f"{name}@{tag}"
for image in images:
repo_tags = image.get("RepoTags")
repo_digests = image.get("RepoDigests")
if (repo_tags and lookup in repo_tags) or (repo_digests and lookup_digest in repo_digests):
return [image]
return []
def is_valid_tag(tag: str, allow_empty: bool = False) -> bool:
"""Check whether the given string is a valid docker tag name."""
if not tag:

View File

@ -12,6 +12,7 @@ from ansible_collections.community.docker.plugins.module_utils._util import (
compare_dict_allow_more_present,
compare_generic,
convert_duration_to_nanosecond,
filter_images_by_tag,
parse_healthcheck,
)
@ -468,3 +469,220 @@ def test_parse_healthcheck() -> None:
)
assert result == {"test": ["CMD-SHELL", "sleep 1"], "interval": 3662003004000}
assert disabled is False
# ========== filter_images_by_tag tests ==========
# Docker stores RepoTags and RepoDigests separately, never combined.
# This function handles tag-only, digest-only, and combined tag@digest formats.
SHA = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
def _create_mock_image(
repo_tags: list[str] | None, repo_digests: list[str] | None
) -> dict[str, t.Any]:
"""Helper to create mock image data as returned by Docker API."""
return {
"Id": "sha256:abc123",
"RepoTags": repo_tags,
"RepoDigests": repo_digests,
}
class TestFilterImagesByTag:
"""Test cases for filter_images_by_tag function."""
# ========== Tag-only tests ==========
def test_tag_only_matches_repo_tags(self) -> None:
"""Tag-only reference should match in RepoTags."""
image = _create_mock_image(
repo_tags=["nginx:1.21"],
repo_digests=["nginx@sha256:" + SHA],
)
result = filter_images_by_tag("nginx", "1.21", [image])
assert len(result) == 1
assert result[0]["Id"] == "sha256:abc123"
def test_tag_only_no_match(self) -> None:
"""Tag-only reference should not match if tag is different."""
image = _create_mock_image(
repo_tags=["nginx:1.20"],
repo_digests=["nginx@sha256:" + SHA],
)
result = filter_images_by_tag("nginx", "1.21", [image])
assert len(result) == 0
def test_tag_only_with_registry(self) -> None:
"""Tag-only with registry prefix should match correctly."""
image = _create_mock_image(
repo_tags=["ghcr.io/user/repo:v1.0"],
repo_digests=["ghcr.io/user/repo@sha256:" + SHA],
)
result = filter_images_by_tag("ghcr.io/user/repo", "v1.0", [image])
assert len(result) == 1
# ========== Digest-only tests ==========
def test_digest_only_matches_repo_digests(self) -> None:
"""Digest-only reference should match in RepoDigests."""
image = _create_mock_image(
repo_tags=["nginx:1.21"],
repo_digests=["nginx@sha256:" + SHA],
)
result = filter_images_by_tag("nginx", f"sha256:{SHA}", [image])
assert len(result) == 1
def test_digest_only_no_match(self) -> None:
"""Digest-only reference should not match if digest is different."""
other_sha = "a" * 64
image = _create_mock_image(
repo_tags=["nginx:1.21"],
repo_digests=["nginx@sha256:" + SHA],
)
result = filter_images_by_tag("nginx", f"sha256:{other_sha}", [image])
assert len(result) == 0
# ========== Combined tag@digest tests ==========
def test_combined_tag_digest_matches_both(self) -> None:
"""Combined tag@digest should match when BOTH tag AND digest match."""
image = _create_mock_image(
repo_tags=["nginx:1.21"],
repo_digests=["nginx@sha256:" + SHA],
)
result = filter_images_by_tag("nginx", f"1.21@sha256:{SHA}", [image])
assert len(result) == 1
assert result[0]["Id"] == "sha256:abc123"
def test_combined_tag_digest_with_registry(self) -> None:
"""Combined tag@digest with registry should match correctly."""
image = _create_mock_image(
repo_tags=["ghcr.io/gethomepage/homepage:v1.7"],
repo_digests=["ghcr.io/gethomepage/homepage@sha256:" + SHA],
)
result = filter_images_by_tag(
"ghcr.io/gethomepage/homepage", f"v1.7@sha256:{SHA}", [image]
)
assert len(result) == 1
def test_combined_tag_digest_fails_if_tag_mismatch(self) -> None:
"""Combined tag@digest should NOT match if tag doesn't match."""
image = _create_mock_image(
repo_tags=["nginx:1.20"], # Different tag
repo_digests=["nginx@sha256:" + SHA],
)
result = filter_images_by_tag("nginx", f"1.21@sha256:{SHA}", [image])
assert len(result) == 0
def test_combined_tag_digest_fails_if_digest_mismatch(self) -> None:
"""Combined tag@digest should NOT match if digest doesn't match."""
other_sha = "a" * 64
image = _create_mock_image(
repo_tags=["nginx:1.21"],
repo_digests=["nginx@sha256:" + other_sha], # Different digest
)
result = filter_images_by_tag("nginx", f"1.21@sha256:{SHA}", [image])
assert len(result) == 0
def test_combined_tag_digest_fails_if_both_mismatch(self) -> None:
"""Combined tag@digest should NOT match if both tag and digest don't match."""
other_sha = "a" * 64
image = _create_mock_image(
repo_tags=["nginx:1.20"], # Different tag
repo_digests=["nginx@sha256:" + other_sha], # Different digest
)
result = filter_images_by_tag("nginx", f"1.21@sha256:{SHA}", [image])
assert len(result) == 0
# ========== Edge cases ==========
def test_empty_repo_tags(self) -> None:
"""Handle images where RepoTags is empty or None."""
image = _create_mock_image(
repo_tags=None,
repo_digests=["nginx@sha256:" + SHA],
)
# Combined format should not match if RepoTags is empty
result = filter_images_by_tag("nginx", f"1.21@sha256:{SHA}", [image])
assert len(result) == 0
def test_empty_repo_digests(self) -> None:
"""Handle images where RepoDigests is empty or None."""
image = _create_mock_image(
repo_tags=["nginx:1.21"],
repo_digests=None,
)
# Combined format should not match if RepoDigests is empty
result = filter_images_by_tag("nginx", f"1.21@sha256:{SHA}", [image])
assert len(result) == 0
def test_multiple_tags(self) -> None:
"""Image with multiple tags should match on any of them."""
image = _create_mock_image(
repo_tags=["nginx:1.21", "nginx:latest", "nginx:stable"],
repo_digests=["nginx@sha256:" + SHA],
)
# Should match on 1.21
result = filter_images_by_tag("nginx", f"1.21@sha256:{SHA}", [image])
assert len(result) == 1
# Should also match on latest
result = filter_images_by_tag("nginx", f"latest@sha256:{SHA}", [image])
assert len(result) == 1
def test_multiple_digests(self) -> None:
"""Image with multiple digests should match on any of them."""
other_sha = "b" * 64
image = _create_mock_image(
repo_tags=["nginx:1.21"],
repo_digests=["nginx@sha256:" + SHA, "nginx@sha256:" + other_sha],
)
# Should match on first digest
result = filter_images_by_tag("nginx", f"1.21@sha256:{SHA}", [image])
assert len(result) == 1
# Should also match on second digest
result = filter_images_by_tag("nginx", f"1.21@sha256:{other_sha}", [image])
assert len(result) == 1
def test_port_in_registry_name(self) -> None:
"""Registry with port number should not be confused with tag."""
image = _create_mock_image(
repo_tags=["localhost:5000/myapp:v2.0"],
repo_digests=["localhost:5000/myapp@sha256:" + SHA],
)
result = filter_images_by_tag(
"localhost:5000/myapp", f"v2.0@sha256:{SHA}", [image]
)
assert len(result) == 1
def test_no_tag_returns_all_images(self) -> None:
"""When tag is None, all images should be returned."""
images = [
_create_mock_image(["nginx:1.21"], ["nginx@sha256:" + SHA]),
_create_mock_image(["nginx:1.20"], ["nginx@sha256:" + "a" * 64]),
]
result = filter_images_by_tag("nginx", None, images)
assert len(result) == 2
# ========== Additional edge cases ==========
def test_multiple_at_symbols_in_digest(self) -> None:
"""Handle edge case where digest has extra content after sha."""
# Using split("@", 1) should handle this correctly
image = _create_mock_image(
repo_tags=["nginx:1.21"],
repo_digests=["nginx@sha256:" + SHA],
)
# The sha256:... part should be treated as the digest
result = filter_images_by_tag("nginx", f"1.21@sha256:{SHA}", [image])
assert len(result) == 1
def test_empty_tag_part_in_combined_format(self) -> None:
"""Handle edge case where tag part is empty like ':@sha256:...'."""
image = _create_mock_image(
repo_tags=["nginx:"], # Empty tag
repo_digests=["nginx@sha256:" + SHA],
)
# Should construct lookup_tag as "nginx:" which matches
result = filter_images_by_tag("nginx", f"@sha256:{SHA}", [image])
assert len(result) == 1