Skip to content

Commit 96fc33e

Browse files
authored
fix: Dequeue items from RequestQueue in the correct order (#411)
1 parent 7babd18 commit 96fc33e

File tree

4 files changed

+212
-43
lines changed

4 files changed

+212
-43
lines changed

src/crawlee/storages/base_storage.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@
1010
class BaseStorage(ABC):
1111
"""Base class for storages."""
1212

13-
LABEL = 'Unknown'
14-
"""Human readable label of the storage."""
15-
1613
@property
1714
@abstractmethod
1815
def id(self) -> str:

src/crawlee/storages/request_queue.py

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
from collections import OrderedDict
55
from datetime import datetime, timedelta, timezone
66
from logging import getLogger
7-
from typing import TYPE_CHECKING
8-
from typing import OrderedDict as OrderedDictType
7+
from typing import TYPE_CHECKING, Generic, TypedDict, TypeVar
98

109
from typing_extensions import override
1110

@@ -31,6 +30,40 @@
3130

3231
logger = getLogger(__name__)
3332

33+
__all__ = ['RequestQueue']
34+
35+
36+
T = TypeVar('T')
37+
38+
39+
class BoundedSet(Generic[T]):
40+
"""A simple set datastructure that removes the least recently accessed item when it reaches `max_length`."""
41+
42+
def __init__(self, max_length: int) -> None:
43+
self._max_length = max_length
44+
self._data = OrderedDict[T, object]()
45+
46+
def __contains__(self, item: T) -> bool:
47+
found = item in self._data
48+
if found:
49+
self._data.move_to_end(item, last=True)
50+
return found
51+
52+
def add(self, item: T) -> None:
53+
self._data[item] = True
54+
self._data.move_to_end(item)
55+
56+
if len(self._data) > self._max_length:
57+
self._data.popitem(last=False)
58+
59+
def clear(self) -> None:
60+
self._data.clear()
61+
62+
63+
class CachedRequest(TypedDict):
64+
id: str
65+
was_already_handled: bool
66+
3467

3568
class RequestQueue(BaseStorage, RequestProvider):
3669
"""Represents a queue storage for HTTP requests to crawl.
@@ -97,12 +130,12 @@ def __init__(
97130
self._internal_timeout_seconds = 5 * 60
98131
self._assumed_total_count = 0
99132
self._assumed_handled_count = 0
100-
self._queue_head_dict: OrderedDictType[str, str] = OrderedDict()
133+
self._queue_head_dict: OrderedDict[str, str] = OrderedDict()
101134
self._query_queue_head_task: asyncio.Task | None = None
102135
self._in_progress: set[str] = set()
103136
self._last_activity = datetime.now(timezone.utc)
104-
self._recently_handled: LRUCache[bool] = LRUCache(max_length=self._RECENTLY_HANDLED_CACHE_SIZE)
105-
self._requests_cache: LRUCache[dict] = LRUCache(max_length=self._MAX_CACHED_REQUESTS)
137+
self._recently_handled: BoundedSet[str] = BoundedSet(max_length=self._RECENTLY_HANDLED_CACHE_SIZE)
138+
self._requests_cache: LRUCache[CachedRequest] = LRUCache(max_length=self._MAX_CACHED_REQUESTS)
106139

107140
@override
108141
@property
@@ -125,13 +158,17 @@ async def open(
125158
) -> RequestQueue:
126159
from crawlee.storages._creation_management import open_storage
127160

128-
return await open_storage(
161+
storage = await open_storage(
129162
storage_class=cls,
130163
id=id,
131164
name=name,
132165
configuration=configuration,
133166
)
134167

168+
await storage._ensure_head_is_non_empty() # noqa: SLF001 - accessing private members from factories is OK
169+
170+
return storage
171+
135172
@override
136173
async def drop(self, *, timeout: timedelta | None = None) -> None:
137174
from crawlee.storages._creation_management import remove_storage_from_cache
@@ -208,7 +245,7 @@ async def add_request(
208245
not is_handled
209246
and not was_already_present
210247
and request_id not in self._in_progress
211-
and self._recently_handled.get(request_id) is None
248+
and request_id not in self._recently_handled
212249
):
213250
self._assumed_total_count += 1
214251
self._maybe_add_request_to_queue_head(request_id, forefront=forefront)
@@ -284,7 +321,7 @@ async def fetch_next_request(self) -> Request | None:
284321
Returns:
285322
The request or `None` if there are no more pending requests.
286323
"""
287-
await self.ensure_head_is_non_empty()
324+
await self._ensure_head_is_non_empty()
288325

289326
# We are likely done at this point.
290327
if len(self._queue_head_dict) == 0:
@@ -293,7 +330,7 @@ async def fetch_next_request(self) -> Request | None:
293330
next_request_id, _ = self._queue_head_dict.popitem(last=False) # ~removeFirst()
294331

295332
# This should never happen, but...
296-
if next_request_id in self._in_progress or self._recently_handled.get(next_request_id):
333+
if next_request_id in self._in_progress or next_request_id in self._recently_handled:
297334
logger.warning(
298335
'Queue head returned a request that is already in progress?!',
299336
extra={
@@ -343,7 +380,7 @@ async def fetch_next_request(self) -> Request | None:
343380
'Request fetched from the beginning of queue was already handled',
344381
extra={'nextRequestId': next_request_id},
345382
)
346-
self._recently_handled[next_request_id] = True
383+
self._recently_handled.add(next_request_id)
347384
return None
348385

349386
return request
@@ -372,7 +409,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
372409
processed_request.unique_key = request.unique_key
373410

374411
self._in_progress.remove(request.id)
375-
self._recently_handled[request.id] = True
412+
self._recently_handled.add(request.id)
376413

377414
if not processed_request.was_already_handled:
378415
self._assumed_handled_count += 1
@@ -431,7 +468,7 @@ async def is_empty(self) -> bool:
431468
Returns:
432469
bool: `True` if the next call to `RequestQueue.fetchNextRequest` would return `None`, otherwise `False`.
433470
"""
434-
await self.ensure_head_is_non_empty()
471+
await self._ensure_head_is_non_empty()
435472
return len(self._queue_head_dict) == 0
436473

437474
async def is_finished(self) -> bool:
@@ -457,7 +494,7 @@ async def is_finished(self) -> bool:
457494

458495
# TODO: set ensure_consistency to True once the following issue is resolved:
459496
# https://github.com/apify/crawlee-python/issues/203
460-
is_head_consistent = await self.ensure_head_is_non_empty(ensure_consistency=False)
497+
is_head_consistent = await self._ensure_head_is_non_empty(ensure_consistency=False)
461498
return is_head_consistent and len(self._queue_head_dict) == 0 and self._in_progress_count() == 0
462499

463500
async def get_info(self) -> RequestQueueMetadata | None:
@@ -472,7 +509,7 @@ async def get_handled_count(self) -> int:
472509
async def get_total_count(self) -> int:
473510
return self._assumed_total_count
474511

475-
async def ensure_head_is_non_empty(
512+
async def _ensure_head_is_non_empty(
476513
self,
477514
*,
478515
ensure_consistency: bool = False,
@@ -556,7 +593,7 @@ async def ensure_head_is_non_empty(
556593
logger.info(f'Waiting for {delay_seconds} for queue finalization, to ensure data consistency.')
557594
await asyncio.sleep(delay_seconds)
558595

559-
return await self.ensure_head_is_non_empty(
596+
return await self._ensure_head_is_non_empty(
560597
ensure_consistency=ensure_consistency,
561598
limit=next_limit,
562599
iteration=iteration + 1,
@@ -578,8 +615,6 @@ def _reset(self) -> None:
578615
def _cache_request(self, cache_key: str, processed_request: ProcessedRequest) -> None:
579616
self._requests_cache[cache_key] = {
580617
'id': processed_request.id,
581-
'is_handled': processed_request.was_already_handled,
582-
'unique_key': processed_request.unique_key,
583618
'was_already_handled': processed_request.was_already_handled,
584619
}
585620

@@ -595,7 +630,7 @@ async def _queue_query_head(self, limit: int) -> RequestQueueHeadState:
595630
not request.id
596631
or not request.unique_key
597632
or request.id in self._in_progress
598-
or self._recently_handled.get(request.id)
633+
or request.id in self._recently_handled
599634
):
600635
continue
601636

tests/unit/conftest.py

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from __future__ import annotations
55

66
import os
7-
from typing import TYPE_CHECKING
7+
from typing import TYPE_CHECKING, Callable
88

99
import pytest
1010
from proxy import Proxy
@@ -20,8 +20,35 @@
2020
from pathlib import Path
2121

2222

23+
@pytest.fixture()
24+
def reset_globals(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> Callable[[], None]:
25+
def reset() -> None:
26+
# Set the environment variable for the local storage directory to the temporary path
27+
monkeypatch.setenv('CRAWLEE_STORAGE_DIR', str(tmp_path))
28+
29+
# Reset the local and cloud clients in StorageClientManager
30+
StorageClientManager._local_client = MemoryStorageClient()
31+
StorageClientManager._cloud_client = None
32+
33+
# Remove global configuration instance - it may contain settings adjusted by a previous test
34+
Configuration._default_instance = None
35+
36+
# Clear creation-related caches to ensure no state is carried over between tests
37+
monkeypatch.setattr(_creation_management, '_cache_dataset_by_id', {})
38+
monkeypatch.setattr(_creation_management, '_cache_dataset_by_name', {})
39+
monkeypatch.setattr(_creation_management, '_cache_kvs_by_id', {})
40+
monkeypatch.setattr(_creation_management, '_cache_kvs_by_name', {})
41+
monkeypatch.setattr(_creation_management, '_cache_rq_by_id', {})
42+
monkeypatch.setattr(_creation_management, '_cache_rq_by_name', {})
43+
44+
# Verify that the environment variable is set correctly
45+
assert os.environ.get('CRAWLEE_STORAGE_DIR') == str(tmp_path)
46+
47+
return reset
48+
49+
2350
@pytest.fixture(autouse=True)
24-
def _isolate_test_environment(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
51+
def _isolate_test_environment(reset_globals: Callable[[], None]) -> None:
2552
"""Isolate tests by resetting the storage clients, clearing caches, and setting the environment variables.
2653
2754
The fixture is applied automatically to all test cases.
@@ -30,26 +57,8 @@ def _isolate_test_environment(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -
3057
monkeypatch: Test utility provided by pytest.
3158
tmp_path: A unique temporary directory path provided by pytest for test isolation.
3259
"""
33-
# Set the environment variable for the local storage directory to the temporary path
34-
monkeypatch.setenv('CRAWLEE_STORAGE_DIR', str(tmp_path))
35-
36-
# Reset the local and cloud clients in StorageClientManager
37-
StorageClientManager._local_client = MemoryStorageClient()
38-
StorageClientManager._cloud_client = None
39-
40-
# Remove global configuration instance - it may contain settings adjusted by a previous test
41-
Configuration._default_instance = None
42-
43-
# Clear creation-related caches to ensure no state is carried over between tests
44-
monkeypatch.setattr(_creation_management, '_cache_dataset_by_id', {})
45-
monkeypatch.setattr(_creation_management, '_cache_dataset_by_name', {})
46-
monkeypatch.setattr(_creation_management, '_cache_kvs_by_id', {})
47-
monkeypatch.setattr(_creation_management, '_cache_kvs_by_name', {})
48-
monkeypatch.setattr(_creation_management, '_cache_rq_by_id', {})
49-
monkeypatch.setattr(_creation_management, '_cache_rq_by_name', {})
50-
51-
# Verify that the environment variable is set correctly
52-
assert os.environ.get('CRAWLEE_STORAGE_DIR') == str(tmp_path)
60+
61+
reset_globals()
5362

5463

5564
@pytest.fixture()
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
from __future__ import annotations
2+
3+
from datetime import datetime, timezone
4+
from typing import Callable
5+
6+
import pytest
7+
8+
from crawlee.models import Request
9+
from crawlee.storage_client_manager import StorageClientManager
10+
from crawlee.storages.key_value_store import KeyValueStore
11+
from crawlee.storages.request_queue import RequestQueue
12+
13+
14+
@pytest.mark.parametrize('purge_on_start', [True, False])
15+
async def test_actor_memory_storage_client_key_value_store_e2e(
16+
monkeypatch: pytest.MonkeyPatch,
17+
purge_on_start: bool, # noqa: FBT001
18+
reset_globals: Callable[[], None],
19+
) -> None:
20+
"""This test simulates two clean runs using memory storage.
21+
The second run attempts to access data created by the first one.
22+
We run 2 configurations with different `purge_on_start`."""
23+
# Configure purging env var
24+
monkeypatch.setenv('CRAWLEE_PURGE_ON_START', f'{int(purge_on_start)}')
25+
# Store old storage client so we have the object reference for comparison
26+
old_client = StorageClientManager.get_storage_client()
27+
28+
old_default_kvs = await KeyValueStore.open()
29+
old_non_default_kvs = await KeyValueStore.open(name='non-default')
30+
# Create data in default and non-default key-value store
31+
await old_default_kvs.set_value('test', 'default value')
32+
await old_non_default_kvs.set_value('test', 'non-default value')
33+
34+
# We simulate another clean run, we expect the memory storage to read from the local data directory
35+
# Default storages are purged based on purge_on_start parameter.
36+
reset_globals()
37+
38+
# Check if we're using a different memory storage instance
39+
assert old_client is not StorageClientManager.get_storage_client()
40+
default_kvs = await KeyValueStore.open()
41+
assert default_kvs is not old_default_kvs
42+
non_default_kvs = await KeyValueStore.open(name='non-default')
43+
assert non_default_kvs is not old_non_default_kvs
44+
default_value = await default_kvs.get_value('test')
45+
46+
if purge_on_start:
47+
assert default_value is None
48+
else:
49+
assert default_value == 'default value'
50+
51+
assert await non_default_kvs.get_value('test') == 'non-default value'
52+
53+
54+
@pytest.mark.parametrize('purge_on_start', [True, False])
55+
async def test_actor_memory_storage_client_request_queue_e2e(
56+
monkeypatch: pytest.MonkeyPatch,
57+
purge_on_start: bool, # noqa: FBT001
58+
reset_globals: Callable[[], None],
59+
) -> None:
60+
"""This test simulates two clean runs using memory storage.
61+
The second run attempts to access data created by the first one.
62+
We run 2 configurations with different `purge_on_start`."""
63+
# Configure purging env var
64+
monkeypatch.setenv('CRAWLEE_PURGE_ON_START', f'{int(purge_on_start)}')
65+
66+
# Add some requests to the default queue
67+
default_queue = await RequestQueue.open()
68+
for i in range(6):
69+
# [0, 3] <- nothing special
70+
# [1, 4] <- forefront=True
71+
# [2, 5] <- handled=True
72+
request_url = f'http://example.com/{i}'
73+
forefront = i % 3 == 1
74+
was_handled = i % 3 == 2
75+
await default_queue.add_request(
76+
Request.from_url(
77+
unique_key=str(i),
78+
url=request_url,
79+
handled_at=datetime.now(timezone.utc) if was_handled else None,
80+
),
81+
forefront=forefront,
82+
)
83+
84+
# We simulate another clean run, we expect the memory storage to read from the local data directory
85+
# Default storages are purged based on purge_on_start parameter.
86+
reset_globals()
87+
88+
# Add some more requests to the default queue
89+
default_queue = await RequestQueue.open()
90+
for i in range(6, 12):
91+
# [6, 9] <- nothing special
92+
# [7, 10] <- forefront=True
93+
# [8, 11] <- handled=True
94+
request_url = f'http://example.com/{i}'
95+
forefront = i % 3 == 1
96+
was_handled = i % 3 == 2
97+
await default_queue.add_request(
98+
Request.from_url(
99+
unique_key=str(i),
100+
url=request_url,
101+
handled_at=datetime.now(timezone.utc) if was_handled else None,
102+
),
103+
forefront=forefront,
104+
)
105+
106+
queue_info = await default_queue.get_info()
107+
assert queue_info is not None
108+
109+
# If the queue was purged between the runs, only the requests from the second run should be present,
110+
# in the right order
111+
if purge_on_start:
112+
assert queue_info.total_request_count == 6
113+
assert queue_info.handled_request_count == 2
114+
115+
expected_pending_request_order = [10, 7, 6, 9]
116+
# If the queue was NOT purged between the runs, all the requests should be in the queue in the right order
117+
else:
118+
assert queue_info.total_request_count == 12
119+
assert queue_info.handled_request_count == 4
120+
121+
expected_pending_request_order = [10, 7, 4, 1, 0, 3, 6, 9]
122+
123+
actual_requests = list[Request]()
124+
while req := await default_queue.fetch_next_request():
125+
actual_requests.append(req)
126+
127+
assert [int(req.unique_key) for req in actual_requests] == expected_pending_request_order
128+
assert [req.url for req in actual_requests] == [f'http://example.com/{req.unique_key}' for req in actual_requests]

0 commit comments

Comments
 (0)