Address redefined-outer-name.

This commit is contained in:
Felix Fontein 2025-10-25 01:39:59 +02:00
parent 18f1ee0a26
commit 40c240ac2b
7 changed files with 29 additions and 30 deletions

View File

@ -384,7 +384,6 @@ disable=raw-checker-failed,
import-error, # TODO figure out why pylint cannot find the module import-error, # TODO figure out why pylint cannot find the module
no-name-in-module, # TODO figure out why pylint cannot find the module no-name-in-module, # TODO figure out why pylint cannot find the module
protected-access, protected-access,
redefined-outer-name, # needed for test fixtures
subprocess-popen-preexec-fn, subprocess-popen-preexec-fn,
unexpected-keyword-arg, unexpected-keyword-arg,
unused-argument, unused-argument,

View File

@ -98,7 +98,7 @@ def create_archive(
extra_files = extra_files or [] extra_files = extra_files or []
if not fileobj: if not fileobj:
fileobj = tempfile.NamedTemporaryFile() fileobj = tempfile.NamedTemporaryFile()
t = tarfile.open(mode="w:gz" if gzip else "w", fileobj=fileobj) tarf = tarfile.open(mode="w:gz" if gzip else "w", fileobj=fileobj)
if files is None: if files is None:
files = build_file_list(root) files = build_file_list(root)
extra_names = set(e[0] for e in extra_files) extra_names = set(e[0] for e in extra_files)
@ -108,7 +108,7 @@ def create_archive(
continue continue
full_path = os.path.join(root, path) full_path = os.path.join(root, path)
i = t.gettarinfo(full_path, arcname=path) i = tarf.gettarinfo(full_path, arcname=path)
if i is None: if i is None:
# This happens when we encounter a socket file. We can safely # This happens when we encounter a socket file. We can safely
# ignore it and proceed. # ignore it and proceed.
@ -126,20 +126,20 @@ def create_archive(
if i.isfile(): if i.isfile():
try: try:
with open(full_path, "rb") as f: with open(full_path, "rb") as f:
t.addfile(i, f) tarf.addfile(i, f)
except IOError as exc: except IOError as exc:
raise IOError(f"Can not read file in context: {full_path}") from exc raise IOError(f"Can not read file in context: {full_path}") from exc
else: else:
# Directories, FIFOs, symlinks... do not need to be read. # Directories, FIFOs, symlinks... do not need to be read.
t.addfile(i, None) tarf.addfile(i, None)
for name, contents in extra_files: for name, contents in extra_files:
info = tarfile.TarInfo(name) info = tarfile.TarInfo(name)
contents_encoded = contents.encode("utf-8") contents_encoded = contents.encode("utf-8")
info.size = len(contents_encoded) info.size = len(contents_encoded)
t.addfile(info, io.BytesIO(contents_encoded)) tarf.addfile(info, io.BytesIO(contents_encoded))
t.close() tarf.close()
fileobj.seek(0) fileobj.seek(0)
return fileobj return fileobj
@ -147,7 +147,7 @@ def create_archive(
def mkbuildcontext(dockerfile: io.BytesIO | t.IO[bytes]) -> t.IO[bytes]: def mkbuildcontext(dockerfile: io.BytesIO | t.IO[bytes]) -> t.IO[bytes]:
f = tempfile.NamedTemporaryFile() # pylint: disable=consider-using-with f = tempfile.NamedTemporaryFile() # pylint: disable=consider-using-with
try: try:
with tarfile.open(mode="w", fileobj=f) as t: with tarfile.open(mode="w", fileobj=f) as tarf:
if isinstance(dockerfile, io.StringIO): # type: ignore if isinstance(dockerfile, io.StringIO): # type: ignore
raise TypeError("Please use io.BytesIO to create in-memory Dockerfiles") raise TypeError("Please use io.BytesIO to create in-memory Dockerfiles")
if isinstance(dockerfile, io.BytesIO): if isinstance(dockerfile, io.BytesIO):
@ -155,8 +155,8 @@ def mkbuildcontext(dockerfile: io.BytesIO | t.IO[bytes]) -> t.IO[bytes]:
dfinfo.size = len(dockerfile.getvalue()) dfinfo.size = len(dockerfile.getvalue())
dockerfile.seek(0) dockerfile.seek(0)
else: else:
dfinfo = t.gettarinfo(fileobj=dockerfile, arcname="Dockerfile") dfinfo = tarf.gettarinfo(fileobj=dockerfile, arcname="Dockerfile")
t.addfile(dfinfo, dockerfile) tarf.addfile(dfinfo, dockerfile)
f.seek(0) f.seek(0)
except Exception: # noqa: E722 except Exception: # noqa: E722
f.close() f.close()

View File

@ -790,17 +790,17 @@ def _preprocess_mounts(
) -> dict[str, t.Any]: ) -> dict[str, t.Any]:
last: dict[str, str] = {} last: dict[str, str] = {}
def check_collision(t: str, name: str) -> None: def check_collision(target: str, name: str) -> None:
if t in last: if target in last:
if name == last[t]: if name == last[target]:
module.fail_json( module.fail_json(
msg=f'The mount point "{t}" appears twice in the {name} option' msg=f'The mount point "{target}" appears twice in the {name} option'
) )
else: else:
module.fail_json( module.fail_json(
msg=f'The mount point "{t}" appears both in the {name} and {last[t]} option' msg=f'The mount point "{target}" appears both in the {name} and {last[target]} option'
) )
last[t] = name last[target] = name
if "mounts" in values: if "mounts" in values:
mounts = [] mounts = []

View File

@ -24,14 +24,14 @@ if t.TYPE_CHECKING:
from collections.abc import Callable from collections.abc import Callable
@pytest.fixture(scope="module") @pytest.fixture(scope="module", name="templar")
def templar() -> Templar: def templar_fixture() -> Templar:
dataloader = create_autospec(DataLoader, instance=True) dataloader = create_autospec(DataLoader, instance=True)
return Templar(loader=dataloader) return Templar(loader=dataloader)
@pytest.fixture(scope="module") @pytest.fixture(scope="module", name="inventory")
def inventory(templar: Templar) -> InventoryModule: def inventory_fixture(templar: Templar) -> InventoryModule:
r = InventoryModule() r = InventoryModule()
r.inventory = InventoryData() r.inventory = InventoryData()
r.templar = templar r.templar = templar

View File

@ -52,7 +52,7 @@ if t.TYPE_CHECKING:
DEFAULT_TIMEOUT_SECONDS = constants.DEFAULT_TIMEOUT_SECONDS DEFAULT_TIMEOUT_SECONDS = constants.DEFAULT_TIMEOUT_SECONDS
def response( def create_response(
status_code: int = 200, status_code: int = 200,
content: bytes | dict[str, t.Any] | list[dict[str, t.Any]] = b"", content: bytes | dict[str, t.Any] | list[dict[str, t.Any]] = b"",
headers: dict[str, str] | None = None, headers: dict[str, str] | None = None,
@ -95,7 +95,7 @@ def fake_resp(
if not key: if not key:
raise NotImplementedError(f"{method} {url}") raise NotImplementedError(f"{method} {url}")
status_code, content = fake_api.fake_responses[key]() status_code, content = fake_api.fake_responses[key]()
return response(status_code=status_code, content=content) return create_response(status_code=status_code, content=content)
fake_request = mock.Mock(side_effect=fake_resp) fake_request = mock.Mock(side_effect=fake_resp)
@ -328,26 +328,26 @@ class DockerApiTest(BaseAPIClientTest):
# pass `decode=False` to the helper # pass `decode=False` to the helper
raw_resp._fp.seek(0) raw_resp._fp.seek(0)
resp = response(status_code=status_code, content=content, raw=raw_resp) resp = create_response(status_code=status_code, content=content, raw=raw_resp)
result = next(self.client._stream_helper(resp)) result = next(self.client._stream_helper(resp))
assert result == content_str assert result == content_str
# pass `decode=True` to the helper # pass `decode=True` to the helper
raw_resp._fp.seek(0) raw_resp._fp.seek(0)
resp = response(status_code=status_code, content=content, raw=raw_resp) resp = create_response(status_code=status_code, content=content, raw=raw_resp)
result = next(self.client._stream_helper(resp, decode=True)) result = next(self.client._stream_helper(resp, decode=True))
assert result == content assert result == content
# non-chunked response, pass `decode=False` to the helper # non-chunked response, pass `decode=False` to the helper
setattr(raw_resp._fp, "chunked", False) setattr(raw_resp._fp, "chunked", False)
raw_resp._fp.seek(0) raw_resp._fp.seek(0)
resp = response(status_code=status_code, content=content, raw=raw_resp) resp = create_response(status_code=status_code, content=content, raw=raw_resp)
result = next(self.client._stream_helper(resp)) result = next(self.client._stream_helper(resp))
assert result == content_str.decode("utf-8") # type: ignore assert result == content_str.decode("utf-8") # type: ignore
# non-chunked response, pass `decode=True` to the helper # non-chunked response, pass `decode=True` to the helper
raw_resp._fp.seek(0) raw_resp._fp.seek(0)
resp = response(status_code=status_code, content=content, raw=raw_resp) resp = create_response(status_code=status_code, content=content, raw=raw_resp)
result = next(self.client._stream_helper(resp, decode=True)) result = next(self.client._stream_helper(resp, decode=True))
assert result == content assert result == content

View File

@ -22,8 +22,8 @@ from ..test_support.docker_image_archive_stubbing import (
) )
@pytest.fixture @pytest.fixture(name="tar_file_name")
def tar_file_name(tmpdir: t.Any) -> str: def tar_file_name_fixture(tmpdir: t.Any) -> str:
""" """
Return the name of a non-existing tar file in an existing temporary directory. Return the name of a non-existing tar file in an existing temporary directory.
""" """

View File

@ -37,8 +37,8 @@ def capture_logging(messages: list[str]) -> Callable[[str], None]:
return capture return capture
@pytest.fixture @pytest.fixture(name="tar_file_name")
def tar_file_name(tmpdir: t.Any) -> str: def tar_file_name_fixture(tmpdir: t.Any) -> str:
""" """
Return the name of a non-existing tar file in an existing temporary directory. Return the name of a non-existing tar file in an existing temporary directory.
""" """