diff --git a/Doc/library/test.rst b/Doc/library/test.rst index 4e21e1ded82724c..e254f5585f63820 100644 --- a/Doc/library/test.rst +++ b/Doc/library/test.rst @@ -961,6 +961,59 @@ The :mod:`!test.support` module defines the following functions: :mod:`tracemalloc` is enabled. +.. currentmodule:: test.support.isolation + +.. decorator:: isolated() + + Decorator that runs the decorated test in isolation, in a fresh interpreter + subprocess, so that it does not share global or interpreter state with the + rest of the test run. It can decorate a test method or a whole + :class:`~unittest.TestCase` subclass. Decorated methods must take no extra + arguments. A failure, error or skip in the subprocess is reported for the + corresponding test, and individual :meth:`subtests + ` that fail or are skipped are reported + individually. A reported failure or error shows the original subprocess + traceback as the cause of the exception. + + When a **method** is decorated, only that method runs in a subprocess; all + fixtures (:meth:`~unittest.TestCase.setUp` / :meth:`~unittest.TestCase.tearDown`, + :meth:`~unittest.TestCase.setUpClass` / :meth:`~unittest.TestCase.tearDownClass` + and ``setUpModule()`` / ``tearDownModule()``) run both in the parent process + (as usual) and in the subprocess around the method. + + When a **class** is decorated, the whole class runs in a single subprocess, + and :meth:`~unittest.TestCase.setUpClass`, + :meth:`~unittest.TestCase.tearDownClass`, :meth:`~unittest.TestCase.setUp` + and :meth:`~unittest.TestCase.tearDown` run once each in the subprocess and + are skipped in the parent process. A failure or skip of + :meth:`~unittest.TestCase.setUpClass` in the subprocess is reported for the + whole class. ``setUpModule()`` cannot be controlled by a class decorator, + so it still runs in the parent process too; test it with + :data:`running_isolated` if needed. + + The subprocess inherits the enabled resources (``-u``), memory limit + (``-M``) and verbosity (``-v``) of the parent test run, so that + :func:`~test.support.requires_resource`, :func:`~test.support.requires`, + :func:`~test.support.bigmemtest` and the like behave consistently in both + processes. + + The test is skipped on platforms without subprocess support. + + +.. data:: running_isolated + + ``True`` while the code runs in the isolated subprocess spawned by + :func:`isolated`, and ``False`` otherwise (including in the parent process + and in a normal, non-isolated test run). Fixtures such as + :meth:`~unittest.TestCase.setUp`, :meth:`~unittest.TestCase.tearDown`, + :meth:`~unittest.TestCase.setUpClass`, :meth:`~unittest.TestCase.tearDownClass`, + ``setUpModule()`` and ``tearDownModule()`` can test it to choose which code + to run in the subprocess. + + +.. currentmodule:: test.support + + .. function:: check_free_after_iterating(test, iter, cls, args=()) Assert instances of *cls* are deallocated after iterating. diff --git a/Lib/test/_isolated_sample.py b/Lib/test/_isolated_sample.py new file mode 100644 index 000000000000000..2533479d25b0e2b --- /dev/null +++ b/Lib/test/_isolated_sample.py @@ -0,0 +1,74 @@ +"""Sample tests driven by test.test_support.TestIsolated. + +This module is imported, never run as a test file, so that +:func:`test.support.isolation.isolated` has a real, importable target to run in +a subprocess. Several of these tests fail, error or are skipped on purpose. +""" + +import time +import unittest +from test.support import isolation + +# A test in DurationSample sleeps this long in the subprocess; the parent +# replays it instantly, so a parent-reported duration close to this proves the +# subprocess timing was forwarded rather than the replay time measured. +DURATION_SLEEP = 0.2 + + +class MethodSample(unittest.TestCase): + + @isolation.isolated() + def test_pass(self): + self.assertTrue(isolation.running_isolated) + + @isolation.isolated() + def test_fail(self): + self.assertEqual(1, 2) + + @isolation.isolated() + def test_error(self): + raise RuntimeError('boom') + + @isolation.isolated() + def test_skip(self): + self.skipTest('nope') + + @isolation.isolated() + @unittest.expectedFailure + def test_expected_failure(self): + self.assertEqual(1, 2) + + @isolation.isolated() + @unittest.expectedFailure + def test_unexpected_success(self): + pass + + +@isolation.isolated() +class ClassSample(unittest.TestCase): + + def test_pass(self): + self.assertTrue(isolation.running_isolated) + + def test_fail(self): + self.assertEqual(1, 2) + + @unittest.expectedFailure + def test_expected_failure(self): + self.assertEqual(1, 2) + + +class SubtestSample(unittest.TestCase): + + @isolation.isolated() + def test_subtests(self): + for i in range(3): + with self.subTest(i=i): + self.assertNotEqual(i, 1) + + +@isolation.isolated() +class DurationSample(unittest.TestCase): + + def test_slow(self): + time.sleep(DURATION_SLEEP) diff --git a/Lib/test/_test_eintr.py b/Lib/test/_test_eintr.py deleted file mode 100644 index 4a050792df73c4f..000000000000000 --- a/Lib/test/_test_eintr.py +++ /dev/null @@ -1,554 +0,0 @@ -""" -This test suite exercises some system calls subject to interruption with EINTR, -to check that it is actually handled transparently. -It is intended to be run by the main test suite within a child process, to -ensure there is no background thread running (so that signals are delivered to -the correct thread). -Signals are generated in-process using setitimer(ITIMER_REAL), which allows -sub-second periodicity (contrarily to signal()). -""" - -import contextlib -import faulthandler -import fcntl -import os -import platform -import select -import signal -import socket -import subprocess -import sys -import textwrap -import time -import unittest - -from test import support -from test.support import os_helper -from test.support import socket_helper - - -# gh-109592: Tolerate a difference of 20 ms when comparing timings -# (clock resolution) -CLOCK_RES = 0.020 - - -@contextlib.contextmanager -def kill_on_error(proc): - """Context manager killing the subprocess if a Python exception is raised.""" - with proc: - try: - yield proc - except: - proc.kill() - raise - - -@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") -class EINTRBaseTest(unittest.TestCase): - """ Base class for EINTR tests. """ - - # delay for initial signal delivery - signal_delay = 0.1 - # signal delivery periodicity - signal_period = 0.1 - # default sleep time for tests - should obviously have: - # sleep_time > signal_period - sleep_time = 0.2 - - def sighandler(self, signum, frame): - self.signals += 1 - - def setUp(self): - self.signals = 0 - self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler) - signal.setitimer(signal.ITIMER_REAL, self.signal_delay, - self.signal_period) - - # Use faulthandler as watchdog to debug when a test hangs - # (timeout of 10 minutes) - faulthandler.dump_traceback_later(10 * 60, exit=True, - file=sys.__stderr__) - - @staticmethod - def stop_alarm(): - signal.setitimer(signal.ITIMER_REAL, 0, 0) - - def tearDown(self): - self.stop_alarm() - signal.signal(signal.SIGALRM, self.orig_handler) - faulthandler.cancel_dump_traceback_later() - - def subprocess(self, *args, **kw): - cmd_args = (sys.executable, '-c') + args - return subprocess.Popen(cmd_args, **kw) - - def check_elapsed_time(self, elapsed): - self.assertGreaterEqual(elapsed, self.sleep_time - CLOCK_RES) - - -@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") -class OSEINTRTest(EINTRBaseTest): - """ EINTR tests for the os module. """ - - def new_sleep_process(self): - code = f'import time; time.sleep({self.sleep_time!r})' - return self.subprocess(code) - - def _test_wait_multiple(self, wait_func): - num = 3 - processes = [self.new_sleep_process() for _ in range(num)] - for _ in range(num): - wait_func() - # Call the Popen method to avoid a ResourceWarning - for proc in processes: - proc.wait() - - def test_wait(self): - self._test_wait_multiple(os.wait) - - @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()') - def test_wait3(self): - self._test_wait_multiple(lambda: os.wait3(0)) - - def _test_wait_single(self, wait_func): - proc = self.new_sleep_process() - wait_func(proc.pid) - # Call the Popen method to avoid a ResourceWarning - proc.wait() - - def test_waitpid(self): - self._test_wait_single(lambda pid: os.waitpid(pid, 0)) - - @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()') - def test_wait4(self): - self._test_wait_single(lambda pid: os.wait4(pid, 0)) - - def _interrupted_reads(self): - """Make a fd which will force block on read of expected bytes.""" - rd, wr = os.pipe() - self.addCleanup(os.close, rd) - # wr closed explicitly by parent - - # the payload below are smaller than PIPE_BUF, hence the writes will be - # atomic - data = [b"hello", b"world", b"spam"] - - code = '\n'.join(( - 'import os, sys, time', - '', - 'wr = int(sys.argv[1])', - f'data = {data!r}', - f'sleep_time = {self.sleep_time!r}', - '', - 'for item in data:', - ' # let the parent block on read()', - ' time.sleep(sleep_time)', - ' os.write(wr, item)', - )) - - proc = self.subprocess(code, str(wr), pass_fds=[wr]) - with kill_on_error(proc): - os.close(wr) - for datum in data: - yield rd, datum - self.assertEqual(proc.wait(), 0) - - def test_read(self): - for fd, expected in self._interrupted_reads(): - self.assertEqual(expected, os.read(fd, len(expected))) - - def test_readinto(self): - for fd, expected in self._interrupted_reads(): - buffer = bytearray(len(expected)) - self.assertEqual(os.readinto(fd, buffer), len(expected)) - self.assertEqual(buffer, expected) - - def test_write(self): - rd, wr = os.pipe() - self.addCleanup(os.close, wr) - # rd closed explicitly by parent - - # we must write enough data for the write() to block - data = b"x" * support.PIPE_MAX_SIZE - - code = '\n'.join(( - 'import io, os, sys, time', - '', - 'rd = int(sys.argv[1])', - f'sleep_time = {self.sleep_time!r}', - f'data = b"x" * {support.PIPE_MAX_SIZE}', - 'data_len = len(data)', - '', - '# let the parent block on write()', - 'time.sleep(sleep_time)', - '', - 'read_data = io.BytesIO()', - 'while len(read_data.getvalue()) < data_len:', - ' chunk = os.read(rd, 2 * data_len)', - ' read_data.write(chunk)', - '', - 'value = read_data.getvalue()', - 'if value != data:', - ' raise Exception(f"read error: {len(value)}' - ' vs {data_len} bytes")', - )) - - proc = self.subprocess(code, str(rd), pass_fds=[rd]) - with kill_on_error(proc): - os.close(rd) - written = 0 - while written < len(data): - written += os.write(wr, memoryview(data)[written:]) - self.assertEqual(proc.wait(), 0) - - -@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") -class SocketEINTRTest(EINTRBaseTest): - """ EINTR tests for the socket module. """ - - @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()') - def _test_recv(self, recv_func): - rd, wr = socket.socketpair() - self.addCleanup(rd.close) - # wr closed explicitly by parent - - # single-byte payload guard us against partial recv - data = [b"x", b"y", b"z"] - - code = '\n'.join(( - 'import os, socket, sys, time', - '', - 'fd = int(sys.argv[1])', - f'family = {int(wr.family)}', - f'sock_type = {int(wr.type)}', - f'data = {data!r}', - f'sleep_time = {self.sleep_time!r}', - '', - 'wr = socket.fromfd(fd, family, sock_type)', - 'os.close(fd)', - '', - 'with wr:', - ' for item in data:', - ' # let the parent block on recv()', - ' time.sleep(sleep_time)', - ' wr.sendall(item)', - )) - - fd = wr.fileno() - proc = self.subprocess(code, str(fd), pass_fds=[fd]) - with kill_on_error(proc): - wr.close() - for item in data: - self.assertEqual(item, recv_func(rd, len(item))) - self.assertEqual(proc.wait(), 0) - - def test_recv(self): - self._test_recv(socket.socket.recv) - - @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()') - def test_recvmsg(self): - self._test_recv(lambda sock, data: sock.recvmsg(data)[0]) - - def _test_send(self, send_func): - rd, wr = socket.socketpair() - self.addCleanup(wr.close) - # rd closed explicitly by parent - - # we must send enough data for the send() to block - data = b"xyz" * (support.SOCK_MAX_SIZE // 3) - - code = '\n'.join(( - 'import os, socket, sys, time', - '', - 'fd = int(sys.argv[1])', - f'family = {int(rd.family)}', - f'sock_type = {int(rd.type)}', - f'sleep_time = {self.sleep_time!r}', - f'data = b"xyz" * {support.SOCK_MAX_SIZE // 3}', - 'data_len = len(data)', - '', - 'rd = socket.fromfd(fd, family, sock_type)', - 'os.close(fd)', - '', - 'with rd:', - ' # let the parent block on send()', - ' time.sleep(sleep_time)', - '', - ' received_data = bytearray(data_len)', - ' n = 0', - ' while n < data_len:', - ' n += rd.recv_into(memoryview(received_data)[n:])', - '', - 'if received_data != data:', - ' raise Exception(f"recv error: {len(received_data)}' - ' vs {data_len} bytes")', - )) - - fd = rd.fileno() - proc = self.subprocess(code, str(fd), pass_fds=[fd]) - with kill_on_error(proc): - rd.close() - written = 0 - while written < len(data): - sent = send_func(wr, memoryview(data)[written:]) - # sendall() returns None - written += len(data) if sent is None else sent - self.assertEqual(proc.wait(), 0) - - def test_send(self): - self._test_send(socket.socket.send) - - def test_sendall(self): - self._test_send(socket.socket.sendall) - - @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()') - def test_sendmsg(self): - self._test_send(lambda sock, data: sock.sendmsg([data])) - - def test_accept(self): - sock = socket.create_server((socket_helper.HOST, 0)) - self.addCleanup(sock.close) - port = sock.getsockname()[1] - - code = '\n'.join(( - 'import socket, time', - '', - f'host = {socket_helper.HOST!r}', - f'port = {port}', - f'sleep_time = {self.sleep_time!r}', - '', - '# let parent block on accept()', - 'time.sleep(sleep_time)', - 'with socket.create_connection((host, port)):', - ' time.sleep(sleep_time)', - )) - - proc = self.subprocess(code) - with kill_on_error(proc): - client_sock, _ = sock.accept() - client_sock.close() - self.assertEqual(proc.wait(), 0) - - # Issue #25122: There is a race condition in the FreeBSD kernel on - # handling signals in the FIFO device. Skip the test until the bug is - # fixed in the kernel. - # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162 - @support.requires_freebsd_version(10, 3) - @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()') - def _test_open(self, do_open_close_reader, do_open_close_writer): - filename = os_helper.TESTFN - - # Use a fifo: until the child opens it for reading, the parent will - # block when trying to open it for writing. - os_helper.unlink(filename) - try: - os.mkfifo(filename) - except PermissionError as exc: - self.skipTest(f'os.mkfifo(): {exc!r}') - self.addCleanup(os_helper.unlink, filename) - - code = '\n'.join(( - 'import os, time', - '', - f'path = {filename!a}', - f'sleep_time = {self.sleep_time!r}', - '', - '# let the parent block', - 'time.sleep(sleep_time)', - '', - do_open_close_reader, - )) - - proc = self.subprocess(code) - with kill_on_error(proc): - do_open_close_writer(filename) - self.assertEqual(proc.wait(), 0) - - def python_open(self, path): - fp = open(path, 'w') - fp.close() - - @unittest.skipIf(sys.platform == "darwin", - "hangs under macOS; see bpo-25234, bpo-35363") - def test_open(self): - self._test_open("fp = open(path, 'r')\nfp.close()", - self.python_open) - - def os_open(self, path): - fd = os.open(path, os.O_WRONLY) - os.close(fd) - - @unittest.skipIf(sys.platform == "darwin", - "hangs under macOS; see bpo-25234, bpo-35363") - @unittest.skipIf(sys.platform.startswith('netbsd'), - "hangs on NetBSD; see gh-137397") - def test_os_open(self): - self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)", - self.os_open) - - -@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") -class TimeEINTRTest(EINTRBaseTest): - """ EINTR tests for the time module. """ - - def test_sleep(self): - t0 = time.monotonic() - time.sleep(self.sleep_time) - self.stop_alarm() - dt = time.monotonic() - t0 - self.check_elapsed_time(dt) - - -@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") -# bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test -# is vulnerable to a race condition between the child and the parent processes. -@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), - 'need signal.pthread_sigmask()') -class SignalEINTRTest(EINTRBaseTest): - """ EINTR tests for the signal module. """ - - def check_sigwait(self, wait_func): - signum = signal.SIGUSR1 - - old_handler = signal.signal(signum, lambda *args: None) - self.addCleanup(signal.signal, signum, old_handler) - - code = '\n'.join(( - 'import os, time', - f'pid = {os.getpid()}', - f'signum = {int(signum)}', - f'sleep_time = {self.sleep_time!r}', - 'time.sleep(sleep_time)', - 'os.kill(pid, signum)', - )) - - signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) - self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum]) - - proc = self.subprocess(code) - with kill_on_error(proc): - wait_func(signum) - - self.assertEqual(proc.wait(), 0) - - @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), - 'need signal.sigwaitinfo()') - def test_sigwaitinfo(self): - def wait_func(signum): - signal.sigwaitinfo([signum]) - - self.check_sigwait(wait_func) - - @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), - 'need signal.sigwaitinfo()') - def test_sigtimedwait(self): - def wait_func(signum): - signal.sigtimedwait([signum], 120.0) - - self.check_sigwait(wait_func) - - -@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") -class SelectEINTRTest(EINTRBaseTest): - """ EINTR tests for the select module. """ - - def test_select(self): - t0 = time.monotonic() - select.select([], [], [], self.sleep_time) - dt = time.monotonic() - t0 - self.stop_alarm() - self.check_elapsed_time(dt) - - @unittest.skipIf(sys.platform == "darwin", - "poll may fail on macOS; see issue #28087") - @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll') - def test_poll(self): - poller = select.poll() - - t0 = time.monotonic() - poller.poll(self.sleep_time * 1e3) - dt = time.monotonic() - t0 - self.stop_alarm() - self.check_elapsed_time(dt) - - @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll') - def test_epoll(self): - poller = select.epoll() - self.addCleanup(poller.close) - - t0 = time.monotonic() - poller.poll(self.sleep_time) - dt = time.monotonic() - t0 - self.stop_alarm() - self.check_elapsed_time(dt) - - @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue') - def test_kqueue(self): - kqueue = select.kqueue() - self.addCleanup(kqueue.close) - - t0 = time.monotonic() - kqueue.control(None, 1, self.sleep_time) - dt = time.monotonic() - t0 - self.stop_alarm() - self.check_elapsed_time(dt) - - @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll') - def test_devpoll(self): - poller = select.devpoll() - self.addCleanup(poller.close) - - t0 = time.monotonic() - poller.poll(self.sleep_time * 1e3) - dt = time.monotonic() - t0 - self.stop_alarm() - self.check_elapsed_time(dt) - - -class FCNTLEINTRTest(EINTRBaseTest): - def _lock(self, lock_func, lock_name): - self.addCleanup(os_helper.unlink, os_helper.TESTFN) - rd1, wr1 = os.pipe() - rd2, wr2 = os.pipe() - for fd in (rd1, wr1, rd2, wr2): - self.addCleanup(os.close, fd) - code = textwrap.dedent(f""" - import fcntl, os, time - with open('{os_helper.TESTFN}', 'wb') as f: - fcntl.{lock_name}(f, fcntl.LOCK_EX) - os.write({wr1}, b"ok") - _ = os.read({rd2}, 2) # wait for parent process - time.sleep({self.sleep_time}) - """) - proc = self.subprocess(code, pass_fds=[wr1, rd2]) - with kill_on_error(proc): - with open(os_helper.TESTFN, 'wb') as f: - # synchronize the subprocess - ok = os.read(rd1, 2) - self.assertEqual(ok, b"ok") - - # notify the child that the parent is ready - start_time = time.monotonic() - os.write(wr2, b"go") - - # the child locked the file just a moment ago for 'sleep_time' seconds - # that means that the lock below will block for 'sleep_time' minus some - # potential context switch delay - lock_func(f, fcntl.LOCK_EX) - dt = time.monotonic() - start_time - self.stop_alarm() - self.check_elapsed_time(dt) - proc.wait() - - # Issue 35633: See https://bugs.python.org/issue35633#msg333662 - # skip test rather than accept PermissionError from all platforms - @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError") - def test_lockf(self): - self._lock(fcntl.lockf, "lockf") - - def test_flock(self): - self._lock(fcntl.flock, "flock") - - -if __name__ == "__main__": - unittest.main() diff --git a/Lib/test/support/isolation.py b/Lib/test/support/isolation.py new file mode 100644 index 000000000000000..fe0e5df0ec8ba45 --- /dev/null +++ b/Lib/test/support/isolation.py @@ -0,0 +1,286 @@ +"""Run tests in isolated subprocesses (the test.support.isolation.isolated decorator). + +A failure, error or skip that happens in the subprocess is replayed in the +parent process so that the test runner records it. The original (subprocess) +traceback is attached as the cause of the replayed exception, the same way +:mod:`concurrent.futures` surfaces tracebacks from worker processes. +""" + +import functools +import os +import sys +import unittest + +# Mark this module's frames as belonging to the test machinery, so that +# unittest strips them from reported tracebacks (see TestResult._clean_tracebacks +# in Lib/unittest/result.py). Only the original subprocess traceback, attached +# as the cause, is then shown -- not the parent-side replay frames. +__unittest = True + +# Environment variable set in the child process so that the decorated test +# method runs its real body instead of spawning yet another subprocess. +_RUN_IN_SUBPROCESS_ENV = '_PYTHON_RUN_IN_SUBPROCESS' + +# Environment variable carrying (as JSON) the regrtest-configured test.support +# state, so that the bare subprocess honors -u, -M, -v, etc. like the parent. +_CONFIG_ENV = '_PYTHON_ISOLATED_CONFIG' + +# test.support globals set by regrtest (libregrtest/setup.py) that affect how +# tests run and which are skipped at runtime in the subprocess. +_PROPAGATED_CONFIG = ( + 'use_resources', # -u (is_resource_enabled/requires) + 'max_memuse', 'real_max_memuse', # -M (bigmemtest) + 'verbose', # -v + 'failfast', # -f +) + +def _child_config(): + import test.support as support + return {name: getattr(support, name) for name in _PROPAGATED_CONFIG} + +def _apply_child_config(): + """Mirror the parent's test.support configuration in the subprocess. + + Called by subprocess_runner before loading the test, so that import-time + decorators (e.g. requires_resource) and runtime checks see the same -u/-M/-v + configuration as the parent process. + """ + import json + import test.support as support + data = os.environ.get(_CONFIG_ENV) + if data: + for name, value in json.loads(data).items(): + setattr(support, name, value) + +# True while running inside the isolated subprocess spawned by @isolated(). +# setUp()/tearDown() and the class- and module-level fixtures can test it to +# decide which code to run in the subprocess as opposed to the parent process. +running_isolated = bool(os.environ.get(_RUN_IN_SUBPROCESS_ENV)) + + +class _RemoteTraceback(Exception): + """Carry a formatted traceback string from the subprocess for display. + + Attached as the ``__cause__`` of the replayed failure/error, so that the + original traceback is shown by the traceback machinery. + """ + def __init__(self, tb): + self.tb = tb + + def __str__(self): + return self.tb + + +class _SubprocessTestError(Exception): + """Replay a subprocess error (as opposed to a failure) in the parent.""" + + +def _remote(detail): + # Wrap the subprocess traceback the way concurrent.futures does, so it is + # clearly delimited when shown as the cause. + return _RemoteTraceback(f'\n"""\n{detail}"""') + + +def _check_subprocess_support(): + # isolated() always runs the test in a subprocess, so skip (in the parent) + # on platforms that do not support spawning one. + import test.support as support + if not support.has_subprocess_support: + raise unittest.SkipTest('requires subprocess support') + + +def _run_in_subprocess(module, qualname): + """Run module.qualname (a test method or class) in a fresh subprocess. + + Return ``(payload, output, returncode)``, where *payload* is the decoded + ``{'outcomes': ..., 'durations': ...}`` mapping from the subprocess, or + ``None`` if it did not run to completion (crash, import error, ...). + """ + import json + import subprocess + import tempfile + env = dict(os.environ) + env[_RUN_IN_SUBPROCESS_ENV] = '1' + env[_CONFIG_ENV] = json.dumps(_child_config()) + fd, result_path = tempfile.mkstemp(suffix='.json') + os.close(fd) + try: + cmd = [sys.executable, '-m', 'test.support.subprocess_runner', + module, qualname, result_path] + proc = subprocess.run(cmd, capture_output=True, text=True, env=env) + try: + with open(result_path, encoding='utf-8') as f: + payload = json.load(f) + except (OSError, ValueError): + payload = None + finally: + try: + os.unlink(result_path) + except OSError: + pass + return payload, (proc.stdout or '') + (proc.stderr or ''), proc.returncode + + +def _replay_outcome(test, outcome): + kind = outcome['kind'] + detail = outcome['detail'] + if kind == 'skipped': + test.skipTest(detail) # the detail is the skip reason, not a traceback + elif kind in ('failure', 'expected_failure'): + # An expected failure is replayed like a failure: the wrapper carries + # the @expectedFailure marker (copied by functools.wraps), so the parent + # records the raised exception as an expectedFailure, not a failure. + exc = test.failureException('test failed in the subprocess') + raise exc from _remote(detail) + else: # 'error' + exc = _SubprocessTestError('test failed in the subprocess') + raise exc from _remote(detail) + + +def _replay_outcomes(test, outcomes): + # Replay each subtest outcome in its own subTest() context so that they are + # reported individually, then replay the whole-test outcome (if any). + main = [] + for outcome in outcomes: + if outcome['subtest']: + with test.subTest(outcome['desc']): + _replay_outcome(test, outcome) + else: + main.append(outcome) + for outcome in main: + _replay_outcome(test, outcome) + + +def _raise_fixture_outcome(outcome): + # Reproduce a setUpClass()/setUpModule() failure or skip from the + # subprocess in a parent-process fixture, so it applies to every test. + if outcome['kind'] == 'skipped': + raise unittest.SkipTest(outcome['detail']) + exc = _SubprocessTestError('class failed in the subprocess') + raise exc from _remote(outcome['detail']) + + +def _isolate_method(func): + @functools.wraps(func) + def wrapper(self, /, *args, **kwargs): + if running_isolated: + # Already running in the subprocess: run the real test. + return func(self, *args, **kwargs) + _check_subprocess_support() + cls = type(self) + qualname = f'{cls.__qualname__}.{func.__name__}' + payload, output, returncode = _run_in_subprocess(cls.__module__, + qualname) + if payload is None: + exc = _SubprocessTestError( + f'test did not complete in a subprocess (exit code {returncode})') + raise exc from _remote(output) + # The parent measures this method's own duration (the real cost of the + # isolated run, subprocess startup included), so nothing to forward here. + _replay_outcomes(self, payload['outcomes']) + return wrapper + + +def _isolate_class(cls): + # Unwrap to the plain functions: the replacements below call them with the + # runtime cls, so a subclass of an isolated class runs the fixtures bound to + # itself (a bound classmethod would freeze the decoration-time class). + orig_setUpClass = cls.setUpClass.__func__ + orig_tearDownClass = cls.tearDownClass.__func__ + orig_setUp = cls.setUp + orig_tearDown = cls.tearDown + orig_addDuration = getattr(cls, '_addDuration', None) + + def setUpClass(cls): + if running_isolated: + orig_setUpClass(cls) + return + _check_subprocess_support() + # Run the whole class in a single subprocess and stash the outcomes + # for the wrapped test methods to replay. + payload, output, returncode = _run_in_subprocess(cls.__module__, + cls.__qualname__) + if payload is None: + exc = _SubprocessTestError( + f'class did not complete in a subprocess (exit code {returncode})') + raise exc from _remote(output) + by_id = {} + for outcome in payload['outcomes']: + if outcome['fixture']: + # A setUpClass()/setUpModule() failure or skip: apply it to the + # whole class by raising it here, in the parent's setUpClass(). + _raise_fixture_outcome(outcome) + by_id.setdefault(outcome['id'], []).append(outcome) + cls._isolated_outcomes = by_id + cls._isolated_durations = dict(payload.get('durations', ())) + + def tearDownClass(cls): + if running_isolated: + orig_tearDownClass(cls) + else: + cls._isolated_outcomes = None + cls._isolated_durations = None + + def setUp(self): + # In the parent the real test does not run, so neither should setUp(). + if running_isolated: + orig_setUp(self) + + def tearDown(self): + if running_isolated: + orig_tearDown(self) + + def _addDuration(self, result, elapsed): + # In the parent, report the per-test duration measured in the subprocess + # rather than the replay time (subprocess startup is paid once, in + # setUpClass). + if not running_isolated: + durations = getattr(type(self), '_isolated_durations', None) or {} + elapsed = durations.get(self.id(), elapsed) + orig_addDuration(self, result, elapsed) + + def replay(self): + by_id = getattr(type(self), '_isolated_outcomes', None) or {} + _replay_outcomes(self, by_id.get(self.id(), [])) + + cls.setUpClass = classmethod(setUpClass) + cls.tearDownClass = classmethod(tearDownClass) + cls.setUp = setUp + cls.tearDown = tearDown + if orig_addDuration is not None: + cls._addDuration = _addDuration + for name in unittest.TestLoader().getTestCaseNames(cls): + method = getattr(cls, name) + @functools.wraps(method) + def wrapper(self, /, *args, __func=method, **kwargs): + if running_isolated: + return __func(self, *args, **kwargs) + replay(self) + setattr(cls, name, wrapper) + return cls + + +def isolated(): + """Decorator to run a test method or class in isolation from the rest. + + The decorated test runs in a separate, fresh Python process, so it does not + share global or interpreter state with the rest of the test run. When a + :class:`~unittest.TestCase` subclass is decorated, the whole class runs in a + single subprocess and its ``setUpClass()``/``setUpModule()`` fixtures run + once there; when a method is decorated, only that method runs in a + subprocess. Decorated methods must take no extra arguments. + + A failure, error or skip of the whole test is reported for the test, and + individual subtests (:meth:`~unittest.TestCase.subTest`) that fail or are + skipped are reported individually. The original subprocess traceback is + shown as the cause of a reported failure or error. Use + :data:`running_isolated` in fixtures to choose what to run in the subprocess. + + The test is skipped on platforms without subprocess support, since it must + spawn one. + """ + def decorator(obj): + if isinstance(obj, type) and issubclass(obj, unittest.TestCase): + return _isolate_class(obj) + return _isolate_method(obj) + return decorator diff --git a/Lib/test/support/subprocess_runner.py b/Lib/test/support/subprocess_runner.py new file mode 100644 index 000000000000000..6eced4951171bea --- /dev/null +++ b/Lib/test/support/subprocess_runner.py @@ -0,0 +1,74 @@ +"""Run a single test method in this (sub)process and report the result. + +Invoked as ``python -m test.support.subprocess_runner MODULE QUALNAME OUTFILE`` +by :func:`test.support.isolation.isolated`. The outcome of the test (including +that of each individual subtest) is written as JSON to OUTFILE. This module is +not meant to be imported. +""" + +import json +import sys +import unittest +from unittest.case import _SubTest + +if __name__ != '__main__': + raise ImportError('this module cannot be directly imported') + +if len(sys.argv) != 4: + print('usage: python -m test.support.subprocess_runner ' + 'MODULE QUALNAME OUTFILE', file=sys.stderr) + sys.exit(2) + +module = sys.argv[1] +qualname = sys.argv[2] +outfile = sys.argv[3] + +# Mirror the parent's regrtest configuration (-u, -M, -v, ...) before importing +# the test, so resource gating and bigmem sizing match the parent process. +from test.support.isolation import _apply_child_config +_apply_child_config() + + +class _Result(unittest.TestResult): + # Capture per-test durations keyed by test id, so the parent can report the + # subprocess timings instead of its own replay time. + def __init__(self): + super().__init__() + self.id_durations = [] + + def addDuration(self, test, elapsed): + super().addDuration(test, elapsed) + self.id_durations.append((test.id(), elapsed)) + + +suite = unittest.TestLoader().loadTestsFromName(f'{module}.{qualname}') +result = _Result() +suite.run(result) + + +def _outcome(kind, test, detail): + subtest = isinstance(test, _SubTest) + real = test.test_case if subtest else test + return { + 'kind': kind, + 'subtest': subtest, + 'desc': test._subDescription() if subtest else '', + # id() groups outcomes by test method; a non-TestCase (e.g. an + # _ErrorHolder) marks a setUpClass()/setUpModule() fixture failure. + 'id': real.id(), + 'fixture': not isinstance(real, unittest.TestCase), + 'detail': detail, + } + + +outcomes = [_outcome('failure', t, tb) for t, tb in result.failures] +outcomes += [_outcome('error', t, tb) for t, tb in result.errors] +outcomes += [_outcome('expected_failure', t, tb) + for t, tb in result.expectedFailures] +outcomes += [_outcome('skipped', t, reason) for t, reason in result.skipped] + +payload = {'outcomes': outcomes, 'durations': result.id_durations} +with open(outfile, 'w', encoding='utf-8') as f: + json.dump(payload, f) + +sys.exit(0) diff --git a/Lib/test/test_eintr.py b/Lib/test/test_eintr.py index 49b15f1a2dba92d..5642a58cfed049d 100644 --- a/Lib/test/test_eintr.py +++ b/Lib/test/test_eintr.py @@ -1,20 +1,567 @@ +""" +This test suite exercises some system calls subject to interruption with EINTR, +to check that it is actually handled transparently. +Each test class runs in a fresh subprocess (via @isolation.isolated()) to ensure +there is no background thread running, so that signals are delivered to the +correct thread. +Signals are generated in-process using setitimer(ITIMER_REAL), which allows +sub-second periodicity (contrarily to signal()). +""" + +import contextlib +import faulthandler +import fcntl import os +import platform +import select import signal +import socket +import subprocess +import sys +import textwrap +import time import unittest + from test import support -from test.support import script_helper +from test.support import isolation +from test.support import os_helper +from test.support import socket_helper + + +# gh-109592: Tolerate a difference of 20 ms when comparing timings +# (clock resolution) +CLOCK_RES = 0.020 + + +@contextlib.contextmanager +def kill_on_error(proc): + """Context manager killing the subprocess if a Python exception is raised.""" + with proc: + try: + yield proc + except: + proc.kill() + raise + + +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +class EINTRBaseTest(unittest.TestCase): + """ Base class for EINTR tests. """ + + # delay for initial signal delivery + signal_delay = 0.1 + # signal delivery periodicity + signal_period = 0.1 + # default sleep time for tests - should obviously have: + # sleep_time > signal_period + sleep_time = 0.2 + + def sighandler(self, signum, frame): + self.signals += 1 + + def setUp(self): + self.signals = 0 + self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler) + signal.setitimer(signal.ITIMER_REAL, self.signal_delay, + self.signal_period) + + # Use faulthandler as watchdog to debug when a test hangs + # (timeout of 10 minutes) + faulthandler.dump_traceback_later(10 * 60, exit=True, + file=sys.__stderr__) + + @staticmethod + def stop_alarm(): + signal.setitimer(signal.ITIMER_REAL, 0, 0) + + def tearDown(self): + self.stop_alarm() + signal.signal(signal.SIGALRM, self.orig_handler) + faulthandler.cancel_dump_traceback_later() + + def subprocess(self, *args, **kw): + cmd_args = (sys.executable, '-c') + args + return subprocess.Popen(cmd_args, **kw) + + def check_elapsed_time(self, elapsed): + self.assertGreaterEqual(elapsed, self.sleep_time - CLOCK_RES) + + +@isolation.isolated() +@support.requires_resource('walltime') +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +class OSEINTRTest(EINTRBaseTest): + """ EINTR tests for the os module. """ + + def new_sleep_process(self): + code = f'import time; time.sleep({self.sleep_time!r})' + return self.subprocess(code) + + def _test_wait_multiple(self, wait_func): + num = 3 + processes = [self.new_sleep_process() for _ in range(num)] + for _ in range(num): + wait_func() + # Call the Popen method to avoid a ResourceWarning + for proc in processes: + proc.wait() + + def test_wait(self): + self._test_wait_multiple(os.wait) + + @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()') + def test_wait3(self): + self._test_wait_multiple(lambda: os.wait3(0)) + + def _test_wait_single(self, wait_func): + proc = self.new_sleep_process() + wait_func(proc.pid) + # Call the Popen method to avoid a ResourceWarning + proc.wait() + + def test_waitpid(self): + self._test_wait_single(lambda pid: os.waitpid(pid, 0)) + + @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()') + def test_wait4(self): + self._test_wait_single(lambda pid: os.wait4(pid, 0)) + + def _interrupted_reads(self): + """Make a fd which will force block on read of expected bytes.""" + rd, wr = os.pipe() + self.addCleanup(os.close, rd) + # wr closed explicitly by parent + + # the payload below are smaller than PIPE_BUF, hence the writes will be + # atomic + data = [b"hello", b"world", b"spam"] + + code = '\n'.join(( + 'import os, sys, time', + '', + 'wr = int(sys.argv[1])', + f'data = {data!r}', + f'sleep_time = {self.sleep_time!r}', + '', + 'for item in data:', + ' # let the parent block on read()', + ' time.sleep(sleep_time)', + ' os.write(wr, item)', + )) + + proc = self.subprocess(code, str(wr), pass_fds=[wr]) + with kill_on_error(proc): + os.close(wr) + for datum in data: + yield rd, datum + self.assertEqual(proc.wait(), 0) + + def test_read(self): + for fd, expected in self._interrupted_reads(): + self.assertEqual(expected, os.read(fd, len(expected))) + + def test_readinto(self): + for fd, expected in self._interrupted_reads(): + buffer = bytearray(len(expected)) + self.assertEqual(os.readinto(fd, buffer), len(expected)) + self.assertEqual(buffer, expected) + + def test_write(self): + rd, wr = os.pipe() + self.addCleanup(os.close, wr) + # rd closed explicitly by parent + + # we must write enough data for the write() to block + data = b"x" * support.PIPE_MAX_SIZE + + code = '\n'.join(( + 'import io, os, sys, time', + '', + 'rd = int(sys.argv[1])', + f'sleep_time = {self.sleep_time!r}', + f'data = b"x" * {support.PIPE_MAX_SIZE}', + 'data_len = len(data)', + '', + '# let the parent block on write()', + 'time.sleep(sleep_time)', + '', + 'read_data = io.BytesIO()', + 'while len(read_data.getvalue()) < data_len:', + ' chunk = os.read(rd, 2 * data_len)', + ' read_data.write(chunk)', + '', + 'value = read_data.getvalue()', + 'if value != data:', + ' raise Exception(f"read error: {len(value)}' + ' vs {data_len} bytes")', + )) + + proc = self.subprocess(code, str(rd), pass_fds=[rd]) + with kill_on_error(proc): + os.close(rd) + written = 0 + while written < len(data): + written += os.write(wr, memoryview(data)[written:]) + self.assertEqual(proc.wait(), 0) + + +@isolation.isolated() +@support.requires_resource('walltime') +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +class SocketEINTRTest(EINTRBaseTest): + """ EINTR tests for the socket module. """ + + @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()') + def _test_recv(self, recv_func): + rd, wr = socket.socketpair() + self.addCleanup(rd.close) + # wr closed explicitly by parent + + # single-byte payload guard us against partial recv + data = [b"x", b"y", b"z"] + + code = '\n'.join(( + 'import os, socket, sys, time', + '', + 'fd = int(sys.argv[1])', + f'family = {int(wr.family)}', + f'sock_type = {int(wr.type)}', + f'data = {data!r}', + f'sleep_time = {self.sleep_time!r}', + '', + 'wr = socket.fromfd(fd, family, sock_type)', + 'os.close(fd)', + '', + 'with wr:', + ' for item in data:', + ' # let the parent block on recv()', + ' time.sleep(sleep_time)', + ' wr.sendall(item)', + )) + + fd = wr.fileno() + proc = self.subprocess(code, str(fd), pass_fds=[fd]) + with kill_on_error(proc): + wr.close() + for item in data: + self.assertEqual(item, recv_func(rd, len(item))) + self.assertEqual(proc.wait(), 0) + + def test_recv(self): + self._test_recv(socket.socket.recv) + + @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()') + def test_recvmsg(self): + self._test_recv(lambda sock, data: sock.recvmsg(data)[0]) + + def _test_send(self, send_func): + rd, wr = socket.socketpair() + self.addCleanup(wr.close) + # rd closed explicitly by parent + + # we must send enough data for the send() to block + data = b"xyz" * (support.SOCK_MAX_SIZE // 3) + + code = '\n'.join(( + 'import os, socket, sys, time', + '', + 'fd = int(sys.argv[1])', + f'family = {int(rd.family)}', + f'sock_type = {int(rd.type)}', + f'sleep_time = {self.sleep_time!r}', + f'data = b"xyz" * {support.SOCK_MAX_SIZE // 3}', + 'data_len = len(data)', + '', + 'rd = socket.fromfd(fd, family, sock_type)', + 'os.close(fd)', + '', + 'with rd:', + ' # let the parent block on send()', + ' time.sleep(sleep_time)', + '', + ' received_data = bytearray(data_len)', + ' n = 0', + ' while n < data_len:', + ' n += rd.recv_into(memoryview(received_data)[n:])', + '', + 'if received_data != data:', + ' raise Exception(f"recv error: {len(received_data)}' + ' vs {data_len} bytes")', + )) + + fd = rd.fileno() + proc = self.subprocess(code, str(fd), pass_fds=[fd]) + with kill_on_error(proc): + rd.close() + written = 0 + while written < len(data): + sent = send_func(wr, memoryview(data)[written:]) + # sendall() returns None + written += len(data) if sent is None else sent + self.assertEqual(proc.wait(), 0) + + def test_send(self): + self._test_send(socket.socket.send) + + def test_sendall(self): + self._test_send(socket.socket.sendall) + + @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()') + def test_sendmsg(self): + self._test_send(lambda sock, data: sock.sendmsg([data])) + + def test_accept(self): + sock = socket.create_server((socket_helper.HOST, 0)) + self.addCleanup(sock.close) + port = sock.getsockname()[1] + + code = '\n'.join(( + 'import socket, time', + '', + f'host = {socket_helper.HOST!r}', + f'port = {port}', + f'sleep_time = {self.sleep_time!r}', + '', + '# let parent block on accept()', + 'time.sleep(sleep_time)', + 'with socket.create_connection((host, port)):', + ' time.sleep(sleep_time)', + )) + + proc = self.subprocess(code) + with kill_on_error(proc): + client_sock, _ = sock.accept() + client_sock.close() + self.assertEqual(proc.wait(), 0) + + # Issue #25122: There is a race condition in the FreeBSD kernel on + # handling signals in the FIFO device. Skip the test until the bug is + # fixed in the kernel. + # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162 + @support.requires_freebsd_version(10, 3) + @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()') + def _test_open(self, do_open_close_reader, do_open_close_writer): + filename = os_helper.TESTFN + + # Use a fifo: until the child opens it for reading, the parent will + # block when trying to open it for writing. + os_helper.unlink(filename) + try: + os.mkfifo(filename) + except PermissionError as exc: + self.skipTest(f'os.mkfifo(): {exc!r}') + self.addCleanup(os_helper.unlink, filename) + + code = '\n'.join(( + 'import os, time', + '', + f'path = {filename!a}', + f'sleep_time = {self.sleep_time!r}', + '', + '# let the parent block', + 'time.sleep(sleep_time)', + '', + do_open_close_reader, + )) + + proc = self.subprocess(code) + with kill_on_error(proc): + do_open_close_writer(filename) + self.assertEqual(proc.wait(), 0) + + def python_open(self, path): + fp = open(path, 'w') + fp.close() + + @unittest.skipIf(sys.platform == "darwin", + "hangs under macOS; see bpo-25234, bpo-35363") + def test_open(self): + self._test_open("fp = open(path, 'r')\nfp.close()", + self.python_open) + + def os_open(self, path): + fd = os.open(path, os.O_WRONLY) + os.close(fd) + + @unittest.skipIf(sys.platform == "darwin", + "hangs under macOS; see bpo-25234, bpo-35363") + @unittest.skipIf(sys.platform.startswith('netbsd'), + "hangs on NetBSD; see gh-137397") + def test_os_open(self): + self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)", + self.os_open) + + +@isolation.isolated() +@support.requires_resource('walltime') +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +class TimeEINTRTest(EINTRBaseTest): + """ EINTR tests for the time module. """ + + def test_sleep(self): + t0 = time.monotonic() + time.sleep(self.sleep_time) + self.stop_alarm() + dt = time.monotonic() - t0 + self.check_elapsed_time(dt) + + +@isolation.isolated() +@support.requires_resource('walltime') +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +# bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test +# is vulnerable to a race condition between the child and the parent processes. +@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') +class SignalEINTRTest(EINTRBaseTest): + """ EINTR tests for the signal module. """ + + def check_sigwait(self, wait_func): + signum = signal.SIGUSR1 + + old_handler = signal.signal(signum, lambda *args: None) + self.addCleanup(signal.signal, signum, old_handler) + + code = '\n'.join(( + 'import os, time', + f'pid = {os.getpid()}', + f'signum = {int(signum)}', + f'sleep_time = {self.sleep_time!r}', + 'time.sleep(sleep_time)', + 'os.kill(pid, signum)', + )) + + signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum]) + + proc = self.subprocess(code) + with kill_on_error(proc): + wait_func(signum) + + self.assertEqual(proc.wait(), 0) + + @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), + 'need signal.sigwaitinfo()') + def test_sigwaitinfo(self): + def wait_func(signum): + signal.sigwaitinfo([signum]) + + self.check_sigwait(wait_func) + + @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), + 'need signal.sigwaitinfo()') + def test_sigtimedwait(self): + def wait_func(signum): + signal.sigtimedwait([signum], 120.0) + + self.check_sigwait(wait_func) + + +@isolation.isolated() +@support.requires_resource('walltime') +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +class SelectEINTRTest(EINTRBaseTest): + """ EINTR tests for the select module. """ + + def test_select(self): + t0 = time.monotonic() + select.select([], [], [], self.sleep_time) + dt = time.monotonic() - t0 + self.stop_alarm() + self.check_elapsed_time(dt) + + @unittest.skipIf(sys.platform == "darwin", + "poll may fail on macOS; see issue #28087") + @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll') + def test_poll(self): + poller = select.poll() + + t0 = time.monotonic() + poller.poll(self.sleep_time * 1e3) + dt = time.monotonic() - t0 + self.stop_alarm() + self.check_elapsed_time(dt) + + @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll') + def test_epoll(self): + poller = select.epoll() + self.addCleanup(poller.close) + + t0 = time.monotonic() + poller.poll(self.sleep_time) + dt = time.monotonic() - t0 + self.stop_alarm() + self.check_elapsed_time(dt) + + @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue') + def test_kqueue(self): + kqueue = select.kqueue() + self.addCleanup(kqueue.close) + + t0 = time.monotonic() + kqueue.control(None, 1, self.sleep_time) + dt = time.monotonic() - t0 + self.stop_alarm() + self.check_elapsed_time(dt) + + @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll') + def test_devpoll(self): + poller = select.devpoll() + self.addCleanup(poller.close) + + t0 = time.monotonic() + poller.poll(self.sleep_time * 1e3) + dt = time.monotonic() - t0 + self.stop_alarm() + self.check_elapsed_time(dt) + + +@isolation.isolated() +@support.requires_resource('walltime') +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +class FCNTLEINTRTest(EINTRBaseTest): + def _lock(self, lock_func, lock_name): + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + rd1, wr1 = os.pipe() + rd2, wr2 = os.pipe() + for fd in (rd1, wr1, rd2, wr2): + self.addCleanup(os.close, fd) + code = textwrap.dedent(f""" + import fcntl, os, time + with open('{os_helper.TESTFN}', 'wb') as f: + fcntl.{lock_name}(f, fcntl.LOCK_EX) + os.write({wr1}, b"ok") + _ = os.read({rd2}, 2) # wait for parent process + time.sleep({self.sleep_time}) + """) + proc = self.subprocess(code, pass_fds=[wr1, rd2]) + with kill_on_error(proc): + with open(os_helper.TESTFN, 'wb') as f: + # synchronize the subprocess + ok = os.read(rd1, 2) + self.assertEqual(ok, b"ok") + + # notify the child that the parent is ready + start_time = time.monotonic() + os.write(wr2, b"go") + # the child locked the file just a moment ago for 'sleep_time' seconds + # that means that the lock below will block for 'sleep_time' minus some + # potential context switch delay + lock_func(f, fcntl.LOCK_EX) + dt = time.monotonic() - start_time + self.stop_alarm() + self.check_elapsed_time(dt) + proc.wait() -@unittest.skipUnless(os.name == "posix", "only supported on Unix") -class EINTRTests(unittest.TestCase): + # Issue 35633: See https://bugs.python.org/issue35633#msg333662 + # skip test rather than accept PermissionError from all platforms + @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError") + def test_lockf(self): + self._lock(fcntl.lockf, "lockf") - @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") - @support.requires_resource('walltime') - def test_all(self): - # Run the tester in a sub-process, to make sure there is only one - # thread (for reliable signal delivery). - script = support.findfile("_test_eintr.py") - script_helper.run_test_script(script) + def test_flock(self): + self._lock(fcntl.flock, "flock") if __name__ == "__main__": diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index d556f96bc532ed1..90a56cf86971ea0 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -19,6 +19,7 @@ import warnings from test import support +from test.support import isolation from test.support import hashlib_helper from test.support import import_helper from test.support import os_helper @@ -1075,5 +1076,106 @@ def test_disable_hash_md5_in_fips_mode_allow_all(self): self.assertIsInstance(h, self._hashlib.HASH) +class TestIsolated(unittest.TestCase): + # Drive the sample tests in test._isolated_sample (which really spawn + # subprocesses through @isolation.isolated()) under a private TestResult, + # and check that each subprocess outcome is replayed in the parent. + + @staticmethod + def _run(name): + suite = unittest.TestLoader().loadTestsFromName( + 'test._isolated_sample.' + name) + result = unittest.TestResult() + suite.run(result) + return result + + @staticmethod + def _names(items): + # Map outcome entries (which are (test, detail) pairs, except + # unexpectedSuccesses which are bare tests) to their method names. + names = [] + for item in items: + test = item[0] if isinstance(item, tuple) else item + names.append(test.id().rpartition('.')[2]) + return sorted(names) + + @support.requires_subprocess() + def test_method_outcomes(self): + result = self._run('MethodSample') + self.assertEqual(result.testsRun, 6) + self.assertEqual(self._names(result.failures), ['test_fail']) + self.assertEqual(self._names(result.errors), ['test_error']) + self.assertEqual(self._names(result.skipped), ['test_skip']) + self.assertEqual(self._names(result.expectedFailures), + ['test_expected_failure']) + self.assertEqual(self._names(result.unexpectedSuccesses), + ['test_unexpected_success']) + + @support.requires_subprocess() + def test_class_outcomes(self): + result = self._run('ClassSample') + self.assertEqual(result.testsRun, 3) + self.assertEqual(self._names(result.failures), ['test_fail']) + self.assertEqual(self._names(result.expectedFailures), + ['test_expected_failure']) + self.assertEqual(result.errors, []) + self.assertEqual(result.unexpectedSuccesses, []) + + @support.requires_subprocess() + def test_subtests_reported_individually(self): + result = self._run('SubtestSample') + self.assertEqual(result.testsRun, 1) + self.assertEqual(len(result.failures), 1) + test, _ = result.failures[0] + self.assertIn('i=1', str(test)) + + @support.requires_subprocess() + def test_skip_reason_propagated(self): + result = self._run('MethodSample.test_skip') + self.assertEqual([reason for _, reason in result.skipped], ['nope']) + + @support.requires_subprocess() + def test_subprocess_traceback_is_cause(self): + result = self._run('MethodSample.test_fail') + self.assertEqual(len(result.failures), 1) + _, tb = result.failures[0] + # The real assertion that failed in the subprocess is shown ... + self.assertIn('self.assertEqual(1, 2)', tb) + # ... as the direct cause of the replayed failure ... + self.assertIn('direct cause', tb) + # ... without leaking the parent-side replay frames. + self.assertNotIn('isolation.py', tb) + + @support.requires_subprocess() + def test_durations_forwarded_for_class(self): + from test._isolated_sample import DURATION_SLEEP + result = unittest.TestResult() + result.collectedDurations = [] + suite = unittest.TestLoader().loadTestsFromName( + 'test._isolated_sample.DurationSample') + suite.run(result) + # The duration reported in the parent is the one measured in the + # subprocess (around the sleep), not the near-instant replay time. + self.assertEqual(len(result.collectedDurations), 1) + name, elapsed = result.collectedDurations[0] + self.assertEqual(name.split()[0], 'test_slow') + self.assertGreaterEqual(elapsed, DURATION_SLEEP / 2) + + def test_skipped_without_subprocess_support(self): + # On a platform without subprocess support the test is skipped in the + # parent, before any subprocess is spawned. + calls = [] + orig = isolation._run_in_subprocess + with support.swap_attr(support, 'has_subprocess_support', False): + isolation._run_in_subprocess = lambda *a, **k: calls.append(a) + try: + result = self._run('MethodSample.test_pass') + finally: + isolation._run_in_subprocess = orig + self.assertEqual(result.testsRun, 1) + self.assertEqual(len(result.skipped), 1) + self.assertEqual(calls, []) + + if __name__ == '__main__': unittest.main() diff --git a/Misc/NEWS.d/next/Tests/2026-06-29-10-14-09.gh-issue-152548.Khw9J7.rst b/Misc/NEWS.d/next/Tests/2026-06-29-10-14-09.gh-issue-152548.Khw9J7.rst new file mode 100644 index 000000000000000..c5559ba110de96a --- /dev/null +++ b/Misc/NEWS.d/next/Tests/2026-06-29-10-14-09.gh-issue-152548.Khw9J7.rst @@ -0,0 +1,3 @@ +Add the :func:`test.support.isolated` decorator to run a test method or +``TestCase`` subclass in a fresh interpreter subprocess, isolated from the rest +of the test run.