Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/crawlee/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ async def push_data(
**kwargs: Unpack[PushDataKwargs],
) -> None:
"""Track a call to the `push_data` context helper."""
from crawlee.storages._dataset import Dataset

await Dataset.check_and_serialize(data)

self.push_data_calls.append(
PushDataFunctionCall(
data=data,
Expand Down
11 changes: 6 additions & 5 deletions src/crawlee/storages/_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,11 @@ async def push_data(self, data: JsonSerializable, **kwargs: Unpack[PushDataKwarg
"""
# Handle singular items
if not isinstance(data, list):
items = await self._check_and_serialize(data)
items = await self.check_and_serialize(data)
return await self._resource_client.push_items(items, **kwargs)

# Handle lists
payloads_generator = (await self._check_and_serialize(item, index) for index, item in enumerate(data))
payloads_generator = (await self.check_and_serialize(item, index) for index, item in enumerate(data))

# Invoke client in series to preserve the order of data
async for items in self._chunk_by_size(payloads_generator):
Expand Down Expand Up @@ -417,7 +417,8 @@ async def iterate_items(
):
yield item

async def _check_and_serialize(self, item: JsonSerializable, index: int | None = None) -> str:
@classmethod
async def check_and_serialize(cls, item: JsonSerializable, index: int | None = None) -> str:
"""Serializes a given item to JSON, checks its serializability and size against a limit.

Args:
Expand All @@ -438,8 +439,8 @@ async def _check_and_serialize(self, item: JsonSerializable, index: int | None =
raise ValueError(f'Data item{s}is not serializable to JSON.') from exc

payload_size = ByteSize(len(payload.encode('utf-8')))
if payload_size > self._EFFECTIVE_LIMIT_SIZE:
raise ValueError(f'Data item{s}is too large (size: {payload_size}, limit: {self._EFFECTIVE_LIMIT_SIZE})')
if payload_size > cls._EFFECTIVE_LIMIT_SIZE:
raise ValueError(f'Data item{s}is too large (size: {payload_size}, limit: {cls._EFFECTIVE_LIMIT_SIZE})')

return payload

Expand Down
11 changes: 11 additions & 0 deletions tests/unit/basic_crawler/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,17 @@ async def handler(context: BasicCrawlingContext) -> None:
assert exported_json_str == expected_json_str


async def test_crawler_push_data_over_limit() -> None:
crawler = BasicCrawler()

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
await context.push_data({'hello': 'world' * 3 * 1024 * 1024})

stats = await crawler.run(['http://example.tld/1'])
assert stats.requests_failed == 1


async def test_context_update_kv_store() -> None:
crawler = BasicCrawler()

Expand Down
Loading