# -*- coding: utf-8 -*- # This code is part of the Ansible collection community.docker, but is an independent component. # This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) # # Copyright (c) 2016-2022 Docker, Inc. # # It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) # SPDX-License-Identifier: Apache-2.0 from __future__ import (absolute_import, division, print_function) __metaclass__ = type import base64 import json import os import os.path import shlex import string from datetime import datetime from ansible_collections.community.docker.plugins.module_utils.version import StrictVersion from ansible.module_utils.six import PY2, PY3, binary_type, integer_types, iteritems, string_types, text_type from .. import errors from .. import tls from ..constants import DEFAULT_HTTP_HOST from ..constants import DEFAULT_UNIX_SOCKET from ..constants import DEFAULT_NPIPE from ..constants import BYTE_UNITS if PY2: from urllib import splitnport from urlparse import urlparse else: from urllib.parse import splitnport, urlparse def create_ipam_pool(*args, **kwargs): raise errors.DeprecatedMethod( 'utils.create_ipam_pool has been removed. Please use a ' 'docker.types.IPAMPool object instead.' ) def create_ipam_config(*args, **kwargs): raise errors.DeprecatedMethod( 'utils.create_ipam_config has been removed. Please use a ' 'docker.types.IPAMConfig object instead.' ) def decode_json_header(header): data = base64.b64decode(header) if PY3: data = data.decode('utf-8') return json.loads(data) def compare_version(v1, v2): """Compare docker versions >>> v1 = '1.9' >>> v2 = '1.10' >>> compare_version(v1, v2) 1 >>> compare_version(v2, v1) -1 >>> compare_version(v2, v2) 0 """ s1 = StrictVersion(v1) s2 = StrictVersion(v2) if s1 == s2: return 0 elif s1 > s2: return -1 else: return 1 def version_lt(v1, v2): return compare_version(v1, v2) > 0 def version_gte(v1, v2): return not version_lt(v1, v2) def _convert_port_binding(binding): result = {'HostIp': '', 'HostPort': ''} if isinstance(binding, tuple): if len(binding) == 2: result['HostPort'] = binding[1] result['HostIp'] = binding[0] elif isinstance(binding[0], string_types): result['HostIp'] = binding[0] else: result['HostPort'] = binding[0] elif isinstance(binding, dict): if 'HostPort' in binding: result['HostPort'] = binding['HostPort'] if 'HostIp' in binding: result['HostIp'] = binding['HostIp'] else: raise ValueError(binding) else: result['HostPort'] = binding if result['HostPort'] is None: result['HostPort'] = '' else: result['HostPort'] = str(result['HostPort']) return result def convert_port_bindings(port_bindings): result = {} for k, v in iteritems(port_bindings): key = str(k) if '/' not in key: key += '/tcp' if isinstance(v, list): result[key] = [_convert_port_binding(binding) for binding in v] else: result[key] = [_convert_port_binding(v)] return result def convert_volume_binds(binds): if isinstance(binds, list): return binds result = [] for k, v in binds.items(): if isinstance(k, binary_type): k = k.decode('utf-8') if isinstance(v, dict): if 'ro' in v and 'mode' in v: raise ValueError( 'Binding cannot contain both "ro" and "mode": {0}' .format(repr(v)) ) bind = v['bind'] if isinstance(bind, binary_type): bind = bind.decode('utf-8') if 'ro' in v: mode = 'ro' if v['ro'] else 'rw' elif 'mode' in v: mode = v['mode'] else: mode = 'rw' result.append( text_type('{0}:{1}:{2}').format(k, bind, mode) ) else: if isinstance(v, binary_type): v = v.decode('utf-8') result.append( text_type('{0}:{1}:rw').format(k, v) ) return result def convert_tmpfs_mounts(tmpfs): if isinstance(tmpfs, dict): return tmpfs if not isinstance(tmpfs, list): raise ValueError( 'Expected tmpfs value to be either a list or a dict, found: {0}' .format(type(tmpfs).__name__) ) result = {} for mount in tmpfs: if isinstance(mount, string_types): if ":" in mount: name, options = mount.split(":", 1) else: name = mount options = "" else: raise ValueError( "Expected item in tmpfs list to be a string, found: {0}" .format(type(mount).__name__) ) result[name] = options return result def convert_service_networks(networks): if not networks: return networks if not isinstance(networks, list): raise TypeError('networks parameter must be a list.') result = [] for n in networks: if isinstance(n, string_types): n = {'Target': n} result.append(n) return result def parse_repository_tag(repo_name): parts = repo_name.rsplit('@', 1) if len(parts) == 2: return tuple(parts) parts = repo_name.rsplit(':', 1) if len(parts) == 2 and '/' not in parts[1]: return tuple(parts) return repo_name, None def parse_host(addr, is_win32=False, tls=False): path = '' port = None host = None # Sensible defaults if not addr and is_win32: return DEFAULT_NPIPE if not addr or addr.strip() == 'unix://': return DEFAULT_UNIX_SOCKET addr = addr.strip() parsed_url = urlparse(addr) proto = parsed_url.scheme if not proto or any(x not in string.ascii_letters + '+' for x in proto): # https://bugs.python.org/issue754016 parsed_url = urlparse('//' + addr, 'tcp') proto = 'tcp' if proto == 'fd': raise errors.DockerException('fd protocol is not implemented') # These protos are valid aliases for our library but not for the # official spec if proto == 'http' or proto == 'https': tls = proto == 'https' proto = 'tcp' elif proto == 'http+unix': proto = 'unix' if proto not in ('tcp', 'unix', 'npipe', 'ssh'): raise errors.DockerException( "Invalid bind address protocol: {0}".format(addr) ) if proto == 'tcp' and not parsed_url.netloc: # "tcp://" is exceptionally disallowed by convention; # omitting a hostname for other protocols is fine raise errors.DockerException( 'Invalid bind address format: {0}'.format(addr) ) if any([ parsed_url.params, parsed_url.query, parsed_url.fragment, parsed_url.password ]): raise errors.DockerException( 'Invalid bind address format: {0}'.format(addr) ) if parsed_url.path and proto == 'ssh': raise errors.DockerException( 'Invalid bind address format: no path allowed for this protocol:' ' {0}'.format(addr) ) else: path = parsed_url.path if proto == 'unix' and parsed_url.hostname is not None: # For legacy reasons, we consider unix://path # to be valid and equivalent to unix:///path path = '/'.join((parsed_url.hostname, path)) if proto in ('tcp', 'ssh'): # parsed_url.hostname strips brackets from IPv6 addresses, # which can be problematic hence our use of splitnport() instead. host, port = splitnport(parsed_url.netloc) if port is None or port < 0: if proto != 'ssh': raise errors.DockerException( 'Invalid bind address format: port is required:' ' {0}'.format(addr) ) port = 22 if not host: host = DEFAULT_HTTP_HOST # Rewrite schemes to fit library internals (requests adapters) if proto == 'tcp': proto = 'http{0}'.format('s' if tls else '') elif proto == 'unix': proto = 'http+unix' if proto in ('http+unix', 'npipe'): return "{0}://{1}".format(proto, path).rstrip('/') return '{0}://{1}:{2}{3}'.format(proto, host, port, path).rstrip('/') def parse_devices(devices): device_list = [] for device in devices: if isinstance(device, dict): device_list.append(device) continue if not isinstance(device, string_types): raise errors.DockerException( 'Invalid device type {0}'.format(type(device)) ) device_mapping = device.split(':') if device_mapping: path_on_host = device_mapping[0] if len(device_mapping) > 1: path_in_container = device_mapping[1] else: path_in_container = path_on_host if len(device_mapping) > 2: permissions = device_mapping[2] else: permissions = 'rwm' device_list.append({ 'PathOnHost': path_on_host, 'PathInContainer': path_in_container, 'CgroupPermissions': permissions }) return device_list def kwargs_from_env(ssl_version=None, assert_hostname=None, environment=None): if not environment: environment = os.environ host = environment.get('DOCKER_HOST') # empty string for cert path is the same as unset. cert_path = environment.get('DOCKER_CERT_PATH') or None # empty string for tls verify counts as "false". # Any value or 'unset' counts as true. tls_verify = environment.get('DOCKER_TLS_VERIFY') if tls_verify == '': tls_verify = False else: tls_verify = tls_verify is not None enable_tls = cert_path or tls_verify params = {} if host: params['base_url'] = host if not enable_tls: return params if not cert_path: cert_path = os.path.join(os.path.expanduser('~'), '.docker') if not tls_verify and assert_hostname is None: # assert_hostname is a subset of TLS verification, # so if it's not set already then set it to false. assert_hostname = False params['tls'] = tls.TLSConfig( client_cert=(os.path.join(cert_path, 'cert.pem'), os.path.join(cert_path, 'key.pem')), ca_cert=os.path.join(cert_path, 'ca.pem'), verify=tls_verify, ssl_version=ssl_version, assert_hostname=assert_hostname, ) return params def convert_filters(filters): result = {} for k, v in iteritems(filters): if isinstance(v, bool): v = 'true' if v else 'false' if not isinstance(v, list): v = [v, ] result[k] = [ str(item) if not isinstance(item, string_types) else item for item in v ] return json.dumps(result) def datetime_to_timestamp(dt): """Convert a UTC datetime to a Unix timestamp""" delta = dt - datetime.utcfromtimestamp(0) return delta.seconds + delta.days * 24 * 3600 def parse_bytes(s): if isinstance(s, integer_types + (float,)): return s if len(s) == 0: return 0 if s[-2:-1].isalpha() and s[-1].isalpha(): if s[-1] == "b" or s[-1] == "B": s = s[:-1] units = BYTE_UNITS suffix = s[-1].lower() # Check if the variable is a string representation of an int # without a units part. Assuming that the units are bytes. if suffix.isdigit(): digits_part = s suffix = 'b' else: digits_part = s[:-1] if suffix in units.keys() or suffix.isdigit(): try: digits = float(digits_part) except ValueError: raise errors.DockerException( 'Failed converting the string value for memory ({0}) to' ' an integer.'.format(digits_part) ) # Reconvert to long for the final result s = int(digits * units[suffix]) else: raise errors.DockerException( 'The specified value for memory ({0}) should specify the' ' units. The postfix should be one of the `b` `k` `m` `g`' ' characters'.format(s) ) return s def normalize_links(links): if isinstance(links, dict): links = iteritems(links) return ['{0}:{1}'.format(k, v) if v else k for k, v in sorted(links)] def parse_env_file(env_file): """ Reads a line-separated environment file. The format of each line should be "key=value". """ environment = {} with open(env_file, 'r') as f: for line in f: if line[0] == '#': continue line = line.strip() if not line: continue parse_line = line.split('=', 1) if len(parse_line) == 2: k, v = parse_line environment[k] = v else: raise errors.DockerException( 'Invalid line in environment file {0}:\n{1}'.format( env_file, line)) return environment def split_command(command): if PY2 and not isinstance(command, binary_type): command = command.encode('utf-8') return shlex.split(command) def format_environment(environment): def format_env(key, value): if value is None: return key if isinstance(value, binary_type): value = value.decode('utf-8') return u'{key}={value}'.format(key=key, value=value) return [format_env(*var) for var in iteritems(environment)] def format_extra_hosts(extra_hosts, task=False): # Use format dictated by Swarm API if container is part of a task if task: return [ '{0} {1}'.format(v, k) for k, v in sorted(iteritems(extra_hosts)) ] return [ '{0}:{1}'.format(k, v) for k, v in sorted(iteritems(extra_hosts)) ] def create_host_config(self, *args, **kwargs): raise errors.DeprecatedMethod( 'utils.create_host_config has been removed. Please use a ' 'docker.types.HostConfig object instead.' )