Source code for coalib.misc.ContextManagers
import builtins
import os
import platform
import signal
import sys
import tempfile
import threading
from contextlib import closing, contextmanager
from io import StringIO
from coalib.misc.MutableValue import MutableValue
@contextmanager
[docs]def subprocess_timeout(sub_process, seconds, kill_pg=False):
"""
Kill subprocess if the sub process takes more the than the timeout.
:param sub_process: The sub process to run.
:param seconds: The number of seconds to allow the test to run for. If
set to 0 or a negative value, it waits indefinitely.
Floats can be used to specify units smaller than
seconds.
:param kill_pg: Boolean whether to kill the process group or only this
process. (not applicable for windows)
"""
timedout = MutableValue(False)
if seconds <= 0:
yield timedout
return
finished = threading.Event()
if platform.system() == 'Windows': # pragma: no cover
kill_pg = False
def kill_it():
finished.wait(seconds)
if not finished.is_set():
timedout.value = True
if kill_pg:
pgid = os.getpgid(sub_process.pid)
os.kill(sub_process.pid, signal.SIGINT)
if kill_pg:
os.killpg(pgid, signal.SIGINT)
thread = threading.Thread(name='timeout-killer', target=kill_it)
try:
thread.start()
yield timedout
finally:
finished.set()
thread.join()
@contextmanager
[docs]def replace_stdout(replacement):
"""
Replaces stdout with the replacement, yields back to the caller and then
reverts everything back.
"""
_stdout = sys.stdout
sys.stdout = replacement
try:
yield
finally:
sys.stdout = _stdout
@contextmanager
[docs]def replace_stderr(replacement):
"""
Replaces stderr with the replacement, yields back to the caller and then
reverts everything back.
"""
_stderr = sys.stderr
sys.stderr = replacement
try:
yield
finally:
sys.stderr = _stderr
@contextmanager
[docs]def suppress_stdout():
"""
Suppresses everything going to stdout.
"""
with open(os.devnull, 'w') as devnull, replace_stdout(devnull):
yield
@contextmanager
[docs]def retrieve_stdout():
"""
Yields a StringIO object from which one can read everything that was
printed to stdout. (It won't be printed to the real stdout!)
Example usage:
with retrieve_stdout() as stdout:
print("something") # Won't print to the console
what_was_printed = stdout.getvalue() # Save the value
"""
with closing(StringIO()) as sio, replace_stdout(sio):
oldprint = builtins.print
try:
# Overriding stdout doesn't work with libraries, this ensures even
# cached variables take this up. Well... it works.
def newprint(*args, **kwargs):
kwargs['file'] = sio
oldprint(*args, **kwargs)
builtins.print = newprint
yield sio
finally:
builtins.print = oldprint
@contextmanager
[docs]def retrieve_stderr():
"""
Yields a StringIO object from which one can read everything that was
printed to stderr. (It won't be printed to the real stderr!)
Example usage:
with retrieve_stderr() as stderr:
print("something") # Won't print to the console
what_was_printed = stderr.getvalue() # Save the value
"""
with closing(StringIO()) as sio, replace_stderr(sio):
oldprint = builtins.print
try:
# Overriding stderr doesn't work with libraries, this ensures even
# cached variables take this up. Well... it works.
def newprint(*args, **kwargs):
kwargs['file'] = sio
oldprint(*args, **kwargs)
builtins.print = newprint
yield sio
finally:
builtins.print = oldprint
@contextmanager
@contextmanager
[docs]def make_temp(suffix='', prefix='tmp', dir=None):
"""
Creates a temporary file with a closed stream and deletes it when done.
:return: A contextmanager retrieving the file path.
"""
temporary = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
os.close(temporary[0])
try:
yield temporary[1]
finally:
os.remove(temporary[1])
@contextmanager
[docs]def prepare_file(lines,
filename,
force_linebreaks=True,
create_tempfile=True,
tempfile_kwargs={}):
"""
Can create a temporary file (if filename is None) with the lines.
Can also add a trailing newline to each line specified if needed.
:param lines: The lines from the file. (list or tuple of strings)
:param filename: The filename to be prepared.
:param force_linebreaks: Whether to append newlines at each line if needed.
:param create_tempfile: Whether to save lines in tempfile if needed.
:param tempfile_kwargs: Kwargs passed to tempfile.mkstemp().
"""
if force_linebreaks:
lines = type(lines)(line if line.endswith('\n') else line+'\n'
for line in lines)
if not create_tempfile and filename is None:
filename = 'dummy_file_name'
if not isinstance(filename, str) and create_tempfile:
with make_temp(**tempfile_kwargs) as filename:
with open(filename, 'w', encoding='utf-8') as file:
file.writelines(lines)
yield lines, filename
else:
yield lines, filename
@contextmanager
[docs]def change_directory(path):
old_dir = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(old_dir)