Skip to content
17 changes: 17 additions & 0 deletions Sources/Realtime/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {

do {
try await _clock.sleep(for: delay)

// Check if socket is still connected after delay
if socket.status != .connected {
logger?.debug(
"Socket disconnected during retry delay for channel '\(topic)', aborting subscription"
)
throw CancellationError()
}
} catch {
// If sleep is cancelled, break out of retry loop
logger?.debug("Subscription retry cancelled for channel '\(topic)'")
Expand Down Expand Up @@ -196,6 +204,12 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
return
}
await socket.connect()

// Verify connection succeeded after await
if socket.status != .connected {
logger?.debug("Socket failed to connect, cannot subscribe to channel \(topic)")
return
}
}

logger?.debug("Subscribing to channel \(topic)")
Expand Down Expand Up @@ -234,6 +248,9 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
logger?.debug("Unsubscribing from channel \(topic)")

await push(ChannelEvent.leave)

// Wait for server confirmation of unsubscription
_ = await statusChange.first { @Sendable in $0 == .unsubscribed }
}

@available(
Expand Down
216 changes: 140 additions & 76 deletions Sources/Realtime/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol {
var messageTask: Task<Void, Never>?

var connectionTask: Task<Void, Never>?
var reconnectTask: Task<Void, Never>?
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added reconnectTask field lacks explicit test coverage. Consider adding a test that verifies:

  1. reconnectTask is created when reconnect() is called
  2. reconnectTask is properly cancelled and set to nil in disconnect()
  3. Multiple reconnection attempts properly cancel previous reconnect tasks

While testMultipleReconnectionsHandleTaskLifecycleCorrectly tests reconnection cycles, it doesn't explicitly verify the reconnectTask lifecycle.

Copilot uses AI. Check for mistakes.
var channels: [String: RealtimeChannelV2] = [:]
var sendBuffer: [@Sendable () -> Void] = []

Expand Down Expand Up @@ -170,7 +171,10 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol {
mutableState.withValue {
$0.heartbeatTask?.cancel()
$0.messageTask?.cancel()
$0.connectionTask?.cancel()
$0.reconnectTask?.cancel()
$0.channels = [:]
$0.conn = nil
}
}

Expand All @@ -182,53 +186,77 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol {
}

func connect(reconnect: Bool) async {
if status == .disconnected {
let connectionTask = Task {
if reconnect {
try? await _clock.sleep(for: options.reconnectDelay)
// Check and create connection task atomically to prevent race conditions
let shouldConnect = mutableState.withValue { state -> Bool in
// If already connecting or connected, don't create a new connection task
if status == .connecting || status == .connected {
return false
}

if Task.isCancelled {
options.logger?.debug("Reconnect cancelled, returning")
return
}
}
// If there's already a connection task running, don't create another
if state.connectionTask != nil {
return false
}

if status == .connected {
options.logger?.debug("WebsSocket already connected")
return true
}

guard shouldConnect else {
// Wait for existing connection to complete
_ = await statusChange.first { @Sendable in $0 == .connected }
return
}

let connectionTask = Task {
if reconnect {
try? await _clock.sleep(for: options.reconnectDelay)

if Task.isCancelled {
options.logger?.debug("Reconnect cancelled, returning")
return
}
}

status = .connecting

do {
let conn = try await wsTransport(
Self.realtimeWebSocketURL(
baseURL: Self.realtimeBaseURL(url: url),
apikey: options.apikey,
logLevel: options.logLevel
),
options.headers.dictionary
)
mutableState.withValue { $0.conn = conn }
onConnected(reconnect: reconnect)
} catch {
onError(error)
}
if status == .connected {
options.logger?.debug("WebsSocket already connected")
return
}

mutableState.withValue {
$0.connectionTask = connectionTask
status = .connecting

do {
let conn = try await wsTransport(
Self.realtimeWebSocketURL(
baseURL: Self.realtimeBaseURL(url: url),
apikey: options.apikey,
logLevel: options.logLevel
),
options.headers.dictionary
)
mutableState.withValue { $0.conn = conn }
onConnected(reconnect: reconnect)
} catch {
onError(error)
}
}

mutableState.withValue {
$0.connectionTask = connectionTask
}

_ = await statusChange.first { @Sendable in $0 == .connected }
}

private func onConnected(reconnect: Bool) {
status = .connected
options.logger?.debug("Connected to realtime WebSocket")

// Start listeners before setting status to prevent race conditions
listenForMessages()
startHeartbeating()

// Now set status to connected
status = .connected

if reconnect {
rejoinChannels()
}
Expand Down Expand Up @@ -261,9 +289,14 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol {
}

private func reconnect(disconnectReason: String? = nil) {
Task {
disconnect(reason: disconnectReason)
await connect(reconnect: true)
// Cancel any existing reconnect task and create a new one
mutableState.withValue { state in
state.reconnectTask?.cancel()

state.reconnectTask = Task {
disconnect(reason: disconnectReason)
await connect(reconnect: true)
}
}
}

Expand Down Expand Up @@ -325,7 +358,13 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol {
await channel.unsubscribe()
}

if channels.isEmpty {
// Atomically remove channel and check if we should disconnect
let shouldDisconnect = mutableState.withValue { state -> Bool in
state.channels[channel.topic] = nil
return state.channels.isEmpty
}

if shouldDisconnect {
options.logger?.debug("No more subscribed channel in socket")
disconnect()
}
Expand Down Expand Up @@ -364,49 +403,57 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol {
}

private func listenForMessages() {
mutableState.withValue {
$0.messageTask?.cancel()
$0.messageTask = Task { [weak self] in
guard let self, let conn = self.conn else { return }

do {
for await event in conn.events {
if Task.isCancelled { return }

switch event {
case .binary:
self.options.logger?.error("Unsupported binary event received.")
break
case .text(let text):
let data = Data(text.utf8)
let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
await onMessage(message)

if Task.isCancelled {
return
}

case .close(let code, let reason):
onClose(code: code, reason: reason)
// Capture conn inside the lock before creating the task
let conn = mutableState.withValue { state -> (any WebSocket)? in
state.messageTask?.cancel()
return state.conn
}

guard let conn else { return }

let messageTask = Task {
do {
for await event in conn.events {
if Task.isCancelled { return }

switch event {
case .binary:
self.options.logger?.error("Unsupported binary event received.")
break
case .text(let text):
let data = Data(text.utf8)
let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
await onMessage(message)

if Task.isCancelled {
return
}

case .close(let code, let reason):
onClose(code: code, reason: reason)
}
} catch {
onError(error)
}
} catch {
onError(error)
}
}

mutableState.withValue {
$0.messageTask = messageTask
}
}

private func startHeartbeating() {
mutableState.withValue {
$0.heartbeatTask?.cancel()
$0.heartbeatTask = Task { [weak self, options] in
mutableState.withValue { state in
state.heartbeatTask?.cancel()

state.heartbeatTask = Task { [options] in
while !Task.isCancelled {
try? await _clock.sleep(for: options.heartbeatInterval)
if Task.isCancelled {
break
}
await self?.sendHeartbeat()
await self.sendHeartbeat()
}
}
}
Expand All @@ -418,22 +465,27 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol {
return
}

let pendingHeartbeatRef: String? = mutableState.withValue {
if $0.pendingHeartbeatRef != nil {
$0.pendingHeartbeatRef = nil
return nil
// Check if previous heartbeat is still pending (not acknowledged)
let shouldSendHeartbeat = mutableState.withValue { state -> Bool in
if state.pendingHeartbeatRef != nil {
// Previous heartbeat was not acknowledged - this is a timeout
return false
}

// No pending heartbeat, we can send a new one
let ref = makeRef()
$0.pendingHeartbeatRef = ref
return ref
state.pendingHeartbeatRef = ref
return true
}

if let pendingHeartbeatRef {
if shouldSendHeartbeat {
// Get the ref we just set
let heartbeatRef = mutableState.withValue { $0.pendingHeartbeatRef }!

push(
RealtimeMessageV2(
joinRef: nil,
ref: pendingHeartbeatRef,
ref: heartbeatRef,
topic: "phoenix",
event: "heartbeat",
payload: [:]
Expand All @@ -442,8 +494,13 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol {
heartbeatSubject.yield(.sent)
await setAuth()
} else {
options.logger?.debug("Heartbeat timeout")
// Timeout: previous heartbeat was never acknowledged
options.logger?.debug("Heartbeat timeout - previous heartbeat not acknowledged")
heartbeatSubject.yield(.timeout)

// Clear the pending ref before reconnecting
mutableState.withValue { $0.pendingHeartbeatRef = nil }

reconnect(disconnectReason: "heartbeat timeout")
}
}
Expand All @@ -460,8 +517,15 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol {
mutableState.withValue {
$0.ref = 0
$0.messageTask?.cancel()
$0.messageTask = nil
$0.heartbeatTask?.cancel()
$0.heartbeatTask = nil
$0.connectionTask?.cancel()
$0.connectionTask = nil
$0.reconnectTask?.cancel()
$0.reconnectTask = nil
$0.pendingHeartbeatRef = nil
$0.sendBuffer = []
$0.conn = nil
}

Expand All @@ -485,16 +549,16 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol {
return
}

mutableState.withValue { [token] in
$0.accessToken = token
mutableState.withValue { [tokenToSend] in
$0.accessToken = tokenToSend
}

for channel in channels.values {
if channel.status == .subscribed {
options.logger?.debug("Updating auth token for channel \(channel.topic)")
await channel.push(
ChannelEvent.accessToken,
payload: ["access_token": token.map { .string($0) } ?? .null]
payload: ["access_token": tokenToSend.map { .string($0) } ?? .null]
)
}
}
Expand Down
2 changes: 2 additions & 0 deletions Tests/RealtimeTests/FakeWebSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ final class FakeWebSocket: WebSocket {
s.sentEvents.append(.close(code: code, reason: reason ?? ""))

s.isClosed = true
s.closeCode = code
s.closeReason = reason
if s.other?.isClosed == false {
s.other?._trigger(.close(code: code ?? 1005, reason: reason ?? ""))
}
Expand Down
4 changes: 2 additions & 2 deletions Tests/RealtimeTests/RealtimeTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -765,8 +765,8 @@ import XCTest

await Task.megaYield()

// Verify that the message task was cancelled
XCTAssertTrue(sut.mutableState.messageTask?.isCancelled ?? false)
// Verify that the message task was cancelled and cleaned up
XCTAssertNil(sut.mutableState.messageTask, "Message task should be nil after disconnect")
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line uses a tab character for indentation instead of spaces. According to the project's .editorconfig, the codebase should use 2 spaces for indentation.

Suggested change
XCTAssertNil(sut.mutableState.messageTask, "Message task should be nil after disconnect")
XCTAssertNil(sut.mutableState.messageTask, "Message task should be nil after disconnect")

Copilot uses AI. Check for mistakes.
}

func testMultipleReconnectionsHandleTaskLifecycleCorrectly() async {
Expand Down
Loading