Skip to content

Conversation

@fressi-elastic
Copy link
Contributor

@fressi-elastic fressi-elastic commented Sep 18, 2025

This implements a new AsyncActor base class intended to be used within an asyncio event loop.

This actor supports asynchronous requests (that can be sent either from an external system or from another actor), and handle these requests from a non blocking async method that will be executed from inside an event loop owned by the actor itself.

This introduce some new global functions to help implementing a more dynamic stateless workflow. According to the context (inside of outside an actor these will be implemented using an Actor or an ActorSystem).

  • actors.create(cls, requirements): it allow creating an actor from anywhere. The actor will be created from the current actor or using a pre-initialized ActorSystem.
  • actors.send(destination, message): a wrapper around AsyncActor.send or ActorSystem.tell depending if it is called from an a Actor or not.
  • actors.request(destination, message, timeout): a wrapper around Actor.send or ActorSystem.ask that allows to send a request that will return an awaitable response. It is implemented as an async co-routine and it is intended to be used within an event loop. Target actor should be a subclass of AsyncActor. It returns an asyncio.Future intended for retrieving the result or error of the actor receiveMessage method. In the case of running inside of an actor the implementation is really asynchronous, so it means actors can truly communicate each other using non blocking asyncio tasks. This should help implementing stateless workflows that would not force setting static variables inside actor instances to keep track of the current execution state.
  • actors.shutdown: it shut downs the current actor system or the current actor. It is intended to be used as atexit callback to nicely clean up actor system or actors async tasks.

Following the workflow of actors.request function:
[TBD]

@fressi-elastic
Copy link
Contributor Author

This is in an early working stage. It requires more testing and better documentation.

Copy link
Contributor

@gbanasiak gbanasiak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks for this work. A lot of effort was put into this.

I'd like to capture some context for posterity.

This work was triggered by an intention to speed-up corpus download and decompression. One of the speed-up ideas is to enable multi-part download instead of existing single-part download. Today, corpus files are downloaded during track processing via DefaultTrackPreparator. The download is concurrent thanks to Thespian actor structure. The work is dispatched by TrackPreparationActor and executed in a single background thread in TaskExecutionActor (see this diagram). There are as many TrackPreparationActor instances as load-driver hosts, and as many TaskExecutionActor instances as there are CPU cores on load-driver hosts. The tasks to be executed are sent via Thespian DoTask messages. Each message contains WorkerTask object with a callable and its parameters. The list of tasks is populated using processor.on_prepare_track() generator (see here). Processors are taken from TrackProcessorRegistry. There is only one TrackProcessor implementation that has meaningful on_prepare_track() method defined which is the aforementioned DefaultTrackPreparator. The DefaultTrackProcessor.on_prepare_track() generator yields a separate callable for every document corpus defined in corpora section in track definition.

After reviewing this structure it dawned on me that the easiest way to implement yet another level of concurrency (multi-part download) would be by adding Async I/O loop in TaskExecutionActor background thread similarly to what Worker actor is doing via AsyncIoAdapter (see this diagram), and introducing asynchronous version of TrackProcessor.

@fressi-elastic Have you considered the above option? Can you describe briefly how you intend to use the new AsyncActor class in the context of the structure described above? Will the download be still done via DefaultTrackPreparator?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants