From 59ed5c7f3e455c3aa54f1bff4168896845dc97ee Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Tue, 2 Dec 2025 14:36:07 +0100 Subject: [PATCH 1/2] fix: Synchronize charge operations to prevent race conditions --- src/apify/_charging.py | 98 +++++++++++++++++++++++------------------- 1 file changed, 53 insertions(+), 45 deletions(-) diff --git a/src/apify/_charging.py b/src/apify/_charging.py index e9aa90b9..94ab3311 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import math from dataclasses import dataclass from datetime import datetime, timezone @@ -137,6 +138,11 @@ def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> No self._not_ppe_warning_printed = False self.active = False + self._charge_lock = asyncio.Lock() + """Lock to synchronize charge operations and prevent race conditions between Actor.charge + and Actor.push_data calls. + """ + async def __aenter__(self) -> None: """Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually.""" # Validate config @@ -223,58 +229,60 @@ def calculate_chargeable() -> dict[str, int | None]: chargeable_within_limit=calculate_chargeable(), ) - # START OF CRITICAL SECTION - no awaits here + # Acquire lock to prevent race conditions between concurrent charge calls + # (e.g., when Actor.push_data with charging is called concurrently with Actor.charge). + async with self._charge_lock: + # START OF CRITICAL SECTION - # Determine the maximum amount of events that can be charged within the budget - max_chargeable = self.calculate_max_event_charge_count_within_limit(event_name) - charged_count = min(count, max_chargeable if max_chargeable is not None else count) + # Determine the maximum amount of events that can be charged within the budget + max_chargeable = self.calculate_max_event_charge_count_within_limit(event_name) + charged_count = min(count, max_chargeable if max_chargeable is not None else count) - if charged_count == 0: - return ChargeResult( - event_charge_limit_reached=True, - charged_count=0, - chargeable_within_limit=calculate_chargeable(), - ) + if charged_count == 0: + return ChargeResult( + event_charge_limit_reached=True, + charged_count=0, + chargeable_within_limit=calculate_chargeable(), + ) - pricing_info = self._pricing_info.get( - event_name, - PricingInfoItem( - price=Decimal() - if self._is_at_home - else Decimal(1), # Use a nonzero price for local development so that the maximum budget can be reached, - title=f"Unknown event '{event_name}'", - ), - ) + pricing_info = self._pricing_info.get( + event_name, + PricingInfoItem( + # Use a nonzero price for local development so that the maximum budget can be reached. + price=Decimal() if self._is_at_home else Decimal(1), + title=f"Unknown event '{event_name}'", + ), + ) - # Update the charging state - self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal())) - self._charging_state[event_name].charge_count += charged_count - self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price + # Update the charging state + self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal())) + self._charging_state[event_name].charge_count += charged_count + self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price + + # If running on the platform, call the charge endpoint + if self._is_at_home: + if self._actor_run_id is None: + raise RuntimeError('Actor run ID not configured') + + if event_name in self._pricing_info: + await self._client.run(self._actor_run_id).charge(event_name, charged_count) + else: + logger.warning(f"Attempting to charge for an unknown event '{event_name}'") + + # Log the charged operation (if enabled) + if self._charging_log_dataset: + await self._charging_log_dataset.push_data( + { + 'event_name': event_name, + 'event_title': pricing_info.title, + 'event_price_usd': round(pricing_info.price, 3), + 'charged_count': charged_count, + 'timestamp': datetime.now(timezone.utc).isoformat(), + } + ) # END OF CRITICAL SECTION - # If running on the platform, call the charge endpoint - if self._is_at_home: - if self._actor_run_id is None: - raise RuntimeError('Actor run ID not configured') - - if event_name in self._pricing_info: - await self._client.run(self._actor_run_id).charge(event_name, charged_count) - else: - logger.warning(f"Attempting to charge for an unknown event '{event_name}'") - - # Log the charged operation (if enabled) - if self._charging_log_dataset: - await self._charging_log_dataset.push_data( - { - 'event_name': event_name, - 'event_title': pricing_info.title, - 'event_price_usd': round(pricing_info.price, 3), - 'charged_count': charged_count, - 'timestamp': datetime.now(timezone.utc).isoformat(), - } - ) - # If it is not possible to charge the full amount, log that fact if charged_count < count: subject = 'instance' if count == 1 else 'instances' From e34aeedd2e6434a829e57175535379b60533aea4 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Tue, 2 Dec 2025 14:51:03 +0100 Subject: [PATCH 2/2] address feedback --- src/apify/_actor.py | 46 +++++++++++++------- src/apify/_charging.py | 97 +++++++++++++++++++----------------------- 2 files changed, 74 insertions(+), 69 deletions(-) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 13f87f1a..bd281aff 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -354,6 +354,14 @@ def event_manager(self) -> EventManager: def _charging_manager_implementation(self) -> ChargingManagerImplementation: return ChargingManagerImplementation(self.configuration, self.apify_client) + @cached_property + def _charge_lock(self) -> asyncio.Lock: + """Lock to synchronize charge operations. + + Prevents race conditions between Actor.charge and Actor.push_data calls. + """ + return asyncio.Lock() + @cached_property def _storage_client(self) -> SmartApifyStorageClient: """Storage client used by the Actor. @@ -606,30 +614,34 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non data = data if isinstance(data, list) else [data] - max_charged_count = ( - self.get_charging_manager().calculate_max_event_charge_count_within_limit(charged_event_name) - if charged_event_name is not None - else None - ) + # No charging, just push the data without locking. + if charged_event_name is None: + dataset = await self.open_dataset() + await dataset.push_data(data) + return None - # Push as many items as we can charge for - pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data) + # If charging is requested, acquire the charge lock to prevent race conditions between concurrent + # push_data calls. We need to hold the lock for the entire push_data + charge sequence. + async with self._charge_lock: + max_charged_count = self.get_charging_manager().calculate_max_event_charge_count_within_limit( + charged_event_name + ) - dataset = await self.open_dataset() + # Push as many items as we can charge for. + pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data) - if pushed_items_count < len(data): - await dataset.push_data(data[:pushed_items_count]) - elif pushed_items_count > 0: - await dataset.push_data(data) + dataset = await self.open_dataset() + + if pushed_items_count < len(data): + await dataset.push_data(data[:pushed_items_count]) + elif pushed_items_count > 0: + await dataset.push_data(data) - if charged_event_name: return await self.get_charging_manager().charge( event_name=charged_event_name, count=pushed_items_count, ) - return None - async def get_input(self) -> Any: """Get the Actor input value from the default key-value store associated with the current Actor run.""" self._raise_if_not_initialized() @@ -692,7 +704,9 @@ async def charge(self, event_name: str, count: int = 1) -> ChargeResult: count: Number of events to charge for. """ self._raise_if_not_initialized() - return await self.get_charging_manager().charge(event_name, count) + # Acquire lock to prevent race conditions with concurrent charge/push_data calls. + async with self._charge_lock: + return await self.get_charging_manager().charge(event_name, count) @overload def on( diff --git a/src/apify/_charging.py b/src/apify/_charging.py index 94ab3311..2cf16284 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio import math from dataclasses import dataclass from datetime import datetime, timezone @@ -138,11 +137,6 @@ def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> No self._not_ppe_warning_printed = False self.active = False - self._charge_lock = asyncio.Lock() - """Lock to synchronize charge operations and prevent race conditions between Actor.charge - and Actor.push_data calls. - """ - async def __aenter__(self) -> None: """Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually.""" # Validate config @@ -229,60 +223,57 @@ def calculate_chargeable() -> dict[str, int | None]: chargeable_within_limit=calculate_chargeable(), ) - # Acquire lock to prevent race conditions between concurrent charge calls - # (e.g., when Actor.push_data with charging is called concurrently with Actor.charge). - async with self._charge_lock: - # START OF CRITICAL SECTION + # START OF CRITICAL SECTION - no awaits here - # Determine the maximum amount of events that can be charged within the budget - max_chargeable = self.calculate_max_event_charge_count_within_limit(event_name) - charged_count = min(count, max_chargeable if max_chargeable is not None else count) - - if charged_count == 0: - return ChargeResult( - event_charge_limit_reached=True, - charged_count=0, - chargeable_within_limit=calculate_chargeable(), - ) + # Determine the maximum amount of events that can be charged within the budget + max_chargeable = self.calculate_max_event_charge_count_within_limit(event_name) + charged_count = min(count, max_chargeable if max_chargeable is not None else count) - pricing_info = self._pricing_info.get( - event_name, - PricingInfoItem( - # Use a nonzero price for local development so that the maximum budget can be reached. - price=Decimal() if self._is_at_home else Decimal(1), - title=f"Unknown event '{event_name}'", - ), + if charged_count == 0: + return ChargeResult( + event_charge_limit_reached=True, + charged_count=0, + chargeable_within_limit=calculate_chargeable(), ) - # Update the charging state - self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal())) - self._charging_state[event_name].charge_count += charged_count - self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price - - # If running on the platform, call the charge endpoint - if self._is_at_home: - if self._actor_run_id is None: - raise RuntimeError('Actor run ID not configured') - - if event_name in self._pricing_info: - await self._client.run(self._actor_run_id).charge(event_name, charged_count) - else: - logger.warning(f"Attempting to charge for an unknown event '{event_name}'") - - # Log the charged operation (if enabled) - if self._charging_log_dataset: - await self._charging_log_dataset.push_data( - { - 'event_name': event_name, - 'event_title': pricing_info.title, - 'event_price_usd': round(pricing_info.price, 3), - 'charged_count': charged_count, - 'timestamp': datetime.now(timezone.utc).isoformat(), - } - ) + pricing_info = self._pricing_info.get( + event_name, + PricingInfoItem( + # Use a nonzero price for local development so that the maximum budget can be reached. + price=Decimal() if self._is_at_home else Decimal(1), + title=f"Unknown event '{event_name}'", + ), + ) + + # Update the charging state + self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal())) + self._charging_state[event_name].charge_count += charged_count + self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price # END OF CRITICAL SECTION + # If running on the platform, call the charge endpoint + if self._is_at_home: + if self._actor_run_id is None: + raise RuntimeError('Actor run ID not configured') + + if event_name in self._pricing_info: + await self._client.run(self._actor_run_id).charge(event_name, charged_count) + else: + logger.warning(f"Attempting to charge for an unknown event '{event_name}'") + + # Log the charged operation (if enabled) + if self._charging_log_dataset: + await self._charging_log_dataset.push_data( + { + 'event_name': event_name, + 'event_title': pricing_info.title, + 'event_price_usd': round(pricing_info.price, 3), + 'charged_count': charged_count, + 'timestamp': datetime.now(timezone.utc).isoformat(), + } + ) + # If it is not possible to charge the full amount, log that fact if charged_count < count: subject = 'instance' if count == 1 else 'instances'