Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 104 additions & 96 deletions tests/unit/_utils/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@

if TYPE_CHECKING:
from collections.abc import Callable
from multiprocessing.sharedctypes import Synchronized

# The `spawn` start method is used for all processes in the shared-memory estimation test. Unlike `fork`, it is safe
# to use from the multi-threaded pytest process (forking a multi-threaded process is deprecated since Python 3.12),
# but it requires all process targets to be picklable, module-level functions.
_ctx = get_context('spawn')

_EXTRA_MEMORY_SIZE = 1024 * 1024 * 100 # 100 MB


def test_get_memory_info_returns_valid_values() -> None:
Expand All @@ -26,6 +34,100 @@ def test_get_cpu_info_returns_valid_values() -> None:
assert 0 <= cpu_info.used_ratio <= 1


def _no_extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier) -> None:
ready.wait()
measured.wait()


def _extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier) -> None:
memory = SharedMemory(size=_EXTRA_MEMORY_SIZE, create=True)
assert memory.buf is not None
memory.buf[:] = bytearray([255 for _ in range(_EXTRA_MEMORY_SIZE)])
print(f'Using the memory... {memory.buf[-1]}')
ready.wait()
measured.wait()
memory.close()
memory.unlink()


def _shared_extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier, memory: SharedMemory) -> None:
assert memory.buf is not None
print(f'Using the memory... {memory.buf[-1]}')
ready.wait()
measured.wait()


def _get_additional_memory_estimation_while_running_processes(
*, target: Callable, count: int = 1, use_shared_memory: bool = False
) -> float:
processes = []
ready = _ctx.Barrier(parties=count + 1)
measured = _ctx.Barrier(parties=count + 1)
shared_memory: None | SharedMemory = None
memory_before = get_memory_info().current_size

if use_shared_memory:
shared_memory = SharedMemory(size=_EXTRA_MEMORY_SIZE, create=True)
assert shared_memory.buf is not None
shared_memory.buf[:] = bytearray([255 for _ in range(_EXTRA_MEMORY_SIZE)])
extra_args = [shared_memory]
else:
extra_args = []

for _ in range(count):
p = _ctx.Process(target=target, args=[ready, measured, *extra_args])
p.start()
processes.append(p)

ready.wait()
memory_during = get_memory_info().current_size
measured.wait()

for p in processes:
p.join()

if shared_memory:
shared_memory.close()
shared_memory.unlink()

return (memory_during - memory_before).to_mb() / count


def _parent_process(estimated_memory_expectation: Synchronized) -> None:
children_count = 4
# Memory calculation is not exact, so allow for some tolerance.
test_tolerance = 0.3

additional_memory_simple_child = _get_additional_memory_estimation_while_running_processes(
target=_no_extra_memory_child, count=children_count
)
additional_memory_extra_memory_child = (
_get_additional_memory_estimation_while_running_processes(target=_extra_memory_child, count=children_count)
- additional_memory_simple_child
)
additional_memory_shared_extra_memory_child = (
_get_additional_memory_estimation_while_running_processes(
target=_shared_extra_memory_child, count=children_count, use_shared_memory=True
)
- additional_memory_simple_child
)

memory_estimation_difference_ratio = (
abs((additional_memory_shared_extra_memory_child * children_count) - additional_memory_extra_memory_child)
/ additional_memory_extra_memory_child
)

estimated_memory_expectation.value = memory_estimation_difference_ratio < test_tolerance

if not estimated_memory_expectation.value:
print(
f'{additional_memory_shared_extra_memory_child=}\n'
f'{children_count=}\n'
f'{additional_memory_extra_memory_child=}\n'
f'{memory_estimation_difference_ratio=}'
)


@pytest.mark.skipif(sys.platform != 'linux', reason='Improved estimation available only on Linux')
def test_memory_estimation_does_not_overestimate_due_to_shared_memory() -> None:
"""Test that memory usage estimation is not overestimating memory usage by counting shared memory multiple times.
Expand All @@ -38,103 +140,9 @@ def test_memory_estimation_does_not_overestimate_due_to_shared_memory() -> None:
equal to additional_memory_size_estimate_per_unshared_memory_child where the additional shared memory is exactly
the same as the unshared memory.
"""
estimated_memory_expectation = _ctx.Value('b', False) # noqa: FBT003 # Common usage pattern for multiprocessing.Value

ctx = get_context('fork')
estimated_memory_expectation = ctx.Value('b', False) # noqa: FBT003 # Common usage pattern for multiprocessing.Value

def parent_process() -> None:
extra_memory_size = 1024 * 1024 * 100 # 100 MB
children_count = 4
# Memory calculation is not exact, so allow for some tolerance.
test_tolerance = 0.3

def no_extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier) -> None:
ready.wait()
measured.wait()

def extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier) -> None:
memory = SharedMemory(size=extra_memory_size, create=True)
assert memory.buf is not None
memory.buf[:] = bytearray([255 for _ in range(extra_memory_size)])
print(f'Using the memory... {memory.buf[-1]}')
ready.wait()
measured.wait()
memory.close()
memory.unlink()

def shared_extra_memory_child(
ready: synchronize.Barrier, measured: synchronize.Barrier, memory: SharedMemory
) -> None:
assert memory.buf is not None
print(f'Using the memory... {memory.buf[-1]}')
ready.wait()
measured.wait()

def get_additional_memory_estimation_while_running_processes(
*, target: Callable, count: int = 1, use_shared_memory: bool = False
) -> float:
processes = []
ready = ctx.Barrier(parties=count + 1)
measured = ctx.Barrier(parties=count + 1)
shared_memory: None | SharedMemory = None
memory_before = get_memory_info().current_size

if use_shared_memory:
shared_memory = SharedMemory(size=extra_memory_size, create=True)
assert shared_memory.buf is not None
shared_memory.buf[:] = bytearray([255 for _ in range(extra_memory_size)])
extra_args = [shared_memory]
else:
extra_args = []

for _ in range(count):
p = ctx.Process(target=target, args=[ready, measured, *extra_args])
p.start()
processes.append(p)

ready.wait()
memory_during = get_memory_info().current_size
measured.wait()

for p in processes:
p.join()

if shared_memory:
shared_memory.close()
shared_memory.unlink()

return (memory_during - memory_before).to_mb() / count

additional_memory_simple_child = get_additional_memory_estimation_while_running_processes(
target=no_extra_memory_child, count=children_count
)
additional_memory_extra_memory_child = (
get_additional_memory_estimation_while_running_processes(target=extra_memory_child, count=children_count)
- additional_memory_simple_child
)
additional_memory_shared_extra_memory_child = (
get_additional_memory_estimation_while_running_processes(
target=shared_extra_memory_child, count=children_count, use_shared_memory=True
)
- additional_memory_simple_child
)

memory_estimation_difference_ratio = (
abs((additional_memory_shared_extra_memory_child * children_count) - additional_memory_extra_memory_child)
/ additional_memory_extra_memory_child
)

estimated_memory_expectation.value = memory_estimation_difference_ratio < test_tolerance

if not estimated_memory_expectation.value:
print(
f'{additional_memory_shared_extra_memory_child=}\n'
f'{children_count=}\n'
f'{additional_memory_extra_memory_child=}\n'
f'{memory_estimation_difference_ratio=}'
)

process = ctx.Process(target=parent_process)
process = _ctx.Process(target=_parent_process, args=(estimated_memory_expectation,))
process.start()
process.join()

Expand Down
6 changes: 6 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import multiprocessing
import os
import warnings
from typing import TYPE_CHECKING, Any, cast
Expand Down Expand Up @@ -28,6 +29,11 @@

from crawlee.http_clients._base import HttpClient

# Use the `spawn` start method for all processes created in tests via `multiprocessing` (`proxy.py` acceptors,
# `ProcessPoolExecutor` workers). The default `fork` method on Linux (Python <= 3.13) is unsafe in the multi-threaded
# pytest process and emits a `DeprecationWarning` on Python 3.12+. Python 3.14 already defaults to a non-fork method.
multiprocessing.set_start_method('spawn', force=True)


@pytest.fixture(autouse=True)
async def suppress_user_warning() -> AsyncGenerator[None, None]:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2143,7 +2143,7 @@ async def test_global_and_local_event_manager_in_crawler_run() -> None:

crawler = BasicCrawler(event_manager=local_event_manager)

handler_call = AsyncMock()
handler_call = Mock()

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
Expand Down
Loading