mirror of
https://github.com/ansible-collections/community.docker.git
synced 2025-12-15 11:32:05 +00:00
Remove code that's not used. (#1209)
This commit is contained in:
parent
a869184ad4
commit
5d2b4085ec
@ -43,10 +43,8 @@ docker_version: str | None # pylint: disable=invalid-name
|
||||
|
||||
try:
|
||||
from docker import __version__ as docker_version
|
||||
from docker import auth
|
||||
from docker.errors import APIError, NotFound, TLSParameterError
|
||||
from docker.errors import APIError, TLSParameterError
|
||||
from docker.tls import TLSConfig
|
||||
from requests.exceptions import SSLError
|
||||
|
||||
if LooseVersion(docker_version) >= LooseVersion("3.0.0"):
|
||||
HAS_DOCKER_PY_3 = True # pylint: disable=invalid-name
|
||||
@ -391,242 +389,6 @@ class AnsibleDockerClientBase(Client):
|
||||
)
|
||||
self.fail(f"SSL Exception: {error}")
|
||||
|
||||
def get_container_by_id(self, container_id: str) -> dict[str, t.Any] | None:
|
||||
try:
|
||||
self.log(f"Inspecting container Id {container_id}")
|
||||
result = self.inspect_container(container=container_id)
|
||||
self.log("Completed container inspection")
|
||||
return result
|
||||
except NotFound:
|
||||
return None
|
||||
except Exception as exc: # pylint: disable=broad-exception-caught
|
||||
self.fail(f"Error inspecting container: {exc}")
|
||||
|
||||
def get_container(self, name: str | None) -> dict[str, t.Any] | None:
|
||||
"""
|
||||
Lookup a container and return the inspection results.
|
||||
"""
|
||||
if name is None:
|
||||
return None
|
||||
|
||||
search_name = name
|
||||
if not name.startswith("/"):
|
||||
search_name = "/" + name
|
||||
|
||||
result = None
|
||||
try:
|
||||
for container in self.containers(all=True):
|
||||
self.log(f"testing container: {container['Names']}")
|
||||
if (
|
||||
isinstance(container["Names"], list)
|
||||
and search_name in container["Names"]
|
||||
):
|
||||
result = container
|
||||
break
|
||||
if container["Id"].startswith(name):
|
||||
result = container
|
||||
break
|
||||
if container["Id"] == name:
|
||||
result = container
|
||||
break
|
||||
except SSLError as exc:
|
||||
self._handle_ssl_error(exc)
|
||||
except Exception as exc: # pylint: disable=broad-exception-caught
|
||||
self.fail(f"Error retrieving container list: {exc}")
|
||||
|
||||
if result is None:
|
||||
return None
|
||||
|
||||
return self.get_container_by_id(result["Id"])
|
||||
|
||||
def get_network(
|
||||
self, name: str | None = None, network_id: str | None = None
|
||||
) -> dict[str, t.Any] | None:
|
||||
"""
|
||||
Lookup a network and return the inspection results.
|
||||
"""
|
||||
if name is None and network_id is None:
|
||||
return None
|
||||
|
||||
result = None
|
||||
|
||||
if network_id is None:
|
||||
try:
|
||||
for network in self.networks():
|
||||
self.log(f"testing network: {network['Name']}")
|
||||
if name == network["Name"]:
|
||||
result = network
|
||||
break
|
||||
if network["Id"].startswith(name):
|
||||
result = network
|
||||
break
|
||||
except SSLError as exc:
|
||||
self._handle_ssl_error(exc)
|
||||
except Exception as exc: # pylint: disable=broad-exception-caught
|
||||
self.fail(f"Error retrieving network list: {exc}")
|
||||
|
||||
if result is not None:
|
||||
network_id = result["Id"]
|
||||
|
||||
if network_id is not None:
|
||||
try:
|
||||
self.log(f"Inspecting network Id {network_id}")
|
||||
result = self.inspect_network(network_id)
|
||||
self.log("Completed network inspection")
|
||||
except NotFound:
|
||||
return None
|
||||
except Exception as exc: # pylint: disable=broad-exception-caught
|
||||
self.fail(f"Error inspecting network: {exc}")
|
||||
|
||||
return result
|
||||
|
||||
def find_image(self, name: str, tag: str) -> dict[str, t.Any] | None:
|
||||
"""
|
||||
Lookup an image (by name and tag) and return the inspection results.
|
||||
"""
|
||||
if not name:
|
||||
return None
|
||||
|
||||
self.log(f"Find image {name}:{tag}")
|
||||
images = self._image_lookup(name, tag)
|
||||
if not images:
|
||||
# In API <= 1.20 seeing 'docker.io/<name>' as the name of images pulled from docker hub
|
||||
registry, repo_name = auth.resolve_repository_name(name)
|
||||
if registry == "docker.io":
|
||||
# If docker.io is explicitly there in name, the image
|
||||
# is not found in some cases (#41509)
|
||||
self.log(f"Check for docker.io image: {repo_name}")
|
||||
images = self._image_lookup(repo_name, tag)
|
||||
if not images and repo_name.startswith("library/"):
|
||||
# Sometimes library/xxx images are not found
|
||||
lookup = repo_name[len("library/") :]
|
||||
self.log(f"Check for docker.io image: {lookup}")
|
||||
images = self._image_lookup(lookup, tag)
|
||||
if not images:
|
||||
# Last case for some Docker versions: if docker.io was not there,
|
||||
# it can be that the image was not found either
|
||||
# (https://github.com/ansible/ansible/pull/15586)
|
||||
lookup = f"{registry}/{repo_name}"
|
||||
self.log(f"Check for docker.io image: {lookup}")
|
||||
images = self._image_lookup(lookup, tag)
|
||||
if not images and "/" not in repo_name:
|
||||
# This seems to be happening with podman-docker
|
||||
# (https://github.com/ansible-collections/community.docker/issues/291)
|
||||
lookup = f"{registry}/library/{repo_name}"
|
||||
self.log(f"Check for docker.io image: {lookup}")
|
||||
images = self._image_lookup(lookup, tag)
|
||||
|
||||
if len(images) > 1:
|
||||
self.fail(f"Daemon returned more than one result for {name}:{tag}")
|
||||
|
||||
if len(images) == 1:
|
||||
try:
|
||||
inspection = self.inspect_image(images[0]["Id"])
|
||||
except NotFound:
|
||||
self.log(f"Image {name}:{tag} not found.")
|
||||
return None
|
||||
except Exception as exc: # pylint: disable=broad-exception-caught
|
||||
self.fail(f"Error inspecting image {name}:{tag} - {exc}")
|
||||
return inspection
|
||||
|
||||
self.log(f"Image {name}:{tag} not found.")
|
||||
return None
|
||||
|
||||
def find_image_by_id(
|
||||
self, image_id: str, accept_missing_image: bool = False
|
||||
) -> dict[str, t.Any] | None:
|
||||
"""
|
||||
Lookup an image (by ID) and return the inspection results.
|
||||
"""
|
||||
if not image_id:
|
||||
return None
|
||||
|
||||
self.log(f"Find image {image_id} (by ID)")
|
||||
try:
|
||||
inspection = self.inspect_image(image_id)
|
||||
except NotFound as exc:
|
||||
if not accept_missing_image:
|
||||
self.fail(f"Error inspecting image ID {image_id} - {exc}")
|
||||
self.log(f"Image {image_id} not found.")
|
||||
return None
|
||||
except Exception as exc: # pylint: disable=broad-exception-caught
|
||||
self.fail(f"Error inspecting image ID {image_id} - {exc}")
|
||||
return inspection
|
||||
|
||||
def _image_lookup(self, name: str, tag: str) -> list[dict[str, t.Any]]:
|
||||
"""
|
||||
Including a tag in the name parameter sent to the Docker SDK for Python images method
|
||||
does not work consistently. Instead, get the result set for name and manually check
|
||||
if the tag exists.
|
||||
"""
|
||||
try:
|
||||
response = self.images(name=name)
|
||||
except Exception as exc: # pylint: disable=broad-exception-caught
|
||||
self.fail(f"Error searching for image {name} - {exc}")
|
||||
images = response
|
||||
if tag:
|
||||
lookup = f"{name}:{tag}"
|
||||
lookup_digest = f"{name}@{tag}"
|
||||
images = []
|
||||
for image in response:
|
||||
tags = image.get("RepoTags")
|
||||
digests = image.get("RepoDigests")
|
||||
if (tags and lookup in tags) or (digests and lookup_digest in digests):
|
||||
images = [image]
|
||||
break
|
||||
return images
|
||||
|
||||
def pull_image(
|
||||
self, name: str, tag: str = "latest", image_platform: str | None = None
|
||||
) -> tuple[dict[str, t.Any] | None, bool]:
|
||||
"""
|
||||
Pull an image
|
||||
"""
|
||||
kwargs = {
|
||||
"tag": tag,
|
||||
"stream": True,
|
||||
"decode": True,
|
||||
}
|
||||
if image_platform is not None:
|
||||
kwargs["platform"] = image_platform
|
||||
self.log(f"Pulling image {name}:{tag}")
|
||||
old_tag = self.find_image(name, tag)
|
||||
try:
|
||||
for line in self.pull(name, **kwargs):
|
||||
self.log(line, pretty_print=True)
|
||||
if line.get("error"):
|
||||
if line.get("errorDetail"):
|
||||
error_detail = line.get("errorDetail")
|
||||
self.fail(
|
||||
f"Error pulling {name} - code: {error_detail.get('code')} message: {error_detail.get('message')}"
|
||||
)
|
||||
else:
|
||||
self.fail(f"Error pulling {name} - {line.get('error')}")
|
||||
except Exception as exc: # pylint: disable=broad-exception-caught
|
||||
self.fail(f"Error pulling image {name}:{tag} - {exc}")
|
||||
|
||||
new_tag = self.find_image(name, tag)
|
||||
|
||||
return new_tag, old_tag == new_tag
|
||||
|
||||
def inspect_distribution(self, image: str, **kwargs: t.Any) -> dict[str, t.Any]:
|
||||
"""
|
||||
Get image digest by directly calling the Docker API when running Docker SDK < 4.0.0
|
||||
since prior versions did not support accessing private repositories.
|
||||
"""
|
||||
if self.docker_py_version < LooseVersion("4.0.0"):
|
||||
registry = auth.resolve_repository_name(image)[0]
|
||||
header = auth.get_config_header(self, registry)
|
||||
if header:
|
||||
return self._result(
|
||||
self._get(
|
||||
self._url("/distribution/{0}/json", image),
|
||||
headers={"X-Registry-Auth": header},
|
||||
),
|
||||
json=True,
|
||||
)
|
||||
return super().inspect_distribution(image, **kwargs)
|
||||
|
||||
|
||||
class AnsibleDockerClient(AnsibleDockerClientBase):
|
||||
def __init__(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user