diff --git a/tests/unit/_utils/test_system.py b/tests/unit/_utils/test_system.py index 1813b151a6..e263b3c161 100644 --- a/tests/unit/_utils/test_system.py +++ b/tests/unit/_utils/test_system.py @@ -12,6 +12,14 @@ if TYPE_CHECKING: from collections.abc import Callable + from multiprocessing.sharedctypes import Synchronized + +# The `spawn` start method is used for all processes in the shared-memory estimation test. Unlike `fork`, it is safe +# to use from the multi-threaded pytest process (forking a multi-threaded process is deprecated since Python 3.12), +# but it requires all process targets to be picklable, module-level functions. +_ctx = get_context('spawn') + +_EXTRA_MEMORY_SIZE = 1024 * 1024 * 100 # 100 MB def test_get_memory_info_returns_valid_values() -> None: @@ -26,6 +34,100 @@ def test_get_cpu_info_returns_valid_values() -> None: assert 0 <= cpu_info.used_ratio <= 1 +def _no_extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier) -> None: + ready.wait() + measured.wait() + + +def _extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier) -> None: + memory = SharedMemory(size=_EXTRA_MEMORY_SIZE, create=True) + assert memory.buf is not None + memory.buf[:] = bytearray([255 for _ in range(_EXTRA_MEMORY_SIZE)]) + print(f'Using the memory... {memory.buf[-1]}') + ready.wait() + measured.wait() + memory.close() + memory.unlink() + + +def _shared_extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier, memory: SharedMemory) -> None: + assert memory.buf is not None + print(f'Using the memory... {memory.buf[-1]}') + ready.wait() + measured.wait() + + +def _get_additional_memory_estimation_while_running_processes( + *, target: Callable, count: int = 1, use_shared_memory: bool = False +) -> float: + processes = [] + ready = _ctx.Barrier(parties=count + 1) + measured = _ctx.Barrier(parties=count + 1) + shared_memory: None | SharedMemory = None + memory_before = get_memory_info().current_size + + if use_shared_memory: + shared_memory = SharedMemory(size=_EXTRA_MEMORY_SIZE, create=True) + assert shared_memory.buf is not None + shared_memory.buf[:] = bytearray([255 for _ in range(_EXTRA_MEMORY_SIZE)]) + extra_args = [shared_memory] + else: + extra_args = [] + + for _ in range(count): + p = _ctx.Process(target=target, args=[ready, measured, *extra_args]) + p.start() + processes.append(p) + + ready.wait() + memory_during = get_memory_info().current_size + measured.wait() + + for p in processes: + p.join() + + if shared_memory: + shared_memory.close() + shared_memory.unlink() + + return (memory_during - memory_before).to_mb() / count + + +def _parent_process(estimated_memory_expectation: Synchronized) -> None: + children_count = 4 + # Memory calculation is not exact, so allow for some tolerance. + test_tolerance = 0.3 + + additional_memory_simple_child = _get_additional_memory_estimation_while_running_processes( + target=_no_extra_memory_child, count=children_count + ) + additional_memory_extra_memory_child = ( + _get_additional_memory_estimation_while_running_processes(target=_extra_memory_child, count=children_count) + - additional_memory_simple_child + ) + additional_memory_shared_extra_memory_child = ( + _get_additional_memory_estimation_while_running_processes( + target=_shared_extra_memory_child, count=children_count, use_shared_memory=True + ) + - additional_memory_simple_child + ) + + memory_estimation_difference_ratio = ( + abs((additional_memory_shared_extra_memory_child * children_count) - additional_memory_extra_memory_child) + / additional_memory_extra_memory_child + ) + + estimated_memory_expectation.value = memory_estimation_difference_ratio < test_tolerance + + if not estimated_memory_expectation.value: + print( + f'{additional_memory_shared_extra_memory_child=}\n' + f'{children_count=}\n' + f'{additional_memory_extra_memory_child=}\n' + f'{memory_estimation_difference_ratio=}' + ) + + @pytest.mark.skipif(sys.platform != 'linux', reason='Improved estimation available only on Linux') def test_memory_estimation_does_not_overestimate_due_to_shared_memory() -> None: """Test that memory usage estimation is not overestimating memory usage by counting shared memory multiple times. @@ -38,103 +140,9 @@ def test_memory_estimation_does_not_overestimate_due_to_shared_memory() -> None: equal to additional_memory_size_estimate_per_unshared_memory_child where the additional shared memory is exactly the same as the unshared memory. """ + estimated_memory_expectation = _ctx.Value('b', False) # noqa: FBT003 # Common usage pattern for multiprocessing.Value - ctx = get_context('fork') - estimated_memory_expectation = ctx.Value('b', False) # noqa: FBT003 # Common usage pattern for multiprocessing.Value - - def parent_process() -> None: - extra_memory_size = 1024 * 1024 * 100 # 100 MB - children_count = 4 - # Memory calculation is not exact, so allow for some tolerance. - test_tolerance = 0.3 - - def no_extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier) -> None: - ready.wait() - measured.wait() - - def extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier) -> None: - memory = SharedMemory(size=extra_memory_size, create=True) - assert memory.buf is not None - memory.buf[:] = bytearray([255 for _ in range(extra_memory_size)]) - print(f'Using the memory... {memory.buf[-1]}') - ready.wait() - measured.wait() - memory.close() - memory.unlink() - - def shared_extra_memory_child( - ready: synchronize.Barrier, measured: synchronize.Barrier, memory: SharedMemory - ) -> None: - assert memory.buf is not None - print(f'Using the memory... {memory.buf[-1]}') - ready.wait() - measured.wait() - - def get_additional_memory_estimation_while_running_processes( - *, target: Callable, count: int = 1, use_shared_memory: bool = False - ) -> float: - processes = [] - ready = ctx.Barrier(parties=count + 1) - measured = ctx.Barrier(parties=count + 1) - shared_memory: None | SharedMemory = None - memory_before = get_memory_info().current_size - - if use_shared_memory: - shared_memory = SharedMemory(size=extra_memory_size, create=True) - assert shared_memory.buf is not None - shared_memory.buf[:] = bytearray([255 for _ in range(extra_memory_size)]) - extra_args = [shared_memory] - else: - extra_args = [] - - for _ in range(count): - p = ctx.Process(target=target, args=[ready, measured, *extra_args]) - p.start() - processes.append(p) - - ready.wait() - memory_during = get_memory_info().current_size - measured.wait() - - for p in processes: - p.join() - - if shared_memory: - shared_memory.close() - shared_memory.unlink() - - return (memory_during - memory_before).to_mb() / count - - additional_memory_simple_child = get_additional_memory_estimation_while_running_processes( - target=no_extra_memory_child, count=children_count - ) - additional_memory_extra_memory_child = ( - get_additional_memory_estimation_while_running_processes(target=extra_memory_child, count=children_count) - - additional_memory_simple_child - ) - additional_memory_shared_extra_memory_child = ( - get_additional_memory_estimation_while_running_processes( - target=shared_extra_memory_child, count=children_count, use_shared_memory=True - ) - - additional_memory_simple_child - ) - - memory_estimation_difference_ratio = ( - abs((additional_memory_shared_extra_memory_child * children_count) - additional_memory_extra_memory_child) - / additional_memory_extra_memory_child - ) - - estimated_memory_expectation.value = memory_estimation_difference_ratio < test_tolerance - - if not estimated_memory_expectation.value: - print( - f'{additional_memory_shared_extra_memory_child=}\n' - f'{children_count=}\n' - f'{additional_memory_extra_memory_child=}\n' - f'{memory_estimation_difference_ratio=}' - ) - - process = ctx.Process(target=parent_process) + process = _ctx.Process(target=_parent_process, args=(estimated_memory_expectation,)) process.start() process.join() diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 58040a1c57..32e1f00017 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import multiprocessing import os import warnings from typing import TYPE_CHECKING, Any, cast @@ -28,6 +29,11 @@ from crawlee.http_clients._base import HttpClient +# Use the `spawn` start method for all processes created in tests via `multiprocessing` (`proxy.py` acceptors, +# `ProcessPoolExecutor` workers). The default `fork` method on Linux (Python <= 3.13) is unsafe in the multi-threaded +# pytest process and emits a `DeprecationWarning` on Python 3.12+. Python 3.14 already defaults to a non-fork method. +multiprocessing.set_start_method('spawn', force=True) + @pytest.fixture(autouse=True) async def suppress_user_warning() -> AsyncGenerator[None, None]: diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 6a39e83c12..dd62b77d84 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -2143,7 +2143,7 @@ async def test_global_and_local_event_manager_in_crawler_run() -> None: crawler = BasicCrawler(event_manager=local_event_manager) - handler_call = AsyncMock() + handler_call = Mock() @crawler.router.default_handler async def handler(context: BasicCrawlingContext) -> None: