diff --git a/Doc/library/test.rst b/Doc/library/test.rst index 4e21e1ded82724c..e254f5585f63820 100644 --- a/Doc/library/test.rst +++ b/Doc/library/test.rst @@ -961,6 +961,59 @@ The :mod:`!test.support` module defines the following functions: :mod:`tracemalloc` is enabled. +.. currentmodule:: test.support.isolation + +.. decorator:: isolated() + + Decorator that runs the decorated test in isolation, in a fresh interpreter + subprocess, so that it does not share global or interpreter state with the + rest of the test run. It can decorate a test method or a whole + :class:`~unittest.TestCase` subclass. Decorated methods must take no extra + arguments. A failure, error or skip in the subprocess is reported for the + corresponding test, and individual :meth:`subtests + ` that fail or are skipped are reported + individually. A reported failure or error shows the original subprocess + traceback as the cause of the exception. + + When a **method** is decorated, only that method runs in a subprocess; all + fixtures (:meth:`~unittest.TestCase.setUp` / :meth:`~unittest.TestCase.tearDown`, + :meth:`~unittest.TestCase.setUpClass` / :meth:`~unittest.TestCase.tearDownClass` + and ``setUpModule()`` / ``tearDownModule()``) run both in the parent process + (as usual) and in the subprocess around the method. + + When a **class** is decorated, the whole class runs in a single subprocess, + and :meth:`~unittest.TestCase.setUpClass`, + :meth:`~unittest.TestCase.tearDownClass`, :meth:`~unittest.TestCase.setUp` + and :meth:`~unittest.TestCase.tearDown` run once each in the subprocess and + are skipped in the parent process. A failure or skip of + :meth:`~unittest.TestCase.setUpClass` in the subprocess is reported for the + whole class. ``setUpModule()`` cannot be controlled by a class decorator, + so it still runs in the parent process too; test it with + :data:`running_isolated` if needed. + + The subprocess inherits the enabled resources (``-u``), memory limit + (``-M``) and verbosity (``-v``) of the parent test run, so that + :func:`~test.support.requires_resource`, :func:`~test.support.requires`, + :func:`~test.support.bigmemtest` and the like behave consistently in both + processes. + + The test is skipped on platforms without subprocess support. + + +.. data:: running_isolated + + ``True`` while the code runs in the isolated subprocess spawned by + :func:`isolated`, and ``False`` otherwise (including in the parent process + and in a normal, non-isolated test run). Fixtures such as + :meth:`~unittest.TestCase.setUp`, :meth:`~unittest.TestCase.tearDown`, + :meth:`~unittest.TestCase.setUpClass`, :meth:`~unittest.TestCase.tearDownClass`, + ``setUpModule()`` and ``tearDownModule()`` can test it to choose which code + to run in the subprocess. + + +.. currentmodule:: test.support + + .. function:: check_free_after_iterating(test, iter, cls, args=()) Assert instances of *cls* are deallocated after iterating. diff --git a/Lib/test/_isolated_sample.py b/Lib/test/_isolated_sample.py new file mode 100644 index 000000000000000..2533479d25b0e2b --- /dev/null +++ b/Lib/test/_isolated_sample.py @@ -0,0 +1,74 @@ +"""Sample tests driven by test.test_support.TestIsolated. + +This module is imported, never run as a test file, so that +:func:`test.support.isolation.isolated` has a real, importable target to run in +a subprocess. Several of these tests fail, error or are skipped on purpose. +""" + +import time +import unittest +from test.support import isolation + +# A test in DurationSample sleeps this long in the subprocess; the parent +# replays it instantly, so a parent-reported duration close to this proves the +# subprocess timing was forwarded rather than the replay time measured. +DURATION_SLEEP = 0.2 + + +class MethodSample(unittest.TestCase): + + @isolation.isolated() + def test_pass(self): + self.assertTrue(isolation.running_isolated) + + @isolation.isolated() + def test_fail(self): + self.assertEqual(1, 2) + + @isolation.isolated() + def test_error(self): + raise RuntimeError('boom') + + @isolation.isolated() + def test_skip(self): + self.skipTest('nope') + + @isolation.isolated() + @unittest.expectedFailure + def test_expected_failure(self): + self.assertEqual(1, 2) + + @isolation.isolated() + @unittest.expectedFailure + def test_unexpected_success(self): + pass + + +@isolation.isolated() +class ClassSample(unittest.TestCase): + + def test_pass(self): + self.assertTrue(isolation.running_isolated) + + def test_fail(self): + self.assertEqual(1, 2) + + @unittest.expectedFailure + def test_expected_failure(self): + self.assertEqual(1, 2) + + +class SubtestSample(unittest.TestCase): + + @isolation.isolated() + def test_subtests(self): + for i in range(3): + with self.subTest(i=i): + self.assertNotEqual(i, 1) + + +@isolation.isolated() +class DurationSample(unittest.TestCase): + + def test_slow(self): + time.sleep(DURATION_SLEEP) diff --git a/Lib/test/audit-tests.py b/Lib/test/audit-tests.py deleted file mode 100644 index 8be5bf8aa4f5469..000000000000000 --- a/Lib/test/audit-tests.py +++ /dev/null @@ -1,775 +0,0 @@ -"""This script contains the actual auditing tests. - -It should not be imported directly, but should be run by the test_audit -module with arguments identifying each test. - -""" - -import contextlib -import os -import sys -import unittest.mock -from test.support import swap_item - - -class TestHook: - """Used in standard hook tests to collect any logged events. - - Should be used in a with block to ensure that it has no impact - after the test completes. - """ - - def __init__(self, raise_on_events=None, exc_type=RuntimeError): - self.raise_on_events = raise_on_events or () - self.exc_type = exc_type - self.seen = [] - self.closed = False - - def __enter__(self, *a): - sys.addaudithook(self) - return self - - def __exit__(self, *a): - self.close() - - def close(self): - self.closed = True - - @property - def seen_events(self): - return [i[0] for i in self.seen] - - def __call__(self, event, args): - if self.closed: - return - self.seen.append((event, args)) - if event in self.raise_on_events: - raise self.exc_type("saw event " + event) - - -# Simple helpers, since we are not in unittest here -def assertEqual(x, y): - if x != y: - raise AssertionError(f"{x!r} should equal {y!r}") - - -def assertIn(el, series): - if el not in series: - raise AssertionError(f"{el!r} should be in {series!r}") - - -def assertNotIn(el, series): - if el in series: - raise AssertionError(f"{el!r} should not be in {series!r}") - - -def assertSequenceEqual(x, y): - if len(x) != len(y): - raise AssertionError(f"{x!r} should equal {y!r}") - if any(ix != iy for ix, iy in zip(x, y)): - raise AssertionError(f"{x!r} should equal {y!r}") - - -@contextlib.contextmanager -def assertRaises(ex_type): - try: - yield - assert False, f"expected {ex_type}" - except BaseException as ex: - if isinstance(ex, AssertionError): - raise - assert type(ex) is ex_type, f"{ex} should be {ex_type}" - - -def test_basic(): - with TestHook() as hook: - sys.audit("test_event", 1, 2, 3) - assertEqual(hook.seen[0][0], "test_event") - assertEqual(hook.seen[0][1], (1, 2, 3)) - - -def test_block_add_hook(): - # Raising an exception should prevent a new hook from being added, - # but will not propagate out. - with TestHook(raise_on_events="sys.addaudithook") as hook1: - with TestHook() as hook2: - sys.audit("test_event") - assertIn("test_event", hook1.seen_events) - assertNotIn("test_event", hook2.seen_events) - - -def test_block_add_hook_baseexception(): - # Raising BaseException will propagate out when adding a hook - with assertRaises(BaseException): - with TestHook( - raise_on_events="sys.addaudithook", exc_type=BaseException - ) as hook1: - # Adding this next hook should raise BaseException - with TestHook() as hook2: - pass - - -def test_marshal(): - import marshal - o = ("a", "b", "c", 1, 2, 3) - payload = marshal.dumps(o) - - with TestHook() as hook: - assertEqual(o, marshal.loads(marshal.dumps(o))) - - try: - with open("test-marshal.bin", "wb") as f: - marshal.dump(o, f) - with open("test-marshal.bin", "rb") as f: - assertEqual(o, marshal.load(f)) - finally: - os.unlink("test-marshal.bin") - - actual = [(a[0], a[1]) for e, a in hook.seen if e == "marshal.dumps"] - assertSequenceEqual(actual, [(o, marshal.version)] * 2) - - actual = [a[0] for e, a in hook.seen if e == "marshal.loads"] - assertSequenceEqual(actual, [payload]) - - actual = [e for e, a in hook.seen if e == "marshal.load"] - assertSequenceEqual(actual, ["marshal.load"]) - - -def test_pickle(): - import pickle - - class PicklePrint: - def __reduce_ex__(self, p): - return str, ("Pwned!",) - - payload_1 = pickle.dumps(PicklePrint()) - payload_2 = pickle.dumps(("a", "b", "c", 1, 2, 3)) - - # Before we add the hook, ensure our malicious pickle loads - assertEqual("Pwned!", pickle.loads(payload_1)) - - with TestHook(raise_on_events="pickle.find_class") as hook: - with assertRaises(RuntimeError): - # With the hook enabled, loading globals is not allowed - pickle.loads(payload_1) - # pickles with no globals are okay - pickle.loads(payload_2) - - -def test_monkeypatch(): - class A: - pass - - class B: - pass - - class C(A): - pass - - a = A() - - with TestHook() as hook: - # Catch name changes - C.__name__ = "X" - # Catch type changes - C.__bases__ = (B,) - # Ensure bypassing __setattr__ is still caught - type.__dict__["__bases__"].__set__(C, (B,)) - # Catch attribute replacement - C.__init__ = B.__init__ - # Catch attribute addition - C.new_attr = 123 - # Catch class changes - a.__class__ = B - - actual = [(a[0], a[1]) for e, a in hook.seen if e == "object.__setattr__"] - assertSequenceEqual( - [(C, "__name__"), (C, "__bases__"), (C, "__bases__"), (a, "__class__")], actual - ) - - -def test_open(testfn): - # SSLContext.load_dh_params uses Py_fopen() rather than normal open() - try: - import ssl - - load_dh_params = ssl.create_default_context().load_dh_params - except ImportError: - load_dh_params = None - - try: - import readline - except ImportError: - readline = None - - def rl(name): - if readline: - return getattr(readline, name, None) - else: - return None - - try: - import _remote_debugging - except ImportError: - _remote_debugging = None - - def rd(name): - if _remote_debugging: - return getattr(_remote_debugging, name, None) - return None - - # Try a range of "open" functions. - # All of them should fail - with TestHook(raise_on_events={"open"}) as hook: - for fn, *args in [ - (open, testfn, "r"), - (open, sys.executable, "rb"), - (open, 3, "wb"), - (open, testfn, "w", -1, None, None, None, False, lambda *a: 1), - (load_dh_params, testfn), - (rl("read_history_file"), testfn), - (rl("read_history_file"), None), - (rl("write_history_file"), testfn), - (rl("write_history_file"), None), - (rl("append_history_file"), 0, testfn), - (rl("append_history_file"), 0, None), - (rl("read_init_file"), testfn), - (rl("read_init_file"), None), - (rd("BinaryWriter"), testfn, 1000, 0), - (rd("BinaryReader"), testfn), - ]: - if not fn: - continue - with assertRaises(RuntimeError): - try: - fn(*args) - except NotImplementedError: - if fn == load_dh_params: - # Not callable in some builds - load_dh_params = None - raise RuntimeError - else: - raise - - actual_mode = [(a[0], a[1]) for e, a in hook.seen if e == "open" and a[1]] - actual_flag = [(a[0], a[2]) for e, a in hook.seen if e == "open" and not a[1]] - assertSequenceEqual( - [ - i - for i in [ - (testfn, "r"), - (sys.executable, "r"), - (3, "w"), - (testfn, "w"), - (testfn, "rb") if load_dh_params else None, - (testfn, "r") if readline else None, - ("~/.history", "r") if readline else None, - (testfn, "w") if readline else None, - ("~/.history", "w") if readline else None, - (testfn, "a") if rl("append_history_file") else None, - ("~/.history", "a") if rl("append_history_file") else None, - (testfn, "r") if readline else None, - ("", "r") if readline else None, - (testfn, "wb") if rd("BinaryWriter") else None, - (testfn, "rb") if rd("BinaryReader") else None, - ] - if i is not None - ], - actual_mode, - ) - assertSequenceEqual([], actual_flag) - - -def test_cantrace(): - traced = [] - - def trace(frame, event, *args): - if frame.f_code == TestHook.__call__.__code__: - traced.append(event) - - old = sys.settrace(trace) - try: - with TestHook() as hook: - # No traced call - eval("1") - - # No traced call - hook.__cantrace__ = False - eval("2") - - # One traced call - hook.__cantrace__ = True - eval("3") - - # Two traced calls (writing to private member, eval) - hook.__cantrace__ = 1 - eval("4") - - # One traced call (writing to private member) - hook.__cantrace__ = 0 - finally: - sys.settrace(old) - - assertSequenceEqual(["call"] * 4, traced) - - -def test_mmap(): - import mmap - - with TestHook() as hook: - mmap.mmap(-1, 8) - assertEqual(hook.seen[0][1][:2], (-1, 8)) - - -def test_ctypes_call_function(): - import ctypes - import _ctypes - - with TestHook() as hook: - _ctypes.call_function(ctypes._memmove_addr, (0, 0, 0)) - assert ("ctypes.call_function", (ctypes._memmove_addr, (0, 0, 0))) in hook.seen, f"{ctypes._memmove_addr=} {hook.seen=}" - - ctypes.CFUNCTYPE(ctypes.c_voidp)(ctypes._memset_addr)(1, 0, 0) - assert ("ctypes.call_function", (ctypes._memset_addr, (1, 0, 0))) in hook.seen, f"{ctypes._memset_addr=} {hook.seen=}" - - with TestHook() as hook: - ctypes.cast(ctypes.c_voidp(0), ctypes.POINTER(ctypes.c_char)) - assert "ctypes.call_function" in hook.seen_events - - with TestHook() as hook: - ctypes.string_at(id("ctypes.string_at") + 40) - assert "ctypes.call_function" in hook.seen_events - assert "ctypes.string_at" in hook.seen_events - - -def test_posixsubprocess(): - import multiprocessing.util - - exe = b"xxx" - args = [b"yyy", b"zzz"] - with TestHook() as hook: - multiprocessing.util.spawnv_passfds(exe, args, ()) - assert ("_posixsubprocess.fork_exec", ([exe], args, None)) in hook.seen - - -def test_excepthook(): - def excepthook(exc_type, exc_value, exc_tb): - if exc_type is not RuntimeError: - sys.__excepthook__(exc_type, exc_value, exc_tb) - - def hook(event, args): - if event == "sys.excepthook": - if not isinstance(args[2], args[1]): - raise TypeError(f"Expected isinstance({args[2]!r}, " f"{args[1]!r})") - if args[0] != excepthook: - raise ValueError(f"Expected {args[0]} == {excepthook}") - print(event, repr(args[2])) - - sys.addaudithook(hook) - sys.excepthook = excepthook - raise RuntimeError("fatal-error") - - -def test_unraisablehook(): - from _testcapi import err_formatunraisable - - def unraisablehook(hookargs): - pass - - def hook(event, args): - if event == "sys.unraisablehook": - if args[0] != unraisablehook: - raise ValueError(f"Expected {args[0]} == {unraisablehook}") - print(event, repr(args[1].exc_value), args[1].err_msg) - - sys.addaudithook(hook) - sys.unraisablehook = unraisablehook - err_formatunraisable(RuntimeError("nonfatal-error"), - "Exception ignored for audit hook test") - - -def test_winreg(): - from winreg import OpenKey, EnumKey, CloseKey, HKEY_LOCAL_MACHINE - - def hook(event, args): - if not event.startswith("winreg."): - return - print(event, *args) - - sys.addaudithook(hook) - - k = OpenKey(HKEY_LOCAL_MACHINE, "Software") - EnumKey(k, 0) - try: - EnumKey(k, 10000) - except OSError: - pass - else: - raise RuntimeError("Expected EnumKey(HKLM, 10000) to fail") - - kv = k.Detach() - CloseKey(kv) - - -def test_socket(): - import socket - - def hook(event, args): - if event.startswith("socket."): - print(event, *args) - - sys.addaudithook(hook) - - socket.gethostname() - - # Don't care if this fails, we just want the audit message - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - # Don't care if this fails, we just want the audit message - sock.bind(('127.0.0.1', 8080)) - except Exception: - pass - finally: - sock.close() - - -def test_gc(): - import gc - - def hook(event, args): - if event.startswith("gc."): - print(event, *args) - - sys.addaudithook(hook) - - gc.get_objects(generation=1) - - x = object() - y = [x] - - gc.get_referrers(x) - gc.get_referents(y) - - -def test_http_client(): - import http.client - - def hook(event, args): - if event.startswith("http.client."): - print(event, *args[1:]) - - sys.addaudithook(hook) - - conn = http.client.HTTPConnection('www.python.org') - try: - conn.request('GET', '/') - except OSError: - print('http.client.send', '[cannot send]') - finally: - conn.close() - - -def test_sqlite3(): - import sqlite3 - - def hook(event, *args): - if event.startswith("sqlite3."): - print(event, *args) - - sys.addaudithook(hook) - cx1 = sqlite3.connect(":memory:") - cx2 = sqlite3.Connection(":memory:") - - # Configured without --enable-loadable-sqlite-extensions - try: - if hasattr(sqlite3.Connection, "enable_load_extension"): - cx1.enable_load_extension(False) - try: - cx1.load_extension("test") - except sqlite3.OperationalError: - pass - else: - raise RuntimeError("Expected sqlite3.load_extension to fail") - finally: - cx1.close() - cx2.close() - -def test_sys_getframe(): - import sys - - def hook(event, args): - if event.startswith("sys."): - print(event, args[0].f_code.co_name) - - sys.addaudithook(hook) - sys._getframe() - - -def test_sys_getframemodulename(): - import sys - - def hook(event, args): - if event.startswith("sys."): - print(event, *args) - - sys.addaudithook(hook) - sys._getframemodulename() - - -def test_threading(): - import _thread - - def hook(event, args): - if event.startswith(("_thread.", "cpython.PyThreadState", "test.")): - print(event, args) - - sys.addaudithook(hook) - - lock = _thread.allocate_lock() - lock.acquire() - - class test_func: - def __repr__(self): return "" - def __call__(self): - sys.audit("test.test_func") - lock.release() - - i = _thread.start_new_thread(test_func(), ()) - lock.acquire() - - handle = _thread.start_joinable_thread(test_func()) - handle.join() - - -def test_threading_abort(): - # Ensures that aborting PyThreadState_New raises the correct exception - import _thread - - class ThreadNewAbortError(Exception): - pass - - def hook(event, args): - if event == "cpython.PyThreadState_New": - raise ThreadNewAbortError() - - sys.addaudithook(hook) - - try: - _thread.start_new_thread(lambda: None, ()) - except ThreadNewAbortError: - # Other exceptions are raised and the test will fail - pass - - -def test_wmi_exec_query(): - import _wmi - - def hook(event, args): - if event.startswith("_wmi."): - print(event, args[0]) - - sys.addaudithook(hook) - try: - _wmi.exec_query("SELECT * FROM Win32_OperatingSystem") - except WindowsError as e: - # gh-112278: WMI may be slow response when first called, but we still - # get the audit event, so just ignore the timeout - if e.winerror != 258: - raise - -def test_syslog(): - import syslog - - def hook(event, args): - if event.startswith("syslog."): - print(event, *args) - - sys.addaudithook(hook) - syslog.openlog('python') - syslog.syslog('test') - syslog.setlogmask(syslog.LOG_DEBUG) - syslog.closelog() - # implicit open - syslog.syslog('test2') - # open with default ident - syslog.openlog(logoption=syslog.LOG_NDELAY, facility=syslog.LOG_LOCAL0) - sys.argv = None - syslog.openlog() - syslog.closelog() - - -def test_not_in_gc(): - import gc - - hook = lambda *a: None - sys.addaudithook(hook) - - for o in gc.get_objects(): - if isinstance(o, list): - assert hook not in o - - -def test_time(mode): - import time - - def hook(event, args): - if event.startswith("time."): - if mode == 'print': - print(event, *args) - elif mode == 'fail': - raise AssertionError('hook failed') - sys.addaudithook(hook) - - time.sleep(0) - time.sleep(0.0625) # 1/16, a small exact float - try: - time.sleep(-1) - except ValueError: - pass - -def test_sys_monitoring_register_callback(): - import sys - - def hook(event, args): - if event.startswith("sys.monitoring"): - print(event, args) - - sys.addaudithook(hook) - sys.monitoring.register_callback(1, 1, None) - - -def test_winapi_createnamedpipe(pipe_name): - import _winapi - - def hook(event, args): - if event == "_winapi.CreateNamedPipe": - print(event, args) - - sys.addaudithook(hook) - _winapi.CreateNamedPipe(pipe_name, _winapi.PIPE_ACCESS_DUPLEX, 8, 2, 0, 0, 0, 0) - - -def test_assert_unicode(): - import sys - sys.addaudithook(lambda *args: None) - try: - sys.audit(9) - except TypeError: - pass - else: - raise RuntimeError("Expected sys.audit(9) to fail.") - -def test_sys_remote_exec(): - import tempfile - - pid = os.getpid() - event_pid = -1 - event_script_path = "" - remote_event_script_path = "" - def hook(event, args): - if event not in ["sys.remote_exec", "cpython.remote_debugger_script"]: - return - print(event, args) - match event: - case "sys.remote_exec": - nonlocal event_pid, event_script_path - event_pid = args[0] - event_script_path = args[1] - case "cpython.remote_debugger_script": - nonlocal remote_event_script_path - remote_event_script_path = args[0] - - sys.addaudithook(hook) - with tempfile.NamedTemporaryFile(mode='w+', delete=True) as tmp_file: - tmp_file.write("a = 1+1\n") - tmp_file.flush() - sys.remote_exec(pid, tmp_file.name) - assertEqual(event_pid, pid) - assertEqual(event_script_path, tmp_file.name) - assertEqual(remote_event_script_path, tmp_file.name) - -def test_import_module(): - import importlib - - with TestHook() as hook: - importlib.import_module("importlib") # already imported, won't get logged - importlib.import_module("email") # standard library module - importlib.import_module("pythoninfo") # random module - importlib.import_module(".audit_test_data.submodule", "test") # relative import - importlib.import_module("test.audit_test_data.submodule2") # absolute import - importlib.import_module("_testcapi") # extension module - - actual = [a for e, a in hook.seen if e == "import"] - assertSequenceEqual( - [ - ("email", None, sys.path, sys.meta_path, sys.path_hooks), - ("pythoninfo", None, sys.path, sys.meta_path, sys.path_hooks), - ("test.audit_test_data.submodule", None, sys.path, sys.meta_path, sys.path_hooks), - ("test.audit_test_data", None, sys.path, sys.meta_path, sys.path_hooks), - ("test.audit_test_data.submodule2", None, sys.path, sys.meta_path, sys.path_hooks), - ("_testcapi", None, sys.path, sys.meta_path, sys.path_hooks), - ("_testcapi", unittest.mock.ANY, None, None, None) - ], - actual, - ) - -def test_builtin__import__(): - import importlib # noqa: F401 - - with TestHook() as hook: - __import__("importlib") - __import__("email") - __import__("pythoninfo") - __import__("audit_test_data.submodule", level=1, globals={"__package__": "test"}) - __import__("test.audit_test_data.submodule2") - __import__("_testcapi") - - actual = [a for e, a in hook.seen if e == "import"] - assertSequenceEqual( - [ - ("email", None, sys.path, sys.meta_path, sys.path_hooks), - ("pythoninfo", None, sys.path, sys.meta_path, sys.path_hooks), - ("test.audit_test_data.submodule", None, sys.path, sys.meta_path, sys.path_hooks), - ("test.audit_test_data", None, sys.path, sys.meta_path, sys.path_hooks), - ("test.audit_test_data.submodule2", None, sys.path, sys.meta_path, sys.path_hooks), - ("_testcapi", None, sys.path, sys.meta_path, sys.path_hooks), - ("_testcapi", unittest.mock.ANY, None, None, None) - ], - actual, - ) - -def test_import_statement(): - import importlib # noqa: F401 - # Set __package__ so relative imports work - with swap_item(globals(), "__package__", "test"): - with TestHook() as hook: - import importlib # noqa: F401 - import email # noqa: F401 - import pythoninfo # noqa: F401 - from .audit_test_data import submodule # noqa: F401 - import test.audit_test_data.submodule2 # noqa: F401 - import _testcapi # noqa: F401 - - actual = [a for e, a in hook.seen if e == "import"] - # Import statement ordering is different because the package is - # loaded first and then the submodule - assertSequenceEqual( - [ - ("email", None, sys.path, sys.meta_path, sys.path_hooks), - ("pythoninfo", None, sys.path, sys.meta_path, sys.path_hooks), - ("test.audit_test_data", None, sys.path, sys.meta_path, sys.path_hooks), - ("test.audit_test_data.submodule", None, sys.path, sys.meta_path, sys.path_hooks), - ("test.audit_test_data.submodule2", None, sys.path, sys.meta_path, sys.path_hooks), - ("_testcapi", None, sys.path, sys.meta_path, sys.path_hooks), - ("_testcapi", unittest.mock.ANY, None, None, None) - ], - actual, - ) - -if __name__ == "__main__": - from test.support import suppress_msvcrt_asserts - - suppress_msvcrt_asserts() - - test = sys.argv[1] - globals()[test](*sys.argv[2:]) diff --git a/Lib/test/support/isolation.py b/Lib/test/support/isolation.py new file mode 100644 index 000000000000000..fe0e5df0ec8ba45 --- /dev/null +++ b/Lib/test/support/isolation.py @@ -0,0 +1,286 @@ +"""Run tests in isolated subprocesses (the test.support.isolation.isolated decorator). + +A failure, error or skip that happens in the subprocess is replayed in the +parent process so that the test runner records it. The original (subprocess) +traceback is attached as the cause of the replayed exception, the same way +:mod:`concurrent.futures` surfaces tracebacks from worker processes. +""" + +import functools +import os +import sys +import unittest + +# Mark this module's frames as belonging to the test machinery, so that +# unittest strips them from reported tracebacks (see TestResult._clean_tracebacks +# in Lib/unittest/result.py). Only the original subprocess traceback, attached +# as the cause, is then shown -- not the parent-side replay frames. +__unittest = True + +# Environment variable set in the child process so that the decorated test +# method runs its real body instead of spawning yet another subprocess. +_RUN_IN_SUBPROCESS_ENV = '_PYTHON_RUN_IN_SUBPROCESS' + +# Environment variable carrying (as JSON) the regrtest-configured test.support +# state, so that the bare subprocess honors -u, -M, -v, etc. like the parent. +_CONFIG_ENV = '_PYTHON_ISOLATED_CONFIG' + +# test.support globals set by regrtest (libregrtest/setup.py) that affect how +# tests run and which are skipped at runtime in the subprocess. +_PROPAGATED_CONFIG = ( + 'use_resources', # -u (is_resource_enabled/requires) + 'max_memuse', 'real_max_memuse', # -M (bigmemtest) + 'verbose', # -v + 'failfast', # -f +) + +def _child_config(): + import test.support as support + return {name: getattr(support, name) for name in _PROPAGATED_CONFIG} + +def _apply_child_config(): + """Mirror the parent's test.support configuration in the subprocess. + + Called by subprocess_runner before loading the test, so that import-time + decorators (e.g. requires_resource) and runtime checks see the same -u/-M/-v + configuration as the parent process. + """ + import json + import test.support as support + data = os.environ.get(_CONFIG_ENV) + if data: + for name, value in json.loads(data).items(): + setattr(support, name, value) + +# True while running inside the isolated subprocess spawned by @isolated(). +# setUp()/tearDown() and the class- and module-level fixtures can test it to +# decide which code to run in the subprocess as opposed to the parent process. +running_isolated = bool(os.environ.get(_RUN_IN_SUBPROCESS_ENV)) + + +class _RemoteTraceback(Exception): + """Carry a formatted traceback string from the subprocess for display. + + Attached as the ``__cause__`` of the replayed failure/error, so that the + original traceback is shown by the traceback machinery. + """ + def __init__(self, tb): + self.tb = tb + + def __str__(self): + return self.tb + + +class _SubprocessTestError(Exception): + """Replay a subprocess error (as opposed to a failure) in the parent.""" + + +def _remote(detail): + # Wrap the subprocess traceback the way concurrent.futures does, so it is + # clearly delimited when shown as the cause. + return _RemoteTraceback(f'\n"""\n{detail}"""') + + +def _check_subprocess_support(): + # isolated() always runs the test in a subprocess, so skip (in the parent) + # on platforms that do not support spawning one. + import test.support as support + if not support.has_subprocess_support: + raise unittest.SkipTest('requires subprocess support') + + +def _run_in_subprocess(module, qualname): + """Run module.qualname (a test method or class) in a fresh subprocess. + + Return ``(payload, output, returncode)``, where *payload* is the decoded + ``{'outcomes': ..., 'durations': ...}`` mapping from the subprocess, or + ``None`` if it did not run to completion (crash, import error, ...). + """ + import json + import subprocess + import tempfile + env = dict(os.environ) + env[_RUN_IN_SUBPROCESS_ENV] = '1' + env[_CONFIG_ENV] = json.dumps(_child_config()) + fd, result_path = tempfile.mkstemp(suffix='.json') + os.close(fd) + try: + cmd = [sys.executable, '-m', 'test.support.subprocess_runner', + module, qualname, result_path] + proc = subprocess.run(cmd, capture_output=True, text=True, env=env) + try: + with open(result_path, encoding='utf-8') as f: + payload = json.load(f) + except (OSError, ValueError): + payload = None + finally: + try: + os.unlink(result_path) + except OSError: + pass + return payload, (proc.stdout or '') + (proc.stderr or ''), proc.returncode + + +def _replay_outcome(test, outcome): + kind = outcome['kind'] + detail = outcome['detail'] + if kind == 'skipped': + test.skipTest(detail) # the detail is the skip reason, not a traceback + elif kind in ('failure', 'expected_failure'): + # An expected failure is replayed like a failure: the wrapper carries + # the @expectedFailure marker (copied by functools.wraps), so the parent + # records the raised exception as an expectedFailure, not a failure. + exc = test.failureException('test failed in the subprocess') + raise exc from _remote(detail) + else: # 'error' + exc = _SubprocessTestError('test failed in the subprocess') + raise exc from _remote(detail) + + +def _replay_outcomes(test, outcomes): + # Replay each subtest outcome in its own subTest() context so that they are + # reported individually, then replay the whole-test outcome (if any). + main = [] + for outcome in outcomes: + if outcome['subtest']: + with test.subTest(outcome['desc']): + _replay_outcome(test, outcome) + else: + main.append(outcome) + for outcome in main: + _replay_outcome(test, outcome) + + +def _raise_fixture_outcome(outcome): + # Reproduce a setUpClass()/setUpModule() failure or skip from the + # subprocess in a parent-process fixture, so it applies to every test. + if outcome['kind'] == 'skipped': + raise unittest.SkipTest(outcome['detail']) + exc = _SubprocessTestError('class failed in the subprocess') + raise exc from _remote(outcome['detail']) + + +def _isolate_method(func): + @functools.wraps(func) + def wrapper(self, /, *args, **kwargs): + if running_isolated: + # Already running in the subprocess: run the real test. + return func(self, *args, **kwargs) + _check_subprocess_support() + cls = type(self) + qualname = f'{cls.__qualname__}.{func.__name__}' + payload, output, returncode = _run_in_subprocess(cls.__module__, + qualname) + if payload is None: + exc = _SubprocessTestError( + f'test did not complete in a subprocess (exit code {returncode})') + raise exc from _remote(output) + # The parent measures this method's own duration (the real cost of the + # isolated run, subprocess startup included), so nothing to forward here. + _replay_outcomes(self, payload['outcomes']) + return wrapper + + +def _isolate_class(cls): + # Unwrap to the plain functions: the replacements below call them with the + # runtime cls, so a subclass of an isolated class runs the fixtures bound to + # itself (a bound classmethod would freeze the decoration-time class). + orig_setUpClass = cls.setUpClass.__func__ + orig_tearDownClass = cls.tearDownClass.__func__ + orig_setUp = cls.setUp + orig_tearDown = cls.tearDown + orig_addDuration = getattr(cls, '_addDuration', None) + + def setUpClass(cls): + if running_isolated: + orig_setUpClass(cls) + return + _check_subprocess_support() + # Run the whole class in a single subprocess and stash the outcomes + # for the wrapped test methods to replay. + payload, output, returncode = _run_in_subprocess(cls.__module__, + cls.__qualname__) + if payload is None: + exc = _SubprocessTestError( + f'class did not complete in a subprocess (exit code {returncode})') + raise exc from _remote(output) + by_id = {} + for outcome in payload['outcomes']: + if outcome['fixture']: + # A setUpClass()/setUpModule() failure or skip: apply it to the + # whole class by raising it here, in the parent's setUpClass(). + _raise_fixture_outcome(outcome) + by_id.setdefault(outcome['id'], []).append(outcome) + cls._isolated_outcomes = by_id + cls._isolated_durations = dict(payload.get('durations', ())) + + def tearDownClass(cls): + if running_isolated: + orig_tearDownClass(cls) + else: + cls._isolated_outcomes = None + cls._isolated_durations = None + + def setUp(self): + # In the parent the real test does not run, so neither should setUp(). + if running_isolated: + orig_setUp(self) + + def tearDown(self): + if running_isolated: + orig_tearDown(self) + + def _addDuration(self, result, elapsed): + # In the parent, report the per-test duration measured in the subprocess + # rather than the replay time (subprocess startup is paid once, in + # setUpClass). + if not running_isolated: + durations = getattr(type(self), '_isolated_durations', None) or {} + elapsed = durations.get(self.id(), elapsed) + orig_addDuration(self, result, elapsed) + + def replay(self): + by_id = getattr(type(self), '_isolated_outcomes', None) or {} + _replay_outcomes(self, by_id.get(self.id(), [])) + + cls.setUpClass = classmethod(setUpClass) + cls.tearDownClass = classmethod(tearDownClass) + cls.setUp = setUp + cls.tearDown = tearDown + if orig_addDuration is not None: + cls._addDuration = _addDuration + for name in unittest.TestLoader().getTestCaseNames(cls): + method = getattr(cls, name) + @functools.wraps(method) + def wrapper(self, /, *args, __func=method, **kwargs): + if running_isolated: + return __func(self, *args, **kwargs) + replay(self) + setattr(cls, name, wrapper) + return cls + + +def isolated(): + """Decorator to run a test method or class in isolation from the rest. + + The decorated test runs in a separate, fresh Python process, so it does not + share global or interpreter state with the rest of the test run. When a + :class:`~unittest.TestCase` subclass is decorated, the whole class runs in a + single subprocess and its ``setUpClass()``/``setUpModule()`` fixtures run + once there; when a method is decorated, only that method runs in a + subprocess. Decorated methods must take no extra arguments. + + A failure, error or skip of the whole test is reported for the test, and + individual subtests (:meth:`~unittest.TestCase.subTest`) that fail or are + skipped are reported individually. The original subprocess traceback is + shown as the cause of a reported failure or error. Use + :data:`running_isolated` in fixtures to choose what to run in the subprocess. + + The test is skipped on platforms without subprocess support, since it must + spawn one. + """ + def decorator(obj): + if isinstance(obj, type) and issubclass(obj, unittest.TestCase): + return _isolate_class(obj) + return _isolate_method(obj) + return decorator diff --git a/Lib/test/support/subprocess_runner.py b/Lib/test/support/subprocess_runner.py new file mode 100644 index 000000000000000..6eced4951171bea --- /dev/null +++ b/Lib/test/support/subprocess_runner.py @@ -0,0 +1,74 @@ +"""Run a single test method in this (sub)process and report the result. + +Invoked as ``python -m test.support.subprocess_runner MODULE QUALNAME OUTFILE`` +by :func:`test.support.isolation.isolated`. The outcome of the test (including +that of each individual subtest) is written as JSON to OUTFILE. This module is +not meant to be imported. +""" + +import json +import sys +import unittest +from unittest.case import _SubTest + +if __name__ != '__main__': + raise ImportError('this module cannot be directly imported') + +if len(sys.argv) != 4: + print('usage: python -m test.support.subprocess_runner ' + 'MODULE QUALNAME OUTFILE', file=sys.stderr) + sys.exit(2) + +module = sys.argv[1] +qualname = sys.argv[2] +outfile = sys.argv[3] + +# Mirror the parent's regrtest configuration (-u, -M, -v, ...) before importing +# the test, so resource gating and bigmem sizing match the parent process. +from test.support.isolation import _apply_child_config +_apply_child_config() + + +class _Result(unittest.TestResult): + # Capture per-test durations keyed by test id, so the parent can report the + # subprocess timings instead of its own replay time. + def __init__(self): + super().__init__() + self.id_durations = [] + + def addDuration(self, test, elapsed): + super().addDuration(test, elapsed) + self.id_durations.append((test.id(), elapsed)) + + +suite = unittest.TestLoader().loadTestsFromName(f'{module}.{qualname}') +result = _Result() +suite.run(result) + + +def _outcome(kind, test, detail): + subtest = isinstance(test, _SubTest) + real = test.test_case if subtest else test + return { + 'kind': kind, + 'subtest': subtest, + 'desc': test._subDescription() if subtest else '', + # id() groups outcomes by test method; a non-TestCase (e.g. an + # _ErrorHolder) marks a setUpClass()/setUpModule() fixture failure. + 'id': real.id(), + 'fixture': not isinstance(real, unittest.TestCase), + 'detail': detail, + } + + +outcomes = [_outcome('failure', t, tb) for t, tb in result.failures] +outcomes += [_outcome('error', t, tb) for t, tb in result.errors] +outcomes += [_outcome('expected_failure', t, tb) + for t, tb in result.expectedFailures] +outcomes += [_outcome('skipped', t, reason) for t, reason in result.skipped] + +payload = {'outcomes': outcomes, 'durations': result.id_durations} +with open(outfile, 'w', encoding='utf-8') as f: + json.dump(payload, f) + +sys.exit(0) diff --git a/Lib/test/test_audit.py b/Lib/test/test_audit.py index db4e1eb9999c1fa..8ad5c450e1873b8 100644 --- a/Lib/test/test_audit.py +++ b/Lib/test/test_audit.py @@ -1,344 +1,903 @@ """Tests for sys.audit and sys.addaudithook + +``sys.addaudithook()`` cannot be undone, so each test needs a fresh +interpreter. ``@isolation.isolated()`` provides one and reports the result, which +lets these be ordinary ``TestCase`` methods using ``self.assert*`` directly in +the subprocess -- no separate driver script, and no parsing of events out of the +subprocess's stdout. """ -import subprocess +import os import sys import unittest from test import support -from test.support import import_helper -from test.support import os_helper +from test.support import isolation +from unittest.mock import ANY if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"): raise unittest.SkipTest("test only relevant when sys.audit is available") -AUDIT_TESTS_PY = support.findfile("audit-tests.py") + +class _Hook: + """Collect audited events; for use inside @isolated() tests. + + Should be used in a with block to ensure that it has no impact after the + test completes (although that matters less here, since each test runs in its + own interpreter). + """ + + def __init__(self, raise_on_events=None, exc_type=RuntimeError): + self.raise_on_events = raise_on_events or () + self.exc_type = exc_type + self.seen = [] + self.closed = False + + def __enter__(self): + sys.addaudithook(self) + return self + + def __exit__(self, *a): + self.closed = True + + @property + def seen_events(self): + return [ev for ev, args in self.seen] + + def __call__(self, event, args): + if self.closed: + return + self.seen.append((event, args)) + if event in self.raise_on_events: + raise self.exc_type("saw event " + event) + + +def requires_module(name): + """Skip the test, in the parent process, if module *name* is unavailable. + + Checking here means a missing module skips the test without first spawning + the @isolated() subprocess. + """ + import functools + + def decorator(func): + @functools.wraps(func) + def wrapper(self): + from test.support import import_helper + import_helper.import_module(name) + return func(self) + return wrapper + return decorator class AuditTest(unittest.TestCase): maxDiff = None - @support.requires_subprocess() - def run_test_in_subprocess(self, *args): - with subprocess.Popen( - [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args], - encoding="utf-8", - errors="backslashreplace", - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) as p: - p.wait() - return p, p.stdout.read(), p.stderr.read() - - def do_test(self, *args): - proc, stdout, stderr = self.run_test_in_subprocess(*args) - - sys.stdout.write(stdout) - sys.stderr.write(stderr) - if proc.returncode: - self.fail(stderr) - - def run_python(self, *args, expect_stderr=False): - events = [] - proc, stdout, stderr = self.run_test_in_subprocess(*args) - if not expect_stderr or support.verbose: - sys.stderr.write(stderr) - return ( - proc.returncode, - [line.strip().partition(" ") for line in stdout.splitlines()], - stderr, - ) - + @isolation.isolated() def test_basic(self): - self.do_test("test_basic") + with _Hook() as hook: + sys.audit("test_event", 1, 2, 3) + self.assertEqual(hook.seen[0][0], "test_event") + self.assertEqual(hook.seen[0][1], (1, 2, 3)) + @isolation.isolated() def test_block_add_hook(self): - self.do_test("test_block_add_hook") - + # Raising in a hook for "sys.addaudithook" blocks the new hook but does + # not propagate out. + with _Hook(raise_on_events="sys.addaudithook") as hook1: + with _Hook() as hook2: + sys.audit("test_event") + self.assertIn("test_event", hook1.seen_events) + self.assertNotIn("test_event", hook2.seen_events) + + @isolation.isolated() def test_block_add_hook_baseexception(self): - self.do_test("test_block_add_hook_baseexception") - + # A BaseException raised while adding a hook propagates out. + with self.assertRaises(BaseException): + with _Hook(raise_on_events="sys.addaudithook", + exc_type=BaseException): + with _Hook(): + pass + + @isolation.isolated() def test_marshal(self): - import_helper.import_module("marshal") + import marshal + from test.support import os_helper + + o = ("a", "b", "c", 1, 2, 3) + payload = marshal.dumps(o) + + with _Hook() as hook: + self.assertEqual(o, marshal.loads(marshal.dumps(o))) - self.do_test("test_marshal") + try: + with open(os_helper.TESTFN, "wb") as f: + marshal.dump(o, f) + with open(os_helper.TESTFN, "rb") as f: + self.assertEqual(o, marshal.load(f)) + finally: + os_helper.unlink(os_helper.TESTFN) + actual = [(a[0], a[1]) for e, a in hook.seen if e == "marshal.dumps"] + self.assertEqual(actual, [(o, marshal.version)] * 2) + + actual = [a[0] for e, a in hook.seen if e == "marshal.loads"] + self.assertEqual(actual, [payload]) + + actual = [e for e, a in hook.seen if e == "marshal.load"] + self.assertEqual(actual, ["marshal.load"]) + + @isolation.isolated() def test_pickle(self): - import_helper.import_module("pickle") + import pickle + + class PicklePrint: + def __reduce_ex__(self, p): + return str, ("Pwned!",) - self.do_test("test_pickle") + payload_1 = pickle.dumps(PicklePrint()) + payload_2 = pickle.dumps(("a", "b", "c", 1, 2, 3)) + # Before we add the hook, ensure our malicious pickle loads + self.assertEqual("Pwned!", pickle.loads(payload_1)) + + with _Hook(raise_on_events="pickle.find_class"): + with self.assertRaises(RuntimeError): + # With the hook enabled, loading globals is not allowed + pickle.loads(payload_1) + # pickles with no globals are okay + pickle.loads(payload_2) + + @isolation.isolated() def test_monkeypatch(self): - self.do_test("test_monkeypatch") + class A: + pass + + class B: + pass + + class C(A): + pass + + a = A() + + with _Hook() as hook: + # Catch name changes + C.__name__ = "X" + # Catch type changes + C.__bases__ = (B,) + # Ensure bypassing __setattr__ is still caught + type.__dict__["__bases__"].__set__(C, (B,)) + # Catch attribute replacement + C.__init__ = B.__init__ + # Catch attribute addition + C.new_attr = 123 + # Catch class changes + a.__class__ = B + + actual = [(ar[0], ar[1]) for e, ar in hook.seen + if e == "object.__setattr__"] + self.assertEqual( + [(C, "__name__"), (C, "__bases__"), (C, "__bases__"), + (a, "__class__")], + actual, + ) + @isolation.isolated() def test_open(self): - self.do_test("test_open", os_helper.TESTFN) + from test.support import os_helper + + testfn = os_helper.TESTFN + + # SSLContext.load_dh_params uses Py_fopen() rather than normal open() + try: + import ssl + + load_dh_params = ssl.create_default_context().load_dh_params + except ImportError: + load_dh_params = None + + try: + import readline + except ImportError: + readline = None + + def rl(name): + if readline: + return getattr(readline, name, None) + else: + return None + + try: + import _remote_debugging + except ImportError: + _remote_debugging = None + + def rd(name): + if _remote_debugging: + return getattr(_remote_debugging, name, None) + return None + + # Try a range of "open" functions. + # All of them should fail + with _Hook(raise_on_events={"open"}) as hook: + for fn, *args in [ + (open, testfn, "r"), + (open, sys.executable, "rb"), + (open, 3, "wb"), + (open, testfn, "w", -1, None, None, None, False, lambda *a: 1), + (load_dh_params, testfn), + (rl("read_history_file"), testfn), + (rl("read_history_file"), None), + (rl("write_history_file"), testfn), + (rl("write_history_file"), None), + (rl("append_history_file"), 0, testfn), + (rl("append_history_file"), 0, None), + (rl("read_init_file"), testfn), + (rl("read_init_file"), None), + (rd("BinaryWriter"), testfn, 1000, 0), + (rd("BinaryReader"), testfn), + ]: + if not fn: + continue + with self.assertRaises(RuntimeError): + try: + fn(*args) + except NotImplementedError: + if fn == load_dh_params: + # Not callable in some builds + load_dh_params = None + raise RuntimeError + else: + raise + + actual_mode = [(a[0], a[1]) for e, a in hook.seen + if e == "open" and a[1]] + actual_flag = [(a[0], a[2]) for e, a in hook.seen + if e == "open" and not a[1]] + self.assertEqual( + [ + i + for i in [ + (testfn, "r"), + (sys.executable, "r"), + (3, "w"), + (testfn, "w"), + (testfn, "rb") if load_dh_params else None, + (testfn, "r") if readline else None, + ("~/.history", "r") if readline else None, + (testfn, "w") if readline else None, + ("~/.history", "w") if readline else None, + (testfn, "a") if rl("append_history_file") else None, + ("~/.history", "a") if rl("append_history_file") else None, + (testfn, "r") if readline else None, + ("", "r") if readline else None, + (testfn, "wb") if rd("BinaryWriter") else None, + (testfn, "rb") if rd("BinaryReader") else None, + ] + if i is not None + ], + actual_mode, + ) + self.assertEqual([], actual_flag) + @isolation.isolated() def test_cantrace(self): - self.do_test("test_cantrace") + traced = [] + + def trace(frame, event, *args): + if frame.f_code == _Hook.__call__.__code__: + traced.append(event) + + old = sys.settrace(trace) + try: + with _Hook() as hook: + # No traced call + eval("1") + # No traced call + hook.__cantrace__ = False + eval("2") + + # One traced call + hook.__cantrace__ = True + eval("3") + + # Two traced calls (writing to private member, eval) + hook.__cantrace__ = 1 + eval("4") + + # One traced call (writing to private member) + hook.__cantrace__ = 0 + finally: + sys.settrace(old) + + self.assertEqual(["call"] * 4, traced) + + @isolation.isolated() def test_mmap(self): - self.do_test("test_mmap") + import mmap - def test_ctypes_call_function(self): - import_helper.import_module("ctypes") - self.do_test("test_ctypes_call_function") + with _Hook() as hook: + mmap.mmap(-1, 8) + self.assertEqual(hook.seen[0][1][:2], (-1, 8)) + @requires_module("ctypes") + @isolation.isolated() + def test_ctypes_call_function(self): + import ctypes + import _ctypes + + with _Hook() as hook: + _ctypes.call_function(ctypes._memmove_addr, (0, 0, 0)) + self.assertIn( + ("ctypes.call_function", (ctypes._memmove_addr, (0, 0, 0))), + hook.seen) + + ctypes.CFUNCTYPE(ctypes.c_voidp)(ctypes._memset_addr)(1, 0, 0) + self.assertIn( + ("ctypes.call_function", (ctypes._memset_addr, (1, 0, 0))), + hook.seen) + + with _Hook() as hook: + ctypes.cast(ctypes.c_voidp(0), ctypes.POINTER(ctypes.c_char)) + self.assertIn("ctypes.call_function", hook.seen_events) + + with _Hook() as hook: + ctypes.string_at(id("ctypes.string_at") + 40) + self.assertIn("ctypes.call_function", hook.seen_events) + self.assertIn("ctypes.string_at", hook.seen_events) + + @requires_module("_posixsubprocess") + @isolation.isolated() def test_posixsubprocess(self): - import_helper.import_module("_posixsubprocess") - self.do_test("test_posixsubprocess") + import multiprocessing.util - def test_excepthook(self): - returncode, events, stderr = self.run_python("test_excepthook") - if not returncode: - self.fail(f"Expected fatal exception\n{stderr}") + exe = b"xxx" + args = [b"yyy", b"zzz"] + with _Hook() as hook: + multiprocessing.util.spawnv_passfds(exe, args, ()) + self.assertIn( + ("_posixsubprocess.fork_exec", ([exe], args, None)), hook.seen) - self.assertSequenceEqual( - [("sys.excepthook", " ", "RuntimeError('fatal-error')")], events + @support.requires_subprocess() + def test_excepthook(self): + import textwrap + from test.support import script_helper + + # The "sys.excepthook" audit event fires only for an exception that goes + # uncaught to the top level of the interpreter, so this case cannot use + # @isolated() (whose test body runs under unittest, which catches the + # exception). Run it as a small script instead. + code = textwrap.dedent("""\ + import sys + + def excepthook(exc_type, exc_value, exc_tb): + if exc_type is not RuntimeError: + sys.__excepthook__(exc_type, exc_value, exc_tb) + + def hook(event, args): + if event == "sys.excepthook": + if not isinstance(args[2], args[1]): + raise TypeError( + f"Expected isinstance({args[2]!r}, {args[1]!r})") + if args[0] != excepthook: + raise ValueError(f"Expected {args[0]} == {excepthook}") + print(event, repr(args[2])) + + sys.addaudithook(hook) + sys.excepthook = excepthook + raise RuntimeError("fatal-error") + """) + rc, stdout, stderr = script_helper.assert_python_failure("-c", code) + self.assertEqual( + stdout.strip().decode(), + "sys.excepthook RuntimeError('fatal-error')", + stderr.decode(), ) + @requires_module("_testcapi") + @isolation.isolated() def test_unraisablehook(self): - import_helper.import_module("_testcapi") - returncode, events, stderr = self.run_python("test_unraisablehook") - if returncode: - self.fail(stderr) + import _testcapi - self.assertEqual(events[0][0], "sys.unraisablehook") - self.assertEqual( - events[0][2], - "RuntimeError('nonfatal-error') Exception ignored for audit hook test", - ) + def unraisablehook(hookargs): + pass + + events = [] + def hook(event, args): + if event == "sys.unraisablehook": + events.append(args) + + sys.addaudithook(hook) + sys.unraisablehook = unraisablehook + _testcapi.err_formatunraisable( + RuntimeError("nonfatal-error"), + "Exception ignored for audit hook test") + + self.assertEqual(len(events), 1) + args = events[0] + self.assertIs(args[0], unraisablehook) + self.assertEqual(repr(args[1].exc_value), "RuntimeError('nonfatal-error')") + self.assertEqual(args[1].err_msg, + "Exception ignored for audit hook test") + + @requires_module("winreg") + @isolation.isolated() def test_winreg(self): - import_helper.import_module("winreg") - returncode, events, stderr = self.run_python("test_winreg") - if returncode: - self.fail(stderr) + import winreg + + events = [] + + def hook(event, args): + if event.startswith("winreg."): + events.append((event, args)) + + sys.addaudithook(hook) + + k = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, "Software") + winreg.EnumKey(k, 0) + with self.assertRaises(OSError): + winreg.EnumKey(k, 10000) + kv = k.Detach() + winreg.CloseKey(kv) self.assertEqual(events[0][0], "winreg.OpenKey") self.assertEqual(events[1][0], "winreg.OpenKey/result") - expected = events[1][2] - self.assertTrue(expected) - self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 0"], events[2]) - self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 10000"], events[3]) - self.assertSequenceEqual(["winreg.PyHKEY.Detach", " ", expected], events[4]) - + hkey = events[1][1][0] + self.assertTrue(hkey) + self.assertEqual(events[2], ("winreg.EnumKey", (hkey, 0))) + self.assertEqual(events[3], ("winreg.EnumKey", (hkey, 10000))) + self.assertEqual(events[4], ("winreg.PyHKEY.Detach", (hkey,))) + + @requires_module("socket") + @isolation.isolated() def test_socket(self): - import_helper.import_module("socket") - returncode, events, stderr = self.run_python("test_socket") - if returncode: - self.fail(stderr) - - if support.verbose: - print(*events, sep='\n') - self.assertEqual(events[0][0], "socket.gethostname") - self.assertEqual(events[1][0], "socket.__new__") - self.assertEqual(events[2][0], "socket.bind") - self.assertEndsWith(events[2][2], "('127.0.0.1', 8080)") + import socket + + events = [] + + def hook(event, args): + if event.startswith("socket."): + events.append((event, args)) + sys.addaudithook(hook) + + socket.gethostname() + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + # Don't care if this fails, we just want the audit message + sock.bind(('127.0.0.1', 8080)) + except OSError: + pass + finally: + sock.close() + + names = [e for e, a in events] + self.assertEqual(names[0], "socket.gethostname") + self.assertEqual(names[1], "socket.__new__") + self.assertEqual(names[2], "socket.bind") + self.assertEqual(events[2][1][-1], ('127.0.0.1', 8080)) + + @isolation.isolated() def test_gc(self): - returncode, events, stderr = self.run_python("test_gc") - if returncode: - self.fail(stderr) + import gc - if support.verbose: - print(*events, sep='\n') - self.assertEqual( - [event[0] for event in events], - ["gc.get_objects", "gc.get_referrers", "gc.get_referents"] - ) + events = [] + def hook(event, args): + if event.startswith("gc."): + events.append(event) + + sys.addaudithook(hook) + + gc.get_objects(generation=1) + + x = object() + y = [x] + + gc.get_referrers(x) + gc.get_referents(y) + + self.assertEqual( + events, + ["gc.get_objects", "gc.get_referrers", "gc.get_referents"]) @support.requires_resource('network') + @requires_module("http.client") + @isolation.isolated() def test_http(self): - import_helper.import_module("http.client") - returncode, events, stderr = self.run_python("test_http_client") - if returncode: - self.fail(stderr) + import http.client - if support.verbose: - print(*events, sep='\n') - self.assertEqual(events[0][0], "http.client.connect") - self.assertEqual(events[0][2], "www.python.org 80") - self.assertEqual(events[1][0], "http.client.send") - if events[1][2] != '[cannot send]': - self.assertIn('HTTP', events[1][2]) + events = [] + + def hook(event, args): + if event.startswith("http.client."): + events.append((event, args[1:])) + + sys.addaudithook(hook) + + conn = http.client.HTTPConnection('www.python.org') + sent = True + try: + conn.request('GET', '/') + except OSError: + sent = False + finally: + conn.close() + self.assertEqual(events[0][0], "http.client.connect") + self.assertEqual(events[0][1], ('www.python.org', 80)) + if sent: + self.assertEqual(events[1][0], "http.client.send") + self.assertIn(b'HTTP', events[1][1][0]) + @requires_module("sqlite3") + @isolation.isolated() def test_sqlite3(self): - sqlite3 = import_helper.import_module("sqlite3") - returncode, events, stderr = self.run_python("test_sqlite3") - if returncode: - self.fail(stderr) - - if support.verbose: - print(*events, sep='\n') - actual = [ev[0] for ev in events] - expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2 + import sqlite3 + + events = [] + def hook(event, args): + if event.startswith("sqlite3."): + events.append(event) + + sys.addaudithook(hook) + cx1 = sqlite3.connect(":memory:") + cx2 = sqlite3.Connection(":memory:") + + # Configured without --enable-loadable-sqlite-extensions + try: + if hasattr(sqlite3.Connection, "enable_load_extension"): + cx1.enable_load_extension(False) + try: + cx1.load_extension("test") + except sqlite3.OperationalError: + pass + else: + self.fail("Expected sqlite3.load_extension to fail") + finally: + cx1.close() + cx2.close() + + expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2 if hasattr(sqlite3.Connection, "enable_load_extension"): expected += [ "sqlite3.enable_load_extension", "sqlite3.load_extension", ] - self.assertEqual(actual, expected) - + self.assertEqual(events, expected) + @isolation.isolated() def test_sys_getframe(self): - returncode, events, stderr = self.run_python("test_sys_getframe") - if returncode: - self.fail(stderr) + events = [] - if support.verbose: - print(*events, sep='\n') - actual = [(ev[0], ev[2]) for ev in events] - expected = [("sys._getframe", "test_sys_getframe")] + def hook(event, args): + if event.startswith("sys."): + events.append((event, args[0].f_code.co_name)) - self.assertEqual(actual, expected) + sys.addaudithook(hook) + sys._getframe() + self.assertEqual(events, [("sys._getframe", "test_sys_getframe")]) + @isolation.isolated() def test_sys_getframemodulename(self): - returncode, events, stderr = self.run_python("test_sys_getframemodulename") - if returncode: - self.fail(stderr) - - if support.verbose: - print(*events, sep='\n') - actual = [(ev[0], ev[2]) for ev in events] - expected = [("sys._getframemodulename", "0")] + events = [] - self.assertEqual(actual, expected) + def hook(event, args): + if event.startswith("sys."): + events.append((event, args)) + sys.addaudithook(hook) + sys._getframemodulename() + self.assertEqual(events, [("sys._getframemodulename", (0,))]) + @isolation.isolated() def test_threading(self): - returncode, events, stderr = self.run_python("test_threading") - if returncode: - self.fail(stderr) + import _thread - if support.verbose: - print(*events, sep='\n') - actual = [(ev[0], ev[2]) for ev in events] - expected = [ - ("_thread.start_new_thread", "(, (), None)"), - ("test.test_func", "()"), - ("_thread.start_joinable_thread", "(, 1, None)"), - ("test.test_func", "()"), - ] + events = [] - self.assertEqual(actual, expected) + def hook(event, args): + if event.startswith(("_thread.", "cpython.PyThreadState", "test.")): + events.append(event) + sys.addaudithook(hook) - def test_wmi_exec_query(self): - import_helper.import_module("_wmi") - returncode, events, stderr = self.run_python("test_wmi_exec_query") - if returncode: - self.fail(stderr) + lock = _thread.allocate_lock() + lock.acquire() - if support.verbose: - print(*events, sep='\n') - actual = [(ev[0], ev[2]) for ev in events] - expected = [("_wmi.exec_query", "SELECT * FROM Win32_OperatingSystem")] + class test_func: + def __repr__(self): + return "" - self.assertEqual(actual, expected) + def __call__(self): + sys.audit("test.test_func") + lock.release() + + _thread.start_new_thread(test_func(), ()) + lock.acquire() + + handle = _thread.start_joinable_thread(test_func()) + handle.join() + + self.assertEqual( + events, + [ + "_thread.start_new_thread", + "test.test_func", + "_thread.start_joinable_thread", + "test.test_func", + ], + ) + + @requires_module("_wmi") + @isolation.isolated() + def test_wmi_exec_query(self): + import _wmi + events = [] + + def hook(event, args): + if event.startswith("_wmi."): + events.append((event, args)) + + sys.addaudithook(hook) + try: + _wmi.exec_query("SELECT * FROM Win32_OperatingSystem") + except OSError as e: + # gh-112278: WMI may be slow response when first called, but we still + # get the audit event, so just ignore the timeout + if e.winerror != 258: + raise + + self.assertEqual(len(events), 1) + self.assertEqual(events[0][0], "_wmi.exec_query") + self.assertEqual(events[0][1][0], + "SELECT * FROM Win32_OperatingSystem") + + @requires_module("syslog") + @isolation.isolated() def test_syslog(self): - syslog = import_helper.import_module("syslog") + import syslog + + events = [] - returncode, events, stderr = self.run_python("test_syslog") - if returncode: - self.fail(stderr) + def hook(event, args): + if event.startswith("syslog."): + events.append((event, args)) - if support.verbose: - print('Events:', *events, sep='\n ') + sys.addaudithook(hook) - self.assertSequenceEqual( + # The default ident is derived from sys.argv[0]. + default_ident = os.path.basename(sys.argv[0]) + + syslog.openlog('python') + syslog.syslog('test') + syslog.setlogmask(syslog.LOG_DEBUG) + syslog.closelog() + # implicit open + syslog.syslog('test2') + # open with default ident + syslog.openlog(logoption=syslog.LOG_NDELAY, facility=syslog.LOG_LOCAL0) + sys.argv = None + syslog.openlog() + syslog.closelog() + + self.assertEqual( events, - [('syslog.openlog', ' ', f'python 0 {syslog.LOG_USER}'), - ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test'), - ('syslog.setlogmask', ' ', f'{syslog.LOG_DEBUG}'), - ('syslog.closelog', '', ''), - ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test2'), - ('syslog.openlog', ' ', f'audit-tests.py 0 {syslog.LOG_USER}'), - ('syslog.openlog', ' ', f'audit-tests.py {syslog.LOG_NDELAY} {syslog.LOG_LOCAL0}'), - ('syslog.openlog', ' ', f'None 0 {syslog.LOG_USER}'), - ('syslog.closelog', '', '')] + [('syslog.openlog', ('python', 0, syslog.LOG_USER)), + ('syslog.syslog', (syslog.LOG_INFO, 'test')), + ('syslog.setlogmask', (syslog.LOG_DEBUG,)), + ('syslog.closelog', ()), + ('syslog.syslog', (syslog.LOG_INFO, 'test2')), + ('syslog.openlog', (default_ident, 0, syslog.LOG_USER)), + ('syslog.openlog', + (default_ident, syslog.LOG_NDELAY, syslog.LOG_LOCAL0)), + ('syslog.openlog', (None, 0, syslog.LOG_USER)), + ('syslog.closelog', ())], ) + @isolation.isolated() def test_not_in_gc(self): - returncode, _, stderr = self.run_python("test_not_in_gc") - if returncode: - self.fail(stderr) + import gc + + hook = lambda *a: None + sys.addaudithook(hook) + for o in gc.get_objects(): + if isinstance(o, list): + self.assertNotIn(hook, o) + + @isolation.isolated() def test_time(self): - returncode, events, stderr = self.run_python("test_time", "print") - if returncode: - self.fail(stderr) + import time + + events = [] + + def hook(event, args): + if event.startswith("time."): + events.append((event, args)) - if support.verbose: - print(*events, sep='\n') + sys.addaudithook(hook) - actual = [(ev[0], ev[2]) for ev in events] - expected = [("time.sleep", "0"), - ("time.sleep", "0.0625"), - ("time.sleep", "-1")] + time.sleep(0) + time.sleep(0.0625) # 1/16, a small exact float + with self.assertRaises(ValueError): + time.sleep(-1) - self.assertEqual(actual, expected) + self.assertEqual( + [(ev, args[0]) for ev, args in events], + [("time.sleep", 0), ("time.sleep", 0.0625), ("time.sleep", -1)], + ) + @isolation.isolated() def test_time_fail(self): - returncode, events, stderr = self.run_python("test_time", "fail", - expect_stderr=True) - self.assertNotEqual(returncode, 0) - self.assertIn('hook failed', stderr.splitlines()[-1]) + # A hook raising on an audited event propagates out of the operation. + import time + with self.assertRaises(AssertionError): + with _Hook(raise_on_events={"time.sleep"}, + exc_type=AssertionError): + time.sleep(0) + + @isolation.isolated() def test_sys_monitoring_register_callback(self): - returncode, events, stderr = self.run_python("test_sys_monitoring_register_callback") - if returncode: - self.fail(stderr) + events = [] - if support.verbose: - print(*events, sep='\n') - actual = [(ev[0], ev[2]) for ev in events] - expected = [("sys.monitoring.register_callback", "(None,)")] + def hook(event, args): + if event.startswith("sys.monitoring"): + events.append((event, args)) - self.assertEqual(actual, expected) + sys.addaudithook(hook) + sys.monitoring.register_callback(1, 1, None) + self.assertEqual( + events, [("sys.monitoring.register_callback", (None,))]) + @requires_module("_winapi") + @isolation.isolated() def test_winapi_createnamedpipe(self): - winapi = import_helper.import_module("_winapi") + import _winapi pipe_name = r"\\.\pipe\LOCAL\test_winapi_createnamed_pipe" - returncode, events, stderr = self.run_python("test_winapi_createnamedpipe", pipe_name) - if returncode: - self.fail(stderr) - if support.verbose: - print(*events, sep='\n') - actual = [(ev[0], ev[2]) for ev in events] - expected = [("_winapi.CreateNamedPipe", f"({pipe_name!r}, 3, 8)")] + events = [] + + def hook(event, args): + if event == "_winapi.CreateNamedPipe": + events.append((event, args)) + + sys.addaudithook(hook) + _winapi.CreateNamedPipe(pipe_name, _winapi.PIPE_ACCESS_DUPLEX, + 8, 2, 0, 0, 0, 0) - self.assertEqual(actual, expected) + self.assertEqual(events, [("_winapi.CreateNamedPipe", (pipe_name, 3, 8))]) + @isolation.isolated() def test_assert_unicode(self): # See gh-126018 - returncode, _, stderr = self.run_python("test_assert_unicode") - if returncode: - self.fail(stderr) + sys.addaudithook(lambda *args: None) + with self.assertRaises(TypeError): + sys.audit(9) @support.support_remote_exec_only @support.cpython_only + @isolation.isolated() def test_sys_remote_exec(self): - returncode, events, stderr = self.run_python("test_sys_remote_exec") - self.assertTrue(any(["sys.remote_exec" in event for event in events])) - self.assertTrue(any(["cpython.remote_debugger_script" in event for event in events])) - if returncode: - self.fail(stderr) + import tempfile + import time + pid = os.getpid() + events = [] + + def hook(event, args): + if event in ("sys.remote_exec", "cpython.remote_debugger_script"): + events.append((event, args)) + + sys.addaudithook(hook) + with tempfile.NamedTemporaryFile(mode='w+', delete=True) as tmp_file: + tmp_file.write("a = 1+1\n") + tmp_file.flush() + sys.remote_exec(pid, tmp_file.name) + # The remote-exec script runs as a pending call in this very + # interpreter; give it a chance to run before checking. + deadline = time.monotonic() + support.SHORT_TIMEOUT + while time.monotonic() < deadline: + if any(e == "cpython.remote_debugger_script" + for e, a in events): + break + time.sleep(0.01) + + by_event = dict(events) + self.assertIn("sys.remote_exec", by_event) + self.assertIn("cpython.remote_debugger_script", by_event) + self.assertEqual(by_event["sys.remote_exec"], + (pid, tmp_file.name)) + self.assertEqual(by_event["cpython.remote_debugger_script"], + (tmp_file.name,)) + + @isolation.isolated() def test_import_module(self): - self.do_test("test_import_module") + import importlib + with _Hook() as hook: + importlib.import_module("importlib") # imported, won't be logged + importlib.import_module("email") # standard library module + importlib.import_module("test.pythoninfo") # random module + importlib.import_module(".audit_test_data.submodule", "test") # relative + importlib.import_module("test.audit_test_data.submodule2") # absolute + importlib.import_module("_testcapi") # extension module + + actual = [a for e, a in hook.seen if e == "import"] + self.assertEqual( + actual, + [ + ("email", None, sys.path, sys.meta_path, sys.path_hooks), + ("test.pythoninfo", None, sys.path, sys.meta_path, sys.path_hooks), + ("test.audit_test_data.submodule", None, sys.path, sys.meta_path, sys.path_hooks), + ("test.audit_test_data", None, sys.path, sys.meta_path, sys.path_hooks), + ("test.audit_test_data.submodule2", None, sys.path, sys.meta_path, sys.path_hooks), + ("_testcapi", None, sys.path, sys.meta_path, sys.path_hooks), + ("_testcapi", ANY, None, None, None), + ], + ) + + @isolation.isolated() def test_builtin__import__(self): - self.do_test("test_builtin__import__") + import importlib # noqa: F401 + + with _Hook() as hook: + __import__("importlib") + __import__("email") + __import__("test.pythoninfo") + __import__("audit_test_data.submodule", level=1, + globals={"__package__": "test"}) + __import__("test.audit_test_data.submodule2") + __import__("_testcapi") + + actual = [a for e, a in hook.seen if e == "import"] + self.assertEqual( + actual, + [ + ("email", None, sys.path, sys.meta_path, sys.path_hooks), + ("test.pythoninfo", None, sys.path, sys.meta_path, sys.path_hooks), + ("test.audit_test_data.submodule", None, sys.path, sys.meta_path, sys.path_hooks), + ("test.audit_test_data", None, sys.path, sys.meta_path, sys.path_hooks), + ("test.audit_test_data.submodule2", None, sys.path, sys.meta_path, sys.path_hooks), + ("_testcapi", None, sys.path, sys.meta_path, sys.path_hooks), + ("_testcapi", ANY, None, None, None), + ], + ) + @isolation.isolated() def test_import_statement(self): - self.do_test("test_import_statement") + with _Hook() as hook: + import importlib # noqa: F401 + import email # noqa: F401 + import test.pythoninfo # noqa: F401 + from .audit_test_data import submodule # noqa: F401 + import test.audit_test_data.submodule2 # noqa: F401 + import _testcapi # noqa: F401 + + actual = [a for e, a in hook.seen if e == "import"] + # Import statement ordering is different because the package is + # loaded first and then the submodule + self.assertEqual( + actual, + [ + ("email", None, sys.path, sys.meta_path, sys.path_hooks), + ("test.pythoninfo", None, sys.path, sys.meta_path, sys.path_hooks), + ("test.audit_test_data", None, sys.path, sys.meta_path, sys.path_hooks), + ("test.audit_test_data.submodule", None, sys.path, sys.meta_path, sys.path_hooks), + ("test.audit_test_data.submodule2", None, sys.path, sys.meta_path, sys.path_hooks), + ("_testcapi", None, sys.path, sys.meta_path, sys.path_hooks), + ("_testcapi", ANY, None, None, None), + ], + ) + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index d556f96bc532ed1..90a56cf86971ea0 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -19,6 +19,7 @@ import warnings from test import support +from test.support import isolation from test.support import hashlib_helper from test.support import import_helper from test.support import os_helper @@ -1075,5 +1076,106 @@ def test_disable_hash_md5_in_fips_mode_allow_all(self): self.assertIsInstance(h, self._hashlib.HASH) +class TestIsolated(unittest.TestCase): + # Drive the sample tests in test._isolated_sample (which really spawn + # subprocesses through @isolation.isolated()) under a private TestResult, + # and check that each subprocess outcome is replayed in the parent. + + @staticmethod + def _run(name): + suite = unittest.TestLoader().loadTestsFromName( + 'test._isolated_sample.' + name) + result = unittest.TestResult() + suite.run(result) + return result + + @staticmethod + def _names(items): + # Map outcome entries (which are (test, detail) pairs, except + # unexpectedSuccesses which are bare tests) to their method names. + names = [] + for item in items: + test = item[0] if isinstance(item, tuple) else item + names.append(test.id().rpartition('.')[2]) + return sorted(names) + + @support.requires_subprocess() + def test_method_outcomes(self): + result = self._run('MethodSample') + self.assertEqual(result.testsRun, 6) + self.assertEqual(self._names(result.failures), ['test_fail']) + self.assertEqual(self._names(result.errors), ['test_error']) + self.assertEqual(self._names(result.skipped), ['test_skip']) + self.assertEqual(self._names(result.expectedFailures), + ['test_expected_failure']) + self.assertEqual(self._names(result.unexpectedSuccesses), + ['test_unexpected_success']) + + @support.requires_subprocess() + def test_class_outcomes(self): + result = self._run('ClassSample') + self.assertEqual(result.testsRun, 3) + self.assertEqual(self._names(result.failures), ['test_fail']) + self.assertEqual(self._names(result.expectedFailures), + ['test_expected_failure']) + self.assertEqual(result.errors, []) + self.assertEqual(result.unexpectedSuccesses, []) + + @support.requires_subprocess() + def test_subtests_reported_individually(self): + result = self._run('SubtestSample') + self.assertEqual(result.testsRun, 1) + self.assertEqual(len(result.failures), 1) + test, _ = result.failures[0] + self.assertIn('i=1', str(test)) + + @support.requires_subprocess() + def test_skip_reason_propagated(self): + result = self._run('MethodSample.test_skip') + self.assertEqual([reason for _, reason in result.skipped], ['nope']) + + @support.requires_subprocess() + def test_subprocess_traceback_is_cause(self): + result = self._run('MethodSample.test_fail') + self.assertEqual(len(result.failures), 1) + _, tb = result.failures[0] + # The real assertion that failed in the subprocess is shown ... + self.assertIn('self.assertEqual(1, 2)', tb) + # ... as the direct cause of the replayed failure ... + self.assertIn('direct cause', tb) + # ... without leaking the parent-side replay frames. + self.assertNotIn('isolation.py', tb) + + @support.requires_subprocess() + def test_durations_forwarded_for_class(self): + from test._isolated_sample import DURATION_SLEEP + result = unittest.TestResult() + result.collectedDurations = [] + suite = unittest.TestLoader().loadTestsFromName( + 'test._isolated_sample.DurationSample') + suite.run(result) + # The duration reported in the parent is the one measured in the + # subprocess (around the sleep), not the near-instant replay time. + self.assertEqual(len(result.collectedDurations), 1) + name, elapsed = result.collectedDurations[0] + self.assertEqual(name.split()[0], 'test_slow') + self.assertGreaterEqual(elapsed, DURATION_SLEEP / 2) + + def test_skipped_without_subprocess_support(self): + # On a platform without subprocess support the test is skipped in the + # parent, before any subprocess is spawned. + calls = [] + orig = isolation._run_in_subprocess + with support.swap_attr(support, 'has_subprocess_support', False): + isolation._run_in_subprocess = lambda *a, **k: calls.append(a) + try: + result = self._run('MethodSample.test_pass') + finally: + isolation._run_in_subprocess = orig + self.assertEqual(result.testsRun, 1) + self.assertEqual(len(result.skipped), 1) + self.assertEqual(calls, []) + + if __name__ == '__main__': unittest.main() diff --git a/Misc/NEWS.d/next/Tests/2026-06-29-10-14-09.gh-issue-152548.Khw9J7.rst b/Misc/NEWS.d/next/Tests/2026-06-29-10-14-09.gh-issue-152548.Khw9J7.rst new file mode 100644 index 000000000000000..c5559ba110de96a --- /dev/null +++ b/Misc/NEWS.d/next/Tests/2026-06-29-10-14-09.gh-issue-152548.Khw9J7.rst @@ -0,0 +1,3 @@ +Add the :func:`test.support.isolated` decorator to run a test method or +``TestCase`` subclass in a fresh interpreter subprocess, isolated from the rest +of the test run.