|
7 | 7 | import threading |
8 | 8 | from asyncio import staggered, taskgroups, base_events, tasks |
9 | 9 | from unittest.mock import ANY |
10 | | -from test.support import os_helper, SHORT_TIMEOUT, busy_retry |
| 10 | +from test.support import os_helper, SHORT_TIMEOUT, busy_retry, requires_gil_enabled |
11 | 11 | from test.support.script_helper import make_script |
12 | 12 | from test.support.socket_helper import find_unused_port |
13 | 13 |
|
@@ -876,6 +876,126 @@ def test_self_trace(self): |
876 | 876 | ], |
877 | 877 | ) |
878 | 878 |
|
| 879 | + @skip_if_not_supported |
| 880 | + @unittest.skipIf( |
| 881 | + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, |
| 882 | + "Test only runs on Linux with process_vm_readv support", |
| 883 | + ) |
| 884 | + @requires_gil_enabled("Free threaded builds don't have an 'active thread'") |
| 885 | + def test_only_active_thread(self): |
| 886 | + # Test that only_active_thread parameter works correctly |
| 887 | + port = find_unused_port() |
| 888 | + script = textwrap.dedent( |
| 889 | + f"""\ |
| 890 | + import time, sys, socket, threading |
| 891 | +
|
| 892 | + # Connect to the test process |
| 893 | + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 894 | + sock.connect(('localhost', {port})) |
| 895 | +
|
| 896 | + def worker_thread(name, barrier, ready_event): |
| 897 | + barrier.wait() # Synchronize thread start |
| 898 | + ready_event.wait() # Wait for main thread signal |
| 899 | + # Sleep to keep thread alive |
| 900 | + time.sleep(10_000) |
| 901 | +
|
| 902 | + def main_work(): |
| 903 | + # Do busy work to hold the GIL |
| 904 | + sock.sendall(b"working\\n") |
| 905 | + count = 0 |
| 906 | + while count < 100000000: |
| 907 | + count += 1 |
| 908 | + if count % 10000000 == 0: |
| 909 | + pass # Keep main thread busy |
| 910 | + sock.sendall(b"done\\n") |
| 911 | +
|
| 912 | + # Create synchronization primitives |
| 913 | + num_threads = 3 |
| 914 | + barrier = threading.Barrier(num_threads + 1) # +1 for main thread |
| 915 | + ready_event = threading.Event() |
| 916 | +
|
| 917 | + # Start worker threads |
| 918 | + threads = [] |
| 919 | + for i in range(num_threads): |
| 920 | + t = threading.Thread(target=worker_thread, args=(f"Worker-{{i}}", barrier, ready_event)) |
| 921 | + t.start() |
| 922 | + threads.append(t) |
| 923 | +
|
| 924 | + # Wait for all threads to be ready |
| 925 | + barrier.wait() |
| 926 | +
|
| 927 | + # Signal ready to parent process |
| 928 | + sock.sendall(b"ready\\n") |
| 929 | +
|
| 930 | + # Signal threads to start waiting |
| 931 | + ready_event.set() |
| 932 | +
|
| 933 | + # Give threads time to start sleeping |
| 934 | + time.sleep(0.1) |
| 935 | +
|
| 936 | + # Now do busy work to hold the GIL |
| 937 | + main_work() |
| 938 | + """ |
| 939 | + ) |
| 940 | + |
| 941 | + with os_helper.temp_dir() as work_dir: |
| 942 | + script_dir = os.path.join(work_dir, "script_pkg") |
| 943 | + os.mkdir(script_dir) |
| 944 | + |
| 945 | + # Create a socket server to communicate with the target process |
| 946 | + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 947 | + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 948 | + server_socket.bind(("localhost", port)) |
| 949 | + server_socket.settimeout(SHORT_TIMEOUT) |
| 950 | + server_socket.listen(1) |
| 951 | + |
| 952 | + script_name = _make_test_script(script_dir, "script", script) |
| 953 | + client_socket = None |
| 954 | + try: |
| 955 | + p = subprocess.Popen([sys.executable, script_name]) |
| 956 | + client_socket, _ = server_socket.accept() |
| 957 | + server_socket.close() |
| 958 | + |
| 959 | + # Wait for ready signal |
| 960 | + response = b"" |
| 961 | + while b"ready" not in response: |
| 962 | + response += client_socket.recv(1024) |
| 963 | + |
| 964 | + # Wait for the main thread to start its busy work |
| 965 | + while b"working" not in response: |
| 966 | + response += client_socket.recv(1024) |
| 967 | + |
| 968 | + # Get stack trace with all threads |
| 969 | + unwinder_all = RemoteUnwinder(p.pid, all_threads=True) |
| 970 | + all_traces = unwinder_all.get_stack_trace() |
| 971 | + |
| 972 | + # Get stack trace with only GIL holder |
| 973 | + unwinder_gil = RemoteUnwinder(p.pid, only_active_thread=True) |
| 974 | + gil_traces = unwinder_gil.get_stack_trace() |
| 975 | + |
| 976 | + except PermissionError: |
| 977 | + self.skipTest( |
| 978 | + "Insufficient permissions to read the stack trace" |
| 979 | + ) |
| 980 | + finally: |
| 981 | + if client_socket is not None: |
| 982 | + client_socket.close() |
| 983 | + p.kill() |
| 984 | + p.terminate() |
| 985 | + p.wait(timeout=SHORT_TIMEOUT) |
| 986 | + |
| 987 | + # Verify we got multiple threads in all_traces |
| 988 | + self.assertGreater(len(all_traces), 1, "Should have multiple threads") |
| 989 | + |
| 990 | + # Verify we got exactly one thread in gil_traces |
| 991 | + self.assertEqual(len(gil_traces), 1, "Should have exactly one GIL holder") |
| 992 | + |
| 993 | + # The GIL holder should be in the all_traces list |
| 994 | + gil_thread_id = gil_traces[0][0] |
| 995 | + all_thread_ids = [trace[0] for trace in all_traces] |
| 996 | + self.assertIn(gil_thread_id, all_thread_ids, |
| 997 | + "GIL holder should be among all threads") |
| 998 | + |
879 | 999 |
|
880 | 1000 | if __name__ == "__main__": |
881 | 1001 | unittest.main() |
0 commit comments