# Copyright (c) 2019-2021, Felix Fontein # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later # Note that this module util is **PRIVATE** to the collection. It can have breaking changes at any time. # Do not use this from other collections or standalone plugins/modules! from __future__ import annotations import os import os.path import selectors import socket as pysocket import struct from ansible_collections.community.docker.plugins.module_utils._api.utils import ( socket as docker_socket, ) from ansible_collections.community.docker.plugins.module_utils._socket_helper import ( make_unblocking, shutdown_writing, write_to_socket, ) PARAMIKO_POLL_TIMEOUT = 0.01 # 10 milliseconds class DockerSocketHandlerBase: def __init__(self, sock, log=None): make_unblocking(sock) if log is not None: self._log = log else: self._log = lambda msg: True self._paramiko_read_workaround = hasattr( sock, "send_ready" ) and "paramiko" in str(type(sock)) self._sock = sock self._block_done_callback = None self._block_buffer = [] self._eof = False self._read_buffer = b"" self._write_buffer = b"" self._end_of_writing = False self._current_stream = None self._current_missing = 0 self._current_buffer = b"" self._selector = selectors.DefaultSelector() self._selector.register(self._sock, selectors.EVENT_READ) def __enter__(self): return self def __exit__(self, type_, value, tb): self._selector.close() def set_block_done_callback(self, block_done_callback): self._block_done_callback = block_done_callback if self._block_done_callback is not None: while self._block_buffer: elt = self._block_buffer.pop(0) self._block_done_callback(*elt) def _add_block(self, stream_id, data): if self._block_done_callback is not None: self._block_done_callback(stream_id, data) else: self._block_buffer.append((stream_id, data)) def _read(self): if self._eof: return if hasattr(self._sock, "recv"): try: data = self._sock.recv(262144) except Exception as e: # pylint: disable=broad-exception-caught # After calling self._sock.shutdown(), OpenSSL's/urllib3's # WrappedSocket seems to eventually raise ZeroReturnError in # case of EOF if "OpenSSL.SSL.ZeroReturnError" in str(type(e)): self._eof = True return raise elif isinstance(self._sock, getattr(pysocket, "SocketIO")): data = self._sock.read() else: data = os.read(self._sock.fileno()) if data is None: # no data available return self._log(f"read {len(data)} bytes") if len(data) == 0: # Stream EOF self._eof = True return self._read_buffer += data while len(self._read_buffer) > 0: if self._current_missing > 0: n = min(len(self._read_buffer), self._current_missing) self._current_buffer += self._read_buffer[:n] self._read_buffer = self._read_buffer[n:] self._current_missing -= n if self._current_missing == 0: self._add_block(self._current_stream, self._current_buffer) self._current_buffer = b"" if len(self._read_buffer) < 8: break self._current_stream, self._current_missing = struct.unpack( ">BxxxL", self._read_buffer[:8] ) self._read_buffer = self._read_buffer[8:] if self._current_missing < 0: # Stream EOF (as reported by docker daemon) self._eof = True break def _handle_end_of_writing(self): if self._end_of_writing and len(self._write_buffer) == 0: self._end_of_writing = False self._log("Shutting socket down for writing") shutdown_writing(self._sock, self._log) def _write(self): if len(self._write_buffer) > 0: written = write_to_socket(self._sock, self._write_buffer) self._write_buffer = self._write_buffer[written:] self._log(f"wrote {written} bytes, {len(self._write_buffer)} are left") if len(self._write_buffer) > 0: self._selector.modify( self._sock, selectors.EVENT_READ | selectors.EVENT_WRITE ) else: self._selector.modify(self._sock, selectors.EVENT_READ) self._handle_end_of_writing() def select(self, timeout=None, _internal_recursion=False): if ( not _internal_recursion and self._paramiko_read_workaround and len(self._write_buffer) > 0 ): # When the SSH transport is used, Docker SDK for Python internally uses Paramiko, whose # Channel object supports select(), but only for reading # (https://github.com/paramiko/paramiko/issues/695). if self._sock.send_ready(): self._write() return True while timeout is None or timeout > PARAMIKO_POLL_TIMEOUT: result = self.select(PARAMIKO_POLL_TIMEOUT, _internal_recursion=True) if self._sock.send_ready(): self._read() result += 1 if result > 0: return True if timeout is not None: timeout -= PARAMIKO_POLL_TIMEOUT self._log(f"select... ({timeout})") events = self._selector.select(timeout) for key, event in events: if key.fileobj == self._sock: ev_read = event & selectors.EVENT_READ != 0 ev_write = event & selectors.EVENT_WRITE != 0 self._log(f"select event read:{ev_read} write:{ev_write}") if event & selectors.EVENT_READ != 0: self._read() if event & selectors.EVENT_WRITE != 0: self._write() result = len(events) if self._paramiko_read_workaround and len(self._write_buffer) > 0: if self._sock.send_ready(): self._write() result += 1 return result > 0 def is_eof(self): return self._eof def end_of_writing(self): self._end_of_writing = True self._handle_end_of_writing() def consume(self): stdout = [] stderr = [] def append_block(stream_id, data): if stream_id == docker_socket.STDOUT: stdout.append(data) elif stream_id == docker_socket.STDERR: stderr.append(data) else: raise ValueError(f"{stream_id} is not a valid stream ID") self.end_of_writing() self.set_block_done_callback(append_block) while not self._eof: self.select() return b"".join(stdout), b"".join(stderr) def write(self, str_to_write): self._write_buffer += str_to_write if len(self._write_buffer) == len(str_to_write): self._write() class DockerSocketHandlerModule(DockerSocketHandlerBase): def __init__(self, sock, module): super().__init__(sock, module.debug)