diff --git a/examples/eventsub_websocket/src/main.rs b/examples/eventsub_websocket/src/main.rs index cb58242f79..39a69f24e7 100644 --- a/examples/eventsub_websocket/src/main.rs +++ b/examples/eventsub_websocket/src/main.rs @@ -1,12 +1,12 @@ -#![warn(clippy::unwrap_in_result)] pub mod opts; pub mod util; pub mod websocket; use clap::Parser; pub use opts::Secret; +use reqwest::Client; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use opts::Opts; @@ -14,6 +14,19 @@ use eyre::Context; use twitch_api::{client::ClientDefault, HelixClient}; +static HELIX_CLIENT: LazyLock> = LazyLock::new(|| { + HelixClient::with_client( + ::default_client_with_name(Some( + "twitch-rs/eventsub" + .parse() + .wrap_err_with(|| "when creating header name") + .unwrap(), + )) + .wrap_err_with(|| "when creating client") + .unwrap(), + ) +}); + #[tokio::main] async fn main() -> Result<(), eyre::Report> { // Setup dotenv, tracing and error reporting with eyre @@ -39,15 +52,7 @@ async fn main() -> Result<(), eyre::Report> { /// Run the application pub async fn run(opts: Arc) -> eyre::Result<()> { // Create the HelixClient, which is used to make requests to the Twitch API - let client: HelixClient<_> = twitch_api::HelixClient::with_client( - ::default_client_with_name(Some( - "twitch-rs/eventsub" - .parse() - .wrap_err_with(|| "when creating header name") - .unwrap(), - )) - .wrap_err_with(|| "when creating client")?, - ); + let client: &'static HelixClient<_> = LazyLock::force(&HELIX_CLIENT); // Get the access token from the cli, dotenv or an oauth service let token: twitch_oauth2::UserToken = @@ -67,27 +72,5 @@ pub async fn run(opts: Arc) -> eyre::Result<()> { token.user_id.clone() }; - let websocket_client = websocket::WebsocketClient { - session_id: None, - token, - client, - user_id, - connect_url: twitch_api::TWITCH_EVENTSUB_WEBSOCKET_URL.clone(), - opts, - }; - - let websocket_client = tokio::spawn(async move { websocket_client.run().await }); - - tokio::try_join!(flatten(websocket_client))?; - Ok(()) -} - -async fn flatten( - handle: tokio::task::JoinHandle>, -) -> Result { - match handle.await { - Ok(Ok(result)) => Ok(result), - Ok(Err(err)) => Err(err), - Err(e) => Err(e).wrap_err_with(|| "handling failed"), - } + websocket::run(client, token, opts, user_id).await } diff --git a/examples/eventsub_websocket/src/util.rs b/examples/eventsub_websocket/src/util.rs index a2f1a858ad..43d6d8de29 100644 --- a/examples/eventsub_websocket/src/util.rs +++ b/examples/eventsub_websocket/src/util.rs @@ -63,7 +63,6 @@ pub async fn make_token<'a>( UserToken::from_token(client, token.into()) .await .context("could not get/make access token") - .map_err(Into::into) } /// Get an access token from either the cli, dotenv (via [clap::Arg::env]) or an oauth service diff --git a/examples/eventsub_websocket/src/websocket.rs b/examples/eventsub_websocket/src/websocket.rs index ba3e8fa50c..f81fc70f57 100644 --- a/examples/eventsub_websocket/src/websocket.rs +++ b/examples/eventsub_websocket/src/websocket.rs @@ -1,170 +1,348 @@ -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; use eyre::Context; -use tokio_tungstenite::tungstenite; -use tracing::Instrument; +use futures::{stream::SplitStream, StreamExt}; +use reqwest::Client; +use tokio::{ + sync::{ + mpsc::{self, UnboundedSender}, + Mutex, + }, + task::{JoinError, JoinHandle}, + time::{Duration, Instant}, +}; +use tokio_tungstenite::{ + tungstenite::{client::IntoClientRequest, protocol::WebSocketConfig, Message as WsMessage}, + MaybeTlsStream, WebSocketStream, +}; use twitch_api::{ eventsub::{ self, - event::websocket::{EventsubWebsocketData, ReconnectPayload, SessionData, WelcomePayload}, - Event, + channel::{ChannelBanV1, ChannelUnbanV1}, + event::websocket::{EventsubWebsocketData, ReconnectPayload, WelcomePayload}, + Event, EventSubscription, Message, SessionData, Transport, }, - types::{self}, - HelixClient, + helix::eventsub::CreateEventSubSubscription, + twitch_oauth2::{TwitchToken, UserToken}, + types, HelixClient, }; -use twitch_oauth2::{TwitchToken, UserToken}; - -pub struct WebsocketClient { - /// The session id of the websocket connection - pub session_id: Option, - /// The token used to authenticate with the Twitch API - pub token: UserToken, - /// The client used to make requests to the Twitch API - pub client: HelixClient<'static, reqwest::Client>, - /// The user id of the channel we want to listen to - pub user_id: types::UserId, - /// The url to use for websocket - pub connect_url: url::Url, - pub opts: Arc, + +/// Connect to the websocket and return the stream +async fn connect( + request: impl IntoClientRequest + Unpin, +) -> Result>>, eyre::Error> { + tracing::info!("connecting to twitch"); + let config = Some(WebSocketConfig { + max_message_size: Some(64 << 20), // 64 MiB + max_frame_size: Some(16 << 20), // 16 MiB + accept_unmasked_frames: false, + ..WebSocketConfig::default() + }); + let socket = tokio_tungstenite::connect_async_with_config(request, config, false) + .await + .context("Can't connect")? + .0 + .split() + .1; + + Ok(socket) } -impl WebsocketClient { - /// Connect to the websocket and return the stream - pub async fn connect( - &self, - ) -> Result< - tokio_tungstenite::WebSocketStream< - tokio_tungstenite::MaybeTlsStream, - >, - eyre::Error, - > { - tracing::info!("connecting to twitch"); - let config = tungstenite::protocol::WebSocketConfig { - max_message_size: Some(64 << 20), // 64 MiB - max_frame_size: Some(16 << 20), // 16 MiB - accept_unmasked_frames: false, - ..tungstenite::protocol::WebSocketConfig::default() - }; - let (socket, _) = - tokio_tungstenite::connect_async_with_config(&self.connect_url, Some(config), false) - .await - .context("Can't connect")?; +async fn refresh_if_expired( + token: Arc>, + helix_client: &HelixClient<'_, Client>, + _opts: &crate::Opts, +) { + let lock = token.lock().await; - Ok(socket) + if lock.expires_in() >= Duration::from_secs(60) { + return; } + let _client = helix_client.get_client(); - /// Run the websocket subscriber - #[tracing::instrument(name = "subscriber", skip_all, fields())] - pub async fn run(mut self) -> Result<(), eyre::Error> { - // Establish the stream - let mut s = self - .connect() - .await - .context("when establishing connection")?; - // Loop over the stream, processing messages as they come in. - loop { - tokio::select!( - Some(msg) = futures::StreamExt::next(&mut s) => { - let span = tracing::info_span!("message received", raw_message = ?msg); - let msg = match msg { - Err(tungstenite::Error::Protocol( - tungstenite::error::ProtocolError::ResetWithoutClosingHandshake, - )) => { - tracing::warn!( - "connection was sent an unexpected frame or was reset, reestablishing it" - ); - s = self - .connect().instrument(span) - .await - .context("when reestablishing connection")?; - continue - } - _ => msg.context("when getting message")?, - }; - self.process_message(msg).instrument(span).await? - }) + // TODO: token refresh logic is left up to the user + + drop(lock); +} + +async fn subscribe( + helix_client: &HelixClient<'_, Client>, + session_id: String, + token: &UserToken, + subscription: impl EventSubscription + Send, +) -> eyre::Result<()> { + let transport: Transport = Transport::websocket(session_id); + let _event_info: CreateEventSubSubscription<_> = helix_client + .create_eventsub_subscription(subscription, transport, token) + .await?; + Ok(()) +} + +/// action to perform on received message +enum Action { + /// do nothing with the message + Nothing, + /// reset the timeout and keep the connection alive + ResetKeepalive, + /// kill predecessor and swap the handle + KillPredecessor, + /// spawn successor and await death signal + AssignSuccessor(ActorHandle), +} + +async fn process_welcome( + subscribed: &AtomicBool, + token: &Mutex, + helix_client: &HelixClient<'_, Client>, + user_id: &types::UserId, + session: SessionData<'_>, +) -> eyre::Result<()> { + // if we're already subscribed, don't subscribe again + if subscribed.load(Ordering::Relaxed) { + return Ok(()); + } + let user_token = token.lock().await; + tokio::try_join!( + subscribe( + helix_client, + session.id.to_string(), + &user_token, + ChannelBanV1::broadcaster_user_id(user_id.clone()), + ), + subscribe( + helix_client, + session.id.to_string(), + &user_token, + ChannelUnbanV1::broadcaster_user_id(user_id.clone()), + ), + )?; + subscribed.store(true, Ordering::Relaxed); + Ok(()) +} + +/// Here is where you would handle the events you want to listen to +fn process_payload(event: Event) -> eyre::Result { + match event { + Event::ChannelBanV1(eventsub::Payload { message, .. }) => { + match message { + // not needed for websocket + Message::VerificationRequest(_) => unreachable!(), + Message::Revocation() => Err(eyre::eyre!("unexpected subscription revocation")), + Message::Notification(payload) => { + // do something useful with the payload + tracing::info!(?payload, "got ban event"); + + // new events reset keepalive timeout too + Ok(Action::ResetKeepalive) + } + _ => Ok(Action::Nothing), + } } + Event::ChannelUnbanV1(eventsub::Payload { message, .. }) => { + match message { + // not needed for websocket + Message::VerificationRequest(_) => unreachable!(), + Message::Revocation() => Err(eyre::eyre!("unexpected subscription revocation")), + Message::Notification(payload) => { + // do something useful with the payload + tracing::info!(?payload, "got unban event"); + + // new events reset keepalive timeout too + Ok(Action::ResetKeepalive) + } + _ => Ok(Action::Nothing), + } + } + _ => Ok(Action::Nothing), } +} - /// Process a message from the websocket - pub async fn process_message(&mut self, msg: tungstenite::Message) -> Result<(), eyre::Report> { - match msg { - tungstenite::Message::Text(s) => { - tracing::info!("{s}"); - // Parse the message into a [twitch_api::eventsub::EventsubWebsocketData] - match Event::parse_websocket(&s)? { - EventsubWebsocketData::Welcome { - payload: WelcomePayload { session }, - .. - } - | EventsubWebsocketData::Reconnect { - payload: ReconnectPayload { session }, - .. - } => { - self.process_welcome_message(session).await?; - Ok(()) +struct WebSocketConnection { + socket: SplitStream>>, + helix_client: &'static HelixClient<'static, Client>, + token: Arc>, + opts: Arc, + subscribed: Arc, + user_id: Arc, + kill_self_tx: UnboundedSender<()>, +} + +impl WebSocketConnection { + async fn receive_message(&mut self) -> eyre::Result> { + let Some(message) = self.socket.next().await else { + return Err(eyre::eyre!("websocket stream closed unexpectedly")); + }; + match message.context("tungstenite error")? { + WsMessage::Close(frame) => { + let reason = frame.map(|frame| frame.reason).unwrap_or_default(); + Err(eyre::eyre!( + "websocket stream closed unexpectedly with reason {reason}" + )) + } + WsMessage::Frame(_) => unreachable!(), + WsMessage::Ping(_) | WsMessage::Pong(_) => { + // no need to do anything as tungstenite automatically handles pings for you + // but refresh the token just in case + refresh_if_expired(self.token.clone(), self.helix_client, &self.opts).await; + Ok(None) + } + WsMessage::Binary(_) => unimplemented!(), + WsMessage::Text(payload) => Ok(Some(payload)), + } + } + + async fn process_message(&self, frame: String) -> eyre::Result { + let event_data = Event::parse_websocket(&frame).context("parsing error")?; + match event_data { + EventsubWebsocketData::Welcome { + payload: WelcomePayload { session }, + .. + } => { + process_welcome( + &self.subscribed, + &self.token, + self.helix_client, + &self.user_id, + session, + ) + .await?; + Ok(Action::KillPredecessor) + } + EventsubWebsocketData::Reconnect { + payload: ReconnectPayload { session }, + .. + } => { + let url: String = session.reconnect_url.unwrap().into_owned(); + let successor = ActorHandle::spawn( + url, + self.helix_client, + self.kill_self_tx.clone(), + self.token.clone(), + self.opts.clone(), + self.subscribed.clone(), + self.user_id.clone(), + ); + Ok(Action::AssignSuccessor(successor)) + } + EventsubWebsocketData::Keepalive { .. } => Ok(Action::ResetKeepalive), + EventsubWebsocketData::Revocation { metadata, .. } => { + eyre::bail!("got revocation: {metadata:?}") + } + EventsubWebsocketData::Notification { payload: event, .. } => process_payload(event), + _ => Ok(Action::Nothing), + } + } +} + +struct ActorHandle(JoinHandle>); + +impl ActorHandle { + pub fn spawn( + url: impl IntoClientRequest + Unpin + Send + 'static, + helix_client: &'static HelixClient<'_, Client>, + kill_predecessor_tx: UnboundedSender<()>, + token: Arc>, + opts: Arc, + subscribed: Arc, + user_id: Arc, + ) -> Self { + Self(tokio::spawn(async move { + let socket = connect(url).await?; + // If we receive a reconnect message we want to spawn a new connection to twitch. + // The already existing session should wait on the new session to receive a welcome message before being closed. + // https://dev.twitch.tv/docs/eventsub/handling-websocket-events/#reconnect-message + let (kill_self_tx, mut kill_self_rx) = mpsc::unbounded_channel::<()>(); + + let mut connection = WebSocketConnection { + socket, + helix_client, + token, + opts, + subscribed, + user_id, + kill_self_tx, + }; + + /// default keepalive duration is 10 seconds + const WINDOW: u64 = 10; + let mut timeout: Instant = Instant::now() + Duration::from_secs(WINDOW); + let mut successor: Option = None; + + loop { + tokio::select! { + biased; + result = kill_self_rx.recv() => { + result.unwrap(); + let Some(successor) = successor else { + // can't receive death signal from successor if it isn't spawned yet + unreachable!(); + }; + return Ok(successor); } - // Here is where you would handle the events you want to listen to - EventsubWebsocketData::Notification { - metadata: _, - payload, - } => { - match payload { - Event::ChannelBanV1(eventsub::Payload { message, .. }) => { - tracing::info!(?message, "got ban event"); - } - Event::ChannelUnbanV1(eventsub::Payload { message, .. }) => { - tracing::info!(?message, "got ban event"); - } - _ => {} + result = connection.receive_message() => if let Some(frame) = result? { + let side_effect = connection.process_message(frame).await?; + match side_effect { + Action::Nothing => {} + Action::ResetKeepalive => timeout = Instant::now() + Duration::from_secs(WINDOW), + Action::KillPredecessor => kill_predecessor_tx.send(())?, + Action::AssignSuccessor(actor_handle) => { + successor = Some(actor_handle); + }, } - Ok(()) - } - EventsubWebsocketData::Revocation { - metadata, - payload: _, - } => eyre::bail!("got revocation event: {metadata:?}"), - EventsubWebsocketData::Keepalive { - metadata: _, - payload: _, - } => Ok(()), - _ => Ok(()), + }, + _ = tokio::time::sleep_until(timeout) => eyre::bail!("connection timed out"), } } - tungstenite::Message::Close(_) => todo!(), - _ => Ok(()), - } + })) } - pub async fn process_welcome_message( - &mut self, - data: SessionData<'_>, - ) -> Result<(), eyre::Report> { - self.session_id = Some(data.id.to_string()); - if let Some(url) = data.reconnect_url { - self.connect_url = url.parse()?; - } - // check if the token is expired, if it is, request a new token. This only works if using a oauth service for getting a token - if self.token.is_elapsed() { - self.token = - crate::util::get_access_token(self.client.get_client(), &self.opts).await?; + pub async fn join(self) -> Result, JoinError> { self.0.await } +} + +pub async fn run( + helix_client: &'static HelixClient<'_, Client>, + token: UserToken, + opts: Arc, + user_id: types::UserId, +) -> eyre::Result<()> { + let url = twitch_api::TWITCH_EVENTSUB_WEBSOCKET_URL.clone(); + let token = Arc::new(Mutex::new(token)); + let user_id = Arc::new(user_id); + let subscribed = Arc::new(AtomicBool::new(false)); + + // since this is a root actor without a predecessor it has no previous connection to kill + // but we still need to give it a sender to satisfy the function signature. + // `_` and `_unused` have different semantics where `_` is dropped immediately and sender gets a recv error + let (dummy_tx, _unused_rx) = mpsc::unbounded_channel::<()>(); + let mut handle = ActorHandle::spawn( + url.clone(), + helix_client, + dummy_tx.clone(), + token.clone(), + opts.clone(), + subscribed.clone(), + user_id.clone(), + ); + + loop { + handle = match handle.join().await? { + Ok(handle) => handle, + Err(err) => { + subscribed.store(false, Ordering::Relaxed); + tracing::error!("{err}"); + ActorHandle::spawn( + url.clone(), + helix_client, + dummy_tx.clone(), + token.clone(), + opts.clone(), + subscribed.clone(), + user_id.clone(), + ) + } } - let transport = eventsub::Transport::websocket(data.id.clone()); - self.client - .create_eventsub_subscription( - eventsub::channel::ChannelBanV1::broadcaster_user_id(self.user_id.clone()), - transport.clone(), - &self.token, - ) - .await?; - self.client - .create_eventsub_subscription( - eventsub::channel::ChannelUnbanV1::broadcaster_user_id(self.user_id.clone()), - transport, - &self.token, - ) - .await?; - tracing::info!("listening to ban and unbans"); - Ok(()) } } diff --git a/src/eventsub/event/websocket.rs b/src/eventsub/event/websocket.rs index 5d6e0ced21..adfa7c11ba 100644 --- a/src/eventsub/event/websocket.rs +++ b/src/eventsub/event/websocket.rs @@ -56,7 +56,7 @@ pub struct SessionData<'a> { pub status: Cow<'a, str>, /// The maximum number of seconds that you should expect silence before receiving a keepalive message. For a welcome message, this is the number of seconds that you have to subscribe to an event after receiving the welcome message. If you don’t subscribe to an event within this window, the socket is disconnected. pub keepalive_timeout_seconds: Option, - /// The URL to reconnect to if you get a Reconnect message. Is set to null. + /// The URL to reconnect to if you get a Reconnect message. #[serde(borrow = "'a")] pub reconnect_url: Option>, /// The UTC date and time that the connection was created.