# This code is part of the Ansible collection community.docker, but is an independent component. # This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) # # Copyright (c) 2016-2025 Docker, Inc. # # It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) # SPDX-License-Identifier: Apache-2.0 # Note that this module util is **PRIVATE** to the collection. It can have breaking changes at any time. # Do not use this from other collections or standalone plugins/modules! from __future__ import annotations import json import os from shutil import copyfile, rmtree from ..errors import ContextException from ..tls import TLSConfig from .config import ( get_context_host, get_meta_dir, get_meta_file, get_tls_dir, ) IN_MEMORY = "IN MEMORY" class Context: """A context.""" def __init__( self, name, orchestrator=None, host=None, endpoints=None, skip_tls_verify=False, tls=False, description=None, ): if not name: raise ValueError("Name not provided") self.name = name self.context_type = None self.orchestrator = orchestrator self.endpoints = {} self.tls_cfg = {} self.meta_path = IN_MEMORY self.tls_path = IN_MEMORY self.description = description if not endpoints: # set default docker endpoint if no endpoint is set default_endpoint = ( "docker" if (not orchestrator or orchestrator == "swarm") else orchestrator ) self.endpoints = { default_endpoint: { "Host": get_context_host(host, skip_tls_verify or tls), "SkipTLSVerify": skip_tls_verify, } } return # check docker endpoints for k, v in endpoints.items(): if not isinstance(v, dict): # unknown format raise ContextException( f"Unknown endpoint format for context {name}: {v}", ) self.endpoints[k] = v if k != "docker": continue self.endpoints[k]["Host"] = v.get( "Host", get_context_host(host, skip_tls_verify or tls) ) self.endpoints[k]["SkipTLSVerify"] = bool( v.get("SkipTLSVerify", skip_tls_verify) ) def set_endpoint( self, name="docker", host=None, tls_cfg=None, skip_tls_verify=False, def_namespace=None, ): self.endpoints[name] = { "Host": get_context_host(host, not skip_tls_verify or tls_cfg is not None), "SkipTLSVerify": skip_tls_verify, } if def_namespace: self.endpoints[name]["DefaultNamespace"] = def_namespace if tls_cfg: self.tls_cfg[name] = tls_cfg def inspect(self): return self.__call__() @classmethod def load_context(cls, name): meta = Context._load_meta(name) if meta: instance = cls( meta["Name"], orchestrator=meta["Metadata"].get("StackOrchestrator", None), endpoints=meta.get("Endpoints", None), description=meta["Metadata"].get("Description"), ) instance.context_type = meta["Metadata"].get("Type", None) instance._load_certs() instance.meta_path = get_meta_dir(name) return instance return None @classmethod def _load_meta(cls, name): meta_file = get_meta_file(name) if not os.path.isfile(meta_file): return None metadata = {} try: with open(meta_file, "rt", encoding="utf-8") as f: metadata = json.load(f) except (OSError, KeyError, ValueError) as e: # unknown format raise RuntimeError( f"Detected corrupted meta file for context {name} : {e}" ) from e # for docker endpoints, set defaults for # Host and SkipTLSVerify fields for k, v in metadata["Endpoints"].items(): if k != "docker": continue metadata["Endpoints"][k]["Host"] = v.get( "Host", get_context_host(None, False) ) metadata["Endpoints"][k]["SkipTLSVerify"] = bool( v.get("SkipTLSVerify", True) ) return metadata def _load_certs(self): certs = {} tls_dir = get_tls_dir(self.name) for endpoint in self.endpoints: if not os.path.isdir(os.path.join(tls_dir, endpoint)): continue ca_cert = None cert = None key = None for filename in os.listdir(os.path.join(tls_dir, endpoint)): if filename.startswith("ca"): ca_cert = os.path.join(tls_dir, endpoint, filename) elif filename.startswith("cert"): cert = os.path.join(tls_dir, endpoint, filename) elif filename.startswith("key"): key = os.path.join(tls_dir, endpoint, filename) if all([cert, key]) or ca_cert: verify = None if endpoint == "docker" and not self.endpoints["docker"].get( "SkipTLSVerify", False ): verify = True certs[endpoint] = TLSConfig( client_cert=(cert, key) if cert and key else None, ca_cert=ca_cert, verify=verify, ) self.tls_cfg = certs self.tls_path = tls_dir def save(self): meta_dir = get_meta_dir(self.name) if not os.path.isdir(meta_dir): os.makedirs(meta_dir) with open(get_meta_file(self.name), "wt", encoding="utf-8") as f: f.write(json.dumps(self.Metadata)) tls_dir = get_tls_dir(self.name) for endpoint, tls in self.tls_cfg.items(): if not os.path.isdir(os.path.join(tls_dir, endpoint)): os.makedirs(os.path.join(tls_dir, endpoint)) ca_file = tls.ca_cert if ca_file: copyfile( ca_file, os.path.join(tls_dir, endpoint, os.path.basename(ca_file)) ) if tls.cert: cert_file, key_file = tls.cert copyfile( cert_file, os.path.join(tls_dir, endpoint, os.path.basename(cert_file)), ) copyfile( key_file, os.path.join(tls_dir, endpoint, os.path.basename(key_file)), ) self.meta_path = get_meta_dir(self.name) self.tls_path = get_tls_dir(self.name) def remove(self): if os.path.isdir(self.meta_path): rmtree(self.meta_path) if os.path.isdir(self.tls_path): rmtree(self.tls_path) def __repr__(self): return f"<{self.__class__.__name__}: '{self.name}'>" def __str__(self): return json.dumps(self.__call__(), indent=2) def __call__(self): result = self.Metadata result.update(self.TLSMaterial) result.update(self.Storage) return result def is_docker_host(self): return self.context_type is None @property def Name(self): # pylint: disable=invalid-name return self.name @property def Host(self): # pylint: disable=invalid-name if not self.orchestrator or self.orchestrator == "swarm": endpoint = self.endpoints.get("docker", None) if endpoint: return endpoint.get("Host", None) return None return self.endpoints[self.orchestrator].get("Host", None) @property def Orchestrator(self): # pylint: disable=invalid-name return self.orchestrator @property def Metadata(self): # pylint: disable=invalid-name meta = {} if self.orchestrator: meta = {"StackOrchestrator": self.orchestrator} return {"Name": self.name, "Metadata": meta, "Endpoints": self.endpoints} @property def TLSConfig(self): # pylint: disable=invalid-name key = self.orchestrator if not key or key == "swarm": key = "docker" if key in self.tls_cfg: return self.tls_cfg[key] return None @property def TLSMaterial(self): # pylint: disable=invalid-name certs = {} for endpoint, tls in self.tls_cfg.items(): cert, key = tls.cert certs[endpoint] = list(map(os.path.basename, [tls.ca_cert, cert, key])) return {"TLSMaterial": certs} @property def Storage(self): # pylint: disable=invalid-name return {"Storage": {"MetadataPath": self.meta_path, "TLSPath": self.tls_path}}