mirror of
https://github.com/ansible-collections/community.docker.git
synced 2026-03-16 12:06:29 +00:00
Address some consider-using-with.
This commit is contained in:
parent
9bb3f8c3b3
commit
c7399b7c38
@ -182,10 +182,10 @@ class Connection(ConnectionBase):
|
||||
old_version_subcommand = ["version"]
|
||||
|
||||
old_docker_cmd = [self.docker_cmd] + cmd_args + old_version_subcommand
|
||||
p = subprocess.Popen(
|
||||
with subprocess.Popen(
|
||||
old_docker_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
||||
)
|
||||
cmd_output, err = p.communicate()
|
||||
) as p:
|
||||
cmd_output, err = p.communicate()
|
||||
|
||||
return old_docker_cmd, to_native(cmd_output), err, p.returncode
|
||||
|
||||
@ -196,11 +196,11 @@ class Connection(ConnectionBase):
|
||||
new_version_subcommand = ["version", "--format", "'{{.Server.Version}}'"]
|
||||
|
||||
new_docker_cmd = [self.docker_cmd] + cmd_args + new_version_subcommand
|
||||
p = subprocess.Popen(
|
||||
with subprocess.Popen(
|
||||
new_docker_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
||||
)
|
||||
cmd_output, err = p.communicate()
|
||||
return new_docker_cmd, to_native(cmd_output), err, p.returncode
|
||||
) as p:
|
||||
cmd_output, err = p.communicate()
|
||||
return new_docker_cmd, to_native(cmd_output), err, p.returncode
|
||||
|
||||
def _get_docker_version(self):
|
||||
|
||||
@ -223,21 +223,20 @@ class Connection(ConnectionBase):
|
||||
container = self.get_option("remote_addr")
|
||||
if container in self._container_user_cache:
|
||||
return self._container_user_cache[container]
|
||||
p = subprocess.Popen(
|
||||
with subprocess.Popen(
|
||||
[self.docker_cmd, "inspect", "--format", "{{.Config.User}}", container],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
) as p:
|
||||
out, err = p.communicate()
|
||||
out = to_text(out, errors="surrogate_or_strict")
|
||||
|
||||
out, err = p.communicate()
|
||||
out = to_text(out, errors="surrogate_or_strict")
|
||||
|
||||
if p.returncode != 0:
|
||||
display.warning(
|
||||
f"unable to retrieve default user from docker container: {out} {to_text(err)}"
|
||||
)
|
||||
self._container_user_cache[container] = None
|
||||
return None
|
||||
if p.returncode != 0:
|
||||
display.warning(
|
||||
f"unable to retrieve default user from docker container: {out} {to_text(err)}"
|
||||
)
|
||||
self._container_user_cache[container] = None
|
||||
return None
|
||||
|
||||
# The default exec user is root, unless it was changed in the Dockerfile with USER
|
||||
user = out.strip() or "root"
|
||||
@ -392,87 +391,87 @@ class Connection(ConnectionBase):
|
||||
|
||||
local_cmd = [to_bytes(i, errors="surrogate_or_strict") for i in local_cmd]
|
||||
|
||||
p = subprocess.Popen(
|
||||
with subprocess.Popen(
|
||||
local_cmd,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
display.debug("done running command with Popen()")
|
||||
) as p:
|
||||
display.debug("done running command with Popen()")
|
||||
|
||||
if self.become and self.become.expect_prompt() and sudoable:
|
||||
fcntl.fcntl(
|
||||
p.stdout,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stdout, fcntl.F_GETFL) | os.O_NONBLOCK,
|
||||
)
|
||||
fcntl.fcntl(
|
||||
p.stderr,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stderr, fcntl.F_GETFL) | os.O_NONBLOCK,
|
||||
)
|
||||
selector = selectors.DefaultSelector()
|
||||
selector.register(p.stdout, selectors.EVENT_READ)
|
||||
selector.register(p.stderr, selectors.EVENT_READ)
|
||||
|
||||
become_output = b""
|
||||
try:
|
||||
while not self.become.check_success(
|
||||
become_output
|
||||
) and not self.become.check_password_prompt(become_output):
|
||||
events = selector.select(self.timeout)
|
||||
if not events:
|
||||
stdout, stderr = p.communicate()
|
||||
raise AnsibleError(
|
||||
"timeout waiting for privilege escalation password prompt:\n"
|
||||
+ to_native(become_output)
|
||||
)
|
||||
|
||||
chunks = b""
|
||||
for key, event in events:
|
||||
if key.fileobj == p.stdout:
|
||||
chunk = p.stdout.read()
|
||||
if chunk:
|
||||
chunks += chunk
|
||||
elif key.fileobj == p.stderr:
|
||||
chunk = p.stderr.read()
|
||||
if chunk:
|
||||
chunks += chunk
|
||||
|
||||
if not chunks:
|
||||
stdout, stderr = p.communicate()
|
||||
raise AnsibleError(
|
||||
"privilege output closed while waiting for password prompt:\n"
|
||||
+ to_native(become_output)
|
||||
)
|
||||
become_output += chunks
|
||||
finally:
|
||||
selector.close()
|
||||
|
||||
if not self.become.check_success(become_output):
|
||||
become_pass = self.become.get_option(
|
||||
"become_pass", playcontext=self._play_context
|
||||
if self.become and self.become.expect_prompt() and sudoable:
|
||||
fcntl.fcntl(
|
||||
p.stdout,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stdout, fcntl.F_GETFL) | os.O_NONBLOCK,
|
||||
)
|
||||
p.stdin.write(
|
||||
to_bytes(become_pass, errors="surrogate_or_strict") + b"\n"
|
||||
fcntl.fcntl(
|
||||
p.stderr,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stderr, fcntl.F_GETFL) | os.O_NONBLOCK,
|
||||
)
|
||||
fcntl.fcntl(
|
||||
p.stdout,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stdout, fcntl.F_GETFL) & ~os.O_NONBLOCK,
|
||||
)
|
||||
fcntl.fcntl(
|
||||
p.stderr,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stderr, fcntl.F_GETFL) & ~os.O_NONBLOCK,
|
||||
)
|
||||
selector = selectors.DefaultSelector()
|
||||
selector.register(p.stdout, selectors.EVENT_READ)
|
||||
selector.register(p.stderr, selectors.EVENT_READ)
|
||||
|
||||
display.debug("getting output with communicate()")
|
||||
stdout, stderr = p.communicate(in_data)
|
||||
display.debug("done communicating")
|
||||
become_output = b""
|
||||
try:
|
||||
while not self.become.check_success(
|
||||
become_output
|
||||
) and not self.become.check_password_prompt(become_output):
|
||||
events = selector.select(self.timeout)
|
||||
if not events:
|
||||
stdout, stderr = p.communicate()
|
||||
raise AnsibleError(
|
||||
"timeout waiting for privilege escalation password prompt:\n"
|
||||
+ to_native(become_output)
|
||||
)
|
||||
|
||||
display.debug("done with docker.exec_command()")
|
||||
return (p.returncode, stdout, stderr)
|
||||
chunks = b""
|
||||
for key, event in events:
|
||||
if key.fileobj == p.stdout:
|
||||
chunk = p.stdout.read()
|
||||
if chunk:
|
||||
chunks += chunk
|
||||
elif key.fileobj == p.stderr:
|
||||
chunk = p.stderr.read()
|
||||
if chunk:
|
||||
chunks += chunk
|
||||
|
||||
if not chunks:
|
||||
stdout, stderr = p.communicate()
|
||||
raise AnsibleError(
|
||||
"privilege output closed while waiting for password prompt:\n"
|
||||
+ to_native(become_output)
|
||||
)
|
||||
become_output += chunks
|
||||
finally:
|
||||
selector.close()
|
||||
|
||||
if not self.become.check_success(become_output):
|
||||
become_pass = self.become.get_option(
|
||||
"become_pass", playcontext=self._play_context
|
||||
)
|
||||
p.stdin.write(
|
||||
to_bytes(become_pass, errors="surrogate_or_strict") + b"\n"
|
||||
)
|
||||
fcntl.fcntl(
|
||||
p.stdout,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stdout, fcntl.F_GETFL) & ~os.O_NONBLOCK,
|
||||
)
|
||||
fcntl.fcntl(
|
||||
p.stderr,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stderr, fcntl.F_GETFL) & ~os.O_NONBLOCK,
|
||||
)
|
||||
|
||||
display.debug("getting output with communicate()")
|
||||
stdout, stderr = p.communicate(in_data)
|
||||
display.debug("done communicating")
|
||||
|
||||
display.debug("done with docker.exec_command()")
|
||||
return (p.returncode, stdout, stderr)
|
||||
|
||||
def _prefix_login_path(self, remote_path):
|
||||
"""Make sure that we put files into a standard path
|
||||
@ -559,45 +558,45 @@ class Connection(ConnectionBase):
|
||||
]
|
||||
args = [to_bytes(i, errors="surrogate_or_strict") for i in args]
|
||||
|
||||
p = subprocess.Popen(
|
||||
with subprocess.Popen(
|
||||
args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
||||
)
|
||||
p.communicate()
|
||||
) as p:
|
||||
p.communicate()
|
||||
|
||||
if getattr(self._shell, "_IS_WINDOWS", False):
|
||||
import ntpath
|
||||
if getattr(self._shell, "_IS_WINDOWS", False):
|
||||
import ntpath
|
||||
|
||||
actual_out_path = ntpath.join(out_dir, ntpath.basename(in_path))
|
||||
else:
|
||||
actual_out_path = os.path.join(out_dir, os.path.basename(in_path))
|
||||
actual_out_path = ntpath.join(out_dir, ntpath.basename(in_path))
|
||||
else:
|
||||
actual_out_path = os.path.join(out_dir, os.path.basename(in_path))
|
||||
|
||||
if p.returncode != 0:
|
||||
# Older docker does not have native support for fetching files command `cp`
|
||||
# If `cp` fails, try to use `dd` instead
|
||||
args = self._build_exec_cmd(
|
||||
[self._play_context.executable, "-c", f"dd if={in_path} bs={BUFSIZE}"]
|
||||
)
|
||||
args = [to_bytes(i, errors="surrogate_or_strict") for i in args]
|
||||
with open(
|
||||
to_bytes(actual_out_path, errors="surrogate_or_strict"), "wb"
|
||||
) as out_file:
|
||||
try:
|
||||
p = subprocess.Popen(
|
||||
args,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=out_file,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
except OSError:
|
||||
raise AnsibleError(
|
||||
"docker connection requires dd command in the container to put files"
|
||||
)
|
||||
stdout, stderr = p.communicate()
|
||||
if p.returncode != 0:
|
||||
# Older docker does not have native support for fetching files command `cp`
|
||||
# If `cp` fails, try to use `dd` instead
|
||||
args = self._build_exec_cmd(
|
||||
[self._play_context.executable, "-c", f"dd if={in_path} bs={BUFSIZE}"]
|
||||
)
|
||||
args = [to_bytes(i, errors="surrogate_or_strict") for i in args]
|
||||
with open(
|
||||
to_bytes(actual_out_path, errors="surrogate_or_strict"), "wb"
|
||||
) as out_file:
|
||||
try:
|
||||
pp = subprocess.Popen(
|
||||
args,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=out_file,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
except OSError:
|
||||
raise AnsibleError(
|
||||
"docker connection requires dd command in the container to put files"
|
||||
)
|
||||
stdout, stderr = pp.communicate()
|
||||
|
||||
if p.returncode != 0:
|
||||
raise AnsibleError(
|
||||
f"failed to fetch file {in_path} to {out_path}:\n{stdout}\n{stderr}"
|
||||
)
|
||||
if pp.returncode != 0:
|
||||
raise AnsibleError(
|
||||
f"failed to fetch file {in_path} to {out_path}:\n{stdout}\n{stderr}"
|
||||
)
|
||||
|
||||
# Rename if needed
|
||||
if actual_out_path != out_path:
|
||||
|
||||
@ -134,7 +134,7 @@ class Connection(ConnectionBase):
|
||||
except (IOError, OSError) as e:
|
||||
display.debug(f"Unable to open pty: {e}")
|
||||
|
||||
p = subprocess.Popen(
|
||||
with subprocess.Popen(
|
||||
cmd,
|
||||
shell=isinstance(cmd, (str, bytes)),
|
||||
executable=executable if isinstance(cmd, (str, bytes)) else None,
|
||||
@ -142,98 +142,97 @@ class Connection(ConnectionBase):
|
||||
stdin=stdin,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
) as p:
|
||||
# if we created a master, we can close the other half of the pty now, otherwise master is stdin
|
||||
if master is not None:
|
||||
os.close(stdin)
|
||||
|
||||
# if we created a master, we can close the other half of the pty now, otherwise master is stdin
|
||||
if master is not None:
|
||||
os.close(stdin)
|
||||
display.debug("done running command with Popen()")
|
||||
|
||||
display.debug("done running command with Popen()")
|
||||
|
||||
if self.become and self.become.expect_prompt() and sudoable:
|
||||
fcntl.fcntl(
|
||||
p.stdout,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stdout, fcntl.F_GETFL) | os.O_NONBLOCK,
|
||||
)
|
||||
fcntl.fcntl(
|
||||
p.stderr,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stderr, fcntl.F_GETFL) | os.O_NONBLOCK,
|
||||
)
|
||||
selector = selectors.DefaultSelector()
|
||||
selector.register(p.stdout, selectors.EVENT_READ)
|
||||
selector.register(p.stderr, selectors.EVENT_READ)
|
||||
|
||||
become_output = b""
|
||||
try:
|
||||
while not self.become.check_success(
|
||||
become_output
|
||||
) and not self.become.check_password_prompt(become_output):
|
||||
events = selector.select(self._play_context.timeout)
|
||||
if not events:
|
||||
stdout, stderr = p.communicate()
|
||||
raise AnsibleError(
|
||||
"timeout waiting for privilege escalation password prompt:\n"
|
||||
+ to_native(become_output)
|
||||
)
|
||||
|
||||
chunks = b""
|
||||
for key, event in events:
|
||||
if key.fileobj == p.stdout:
|
||||
chunk = p.stdout.read()
|
||||
if chunk:
|
||||
chunks += chunk
|
||||
elif key.fileobj == p.stderr:
|
||||
chunk = p.stderr.read()
|
||||
if chunk:
|
||||
chunks += chunk
|
||||
|
||||
if not chunks:
|
||||
stdout, stderr = p.communicate()
|
||||
raise AnsibleError(
|
||||
"privilege output closed while waiting for password prompt:\n"
|
||||
+ to_native(become_output)
|
||||
)
|
||||
become_output += chunks
|
||||
finally:
|
||||
selector.close()
|
||||
|
||||
if not self.become.check_success(become_output):
|
||||
become_pass = self.become.get_option(
|
||||
"become_pass", playcontext=self._play_context
|
||||
if self.become and self.become.expect_prompt() and sudoable:
|
||||
fcntl.fcntl(
|
||||
p.stdout,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stdout, fcntl.F_GETFL) | os.O_NONBLOCK,
|
||||
)
|
||||
if master is None:
|
||||
p.stdin.write(
|
||||
to_bytes(become_pass, errors="surrogate_or_strict") + b"\n"
|
||||
)
|
||||
else:
|
||||
os.write(
|
||||
master,
|
||||
to_bytes(become_pass, errors="surrogate_or_strict") + b"\n",
|
||||
fcntl.fcntl(
|
||||
p.stderr,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stderr, fcntl.F_GETFL) | os.O_NONBLOCK,
|
||||
)
|
||||
selector = selectors.DefaultSelector()
|
||||
selector.register(p.stdout, selectors.EVENT_READ)
|
||||
selector.register(p.stderr, selectors.EVENT_READ)
|
||||
|
||||
become_output = b""
|
||||
try:
|
||||
while not self.become.check_success(
|
||||
become_output
|
||||
) and not self.become.check_password_prompt(become_output):
|
||||
events = selector.select(self._play_context.timeout)
|
||||
if not events:
|
||||
stdout, stderr = p.communicate()
|
||||
raise AnsibleError(
|
||||
"timeout waiting for privilege escalation password prompt:\n"
|
||||
+ to_native(become_output)
|
||||
)
|
||||
|
||||
chunks = b""
|
||||
for key, event in events:
|
||||
if key.fileobj == p.stdout:
|
||||
chunk = p.stdout.read()
|
||||
if chunk:
|
||||
chunks += chunk
|
||||
elif key.fileobj == p.stderr:
|
||||
chunk = p.stderr.read()
|
||||
if chunk:
|
||||
chunks += chunk
|
||||
|
||||
if not chunks:
|
||||
stdout, stderr = p.communicate()
|
||||
raise AnsibleError(
|
||||
"privilege output closed while waiting for password prompt:\n"
|
||||
+ to_native(become_output)
|
||||
)
|
||||
become_output += chunks
|
||||
finally:
|
||||
selector.close()
|
||||
|
||||
if not self.become.check_success(become_output):
|
||||
become_pass = self.become.get_option(
|
||||
"become_pass", playcontext=self._play_context
|
||||
)
|
||||
if master is None:
|
||||
p.stdin.write(
|
||||
to_bytes(become_pass, errors="surrogate_or_strict") + b"\n"
|
||||
)
|
||||
else:
|
||||
os.write(
|
||||
master,
|
||||
to_bytes(become_pass, errors="surrogate_or_strict") + b"\n",
|
||||
)
|
||||
|
||||
fcntl.fcntl(
|
||||
p.stdout,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stdout, fcntl.F_GETFL) & ~os.O_NONBLOCK,
|
||||
)
|
||||
fcntl.fcntl(
|
||||
p.stderr,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stderr, fcntl.F_GETFL) & ~os.O_NONBLOCK,
|
||||
)
|
||||
fcntl.fcntl(
|
||||
p.stdout,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stdout, fcntl.F_GETFL) & ~os.O_NONBLOCK,
|
||||
)
|
||||
fcntl.fcntl(
|
||||
p.stderr,
|
||||
fcntl.F_SETFL,
|
||||
fcntl.fcntl(p.stderr, fcntl.F_GETFL) & ~os.O_NONBLOCK,
|
||||
)
|
||||
|
||||
display.debug("getting output with communicate()")
|
||||
stdout, stderr = p.communicate(in_data)
|
||||
display.debug("done communicating")
|
||||
display.debug("getting output with communicate()")
|
||||
stdout, stderr = p.communicate(in_data)
|
||||
display.debug("done communicating")
|
||||
|
||||
# finally, close the other half of the pty, if it was created
|
||||
if master:
|
||||
os.close(master)
|
||||
# finally, close the other half of the pty, if it was created
|
||||
if master:
|
||||
os.close(master)
|
||||
|
||||
display.debug("done with nsenter.exec_command()")
|
||||
return (p.returncode, stdout, stderr)
|
||||
display.debug("done with nsenter.exec_command()")
|
||||
return (p.returncode, stdout, stderr)
|
||||
|
||||
def put_file(self, in_path, out_path):
|
||||
super().put_file(in_path, out_path)
|
||||
|
||||
@ -72,7 +72,7 @@ class SSHSocket(socket.socket):
|
||||
env.pop("LD_LIBRARY_PATH", None)
|
||||
env.pop("SSL_CERT_FILE", None)
|
||||
|
||||
self.proc = subprocess.Popen(
|
||||
self.proc = subprocess.Popen( # pylint: disable=consider-using-with
|
||||
args,
|
||||
env=env,
|
||||
stdout=subprocess.PIPE,
|
||||
|
||||
@ -125,19 +125,22 @@ def create_archive(root, files=None, fileobj=None, gzip=False, extra_files=None)
|
||||
|
||||
|
||||
def mkbuildcontext(dockerfile):
|
||||
f = tempfile.NamedTemporaryFile()
|
||||
t = tarfile.open(mode="w", fileobj=f)
|
||||
if isinstance(dockerfile, io.StringIO):
|
||||
raise TypeError("Please use io.BytesIO to create in-memory Dockerfiles")
|
||||
elif isinstance(dockerfile, io.BytesIO):
|
||||
dfinfo = tarfile.TarInfo("Dockerfile")
|
||||
dfinfo.size = len(dockerfile.getvalue())
|
||||
dockerfile.seek(0)
|
||||
else:
|
||||
dfinfo = t.gettarinfo(fileobj=dockerfile, arcname="Dockerfile")
|
||||
t.addfile(dfinfo, dockerfile)
|
||||
t.close()
|
||||
f.seek(0)
|
||||
f = tempfile.NamedTemporaryFile() # pylint: disable=consider-using-with
|
||||
try:
|
||||
with tarfile.open(mode="w", fileobj=f) as t:
|
||||
if isinstance(dockerfile, io.StringIO):
|
||||
raise TypeError("Please use io.BytesIO to create in-memory Dockerfiles")
|
||||
elif isinstance(dockerfile, io.BytesIO):
|
||||
dfinfo = tarfile.TarInfo("Dockerfile")
|
||||
dfinfo.size = len(dockerfile.getvalue())
|
||||
dockerfile.seek(0)
|
||||
else:
|
||||
dfinfo = t.gettarinfo(fileobj=dockerfile, arcname="Dockerfile")
|
||||
t.addfile(dfinfo, dockerfile)
|
||||
f.seek(0)
|
||||
except Exception: # noqa: E722
|
||||
f.close()
|
||||
raise
|
||||
return f
|
||||
|
||||
|
||||
|
||||
@ -417,8 +417,8 @@ class TarTest(unittest.TestCase):
|
||||
self.addCleanup(shutil.rmtree, base)
|
||||
|
||||
with tar(base, exclude=exclude) as archive:
|
||||
tar_data = tarfile.open(fileobj=archive)
|
||||
assert sorted(tar_data.getnames()) == sorted(expected_names)
|
||||
with tarfile.open(fileobj=archive) as tar_data:
|
||||
assert sorted(tar_data.getnames()) == sorted(expected_names)
|
||||
|
||||
def test_tar_with_empty_directory(self):
|
||||
base = tempfile.mkdtemp()
|
||||
@ -426,8 +426,8 @@ class TarTest(unittest.TestCase):
|
||||
for d in ["foo", "bar"]:
|
||||
os.makedirs(os.path.join(base, d))
|
||||
with tar(base) as archive:
|
||||
tar_data = tarfile.open(fileobj=archive)
|
||||
assert sorted(tar_data.getnames()) == ["bar", "foo"]
|
||||
with tarfile.open(fileobj=archive) as tar_data:
|
||||
assert sorted(tar_data.getnames()) == ["bar", "foo"]
|
||||
|
||||
@pytest.mark.skipif(
|
||||
IS_WINDOWS_PLATFORM or os.geteuid() == 0,
|
||||
@ -454,8 +454,8 @@ class TarTest(unittest.TestCase):
|
||||
os.makedirs(os.path.join(base, "bar"))
|
||||
os.symlink("../foo", os.path.join(base, "bar/foo"))
|
||||
with tar(base) as archive:
|
||||
tar_data = tarfile.open(fileobj=archive)
|
||||
assert sorted(tar_data.getnames()) == ["bar", "bar/foo", "foo"]
|
||||
with tarfile.open(fileobj=archive) as tar_data:
|
||||
assert sorted(tar_data.getnames()) == ["bar", "bar/foo", "foo"]
|
||||
|
||||
@pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason="No symlinks on Windows")
|
||||
def test_tar_with_directory_symlinks(self):
|
||||
@ -465,8 +465,8 @@ class TarTest(unittest.TestCase):
|
||||
os.makedirs(os.path.join(base, d))
|
||||
os.symlink("../foo", os.path.join(base, "bar/foo"))
|
||||
with tar(base) as archive:
|
||||
tar_data = tarfile.open(fileobj=archive)
|
||||
assert sorted(tar_data.getnames()) == ["bar", "bar/foo", "foo"]
|
||||
with tarfile.open(fileobj=archive) as tar_data:
|
||||
assert sorted(tar_data.getnames()) == ["bar", "bar/foo", "foo"]
|
||||
|
||||
@pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason="No symlinks on Windows")
|
||||
def test_tar_with_broken_symlinks(self):
|
||||
@ -477,8 +477,8 @@ class TarTest(unittest.TestCase):
|
||||
|
||||
os.symlink("../baz", os.path.join(base, "bar/foo"))
|
||||
with tar(base) as archive:
|
||||
tar_data = tarfile.open(fileobj=archive)
|
||||
assert sorted(tar_data.getnames()) == ["bar", "bar/foo", "foo"]
|
||||
with tarfile.open(fileobj=archive) as tar_data:
|
||||
assert sorted(tar_data.getnames()) == ["bar", "bar/foo", "foo"]
|
||||
|
||||
@pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason="No UNIX sockets on Win32")
|
||||
def test_tar_socket_file(self):
|
||||
@ -490,8 +490,8 @@ class TarTest(unittest.TestCase):
|
||||
self.addCleanup(sock.close)
|
||||
sock.bind(os.path.join(base, "test.sock"))
|
||||
with tar(base) as archive:
|
||||
tar_data = tarfile.open(fileobj=archive)
|
||||
assert sorted(tar_data.getnames()) == ["bar", "foo"]
|
||||
with tarfile.open(fileobj=archive) as tar_data:
|
||||
assert sorted(tar_data.getnames()) == ["bar", "foo"]
|
||||
|
||||
def tar_test_negative_mtime_bug(self):
|
||||
base = tempfile.mkdtemp()
|
||||
@ -501,9 +501,9 @@ class TarTest(unittest.TestCase):
|
||||
f.write("Invisible Full Moon")
|
||||
os.utime(filename, (12345, -3600.0))
|
||||
with tar(base) as archive:
|
||||
tar_data = tarfile.open(fileobj=archive)
|
||||
assert tar_data.getnames() == ["th.txt"]
|
||||
assert tar_data.getmember("th.txt").mtime == -3600
|
||||
with tarfile.open(fileobj=archive) as tar_data:
|
||||
assert tar_data.getnames() == ["th.txt"]
|
||||
assert tar_data.getmember("th.txt").mtime == -3600
|
||||
|
||||
@pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason="No symlinks on Windows")
|
||||
def test_tar_directory_link(self):
|
||||
@ -513,8 +513,8 @@ class TarTest(unittest.TestCase):
|
||||
self.addCleanup(shutil.rmtree, base)
|
||||
os.symlink(os.path.join(base, "b"), os.path.join(base, "a/c/b"))
|
||||
with tar(base) as archive:
|
||||
tar_data = tarfile.open(fileobj=archive)
|
||||
names = tar_data.getnames()
|
||||
with tarfile.open(fileobj=archive) as tar_data:
|
||||
names = tar_data.getnames()
|
||||
for member in dirs + files:
|
||||
assert member in names
|
||||
assert "a/c/b" in names
|
||||
|
||||
@ -205,9 +205,8 @@ class ParseEnvFileTest(unittest.TestCase):
|
||||
of 'file_content' and returns the filename.
|
||||
Don't forget to unlink the file with os.unlink() after.
|
||||
"""
|
||||
local_tempfile = tempfile.NamedTemporaryFile(delete=False)
|
||||
local_tempfile.write(file_content.encode("UTF-8"))
|
||||
local_tempfile.close()
|
||||
with tempfile.NamedTemporaryFile(delete=False) as local_tempfile:
|
||||
local_tempfile.write(file_content.encode("UTF-8"))
|
||||
return local_tempfile.name
|
||||
|
||||
def test_parse_env_file_proper(self):
|
||||
|
||||
@ -50,8 +50,7 @@ def write_irrelevant_tar(file_name):
|
||||
:type file_name: str
|
||||
"""
|
||||
|
||||
tf = tarfile.open(file_name, "w")
|
||||
try:
|
||||
with tarfile.open(file_name, "w") as tf:
|
||||
with TemporaryFile() as f:
|
||||
f.write("Hello, world.".encode("utf-8"))
|
||||
|
||||
@ -60,6 +59,3 @@ def write_irrelevant_tar(file_name):
|
||||
|
||||
f.seek(0)
|
||||
tf.addfile(ti, f)
|
||||
|
||||
finally:
|
||||
tf.close()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user