diff --git a/docs/upgrading/upgrading_to_v1.md b/docs/upgrading/upgrading_to_v1.md index c1c489fba5..3a03e8e1e7 100644 --- a/docs/upgrading/upgrading_to_v1.md +++ b/docs/upgrading/upgrading_to_v1.md @@ -102,6 +102,7 @@ Some older methods have been removed or replaced: - `from_storage_object` - removed; use the `open` method with either a `name` or `id` instead. - `get_info` and `storage_object` - replaced by the new `get_metadata` method. +- `get_request` has argument `unique_key` instead of `request_id` as the `id` field was removed from the `Request`. - `set_metadata` method has been removed. Some changes in the related model classes: @@ -200,6 +201,10 @@ We drop support for Python 3.9. The minimum supported version is now Python 3.10 The fields `persist_storage` and `persist_metadata` have been removed from the `Configuration`. Persistence is now determined only by which storage client class you use. +### Changes in Request + +`Request` objects no longer have `id` field and all its usages have been transferred to `unique_key` field. + ### Changes in HttpResponse The method `HttpResponse.read` is now asynchronous. This affects all HTTP-based crawlers. diff --git a/src/crawlee/_request.py b/src/crawlee/_request.py index 75859c1521..adf542f35d 100644 --- a/src/crawlee/_request.py +++ b/src/crawlee/_request.py @@ -11,7 +11,7 @@ from crawlee._types import EnqueueStrategy, HttpHeaders, HttpMethod, HttpPayload, JsonSerializable from crawlee._utils.crypto import crypto_random_object_id from crawlee._utils.docs import docs_group -from crawlee._utils.requests import compute_unique_key, unique_key_to_request_id +from crawlee._utils.requests import compute_unique_key from crawlee._utils.urls import validate_http_url if TYPE_CHECKING: @@ -165,10 +165,6 @@ class Request(BaseModel): model_config = ConfigDict(populate_by_name=True) - id: str - """A unique identifier for the request. Note that this is not used for deduplication, and should not be confused - with `unique_key`.""" - unique_key: Annotated[str, Field(alias='uniqueKey')] """A unique key identifying the request. Two requests with the same `unique_key` are considered as pointing to the same URL. @@ -239,7 +235,6 @@ def from_url( label: str | None = None, session_id: str | None = None, unique_key: str | None = None, - id: str | None = None, keep_url_fragment: bool = False, use_extended_unique_key: bool = False, always_enqueue: bool = False, @@ -264,8 +259,6 @@ def from_url( raised. unique_key: A unique key identifying the request. If not provided, it is automatically computed based on the URL and other parameters. Requests with the same `unique_key` are treated as identical. - id: A unique identifier for the request. If not provided, it is automatically generated from the - `unique_key`. keep_url_fragment: Determines whether the URL fragment (e.g., `#section`) should be included in the `unique_key` computation. This is only relevant when `unique_key` is not provided. use_extended_unique_key: Determines whether to include the HTTP method, ID Session and payload in the @@ -296,12 +289,9 @@ def from_url( if always_enqueue: unique_key = f'{unique_key}_{crypto_random_object_id()}' - id = id or unique_key_to_request_id(unique_key) - request = cls( url=url, unique_key=unique_key, - id=id, method=method, headers=headers, payload=payload, diff --git a/src/crawlee/_utils/requests.py b/src/crawlee/_utils/requests.py index a3b152a51d..fa31d4621d 100644 --- a/src/crawlee/_utils/requests.py +++ b/src/crawlee/_utils/requests.py @@ -1,8 +1,5 @@ from __future__ import annotations -import re -from base64 import b64encode -from hashlib import sha256 from logging import getLogger from typing import TYPE_CHECKING @@ -16,29 +13,6 @@ logger = getLogger(__name__) -def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) -> str: - """Generate a deterministic request ID based on a unique key. - - Args: - unique_key: The unique key to convert into a request ID. - request_id_length: The length of the request ID. - - Returns: - A URL-safe, truncated request ID based on the unique key. - """ - # Encode the unique key and compute its SHA-256 hash - hashed_key = sha256(unique_key.encode('utf-8')).digest() - - # Encode the hash in base64 and decode it to get a string - base64_encoded = b64encode(hashed_key).decode('utf-8') - - # Remove characters that are not URL-safe ('+', '/', or '=') - url_safe_key = re.sub(r'(\+|\/|=)', '', base64_encoded) - - # Truncate the key to the desired length - return url_safe_key[:request_id_length] - - def normalize_url(url: str, *, keep_url_fragment: bool = False) -> str: """Normalize a URL. diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index 99d56bce95..d772934df0 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -1057,7 +1057,7 @@ async def _handle_request_retries( max_retries=3, ) await self._handle_failed_request(context, error) - self._statistics.record_request_processing_failure(request.id or request.unique_key) + self._statistics.record_request_processing_failure(request.unique_key) async def _handle_request_error(self, context: TCrawlingContext | BasicCrawlingContext, error: Exception) -> None: try: @@ -1274,7 +1274,7 @@ async def __run_task_function(self) -> None: if not (await self._is_allowed_based_on_robots_txt_file(request.url)): self._logger.warning( - f'Skipping request {request.url} ({request.id}) because it is disallowed based on robots.txt' + f'Skipping request {request.url} ({request.unique_key}) because it is disallowed based on robots.txt' ) await self._handle_skipped_request(request, 'robots_txt', need_mark=True) @@ -1300,8 +1300,7 @@ async def __run_task_function(self) -> None: ) self._context_result_map[context] = result - statistics_id = request.id or request.unique_key - self._statistics.record_request_processing_start(statistics_id) + self._statistics.record_request_processing_start(request.unique_key) try: request.state = RequestState.REQUEST_HANDLER @@ -1328,7 +1327,7 @@ async def __run_task_function(self) -> None: if context.session and context.session.is_usable: context.session.mark_good() - self._statistics.record_request_processing_finish(statistics_id) + self._statistics.record_request_processing_finish(request.unique_key) except RequestCollisionError as request_error: context.request.no_retry = True @@ -1374,7 +1373,7 @@ async def __run_task_function(self) -> None: ) await self._handle_failed_request(context, session_error) - self._statistics.record_request_processing_failure(statistics_id) + self._statistics.record_request_processing_failure(request.unique_key) except ContextPipelineInterruptedError as interrupted_error: self._logger.debug('The context pipeline was interrupted', exc_info=interrupted_error) diff --git a/src/crawlee/request_loaders/_request_list.py b/src/crawlee/request_loaders/_request_list.py index 4f8f8d947a..cd64752cf3 100644 --- a/src/crawlee/request_loaders/_request_list.py +++ b/src/crawlee/request_loaders/_request_list.py @@ -166,7 +166,7 @@ async def fetch_next_request(self) -> Request | None: return None state = await self._get_state() - state.in_progress.add(self._next[0].id) + state.in_progress.add(self._next[0].unique_key) self._assumed_total_count += 1 next_request = self._next[0] @@ -183,7 +183,7 @@ async def fetch_next_request(self) -> Request | None: async def mark_request_as_handled(self, request: Request) -> None: self._handled_count += 1 state = await self._get_state() - state.in_progress.remove(request.id) + state.in_progress.remove(request.unique_key) async def _ensure_next_request(self) -> None: await self._get_state() diff --git a/src/crawlee/request_loaders/_sitemap_request_loader.py b/src/crawlee/request_loaders/_sitemap_request_loader.py index f50ad6d816..def45c7cf7 100644 --- a/src/crawlee/request_loaders/_sitemap_request_loader.py +++ b/src/crawlee/request_loaders/_sitemap_request_loader.py @@ -153,15 +153,15 @@ async def fetch_next_request(self) -> Request | None: url = await self._url_queue.get() request = Request.from_url(url) - self._in_progress.add(request.id) + self._in_progress.add(request.unique_key) return request return None async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: """Mark a request as successfully handled.""" - if request.id in self._in_progress: - self._in_progress.remove(request.id) + if request.unique_key in self._in_progress: + self._in_progress.remove(request.unique_key) self._handled_count += 1 return None diff --git a/src/crawlee/storage_clients/_base/_request_queue_client.py b/src/crawlee/storage_clients/_base/_request_queue_client.py index c2fa638643..a993fcfdb3 100644 --- a/src/crawlee/storage_clients/_base/_request_queue_client.py +++ b/src/crawlee/storage_clients/_base/_request_queue_client.py @@ -63,11 +63,11 @@ async def add_batch_of_requests( """ @abstractmethod - async def get_request(self, request_id: str) -> Request | None: + async def get_request(self, unique_key: str) -> Request | None: """Retrieve a request from the queue. Args: - request_id: ID of the request to retrieve. + unique_key: Unique key of the request to retrieve. Returns: The retrieved request, or None, if it did not exist. diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index f5e0165d68..695392ad08 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -5,6 +5,7 @@ import shutil from collections import deque from datetime import datetime, timezone +from hashlib import sha256 from logging import getLogger from pathlib import Path from typing import TYPE_CHECKING @@ -43,16 +44,16 @@ class RequestQueueState(BaseModel): """Counter for forefront request ordering.""" forefront_requests: dict[str, int] = {} - """Mapping of forefront request IDs to their sequence numbers.""" + """Mapping of forefront request unique keys to their sequence numbers.""" regular_requests: dict[str, int] = {} - """Mapping of regular request IDs to their sequence numbers.""" + """Mapping of regular request unique keys to their sequence numbers.""" in_progress_requests: set[str] = set() - """Set of request IDs currently being processed.""" + """Set of request unique keys currently being processed.""" handled_requests: set[str] = set() - """Set of request IDs that have been handled.""" + """Set of request unique keys that have been handled.""" class FileSystemRequestQueueClient(RequestQueueClient): @@ -331,17 +332,17 @@ async def add_batch_of_requests( # If there is no existing request with the same unique key, add the new request. if existing_request is None: - request_path = self._get_request_path(request.id) + request_path = self._get_request_path(request.unique_key) # Add sequence number to ensure FIFO ordering using state. if forefront: sequence_number = state.forefront_sequence_counter state.forefront_sequence_counter += 1 - state.forefront_requests[request.id] = sequence_number + state.forefront_requests[request.unique_key] = sequence_number else: sequence_number = state.sequence_counter state.sequence_counter += 1 - state.regular_requests[request.id] = sequence_number + state.regular_requests[request.unique_key] = sequence_number # Save the clean request without extra fields request_data = await json_dumps(request.model_dump()) @@ -352,11 +353,10 @@ async def add_batch_of_requests( new_pending_request_count += 1 # Add to our index for subsequent requests in this batch - existing_unique_keys[request.unique_key] = self._get_request_path(request.id) + existing_unique_keys[request.unique_key] = self._get_request_path(request.unique_key) processed_requests.append( ProcessedRequest( - id=request.id, unique_key=request.unique_key, was_already_present=False, was_already_handled=False, @@ -367,13 +367,12 @@ async def add_batch_of_requests( else: # Set the processed request flags. was_already_present = existing_request is not None - was_already_handled = existing_request.id in state.handled_requests + was_already_handled = existing_request.unique_key in state.handled_requests # If the request is already in the RQ and handled, just continue with the next one. if was_already_present and was_already_handled: processed_requests.append( ProcessedRequest( - id=existing_request.id, unique_key=request.unique_key, was_already_present=True, was_already_handled=True, @@ -385,22 +384,21 @@ async def add_batch_of_requests( # Update request type (forefront vs regular) in state if forefront: # Move from regular to forefront if needed - if existing_request.id in state.regular_requests: - state.regular_requests.pop(existing_request.id) - if existing_request.id not in state.forefront_requests: - state.forefront_requests[existing_request.id] = state.forefront_sequence_counter + if existing_request.unique_key in state.regular_requests: + state.regular_requests.pop(existing_request.unique_key) + if existing_request.unique_key not in state.forefront_requests: + state.forefront_requests[existing_request.unique_key] = state.forefront_sequence_counter state.forefront_sequence_counter += 1 elif ( - existing_request.id not in state.forefront_requests - and existing_request.id not in state.regular_requests + existing_request.unique_key not in state.forefront_requests + and existing_request.unique_key not in state.regular_requests ): # Keep as regular if not already forefront - state.regular_requests[existing_request.id] = state.sequence_counter + state.regular_requests[existing_request.unique_key] = state.sequence_counter state.sequence_counter += 1 processed_requests.append( ProcessedRequest( - id=existing_request.id, unique_key=request.unique_key, was_already_present=True, was_already_handled=False, @@ -437,17 +435,17 @@ async def add_batch_of_requests( ) @override - async def get_request(self, request_id: str) -> Request | None: + async def get_request(self, unique_key: str) -> Request | None: async with self._lock: - request_path = self._get_request_path(request_id) + request_path = self._get_request_path(unique_key) request = await self._parse_request_file(request_path) if request is None: - logger.warning(f'Request with ID "{request_id}" not found in the queue.') + logger.warning(f'Request with unique key "{unique_key}" not found in the queue.') return None state = self._state.current_value - state.in_progress_requests.add(request.id) + state.in_progress_requests.add(request.unique_key) await self._update_metadata(update_accessed_at=True) return request @@ -466,11 +464,11 @@ async def fetch_next_request(self) -> Request | None: candidate = self._request_cache.popleft() # Skip requests that are already in progress, however this should not happen. - if candidate.id not in state.in_progress_requests: + if candidate.unique_key not in state.in_progress_requests: next_request = candidate if next_request is not None: - state.in_progress_requests.add(next_request.id) + state.in_progress_requests.add(next_request.unique_key) return next_request @@ -481,8 +479,8 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | state = self._state.current_value # Check if the request is in progress. - if request.id not in state.in_progress_requests: - logger.warning(f'Marking request {request.id} as handled that is not in progress.') + if request.unique_key not in state.in_progress_requests: + logger.warning(f'Marking request {request.unique_key} as handled that is not in progress.') return None # Update the request's handled_at timestamp. @@ -490,18 +488,18 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | request.handled_at = datetime.now(timezone.utc) # Dump the updated request to the file. - request_path = self._get_request_path(request.id) + request_path = self._get_request_path(request.unique_key) if not await asyncio.to_thread(request_path.exists): - logger.warning(f'Request file for {request.id} does not exist, cannot mark as handled.') + logger.warning(f'Request file for {request.unique_key} does not exist, cannot mark as handled.') return None request_data = await json_dumps(request.model_dump()) await atomic_write(request_path, request_data) # Update state: remove from in-progress and add to handled. - state.in_progress_requests.discard(request.id) - state.handled_requests.add(request.id) + state.in_progress_requests.discard(request.unique_key) + state.handled_requests.add(request.unique_key) # Update RQ metadata. await self._update_metadata( @@ -512,7 +510,6 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | ) return ProcessedRequest( - id=request.id, unique_key=request.unique_key, was_already_present=True, was_already_handled=True, @@ -530,36 +527,36 @@ async def reclaim_request( state = self._state.current_value # Check if the request is in progress. - if request.id not in state.in_progress_requests: - logger.info(f'Reclaiming request {request.id} that is not in progress.') + if request.unique_key not in state.in_progress_requests: + logger.info(f'Reclaiming request {request.unique_key} that is not in progress.') return None - request_path = self._get_request_path(request.id) + request_path = self._get_request_path(request.unique_key) if not await asyncio.to_thread(request_path.exists): - logger.warning(f'Request file for {request.id} does not exist, cannot reclaim.') + logger.warning(f'Request file for {request.unique_key} does not exist, cannot reclaim.') return None # Update sequence number and state to ensure proper ordering. if forefront: # Remove from regular requests if it was there - state.regular_requests.pop(request.id, None) + state.regular_requests.pop(request.unique_key, None) sequence_number = state.forefront_sequence_counter state.forefront_sequence_counter += 1 - state.forefront_requests[request.id] = sequence_number + state.forefront_requests[request.unique_key] = sequence_number else: # Remove from forefront requests if it was there - state.forefront_requests.pop(request.id, None) + state.forefront_requests.pop(request.unique_key, None) sequence_number = state.sequence_counter state.sequence_counter += 1 - state.regular_requests[request.id] = sequence_number + state.regular_requests[request.unique_key] = sequence_number # Save the clean request without extra fields request_data = await json_dumps(request.model_dump()) await atomic_write(request_path, request_data) # Remove from in-progress. - state.in_progress_requests.discard(request.id) + state.in_progress_requests.discard(request.unique_key) # Update RQ metadata. await self._update_metadata( @@ -574,7 +571,6 @@ async def reclaim_request( self._request_cache.append(request) return ProcessedRequest( - id=request.id, unique_key=request.unique_key, was_already_present=True, was_already_handled=False, @@ -597,7 +593,7 @@ async def is_empty(self) -> bool: # If we have a cached requests, check them first (fast path). if self._request_cache: for req in self._request_cache: - if req.id not in state.handled_requests: + if req.unique_key not in state.handled_requests: self._is_empty_cache = False return False self._is_empty_cache = True @@ -617,16 +613,16 @@ async def is_empty(self) -> bool: self._is_empty_cache = True return True - def _get_request_path(self, request_id: str) -> Path: + def _get_request_path(self, unique_key: str) -> Path: """Get the path to a specific request file. Args: - request_id: The ID of the request. + unique_key: Unique key of the request. Returns: The path to the request file. """ - return self.path_to_rq / f'{request_id}.json' + return self.path_to_rq / f'{self._get_file_base_name_from_unique_key(unique_key)}.json' async def _update_metadata( self, @@ -699,23 +695,23 @@ async def _refresh_cache(self) -> None: continue # Skip handled requests - if request.id in state.handled_requests: + if request.unique_key in state.handled_requests: continue # Skip in-progress requests - if request.id in state.in_progress_requests: + if request.unique_key in state.in_progress_requests: continue # Determine if request is forefront or regular based on state - if request.id in state.forefront_requests: - sequence = state.forefront_requests[request.id] + if request.unique_key in state.forefront_requests: + sequence = state.forefront_requests[request.unique_key] forefront_requests.append((request, sequence)) - elif request.id in state.regular_requests: - sequence = state.regular_requests[request.id] + elif request.unique_key in state.regular_requests: + sequence = state.regular_requests[request.unique_key] regular_requests.append((request, sequence)) else: # Request not in state, skip it (might be orphaned) - logger.warning(f'Request {request.id} not found in state, skipping.') + logger.warning(f'Request {request.unique_key} not found in state, skipping.') continue # Sort forefront requests by sequence (newest first for LIFO behavior). @@ -807,11 +803,27 @@ async def _discover_existing_requests(self) -> None: continue # Add request to state as regular request (assign sequence numbers) - if request.id not in state.regular_requests and request.id not in state.forefront_requests: + if request.unique_key not in state.regular_requests and request.unique_key not in state.forefront_requests: # Assign as regular request with current sequence counter - state.regular_requests[request.id] = state.sequence_counter + state.regular_requests[request.unique_key] = state.sequence_counter state.sequence_counter += 1 # Check if request was already handled if request.handled_at is not None: - state.handled_requests.add(request.id) + state.handled_requests.add(request.unique_key) + + @staticmethod + def _get_file_base_name_from_unique_key(unique_key: str) -> str: + """Generate a deterministic file name for a unique_key. + + Args: + unique_key: Unique key to be used to generate filename. + + Returns: + A file name based on the unique_key. + """ + # hexdigest produces filenames compliant strings + hashed_key = sha256(unique_key.encode('utf-8')).hexdigest() + name_length = 15 + # Truncate the key to the desired length + return hashed_key[:name_length] diff --git a/src/crawlee/storage_clients/_memory/_request_queue_client.py b/src/crawlee/storage_clients/_memory/_request_queue_client.py index 3041edd9e2..cf8a91897e 100644 --- a/src/crawlee/storage_clients/_memory/_request_queue_client.py +++ b/src/crawlee/storage_clients/_memory/_request_queue_client.py @@ -50,9 +50,6 @@ def __init__( self._in_progress_requests = dict[str, Request]() """In-progress requests are those that have been fetched but not yet marked as handled or reclaimed.""" - self._requests_by_id = dict[str, Request]() - """ID -> Request mapping for fast lookup by request ID.""" - self._requests_by_unique_key = dict[str, Request]() """Unique key -> Request mapping for fast lookup by unique key.""" @@ -102,7 +99,6 @@ async def open( async def drop(self) -> None: self._pending_requests.clear() self._handled_requests.clear() - self._requests_by_id.clear() self._requests_by_unique_key.clear() self._in_progress_requests.clear() @@ -118,7 +114,6 @@ async def drop(self) -> None: async def purge(self) -> None: self._pending_requests.clear() self._handled_requests.clear() - self._requests_by_id.clear() self._requests_by_unique_key.clear() self._in_progress_requests.clear() @@ -147,7 +142,6 @@ async def add_batch_of_requests( if was_already_handled: processed_requests.append( ProcessedRequest( - id=request.id, unique_key=request.unique_key, was_already_present=True, was_already_handled=True, @@ -163,7 +157,6 @@ async def add_batch_of_requests( self._pending_requests.remove(existing_request) # Update indexes. - self._requests_by_id[request.id] = request self._requests_by_unique_key[request.unique_key] = request # Add updated request back to queue. @@ -179,7 +172,6 @@ async def add_batch_of_requests( self._pending_requests.append(request) # Update indexes. - self._requests_by_id[request.id] = request self._requests_by_unique_key[request.unique_key] = request await self._update_metadata( @@ -189,7 +181,6 @@ async def add_batch_of_requests( processed_requests.append( ProcessedRequest( - id=request.id, unique_key=request.unique_key, was_already_present=was_already_present, was_already_handled=False, @@ -213,25 +204,25 @@ async def fetch_next_request(self) -> Request | None: continue # Skip if already in progress (shouldn't happen, but safety check). - if request.id in self._in_progress_requests: + if request.unique_key in self._in_progress_requests: self._pending_requests.appendleft(request) break # Mark as in progress. - self._in_progress_requests[request.id] = request + self._in_progress_requests[request.unique_key] = request return request return None @override - async def get_request(self, request_id: str) -> Request | None: + async def get_request(self, unique_key: str) -> Request | None: await self._update_metadata(update_accessed_at=True) - return self._requests_by_id.get(request_id) + return self._requests_by_unique_key.get(unique_key) @override async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: # Check if the request is in progress. - if request.id not in self._in_progress_requests: + if request.unique_key not in self._in_progress_requests: return None # Set handled_at timestamp if not already set. @@ -239,14 +230,13 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | request.handled_at = datetime.now(timezone.utc) # Move request to handled storage. - self._handled_requests[request.id] = request + self._handled_requests[request.unique_key] = request - # Update indexes (keep the request in indexes for get_request to work). - self._requests_by_id[request.id] = request + # Update index (keep the request in indexes for get_request to work). self._requests_by_unique_key[request.unique_key] = request # Remove from in-progress. - del self._in_progress_requests[request.id] + del self._in_progress_requests[request.unique_key] # Update metadata. await self._update_metadata( @@ -256,7 +246,6 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | ) return ProcessedRequest( - id=request.id, unique_key=request.unique_key, was_already_present=True, was_already_handled=True, @@ -270,11 +259,11 @@ async def reclaim_request( forefront: bool = False, ) -> ProcessedRequest | None: # Check if the request is in progress. - if request.id not in self._in_progress_requests: + if request.unique_key not in self._in_progress_requests: return None # Remove from in-progress. - del self._in_progress_requests[request.id] + del self._in_progress_requests[request.unique_key] # Add request back to pending queue. if forefront: @@ -286,7 +275,6 @@ async def reclaim_request( await self._update_metadata(update_modified_at=True) return ProcessedRequest( - id=request.id, unique_key=request.unique_key, was_already_present=True, was_already_handled=False, diff --git a/src/crawlee/storage_clients/models.py b/src/crawlee/storage_clients/models.py index a5a431fd94..bac8a4baed 100644 --- a/src/crawlee/storage_clients/models.py +++ b/src/crawlee/storage_clients/models.py @@ -137,7 +137,9 @@ class ProcessedRequest(BaseModel): model_config = ConfigDict(populate_by_name=True) - id: Annotated[str, Field(alias='requestId')] + id: Annotated[str | None, Field(alias='requestId', default=None)] = None + """Internal representation of the request by the storage client. Only some clients use id.""" + unique_key: Annotated[str, Field(alias='uniqueKey')] was_already_present: Annotated[bool, Field(alias='wasAlreadyPresent')] was_already_handled: Annotated[bool, Field(alias='wasAlreadyHandled')] diff --git a/src/crawlee/storages/_request_queue.py b/src/crawlee/storages/_request_queue.py index 2434c2b11a..068b5135f0 100644 --- a/src/crawlee/storages/_request_queue.py +++ b/src/crawlee/storages/_request_queue.py @@ -223,16 +223,16 @@ async def fetch_next_request(self) -> Request | None: """ return await self._client.fetch_next_request() - async def get_request(self, request_id: str) -> Request | None: + async def get_request(self, unique_key: str) -> Request | None: """Retrieve a specific request from the queue by its ID. Args: - request_id: The ID of the request to retrieve. + unique_key: Unique key of the request to retrieve. Returns: The request with the specified ID, or `None` if no such request exists. """ - return await self._client.get_request(request_id) + return await self._client.get_request(unique_key) async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: """Mark a request as handled after successful processing. diff --git a/tests/unit/_utils/test_requests.py b/tests/unit/_utils/test_requests.py index 348ae95948..8198909592 100644 --- a/tests/unit/_utils/test_requests.py +++ b/tests/unit/_utils/test_requests.py @@ -3,42 +3,7 @@ import pytest from crawlee._types import HttpHeaders -from crawlee._utils.requests import compute_unique_key, normalize_url, unique_key_to_request_id - - -def test_unique_key_to_request_id_length() -> None: - unique_key = 'exampleKey123' - request_id = unique_key_to_request_id(unique_key, request_id_length=15) - assert len(request_id) == 15, 'Request ID should have the correct length.' - - -def test_unique_key_to_request_id_consistency() -> None: - unique_key = 'consistentKey' - request_id_1 = unique_key_to_request_id(unique_key) - request_id_2 = unique_key_to_request_id(unique_key) - assert request_id_1 == request_id_2, 'The same unique key should generate consistent request IDs.' - - -@pytest.mark.parametrize( - ('unique_key', 'expected_request_id'), - [ - ('abc', 'ungWv48BzpBQUDe'), - ('uniqueKey', 'xiWPs083cree7mH'), - ('', '47DEQpj8HBSaTIm'), - ('测试中文', 'lKPdJkdvw8MXEUp'), - ('test+/=', 'XZRQjhoG0yjfnYD'), - ], - ids=[ - 'basic_abc', - 'keyword_uniqueKey', - 'empty_string', - 'non_ascii_characters', - 'url_unsafe_characters', - ], -) -def test_unique_key_to_request_id_matches_known_values(unique_key: str, expected_request_id: str) -> None: - request_id = unique_key_to_request_id(unique_key) - assert request_id == expected_request_id, f'Unique key "{unique_key}" should produce the expected request ID.' +from crawlee._utils.requests import compute_unique_key, normalize_url @pytest.mark.parametrize( diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index 8df759a27f..7227504a95 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -150,7 +150,6 @@ async def test_add_request_string_url(rq: RequestQueue) -> None: result = await rq.add_request(url) # Verify request was added - assert result.id is not None assert result.unique_key is not None assert result.was_already_present is False assert result.was_already_handled is False @@ -168,7 +167,6 @@ async def test_add_request_object(rq: RequestQueue) -> None: result = await rq.add_request(request) # Verify request was added - assert result.id is not None assert result.unique_key is not None assert result.was_already_present is False assert result.was_already_handled is False @@ -371,12 +369,12 @@ async def test_get_request_by_id(rq: RequestQueue) -> None: """Test retrieving a request by its ID.""" # Add a request added_result = await rq.add_request('https://example.com') - request_id = added_result.id + unique_key = added_result.unique_key # Retrieve the request by ID - retrieved_request = await rq.get_request(request_id) + retrieved_request = await rq.get_request(unique_key) assert retrieved_request is not None - assert retrieved_request.id == request_id + assert retrieved_request.unique_key == unique_key assert retrieved_request.url == 'https://example.com' @@ -403,7 +401,7 @@ async def test_reclaim_request(rq: RequestQueue) -> None: # Verify we can fetch it again reclaimed_request = await rq.fetch_next_request() assert reclaimed_request is not None - assert reclaimed_request.id == request.id + assert reclaimed_request.unique_key == request.unique_key assert reclaimed_request.url == 'https://example.com'