Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 2 additions & 52 deletions src/crawlee/_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from collections.abc import Iterator, MutableMapping
from datetime import datetime
from decimal import Decimal
from enum import IntEnum
from typing import TYPE_CHECKING, Annotated, Any, cast

Expand Down Expand Up @@ -136,16 +135,8 @@ class BaseRequestData(BaseModel):
headers: Annotated[HttpHeaders, Field(default_factory=HttpHeaders)] = HttpHeaders()
"""HTTP request headers."""

payload: Annotated[
HttpPayload | None,
BeforeValidator(lambda v: v.encode() if isinstance(v, str) else v),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the BeforeValidator still needed when loading an old request file?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We need both so that payload is correctly serialized and deserialized to a file.
Updated the e2e test.

PlainSerializer(lambda v: v.decode() if isinstance(v, bytes) else v),
] = None
"""HTTP request payload.

TODO: Re-check the need for `Validator` and `Serializer` once the issue is resolved.
https://github.com/apify/crawlee-python/issues/94
"""
payload: HttpPayload | None = None
"""HTTP request payload."""

user_data: Annotated[
dict[str, JsonSerializable], # Internally, the model contains `UserData`, this is just for convenience
Expand Down Expand Up @@ -256,18 +247,6 @@ class Request(BaseRequestData):
"""A unique identifier for the request. Note that this is not used for deduplication, and should not be confused
with `unique_key`."""

json_: str | None = None
"""Deprecated internal field, do not use it.

Should be removed as part of https://github.com/apify/crawlee-python/issues/94.
"""

order_no: Decimal | None = None
"""Deprecated internal field, do not use it.

Should be removed as part of https://github.com/apify/crawlee-python/issues/94.
"""

@classmethod
def from_url(
cls,
Expand Down Expand Up @@ -436,35 +415,6 @@ def forefront(self) -> bool:
def forefront(self, new_value: bool) -> None:
self.crawlee_data.forefront = new_value

def __eq__(self, other: object) -> bool:
"""Compare all relevant fields of the `Request` class, excluding deprecated fields `json_` and `order_no`.

TODO: Remove this method once the issue is resolved.
https://github.com/apify/crawlee-python/issues/94
"""
if isinstance(other, Request):
return (
self.url == other.url
and self.unique_key == other.unique_key
and self.method == other.method
and self.headers == other.headers
and self.payload == other.payload
and self.user_data == other.user_data
and self.retry_count == other.retry_count
and self.no_retry == other.no_retry
and self.loaded_url == other.loaded_url
and self.handled_at == other.handled_at
and self.id == other.id
and self.label == other.label
and self.state == other.state
and self.max_retries == other.max_retries
and self.session_rotation_count == other.session_rotation_count
and self.enqueue_strategy == other.enqueue_strategy
and self.last_proxy_tier == other.last_proxy_tier
and self.forefront == other.forefront
)
return NotImplemented


class RequestWithLock(Request):
"""A crawling request with information about locks."""
Expand Down
27 changes: 27 additions & 0 deletions src/crawlee/base_storage_client/_models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from datetime import datetime
from decimal import Decimal
from typing import Annotated, Any, Generic

from pydantic import BaseModel, BeforeValidator, ConfigDict, Field
Expand Down Expand Up @@ -230,3 +231,29 @@ class BatchRequestsOperationResponse(BaseModel):

processed_requests: Annotated[list[ProcessedRequest], Field(alias='processedRequests')]
unprocessed_requests: Annotated[list[UnprocessedRequest], Field(alias='unprocessedRequests')]


@docs_group('Data structures')
class InternalRequest(BaseModel):
"""Represents an request in queue client."""

model_config = ConfigDict(populate_by_name=True)

id: str

unique_key: str

order_no: Decimal | None = None

handled_at: datetime | None

request: Annotated[Request, Field(alias='json_')]

@classmethod
def from_request(cls, request: Request, id: str, order_no: Decimal | None) -> InternalRequest:
return cls(
unique_key=request.unique_key, id=id, handled_at=request.handled_at, order_no=order_no, request=request
)

def to_request(self) -> Request:
return self.request
10 changes: 3 additions & 7 deletions src/crawlee/memory_storage_client/_creation_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import os
import pathlib
from datetime import datetime, timezone
from decimal import Decimal
from logging import getLogger
from typing import TYPE_CHECKING

Expand All @@ -15,10 +14,10 @@
from crawlee._utils.file import json_dumps
from crawlee.base_storage_client._models import (
DatasetMetadata,
InternalRequest,
KeyValueStoreMetadata,
KeyValueStoreRecord,
KeyValueStoreRecordMetadata,
Request,
RequestQueueMetadata,
)
from crawlee.storages._dataset import Dataset
Expand Down Expand Up @@ -361,7 +360,7 @@ def create_rq_from_directory(
pending_request_count = resource_info.pending_request_count

# Load request entries
entries: dict[str, Request] = {}
entries: dict[str, InternalRequest] = {}

for entry in os.scandir(storage_directory):
if entry.is_file():
Expand All @@ -371,10 +370,7 @@ def create_rq_from_directory(
with open(os.path.join(storage_directory, entry.name), encoding='utf-8') as f:
content = json.load(f)

request = Request(**content)
order_no = request.order_no
if order_no:
request.order_no = Decimal(order_no)
request = InternalRequest(**content)

entries[request.id] = request

Expand Down
82 changes: 29 additions & 53 deletions src/crawlee/memory_storage_client/_request_queue_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import asyncio
import json
import os
import shutil
from datetime import datetime, timezone
Expand All @@ -15,7 +14,6 @@
from crawlee._types import StorageTypes
from crawlee._utils.crypto import crypto_random_object_id
from crawlee._utils.data_processing import (
filter_out_none_values_recursively,
raise_on_duplicate_storage,
raise_on_non_existing_storage,
)
Expand All @@ -24,6 +22,7 @@
from crawlee.base_storage_client import BaseRequestQueueClient
from crawlee.base_storage_client._models import (
BatchRequestsOperationResponse,
InternalRequest,
ProcessedRequest,
ProlongRequestLockResponse,
Request,
Expand Down Expand Up @@ -200,16 +199,14 @@ async def list_head(self, *, limit: int | None = None) -> RequestQueueHead:

# Check that the request still exists and was not handled,
# in case something deleted it or marked it as handled concurrenctly
if request and request.order_no:
requests.append(request)

items = [request for item in requests if (request := self._json_to_request(item.json_))]
if request and not request.handled_at:
requests.append(request.to_request())

return RequestQueueHead(
limit=limit,
had_multiple_clients=False,
queue_modified_at=existing_queue_by_id._modified_at, # noqa: SLF001
items=items,
items=requests,
)

@override
Expand Down Expand Up @@ -240,39 +237,39 @@ async def add_request(
if existing_queue_by_id is None:
raise_on_non_existing_storage(StorageTypes.REQUEST_QUEUE, self.id)

request_model = await self._create_internal_request(request, forefront)
internal_request = await self._create_internal_request(request, forefront)

async with existing_queue_by_id.file_operation_lock:
existing_request_with_id = existing_queue_by_id.requests.get(request_model.id)
existing_request_with_id = existing_queue_by_id.requests.get(internal_request.id)

# We already have the request present, so we return information about it
if existing_request_with_id is not None:
await existing_queue_by_id.update_timestamps(has_been_modified=False)

return ProcessedRequest(
id=request_model.id,
unique_key=request_model.unique_key,
id=internal_request.id,
unique_key=internal_request.unique_key,
was_already_present=True,
was_already_handled=existing_request_with_id.order_no is None,
was_already_handled=existing_request_with_id.handled_at,
)

existing_queue_by_id.requests[request_model.id] = request_model
if request_model.order_no is None:
existing_queue_by_id.requests[internal_request.id] = internal_request
if internal_request.handled_at:
existing_queue_by_id.handled_request_count += 1
else:
existing_queue_by_id.pending_request_count += 1
await existing_queue_by_id.update_timestamps(has_been_modified=True)
await self._persist_single_request_to_storage(
request=request_model,
request=internal_request,
entity_directory=existing_queue_by_id.resource_directory,
persist_storage=self._memory_storage_client.persist_storage,
)

# We return was_already_handled=False even though the request may have been added as handled,
# because that's how API behaves.
return ProcessedRequest(
id=request_model.id,
unique_key=request_model.unique_key,
id=internal_request.id,
unique_key=internal_request.unique_key,
was_already_present=False,
was_already_handled=False,
)
Expand All @@ -292,8 +289,8 @@ async def get_request(self, request_id: str) -> Request | None:
async with existing_queue_by_id.file_operation_lock:
await existing_queue_by_id.update_timestamps(has_been_modified=False)

request: Request = existing_queue_by_id.requests.get(request_id)
return self._json_to_request(request.json_ if request is not None else None)
internal_request: InternalRequest = existing_queue_by_id.requests.get(request_id)
return internal_request.to_request() if internal_request else None

@override
async def update_request(
Expand All @@ -312,10 +309,10 @@ async def update_request(
if existing_queue_by_id is None:
raise_on_non_existing_storage(StorageTypes.REQUEST_QUEUE, self.id)

request_model = await self._create_internal_request(request, forefront)
internal_request = await self._create_internal_request(request, forefront)

# First we need to check the existing request to be able to return information about its handled state.
existing_request = existing_queue_by_id.requests.get(request_model.id)
existing_request = existing_queue_by_id.requests.get(internal_request.id)

# Undefined means that the request is not present in the queue.
# We need to insert it, to behave the same as API.
Expand All @@ -325,11 +322,12 @@ async def update_request(
async with existing_queue_by_id.file_operation_lock:
# When updating the request, we need to make sure that
# the handled counts are updated correctly in all cases.
existing_queue_by_id.requests[request_model.id] = request_model
existing_queue_by_id.requests[internal_request.id] = internal_request

pending_count_adjustment = 0
is_request_handled_state_changing = not isinstance(existing_request.order_no, type(request_model.order_no))
request_was_handled_before_update = existing_request.order_no is None
is_request_handled_state_changing = existing_request.handled_at != internal_request.handled_at

request_was_handled_before_update = existing_request.handled_at is not None

# We add 1 pending request if previous state was handled
if is_request_handled_state_changing:
Expand All @@ -339,14 +337,14 @@ async def update_request(
existing_queue_by_id.handled_request_count -= pending_count_adjustment
await existing_queue_by_id.update_timestamps(has_been_modified=True)
await self._persist_single_request_to_storage(
request=request_model,
request=internal_request,
entity_directory=existing_queue_by_id.resource_directory,
persist_storage=self._memory_storage_client.persist_storage,
)

return ProcessedRequest(
id=request_model.id,
unique_key=request_model.unique_key,
id=internal_request.id,
unique_key=internal_request.unique_key,
was_already_present=True,
was_already_handled=request_was_handled_before_update,
)
Expand All @@ -368,7 +366,7 @@ async def delete_request(self, request_id: str) -> None:

if request:
del existing_queue_by_id.requests[request_id]
if request.order_no is None:
if request.handled_at:
existing_queue_by_id.handled_request_count -= 1
else:
existing_queue_by_id.pending_request_count -= 1
Expand Down Expand Up @@ -453,7 +451,7 @@ async def update_timestamps(self, *, has_been_modified: bool) -> None:
async def _persist_single_request_to_storage(
self,
*,
request: Request,
request: InternalRequest,
entity_directory: str,
persist_storage: bool,
) -> None:
Expand Down Expand Up @@ -501,18 +499,7 @@ async def _delete_request_file_from_storage(self, *, request_id: str, entity_dir
file_path = os.path.join(entity_directory, f'{request_id}.json')
await force_remove(file_path)

def _json_to_request(self, request_json: str | None) -> Request | None:
if request_json is None:
return None

request_dict = filter_out_none_values_recursively(json.loads(request_json))

if request_dict is None:
return None

return Request.model_validate(request_dict)

async def _create_internal_request(self, request: Request, forefront: bool | None) -> Request:
async def _create_internal_request(self, request: Request, forefront: bool | None) -> InternalRequest:
order_no = self._calculate_order_no(request, forefront)
id = unique_key_to_request_id(request.unique_key)

Expand All @@ -521,18 +508,7 @@ async def _create_internal_request(self, request: Request, forefront: bool | Non
f'The request ID does not match the ID from the unique_key (request.id={request.id}, id={id}).'
)

request_kwargs = {
**(request.model_dump()),
'id': id,
'order_no': order_no,
}

del request_kwargs['json_']

return Request(
**request_kwargs,
json_=await json_dumps(request_kwargs),
)
return InternalRequest.from_request(request=request, id=id, order_no=order_no)

def _calculate_order_no(self, request: Request, forefront: bool | None) -> Decimal | None:
if request.handled_at is not None:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/basic_crawler/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import asyncio
import json
import logging
import os
from collections import Counter
from dataclasses import dataclass
from datetime import timedelta
import os
from pathlib import Path
from typing import TYPE_CHECKING, Any
from unittest.mock import AsyncMock, Mock
Expand Down
Loading