Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,14 @@ def event_manager(self) -> EventManager:
def _charging_manager_implementation(self) -> ChargingManagerImplementation:
return ChargingManagerImplementation(self.configuration, self.apify_client)

@cached_property
def _charge_lock(self) -> asyncio.Lock:
"""Lock to synchronize charge operations.

Prevents race conditions between Actor.charge and Actor.push_data calls.
"""
return asyncio.Lock()

@cached_property
def _storage_client(self) -> SmartApifyStorageClient:
"""Storage client used by the Actor.
Expand Down Expand Up @@ -606,30 +614,34 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non

data = data if isinstance(data, list) else [data]

max_charged_count = (
self.get_charging_manager().calculate_max_event_charge_count_within_limit(charged_event_name)
if charged_event_name is not None
else None
)
# No charging, just push the data without locking.
if charged_event_name is None:
dataset = await self.open_dataset()
await dataset.push_data(data)
return None

# Push as many items as we can charge for
pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data)
# If charging is requested, acquire the charge lock to prevent race conditions between concurrent
# push_data calls. We need to hold the lock for the entire push_data + charge sequence.
async with self._charge_lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, this could still get interleaved with an Actor.charge call with an unrelated event, which would result in pushing more items than the user can afford.

A watertight solution would be to use a read-write lock where plain Actor.charge would acquire the read-lock and Actor.push_data would acquire the write-lock.

But since there is no built-in read-write lock for asyncio, we should probably just leave this the way it is 🙂

max_charged_count = self.get_charging_manager().calculate_max_event_charge_count_within_limit(
charged_event_name
)

dataset = await self.open_dataset()
# Push as many items as we can charge for.
pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data)

if pushed_items_count < len(data):
await dataset.push_data(data[:pushed_items_count])
elif pushed_items_count > 0:
await dataset.push_data(data)
dataset = await self.open_dataset()

if pushed_items_count < len(data):
await dataset.push_data(data[:pushed_items_count])
elif pushed_items_count > 0:
await dataset.push_data(data)

if charged_event_name:
return await self.get_charging_manager().charge(
event_name=charged_event_name,
count=pushed_items_count,
)

return None

async def get_input(self) -> Any:
"""Get the Actor input value from the default key-value store associated with the current Actor run."""
self._raise_if_not_initialized()
Expand Down Expand Up @@ -692,7 +704,9 @@ async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
count: Number of events to charge for.
"""
self._raise_if_not_initialized()
return await self.get_charging_manager().charge(event_name, count)
# Acquire lock to prevent race conditions with concurrent charge/push_data calls.
async with self._charge_lock:
return await self.get_charging_manager().charge(event_name, count)

@overload
def on(
Expand Down
5 changes: 2 additions & 3 deletions src/apify/_charging.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,8 @@ def calculate_chargeable() -> dict[str, int | None]:
pricing_info = self._pricing_info.get(
event_name,
PricingInfoItem(
price=Decimal()
if self._is_at_home
else Decimal(1), # Use a nonzero price for local development so that the maximum budget can be reached,
# Use a nonzero price for local development so that the maximum budget can be reached.
price=Decimal() if self._is_at_home else Decimal(1),
title=f"Unknown event '{event_name}'",
),
)
Expand Down