Skip to content

feat: add support for Beacon API Event Stream#1848

Merged
KolbyML merged 2 commits into
ethereum:masterfrom
KolbyML:add-event-stream
May 23, 2025
Merged

feat: add support for Beacon API Event Stream#1848
KolbyML merged 2 commits into
ethereum:masterfrom
KolbyML:add-event-stream

Conversation

@KolbyML
Copy link
Copy Markdown
Member

@KolbyML KolbyML commented May 22, 2025

What was wrong?

This is a chunk from #1846

We need access the the CL's Beacon API event stream, to drive the ephemeral header bridge.

I only implement the events we need in this PR for the Ephemeral Header bridge.

How was it fixed?

By adding it

Todo in the future

The event stream should be used to drive the Beacon Bridge #1849
^ This isn't a priority for me, but it is an important change for the Beacon Bridge as it heavily will decrease the delays in propagating Optimistic updates and such

Copy link
Copy Markdown
Collaborator

@morph-dev morph-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly ok, but I have several suggestions that I think would be very nice.

:shipit:


/// Represents the topics for events that can be subscribed to. Not all event topics are listed in
/// this enum. More topics can be found in the documentation https://ethereum.github.io/beacon-APIs/#/Events/eventstream
pub enum EventTopics {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use: alloy_rpc_types_beacon::events::BeaconNodeEventTopic

The only reason I can think of is because it doesn't implement FromStr, but I don't see that being used anywhere else except at line 56, so we can inline that functionality there, e.g:

if &event.event_type == BeaconNodeEventTopic::ChainReorg.query_value() {
    ...
} else if ...  {
    ...
} else {
    ...
}

It's not as clean, but we don't have to define new type and we don't have to define constants ourselfs.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if you remove this enum, maybe rename file to event.rs as well.

}
}

pub enum DecodedEvent {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: DecodedEvent doesn't make much sense to me. I would just call it BeaconEvent

fn try_from(event: Event) -> Result<Self, Self::Error> {
match EventTopics::from_str(&event.event_type)? {
EventTopics::ChainReorg => {
Ok(serde_json::from_str(&event.data).map(DecodedEvent::ChainReorg)?)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This logic is duplicated several times, and it would even more if we add more topics

How about something like this (together with other suggestions):

impl BeaconEvent {
    fn from_json<T: DeserializeOwned>(
        json: &str,
        constructor: impl FnOnce(T) -> Self,
    ) -> Result<Self, serde_json::Error> {
        serde_json::from_str(json).map(constructor)
    }
}

impl TryFrom<Event> for BeaconEvent {
    type Error = serde_json::Error;

    fn try_from(event: Event) -> Result<Self, Self::Error> {
        if &event.event_type == BeaconNodeEventTopic::ChainReorg.query_value() {
            Self::from_json(&event.data, Self::ChainReorg),
        } else if ... {
            ...
        } else {
           Err(Self::Error::custom(format!(
                "Can't create BeaconEvent: unexpected event type: {}",
                &event.event_type,
            ))),
        }
    }
}

client_builder = client_builder.header(key.as_ref(), value.to_str()?)?;
}

Ok(client_builder.build().stream())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at #1846 how this is used, I think that most of the logic of processing the stream output should be done here (otherwise, everybody using this function will do similar logic). Something like this:

...
Ok(client_builder
    .build()
    .stream()
    .filter_map(|event| async move {
        let event = match event {
            Ok(SSE::Event(event)) => event,
            Ok(SSE::Connected(connection_details)) => {
                info!("Connected to SSE stream: {connection_details:?}");
                return None;
            }
            Ok(SSE::Comment(comment)) => {
                info!("Received comment: {comment:?}");
                return None;
            }
            Err(err) => {
                error!("Error receiving event: {err:?}");
                return None;
            }
        };
        match DecodedEvent::try_from(event) {
            Ok(event) => Some(event),
            Err(err)  => {
                error!("Failed to decode event: {err:?}");
                None
            }
        }
    })

Also, in this case, we should probably add stream_name: String argument and use it in logs, to better identify the stream (in case there are multiple streams being used in parallel).

@KolbyML KolbyML merged commit 7f71cc2 into ethereum:master May 23, 2025
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants