community.docker/plugins/module_utils/_api/credentials/store.py
Felix Fontein 117271579e
Make all doc fragments, module utils, and plugin utils private (#1144)
* Make all doc fragments, module utils, and plugin utils private.

* Remove some unused and no longer needed imports.

This hopefully also fixes the CI issues, which do not happen locally for me...

* Fix formatting.

* Try to make CI happy, again.

* Fix imports.

* Lint.
2025-10-07 07:32:33 +02:00

100 lines
3.6 KiB
Python

# -*- coding: utf-8 -*-
# This code is part of the Ansible collection community.docker, but is an independent component.
# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
#
# Copyright (c) 2016-2022 Docker, Inc.
#
# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
# SPDX-License-Identifier: Apache-2.0
# Note that this module util is **PRIVATE** to the collection. It can have breaking changes at any time.
# Do not use this from other collections or standalone plugins/modules!
from __future__ import annotations
import errno
import json
import subprocess
from . import constants, errors
from .utils import create_environment_dict, find_executable
class Store(object):
def __init__(self, program, environment=None):
"""Create a store object that acts as an interface to
perform the basic operations for storing, retrieving
and erasing credentials using `program`.
"""
self.program = constants.PROGRAM_PREFIX + program
self.exe = find_executable(self.program)
self.environment = environment
if self.exe is None:
raise errors.InitializationError(
f"{self.program} not installed or not available in PATH"
)
def get(self, server):
"""Retrieve credentials for `server`. If no credentials are found,
a `StoreError` will be raised.
"""
if not isinstance(server, bytes):
server = server.encode("utf-8")
data = self._execute("get", server)
result = json.loads(data.decode("utf-8"))
# docker-credential-pass will return an object for inexistent servers
# whereas other helpers will exit with returncode != 0. For
# consistency, if no significant data is returned,
# raise CredentialsNotFound
if result["Username"] == "" and result["Secret"] == "":
raise errors.CredentialsNotFound(
f"No matching credentials in {self.program}"
)
return result
def store(self, server, username, secret):
"""Store credentials for `server`. Raises a `StoreError` if an error
occurs.
"""
data_input = json.dumps(
{"ServerURL": server, "Username": username, "Secret": secret}
).encode("utf-8")
return self._execute("store", data_input)
def erase(self, server):
"""Erase credentials for `server`. Raises a `StoreError` if an error
occurs.
"""
if not isinstance(server, bytes):
server = server.encode("utf-8")
self._execute("erase", server)
def list(self):
"""List stored credentials. Requires v0.4.0+ of the helper."""
data = self._execute("list", None)
return json.loads(data.decode("utf-8"))
def _execute(self, subcmd, data_input):
output = None
env = create_environment_dict(self.environment)
try:
output = subprocess.check_output(
[self.exe, subcmd],
input=data_input,
env=env,
)
except subprocess.CalledProcessError as e:
raise errors.process_store_error(e, self.program)
except OSError as e:
if e.errno == errno.ENOENT:
raise errors.StoreError(
f"{self.program} not installed or not available in PATH"
)
else:
raise errors.StoreError(
f'Unexpected OS error "{e.strerror}", errno={e.errno}'
)
return output