diff --git a/Doc/library/test.rst b/Doc/library/test.rst index 4e21e1ded82724..e254f5585f6382 100644 --- a/Doc/library/test.rst +++ b/Doc/library/test.rst @@ -961,6 +961,59 @@ The :mod:`!test.support` module defines the following functions: :mod:`tracemalloc` is enabled. +.. currentmodule:: test.support.isolation + +.. decorator:: isolated() + + Decorator that runs the decorated test in isolation, in a fresh interpreter + subprocess, so that it does not share global or interpreter state with the + rest of the test run. It can decorate a test method or a whole + :class:`~unittest.TestCase` subclass. Decorated methods must take no extra + arguments. A failure, error or skip in the subprocess is reported for the + corresponding test, and individual :meth:`subtests + ` that fail or are skipped are reported + individually. A reported failure or error shows the original subprocess + traceback as the cause of the exception. + + When a **method** is decorated, only that method runs in a subprocess; all + fixtures (:meth:`~unittest.TestCase.setUp` / :meth:`~unittest.TestCase.tearDown`, + :meth:`~unittest.TestCase.setUpClass` / :meth:`~unittest.TestCase.tearDownClass` + and ``setUpModule()`` / ``tearDownModule()``) run both in the parent process + (as usual) and in the subprocess around the method. + + When a **class** is decorated, the whole class runs in a single subprocess, + and :meth:`~unittest.TestCase.setUpClass`, + :meth:`~unittest.TestCase.tearDownClass`, :meth:`~unittest.TestCase.setUp` + and :meth:`~unittest.TestCase.tearDown` run once each in the subprocess and + are skipped in the parent process. A failure or skip of + :meth:`~unittest.TestCase.setUpClass` in the subprocess is reported for the + whole class. ``setUpModule()`` cannot be controlled by a class decorator, + so it still runs in the parent process too; test it with + :data:`running_isolated` if needed. + + The subprocess inherits the enabled resources (``-u``), memory limit + (``-M``) and verbosity (``-v``) of the parent test run, so that + :func:`~test.support.requires_resource`, :func:`~test.support.requires`, + :func:`~test.support.bigmemtest` and the like behave consistently in both + processes. + + The test is skipped on platforms without subprocess support. + + +.. data:: running_isolated + + ``True`` while the code runs in the isolated subprocess spawned by + :func:`isolated`, and ``False`` otherwise (including in the parent process + and in a normal, non-isolated test run). Fixtures such as + :meth:`~unittest.TestCase.setUp`, :meth:`~unittest.TestCase.tearDown`, + :meth:`~unittest.TestCase.setUpClass`, :meth:`~unittest.TestCase.tearDownClass`, + ``setUpModule()`` and ``tearDownModule()`` can test it to choose which code + to run in the subprocess. + + +.. currentmodule:: test.support + + .. function:: check_free_after_iterating(test, iter, cls, args=()) Assert instances of *cls* are deallocated after iterating. diff --git a/Lib/test/_isolated_sample.py b/Lib/test/_isolated_sample.py new file mode 100644 index 00000000000000..2533479d25b0e2 --- /dev/null +++ b/Lib/test/_isolated_sample.py @@ -0,0 +1,74 @@ +"""Sample tests driven by test.test_support.TestIsolated. + +This module is imported, never run as a test file, so that +:func:`test.support.isolation.isolated` has a real, importable target to run in +a subprocess. Several of these tests fail, error or are skipped on purpose. +""" + +import time +import unittest +from test.support import isolation + +# A test in DurationSample sleeps this long in the subprocess; the parent +# replays it instantly, so a parent-reported duration close to this proves the +# subprocess timing was forwarded rather than the replay time measured. +DURATION_SLEEP = 0.2 + + +class MethodSample(unittest.TestCase): + + @isolation.isolated() + def test_pass(self): + self.assertTrue(isolation.running_isolated) + + @isolation.isolated() + def test_fail(self): + self.assertEqual(1, 2) + + @isolation.isolated() + def test_error(self): + raise RuntimeError('boom') + + @isolation.isolated() + def test_skip(self): + self.skipTest('nope') + + @isolation.isolated() + @unittest.expectedFailure + def test_expected_failure(self): + self.assertEqual(1, 2) + + @isolation.isolated() + @unittest.expectedFailure + def test_unexpected_success(self): + pass + + +@isolation.isolated() +class ClassSample(unittest.TestCase): + + def test_pass(self): + self.assertTrue(isolation.running_isolated) + + def test_fail(self): + self.assertEqual(1, 2) + + @unittest.expectedFailure + def test_expected_failure(self): + self.assertEqual(1, 2) + + +class SubtestSample(unittest.TestCase): + + @isolation.isolated() + def test_subtests(self): + for i in range(3): + with self.subTest(i=i): + self.assertNotEqual(i, 1) + + +@isolation.isolated() +class DurationSample(unittest.TestCase): + + def test_slow(self): + time.sleep(DURATION_SLEEP) diff --git a/Lib/test/support/isolation.py b/Lib/test/support/isolation.py new file mode 100644 index 00000000000000..fe0e5df0ec8ba4 --- /dev/null +++ b/Lib/test/support/isolation.py @@ -0,0 +1,286 @@ +"""Run tests in isolated subprocesses (the test.support.isolation.isolated decorator). + +A failure, error or skip that happens in the subprocess is replayed in the +parent process so that the test runner records it. The original (subprocess) +traceback is attached as the cause of the replayed exception, the same way +:mod:`concurrent.futures` surfaces tracebacks from worker processes. +""" + +import functools +import os +import sys +import unittest + +# Mark this module's frames as belonging to the test machinery, so that +# unittest strips them from reported tracebacks (see TestResult._clean_tracebacks +# in Lib/unittest/result.py). Only the original subprocess traceback, attached +# as the cause, is then shown -- not the parent-side replay frames. +__unittest = True + +# Environment variable set in the child process so that the decorated test +# method runs its real body instead of spawning yet another subprocess. +_RUN_IN_SUBPROCESS_ENV = '_PYTHON_RUN_IN_SUBPROCESS' + +# Environment variable carrying (as JSON) the regrtest-configured test.support +# state, so that the bare subprocess honors -u, -M, -v, etc. like the parent. +_CONFIG_ENV = '_PYTHON_ISOLATED_CONFIG' + +# test.support globals set by regrtest (libregrtest/setup.py) that affect how +# tests run and which are skipped at runtime in the subprocess. +_PROPAGATED_CONFIG = ( + 'use_resources', # -u (is_resource_enabled/requires) + 'max_memuse', 'real_max_memuse', # -M (bigmemtest) + 'verbose', # -v + 'failfast', # -f +) + +def _child_config(): + import test.support as support + return {name: getattr(support, name) for name in _PROPAGATED_CONFIG} + +def _apply_child_config(): + """Mirror the parent's test.support configuration in the subprocess. + + Called by subprocess_runner before loading the test, so that import-time + decorators (e.g. requires_resource) and runtime checks see the same -u/-M/-v + configuration as the parent process. + """ + import json + import test.support as support + data = os.environ.get(_CONFIG_ENV) + if data: + for name, value in json.loads(data).items(): + setattr(support, name, value) + +# True while running inside the isolated subprocess spawned by @isolated(). +# setUp()/tearDown() and the class- and module-level fixtures can test it to +# decide which code to run in the subprocess as opposed to the parent process. +running_isolated = bool(os.environ.get(_RUN_IN_SUBPROCESS_ENV)) + + +class _RemoteTraceback(Exception): + """Carry a formatted traceback string from the subprocess for display. + + Attached as the ``__cause__`` of the replayed failure/error, so that the + original traceback is shown by the traceback machinery. + """ + def __init__(self, tb): + self.tb = tb + + def __str__(self): + return self.tb + + +class _SubprocessTestError(Exception): + """Replay a subprocess error (as opposed to a failure) in the parent.""" + + +def _remote(detail): + # Wrap the subprocess traceback the way concurrent.futures does, so it is + # clearly delimited when shown as the cause. + return _RemoteTraceback(f'\n"""\n{detail}"""') + + +def _check_subprocess_support(): + # isolated() always runs the test in a subprocess, so skip (in the parent) + # on platforms that do not support spawning one. + import test.support as support + if not support.has_subprocess_support: + raise unittest.SkipTest('requires subprocess support') + + +def _run_in_subprocess(module, qualname): + """Run module.qualname (a test method or class) in a fresh subprocess. + + Return ``(payload, output, returncode)``, where *payload* is the decoded + ``{'outcomes': ..., 'durations': ...}`` mapping from the subprocess, or + ``None`` if it did not run to completion (crash, import error, ...). + """ + import json + import subprocess + import tempfile + env = dict(os.environ) + env[_RUN_IN_SUBPROCESS_ENV] = '1' + env[_CONFIG_ENV] = json.dumps(_child_config()) + fd, result_path = tempfile.mkstemp(suffix='.json') + os.close(fd) + try: + cmd = [sys.executable, '-m', 'test.support.subprocess_runner', + module, qualname, result_path] + proc = subprocess.run(cmd, capture_output=True, text=True, env=env) + try: + with open(result_path, encoding='utf-8') as f: + payload = json.load(f) + except (OSError, ValueError): + payload = None + finally: + try: + os.unlink(result_path) + except OSError: + pass + return payload, (proc.stdout or '') + (proc.stderr or ''), proc.returncode + + +def _replay_outcome(test, outcome): + kind = outcome['kind'] + detail = outcome['detail'] + if kind == 'skipped': + test.skipTest(detail) # the detail is the skip reason, not a traceback + elif kind in ('failure', 'expected_failure'): + # An expected failure is replayed like a failure: the wrapper carries + # the @expectedFailure marker (copied by functools.wraps), so the parent + # records the raised exception as an expectedFailure, not a failure. + exc = test.failureException('test failed in the subprocess') + raise exc from _remote(detail) + else: # 'error' + exc = _SubprocessTestError('test failed in the subprocess') + raise exc from _remote(detail) + + +def _replay_outcomes(test, outcomes): + # Replay each subtest outcome in its own subTest() context so that they are + # reported individually, then replay the whole-test outcome (if any). + main = [] + for outcome in outcomes: + if outcome['subtest']: + with test.subTest(outcome['desc']): + _replay_outcome(test, outcome) + else: + main.append(outcome) + for outcome in main: + _replay_outcome(test, outcome) + + +def _raise_fixture_outcome(outcome): + # Reproduce a setUpClass()/setUpModule() failure or skip from the + # subprocess in a parent-process fixture, so it applies to every test. + if outcome['kind'] == 'skipped': + raise unittest.SkipTest(outcome['detail']) + exc = _SubprocessTestError('class failed in the subprocess') + raise exc from _remote(outcome['detail']) + + +def _isolate_method(func): + @functools.wraps(func) + def wrapper(self, /, *args, **kwargs): + if running_isolated: + # Already running in the subprocess: run the real test. + return func(self, *args, **kwargs) + _check_subprocess_support() + cls = type(self) + qualname = f'{cls.__qualname__}.{func.__name__}' + payload, output, returncode = _run_in_subprocess(cls.__module__, + qualname) + if payload is None: + exc = _SubprocessTestError( + f'test did not complete in a subprocess (exit code {returncode})') + raise exc from _remote(output) + # The parent measures this method's own duration (the real cost of the + # isolated run, subprocess startup included), so nothing to forward here. + _replay_outcomes(self, payload['outcomes']) + return wrapper + + +def _isolate_class(cls): + # Unwrap to the plain functions: the replacements below call them with the + # runtime cls, so a subclass of an isolated class runs the fixtures bound to + # itself (a bound classmethod would freeze the decoration-time class). + orig_setUpClass = cls.setUpClass.__func__ + orig_tearDownClass = cls.tearDownClass.__func__ + orig_setUp = cls.setUp + orig_tearDown = cls.tearDown + orig_addDuration = getattr(cls, '_addDuration', None) + + def setUpClass(cls): + if running_isolated: + orig_setUpClass(cls) + return + _check_subprocess_support() + # Run the whole class in a single subprocess and stash the outcomes + # for the wrapped test methods to replay. + payload, output, returncode = _run_in_subprocess(cls.__module__, + cls.__qualname__) + if payload is None: + exc = _SubprocessTestError( + f'class did not complete in a subprocess (exit code {returncode})') + raise exc from _remote(output) + by_id = {} + for outcome in payload['outcomes']: + if outcome['fixture']: + # A setUpClass()/setUpModule() failure or skip: apply it to the + # whole class by raising it here, in the parent's setUpClass(). + _raise_fixture_outcome(outcome) + by_id.setdefault(outcome['id'], []).append(outcome) + cls._isolated_outcomes = by_id + cls._isolated_durations = dict(payload.get('durations', ())) + + def tearDownClass(cls): + if running_isolated: + orig_tearDownClass(cls) + else: + cls._isolated_outcomes = None + cls._isolated_durations = None + + def setUp(self): + # In the parent the real test does not run, so neither should setUp(). + if running_isolated: + orig_setUp(self) + + def tearDown(self): + if running_isolated: + orig_tearDown(self) + + def _addDuration(self, result, elapsed): + # In the parent, report the per-test duration measured in the subprocess + # rather than the replay time (subprocess startup is paid once, in + # setUpClass). + if not running_isolated: + durations = getattr(type(self), '_isolated_durations', None) or {} + elapsed = durations.get(self.id(), elapsed) + orig_addDuration(self, result, elapsed) + + def replay(self): + by_id = getattr(type(self), '_isolated_outcomes', None) or {} + _replay_outcomes(self, by_id.get(self.id(), [])) + + cls.setUpClass = classmethod(setUpClass) + cls.tearDownClass = classmethod(tearDownClass) + cls.setUp = setUp + cls.tearDown = tearDown + if orig_addDuration is not None: + cls._addDuration = _addDuration + for name in unittest.TestLoader().getTestCaseNames(cls): + method = getattr(cls, name) + @functools.wraps(method) + def wrapper(self, /, *args, __func=method, **kwargs): + if running_isolated: + return __func(self, *args, **kwargs) + replay(self) + setattr(cls, name, wrapper) + return cls + + +def isolated(): + """Decorator to run a test method or class in isolation from the rest. + + The decorated test runs in a separate, fresh Python process, so it does not + share global or interpreter state with the rest of the test run. When a + :class:`~unittest.TestCase` subclass is decorated, the whole class runs in a + single subprocess and its ``setUpClass()``/``setUpModule()`` fixtures run + once there; when a method is decorated, only that method runs in a + subprocess. Decorated methods must take no extra arguments. + + A failure, error or skip of the whole test is reported for the test, and + individual subtests (:meth:`~unittest.TestCase.subTest`) that fail or are + skipped are reported individually. The original subprocess traceback is + shown as the cause of a reported failure or error. Use + :data:`running_isolated` in fixtures to choose what to run in the subprocess. + + The test is skipped on platforms without subprocess support, since it must + spawn one. + """ + def decorator(obj): + if isinstance(obj, type) and issubclass(obj, unittest.TestCase): + return _isolate_class(obj) + return _isolate_method(obj) + return decorator diff --git a/Lib/test/support/subprocess_runner.py b/Lib/test/support/subprocess_runner.py new file mode 100644 index 00000000000000..6eced4951171be --- /dev/null +++ b/Lib/test/support/subprocess_runner.py @@ -0,0 +1,74 @@ +"""Run a single test method in this (sub)process and report the result. + +Invoked as ``python -m test.support.subprocess_runner MODULE QUALNAME OUTFILE`` +by :func:`test.support.isolation.isolated`. The outcome of the test (including +that of each individual subtest) is written as JSON to OUTFILE. This module is +not meant to be imported. +""" + +import json +import sys +import unittest +from unittest.case import _SubTest + +if __name__ != '__main__': + raise ImportError('this module cannot be directly imported') + +if len(sys.argv) != 4: + print('usage: python -m test.support.subprocess_runner ' + 'MODULE QUALNAME OUTFILE', file=sys.stderr) + sys.exit(2) + +module = sys.argv[1] +qualname = sys.argv[2] +outfile = sys.argv[3] + +# Mirror the parent's regrtest configuration (-u, -M, -v, ...) before importing +# the test, so resource gating and bigmem sizing match the parent process. +from test.support.isolation import _apply_child_config +_apply_child_config() + + +class _Result(unittest.TestResult): + # Capture per-test durations keyed by test id, so the parent can report the + # subprocess timings instead of its own replay time. + def __init__(self): + super().__init__() + self.id_durations = [] + + def addDuration(self, test, elapsed): + super().addDuration(test, elapsed) + self.id_durations.append((test.id(), elapsed)) + + +suite = unittest.TestLoader().loadTestsFromName(f'{module}.{qualname}') +result = _Result() +suite.run(result) + + +def _outcome(kind, test, detail): + subtest = isinstance(test, _SubTest) + real = test.test_case if subtest else test + return { + 'kind': kind, + 'subtest': subtest, + 'desc': test._subDescription() if subtest else '', + # id() groups outcomes by test method; a non-TestCase (e.g. an + # _ErrorHolder) marks a setUpClass()/setUpModule() fixture failure. + 'id': real.id(), + 'fixture': not isinstance(real, unittest.TestCase), + 'detail': detail, + } + + +outcomes = [_outcome('failure', t, tb) for t, tb in result.failures] +outcomes += [_outcome('error', t, tb) for t, tb in result.errors] +outcomes += [_outcome('expected_failure', t, tb) + for t, tb in result.expectedFailures] +outcomes += [_outcome('skipped', t, reason) for t, reason in result.skipped] + +payload = {'outcomes': outcomes, 'durations': result.id_durations} +with open(outfile, 'w', encoding='utf-8') as f: + json.dump(payload, f) + +sys.exit(0) diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 2cad18a69ff7c0..fb39ffc57da871 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -13,6 +13,7 @@ import time import unittest from test import support +from test.support import isolation from test.support import ( force_not_colorized, is_apple, is_apple_mobile, os_helper, threading_helper ) @@ -312,59 +313,51 @@ def test_set_wakeup_fd_blocking(self): @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") class WakeupSignalTests(unittest.TestCase): - @unittest.skipIf(_testcapi is None, 'need _testcapi') - def check_wakeup(self, test_body, *signals, ordered=True): - # use a subprocess to have only one thread - code = """if 1: - import _testcapi - import os - import signal + def check_wakeup(self, test, *signals, ordered=True): + # The caller is decorated with @isolated(), so this runs in a fresh + # subprocess that has only one thread for the signal to be delivered to. import struct - signals = {!r} + signals = tuple(int(s) for s in signals) def handler(signum, frame): pass - def check_signum(signals): - data = os.read(read, len(signals)+1) - raised = struct.unpack('%uB' % len(data), data) - if not {!r}: - raised = set(raised) - signals = set(signals) - if raised != signals: - raise Exception("%r != %r" % (raised, signals)) - - {} - signal.signal(signal.SIGALRM, handler) read, write = os.pipe() os.set_blocking(write, False) signal.set_wakeup_fd(write) - test() - check_signum(signals) + test(read) + + data = os.read(read, len(signals) + 1) + raised = struct.unpack('%uB' % len(data), data) + if ordered: + self.assertEqual(raised, signals) + else: + self.assertEqual(set(raised), set(signals)) os.close(read) os.close(write) - """.format(tuple(map(int, signals)), ordered, test_body) - - assert_python_ok('-c', code) @unittest.skipIf(_testcapi is None, 'need _testcapi') @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") @force_not_colorized + @isolation.isolated() def test_wakeup_write_error(self): # Issue #16105: write() errors in the C signal handler should not # pass silently. - # Use a subprocess to have only one thread. - code = """if 1: - import _testcapi - import errno - import os - import signal - import sys - from test.support import captured_stderr + # Use @isolated() to run in a subprocess with only one thread. + r, w = os.pipe() + try: + os.write(r, b'x') + except OSError: + pass + else: + self.skipTest("OS doesn't report write() error on the read end of a pipe") + finally: + os.close(r) + os.close(w) def handler(signum, frame): 1/0 @@ -375,38 +368,23 @@ def handler(signum, frame): # Set wakeup_fd a read-only file descriptor to trigger the error signal.set_wakeup_fd(r) - try: - with captured_stderr() as err: + with support.captured_stderr() as err: + with self.assertRaises(ZeroDivisionError): signal.raise_signal(signal.SIGALRM) - except ZeroDivisionError: - # An ignored exception should have been printed out on stderr - err = err.getvalue() - if ('Exception ignored while trying to write to the signal wakeup fd' - not in err): - raise AssertionError(err) - if ('OSError: [Errno %d]' % errno.EBADF) not in err: - raise AssertionError(err) - else: - raise AssertionError("ZeroDivisionError not raised") + # An ignored exception should have been printed out on stderr + err = err.getvalue() + self.assertIn( + 'Exception ignored while trying to write to the signal wakeup fd', + err) + self.assertIn('OSError: [Errno %d]' % errno.EBADF, err) os.close(r) os.close(w) - """ - r, w = os.pipe() - try: - os.write(r, b'x') - except OSError: - pass - else: - self.skipTest("OS doesn't report write() error on the read end of a pipe") - finally: - os.close(r) - os.close(w) - - assert_python_ok('-c', code) + @unittest.skipIf(_testcapi is None, 'need _testcapi') + @isolation.isolated() def test_wakeup_fd_early(self): - self.check_wakeup("""def test(): + def test(read): import select import time @@ -429,18 +407,19 @@ def handler(signum, frame): except InterruptSelect: pass else: - raise Exception("select() was not interrupted") + self.fail("select() was not interrupted") before_time = time.monotonic() select.select([read], [], [], TIMEOUT_FULL) after_time = time.monotonic() dt = after_time - before_time - if dt >= TIMEOUT_HALF: - raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) - """, signal.SIGALRM) + self.assertLess(dt, TIMEOUT_HALF) + self.check_wakeup(test, signal.SIGALRM) + @unittest.skipIf(_testcapi is None, 'need _testcapi') + @isolation.isolated() def test_wakeup_fd_during(self): - self.check_wakeup("""def test(): + def test(read): import select import time @@ -462,27 +441,34 @@ def handler(signum, frame): except InterruptSelect: pass else: - raise Exception("select() was not interrupted") + self.fail("select() was not interrupted") after_time = time.monotonic() dt = after_time - before_time - if dt >= TIMEOUT_HALF: - raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) - """, signal.SIGALRM) + self.assertLess(dt, TIMEOUT_HALF) + self.check_wakeup(test, signal.SIGALRM) + @unittest.skipIf(_testcapi is None, 'need _testcapi') + @isolation.isolated() def test_signum(self): - self.check_wakeup("""def test(): + def test(read): + def handler(signum, frame): + pass signal.signal(signal.SIGUSR1, handler) signal.raise_signal(signal.SIGUSR1) signal.raise_signal(signal.SIGALRM) - """, signal.SIGUSR1, signal.SIGALRM) + self.check_wakeup(test, signal.SIGUSR1, signal.SIGALRM) @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 'need signal.pthread_sigmask()') + @unittest.skipIf(_testcapi is None, 'need _testcapi') + @isolation.isolated() def test_pending(self): - self.check_wakeup("""def test(): + def test(read): signum1 = signal.SIGUSR1 signum2 = signal.SIGUSR2 + def handler(signum, frame): + pass signal.signal(signum1, handler) signal.signal(signum2, handler) @@ -491,20 +477,17 @@ def test_pending(self): signal.raise_signal(signum2) # Unblocking the 2 signals calls the C signal handler twice signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) - """, signal.SIGUSR1, signal.SIGUSR2, ordered=False) + self.check_wakeup(test, signal.SIGUSR1, signal.SIGUSR2, ordered=False) @unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair') class WakeupSocketSignalTests(unittest.TestCase): @unittest.skipIf(_testcapi is None, 'need _testcapi') + @isolation.isolated() def test_socket(self): - # use a subprocess to have only one thread - code = """if 1: - import signal - import socket + # Use @isolated() to run in a subprocess with only one thread. import struct - import _testcapi signum = signal.SIGINT signals = (signum,) @@ -521,33 +504,21 @@ def handler(signum, frame): signal.raise_signal(signum) data = read.recv(1) - if not data: - raise Exception("no signum written") + self.assertTrue(data, "no signum written") raised = struct.unpack('B', data) - if raised != signals: - raise Exception("%r != %r" % (raised, signals)) + self.assertEqual(raised, signals) read.close() write.close() - """ - - assert_python_ok('-c', code) @unittest.skipIf(_testcapi is None, 'need _testcapi') + @isolation.isolated() def test_send_error(self): - # Use a subprocess to have only one thread. + # Use @isolated() to run in a subprocess with only one thread. if os.name == 'nt': action = 'send' else: action = 'write' - code = """if 1: - import errno - import signal - import socket - import sys - import time - import _testcapi - from test.support import captured_stderr signum = signal.SIGINT @@ -566,31 +537,22 @@ def handler(signum, frame): read.close() write.close() - with captured_stderr() as err: + with support.captured_stderr() as err: signal.raise_signal(signum) - err = err.getvalue() - if ('Exception ignored while trying to {action} to the signal wakeup fd' - not in err): - raise AssertionError(err) - """.format(action=action) - assert_python_ok('-c', code) + self.assertIn( + 'Exception ignored while trying to %s to the signal wakeup fd' + % action, + err.getvalue()) @unittest.skipIf(_testcapi is None, 'need _testcapi') + @isolation.isolated() def test_warn_on_full_buffer(self): - # Use a subprocess to have only one thread. + # Use @isolated() to run in a subprocess with only one thread. if os.name == 'nt': action = 'send' else: action = 'write' - code = """if 1: - import errno - import signal - import socket - import sys - import time - import _testcapi - from test.support import captured_stderr signum = signal.SIGINT @@ -627,67 +589,46 @@ def handler(signum, frame): except (BlockingIOError, TimeoutError): pass - print(f"%s bytes written into the socketpair" % written, flush=True) - write.setblocking(False) - try: + with self.assertRaises(BlockingIOError, + msg="%s bytes failed to fill the socketpair " + "buffer" % written): write.send(b"x") - except BlockingIOError: - # The socketpair buffer seems full - pass - else: - raise AssertionError("%s bytes failed to fill the socketpair " - "buffer" % written) # By default, we get a warning when a signal arrives - msg = ('Exception ignored while trying to {action} ' - 'to the signal wakeup fd') + msg = ('Exception ignored while trying to %s ' + 'to the signal wakeup fd' % action) signal.set_wakeup_fd(write.fileno()) - with captured_stderr() as err: + with support.captured_stderr() as err: signal.raise_signal(signum) - - err = err.getvalue() - if msg not in err: - raise AssertionError("first set_wakeup_fd() test failed, " - "stderr: %r" % err) + self.assertIn(msg, err.getvalue(), + "first set_wakeup_fd() test failed") # And also if warn_on_full_buffer=True signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True) - with captured_stderr() as err: + with support.captured_stderr() as err: signal.raise_signal(signum) - - err = err.getvalue() - if msg not in err: - raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) " - "test failed, stderr: %r" % err) + self.assertIn(msg, err.getvalue(), + "set_wakeup_fd(warn_on_full_buffer=True) test failed") # But not if warn_on_full_buffer=False signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False) - with captured_stderr() as err: + with support.captured_stderr() as err: signal.raise_signal(signum) - - err = err.getvalue() - if err != "": - raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) " - "test failed, stderr: %r" % err) + self.assertEqual(err.getvalue(), "", + "set_wakeup_fd(warn_on_full_buffer=False) test failed") # And then check the default again, to make sure warn_on_full_buffer # settings don't leak across calls. signal.set_wakeup_fd(write.fileno()) - with captured_stderr() as err: + with support.captured_stderr() as err: signal.raise_signal(signum) - - err = err.getvalue() - if msg not in err: - raise AssertionError("second set_wakeup_fd() test failed, " - "stderr: %r" % err) - - """.format(action=action) - assert_python_ok('-c', code) + self.assertIn(msg, err.getvalue(), + "second set_wakeup_fd() test failed") @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index d556f96bc532ed..90a56cf86971ea 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -19,6 +19,7 @@ import warnings from test import support +from test.support import isolation from test.support import hashlib_helper from test.support import import_helper from test.support import os_helper @@ -1075,5 +1076,106 @@ def test_disable_hash_md5_in_fips_mode_allow_all(self): self.assertIsInstance(h, self._hashlib.HASH) +class TestIsolated(unittest.TestCase): + # Drive the sample tests in test._isolated_sample (which really spawn + # subprocesses through @isolation.isolated()) under a private TestResult, + # and check that each subprocess outcome is replayed in the parent. + + @staticmethod + def _run(name): + suite = unittest.TestLoader().loadTestsFromName( + 'test._isolated_sample.' + name) + result = unittest.TestResult() + suite.run(result) + return result + + @staticmethod + def _names(items): + # Map outcome entries (which are (test, detail) pairs, except + # unexpectedSuccesses which are bare tests) to their method names. + names = [] + for item in items: + test = item[0] if isinstance(item, tuple) else item + names.append(test.id().rpartition('.')[2]) + return sorted(names) + + @support.requires_subprocess() + def test_method_outcomes(self): + result = self._run('MethodSample') + self.assertEqual(result.testsRun, 6) + self.assertEqual(self._names(result.failures), ['test_fail']) + self.assertEqual(self._names(result.errors), ['test_error']) + self.assertEqual(self._names(result.skipped), ['test_skip']) + self.assertEqual(self._names(result.expectedFailures), + ['test_expected_failure']) + self.assertEqual(self._names(result.unexpectedSuccesses), + ['test_unexpected_success']) + + @support.requires_subprocess() + def test_class_outcomes(self): + result = self._run('ClassSample') + self.assertEqual(result.testsRun, 3) + self.assertEqual(self._names(result.failures), ['test_fail']) + self.assertEqual(self._names(result.expectedFailures), + ['test_expected_failure']) + self.assertEqual(result.errors, []) + self.assertEqual(result.unexpectedSuccesses, []) + + @support.requires_subprocess() + def test_subtests_reported_individually(self): + result = self._run('SubtestSample') + self.assertEqual(result.testsRun, 1) + self.assertEqual(len(result.failures), 1) + test, _ = result.failures[0] + self.assertIn('i=1', str(test)) + + @support.requires_subprocess() + def test_skip_reason_propagated(self): + result = self._run('MethodSample.test_skip') + self.assertEqual([reason for _, reason in result.skipped], ['nope']) + + @support.requires_subprocess() + def test_subprocess_traceback_is_cause(self): + result = self._run('MethodSample.test_fail') + self.assertEqual(len(result.failures), 1) + _, tb = result.failures[0] + # The real assertion that failed in the subprocess is shown ... + self.assertIn('self.assertEqual(1, 2)', tb) + # ... as the direct cause of the replayed failure ... + self.assertIn('direct cause', tb) + # ... without leaking the parent-side replay frames. + self.assertNotIn('isolation.py', tb) + + @support.requires_subprocess() + def test_durations_forwarded_for_class(self): + from test._isolated_sample import DURATION_SLEEP + result = unittest.TestResult() + result.collectedDurations = [] + suite = unittest.TestLoader().loadTestsFromName( + 'test._isolated_sample.DurationSample') + suite.run(result) + # The duration reported in the parent is the one measured in the + # subprocess (around the sleep), not the near-instant replay time. + self.assertEqual(len(result.collectedDurations), 1) + name, elapsed = result.collectedDurations[0] + self.assertEqual(name.split()[0], 'test_slow') + self.assertGreaterEqual(elapsed, DURATION_SLEEP / 2) + + def test_skipped_without_subprocess_support(self): + # On a platform without subprocess support the test is skipped in the + # parent, before any subprocess is spawned. + calls = [] + orig = isolation._run_in_subprocess + with support.swap_attr(support, 'has_subprocess_support', False): + isolation._run_in_subprocess = lambda *a, **k: calls.append(a) + try: + result = self._run('MethodSample.test_pass') + finally: + isolation._run_in_subprocess = orig + self.assertEqual(result.testsRun, 1) + self.assertEqual(len(result.skipped), 1) + self.assertEqual(calls, []) + + if __name__ == '__main__': unittest.main() diff --git a/Misc/NEWS.d/next/Tests/2026-06-29-10-14-09.gh-issue-152548.Khw9J7.rst b/Misc/NEWS.d/next/Tests/2026-06-29-10-14-09.gh-issue-152548.Khw9J7.rst new file mode 100644 index 00000000000000..c5559ba110de96 --- /dev/null +++ b/Misc/NEWS.d/next/Tests/2026-06-29-10-14-09.gh-issue-152548.Khw9J7.rst @@ -0,0 +1,3 @@ +Add the :func:`test.support.isolated` decorator to run a test method or +``TestCase`` subclass in a fresh interpreter subprocess, isolated from the rest +of the test run.