diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 13f87f1a..bd281aff 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -354,6 +354,14 @@ def event_manager(self) -> EventManager: def _charging_manager_implementation(self) -> ChargingManagerImplementation: return ChargingManagerImplementation(self.configuration, self.apify_client) + @cached_property + def _charge_lock(self) -> asyncio.Lock: + """Lock to synchronize charge operations. + + Prevents race conditions between Actor.charge and Actor.push_data calls. + """ + return asyncio.Lock() + @cached_property def _storage_client(self) -> SmartApifyStorageClient: """Storage client used by the Actor. @@ -606,30 +614,34 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non data = data if isinstance(data, list) else [data] - max_charged_count = ( - self.get_charging_manager().calculate_max_event_charge_count_within_limit(charged_event_name) - if charged_event_name is not None - else None - ) + # No charging, just push the data without locking. + if charged_event_name is None: + dataset = await self.open_dataset() + await dataset.push_data(data) + return None - # Push as many items as we can charge for - pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data) + # If charging is requested, acquire the charge lock to prevent race conditions between concurrent + # push_data calls. We need to hold the lock for the entire push_data + charge sequence. + async with self._charge_lock: + max_charged_count = self.get_charging_manager().calculate_max_event_charge_count_within_limit( + charged_event_name + ) - dataset = await self.open_dataset() + # Push as many items as we can charge for. + pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data) - if pushed_items_count < len(data): - await dataset.push_data(data[:pushed_items_count]) - elif pushed_items_count > 0: - await dataset.push_data(data) + dataset = await self.open_dataset() + + if pushed_items_count < len(data): + await dataset.push_data(data[:pushed_items_count]) + elif pushed_items_count > 0: + await dataset.push_data(data) - if charged_event_name: return await self.get_charging_manager().charge( event_name=charged_event_name, count=pushed_items_count, ) - return None - async def get_input(self) -> Any: """Get the Actor input value from the default key-value store associated with the current Actor run.""" self._raise_if_not_initialized() @@ -692,7 +704,9 @@ async def charge(self, event_name: str, count: int = 1) -> ChargeResult: count: Number of events to charge for. """ self._raise_if_not_initialized() - return await self.get_charging_manager().charge(event_name, count) + # Acquire lock to prevent race conditions with concurrent charge/push_data calls. + async with self._charge_lock: + return await self.get_charging_manager().charge(event_name, count) @overload def on( diff --git a/src/apify/_charging.py b/src/apify/_charging.py index e9aa90b9..2cf16284 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -239,9 +239,8 @@ def calculate_chargeable() -> dict[str, int | None]: pricing_info = self._pricing_info.get( event_name, PricingInfoItem( - price=Decimal() - if self._is_at_home - else Decimal(1), # Use a nonzero price for local development so that the maximum budget can be reached, + # Use a nonzero price for local development so that the maximum budget can be reached. + price=Decimal() if self._is_at_home else Decimal(1), title=f"Unknown event '{event_name}'", ), )