Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,29 @@ package {{ sdk.namespace | caseDot }}.models
import kotlin.collections.Collection
import java.io.Closeable

data class RealtimeSubscriptionUpdate(
val channels: Collection<Any>? = null,
val queries: Collection<String>? = null
)

data class RealtimeSubscription(
/**
* Remove this subscription only. The WebSocket stays open so other subscriptions keep
* receiving events — use [Realtime.disconnect] for full teardown.
*/
val unsubscribe: () -> Unit,

/**
* Replace the channels and/or queries on this subscription without recreating it.
*/
val update: (RealtimeSubscriptionUpdate) -> Unit,

private val close: () -> Unit
) : Closeable {
/**
* Alias of [unsubscribe] that also tears the socket down when this was the last active
* subscription. Prefer [unsubscribe] plus [Realtime.disconnect] for explicit control.
*/
override fun close() = close.invoke()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package {{ sdk.namespace | caseDot }}.services
import {{ sdk.namespace | caseDot }}.Service
import {{ sdk.namespace | caseDot }}.Client
import {{ sdk.namespace | caseDot }}.Channel
import {{ sdk.namespace | caseDot }}.ID
import {{ sdk.namespace | caseDot }}.Query
import {{ sdk.namespace | caseDot }}.exceptions.{{ spec.title | caseUcfirst }}Exception
import {{ sdk.namespace | caseDot }}.extensions.forEachAsync
Expand Down Expand Up @@ -40,22 +41,14 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
private const val HEARTBEAT_INTERVAL = 20_000L // 20 seconds

private var socket: RealWebSocket? = null
// Slot-centric state: Map<slot, RealtimeCallback>
private val activeSubscriptions = ConcurrentHashMap<Int, RealtimeCallback>()
// Map slot index -> subscriptionId (from backend)
private val slotToSubscriptionId = ConcurrentHashMap<Int, String>()
// Inverse map: subscriptionId -> slot index (for O(1) lookup)
private val subscriptionIdToSlot = ConcurrentHashMap<String, Int>()
// Queue of slot snapshots per in-flight subscribe message (FIFO with server responses).
private val pendingSubscribeSlotsQueue = ArrayDeque<List<Int>>()
private val activeSubscriptions = ConcurrentHashMap<String, RealtimeCallback>()
private val pendingSubscribes = LinkedHashMap<String, Map<String, Any>>()

private var reconnectAttempts = 0
private val subscriptionsCounter = AtomicInteger(0)
private val socketGeneration = AtomicInteger(0)
private var reconnect = true
private var heartbeatJob: Job? = null

// Lock to coordinate multi-map updates (activeSubscriptions, slotToSubscriptionId, subscriptionIdToSlot)
private val subscriptionLock = Any()
}

Expand Down Expand Up @@ -104,35 +97,63 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
socket?.close(RealtimeCode.POLICY_VIOLATION.value, null)
}

private fun sendSubscribeMessage() {
private fun sendUnsubscribeMessage(subscriptionIds: List<String>) {
val ws = socket ?: return
val rows = mutableListOf<Map<String, Any>>()
val slotsForMessage = mutableListOf<Int>()
val ids = subscriptionIds.filter { it.isNotEmpty() }
if (ids.isEmpty()) {
return
}
ws.send(
mapOf(
"type" to "unsubscribe",
"data" to ids.map { mapOf("subscriptionId" to it) }
).toJson()
)
}

synchronized(subscriptionLock) {
activeSubscriptions.forEach { (slot, subscription) ->
val queries = subscription.queries
val row = mutableMapOf<String, Any>(
"channels" to subscription.channels.toList(),
"queries" to queries.toList()
)
val knownSubscriptionId = slotToSubscriptionId[slot]
if (!knownSubscriptionId.isNullOrEmpty()) {
row["subscriptionId"] = knownSubscriptionId
}
rows.add(row)
slotsForMessage.add(slot)
private fun generateUniqueSubscriptionIdLocked(): String {
repeat(activeSubscriptions.size + 1) {
val id = ID.unique()
if (!activeSubscriptions.containsKey(id)) {
return id
}
}
throw {{ spec.title | caseUcfirst }}Exception("Failed to generate unique subscription id")
}

if (rows.isEmpty()) {
return
}
private fun enqueuePendingSubscribeLocked(subscriptionId: String) {
val subscription = activeSubscriptions[subscriptionId] ?: return
pendingSubscribes[subscriptionId] = mapOf(
"subscriptionId" to subscriptionId,
"channels" to subscription.channels.toList(),
"queries" to subscription.queries.toList()
)
}

/**
* Close the WebSocket connection and drop all active subscriptions client-side.
* Use this instead of calling [RealtimeSubscription.unsubscribe] on every subscription
* when you want to tear everything down.
*/
fun disconnect() {
synchronized(subscriptionLock) {
pendingSubscribeSlotsQueue.addLast(slotsForMessage.toList())
activeSubscriptions.clear()
pendingSubscribes.clear()
reconnect = false
closeSocket()
}
}

private fun sendPendingSubscribes() {
val ws = socket ?: return
val rows: List<Map<String, Any>>
synchronized(subscriptionLock) {
if (pendingSubscribes.isEmpty()) {
return
}
rows = pendingSubscribes.values.toList()
pendingSubscribes.clear()
}
ws.send(mapOf("type" to "subscribe", "data" to rows).toJson())
}

Expand Down Expand Up @@ -208,53 +229,73 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
queries: Set<String> = emptySet(),
callback: (RealtimeResponseEvent<T>) -> Unit,
): RealtimeSubscription {
// Allocate a new slot index atomically
val slot = subscriptionsCounter.incrementAndGet()

// Store slot-centric data: channels, queries, and callback belong to the slot.
// We only touch activeSubscriptions here, but keep the pattern consistent
// and future-proof by guarding multi-map writes with a shared lock.
val subscriptionId: String
synchronized(subscriptionLock) {
activeSubscriptions[slot] = RealtimeCallback(
subscriptionId = generateUniqueSubscriptionIdLocked()
activeSubscriptions[subscriptionId] = RealtimeCallback(
channels.toSet(),
queries,
payloadType,
callback as (RealtimeResponseEvent<*>) -> Unit
)
}
// Reuse active socket when possible; otherwise create it.
synchronized(subscriptionLock) {
enqueuePendingSubscribeLocked(subscriptionId)
if (socket != null) {
sendSubscribeMessage()
sendPendingSubscribes()
} else {
createSocket()
}
}

return RealtimeSubscription {
// Unsubscribe must update all three maps atomically so that
// no reader observes a half-updated state.
val unsubscribeFn: () -> Unit = {
val removed: Boolean
synchronized(subscriptionLock) {
removed = activeSubscriptions.remove(subscriptionId) != null
pendingSubscribes.remove(subscriptionId)
}
if (removed) {
sendUnsubscribeMessage(listOf(subscriptionId))
}
}

val updateFn: (RealtimeSubscriptionUpdate) -> Unit = { changes ->
synchronized(subscriptionLock) {
val subscriptionId = slotToSubscriptionId[slot]
activeSubscriptions.remove(slot)
slotToSubscriptionId.remove(slot)
subscriptionId?.let { subscriptionIdToSlot.remove(it) }
val current = activeSubscriptions[subscriptionId]
if (current != null) {
val nextChannels = changes.channels?.map { channelToString(it) }?.toSet()
?: current.channels
val nextQueries = changes.queries?.toSet() ?: current.queries
activeSubscriptions[subscriptionId] = RealtimeCallback(
nextChannels,
nextQueries,
current.payloadClass,
current.callback
)
enqueuePendingSubscribeLocked(subscriptionId)
}
}
if (socket != null) {
sendPendingSubscribes()
} else {
createSocket()
}
}

val closeFn: () -> Unit = {
unsubscribeFn()
synchronized(subscriptionLock) {
if (activeSubscriptions.isEmpty()) {
reconnect = false
closeSocket()
} else if (socket != null) {
sendSubscribeMessage()
} else {
this@Realtime.createSocket()
}
}
}
}

// cleanUp is no longer needed - slots are removed directly in subscribe().close()
// Channels are automatically rebuilt from remaining slots in createSocket()
return RealtimeSubscription(
unsubscribe = unsubscribeFn,
update = updateFn,
close = closeFn
)
}

private inner class {{ spec.title | caseUcfirst }}WebSocketListener(
private val generation: Int
Expand Down Expand Up @@ -289,51 +330,18 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
}

private fun handleResponseConnected(message: RealtimeResponse) {
val messageData = message.data?.jsonCast<Map<String, Any>>() ?: return
val subscriptions = messageData["subscriptions"] as? Map<*, *>

if (subscriptions != null) {
// Store subscription ID mappings from backend.
// Try direct slot first, then slot+1 for zero-based server maps.
synchronized(subscriptionLock) {
subscriptions.forEach { (slotStr, subscriptionId) ->
val slot = (slotStr as? String)?.toIntOrNull()
if (slot != null && subscriptionId is String) {
val targetSlot = when {
activeSubscriptions.containsKey(slot) -> slot
activeSubscriptions.containsKey(slot + 1) -> slot + 1
else -> slot
}
slotToSubscriptionId[targetSlot] = subscriptionId
subscriptionIdToSlot[subscriptionId] = targetSlot
}
}
}
}
if (message.data == null) return

sendSubscribeMessage()
synchronized(subscriptionLock) {
activeSubscriptions.keys.forEach { enqueuePendingSubscribeLocked(it) }
}
sendPendingSubscribes()
}

private fun handleResponseAction(message: RealtimeResponse) {
val actionData = message.data?.jsonCast<Map<String, Any>>() ?: return
if (actionData["to"] != "subscribe") {
return
}
val subscriptions = actionData["subscriptions"] as? List<*> ?: return

synchronized(subscriptionLock) {
if (pendingSubscribeSlotsQueue.isEmpty()) {
return
}
val pendingSlots = pendingSubscribeSlotsQueue.removeFirst()
subscriptions.forEachIndexed { index, item ->
val slot = pendingSlots.getOrNull(index) ?: return@forEachIndexed
val row = item as? Map<*, *> ?: return@forEachIndexed
val subscriptionId = row["subscriptionId"] as? String ?: return@forEachIndexed
slotToSubscriptionId[slot] = subscriptionId
subscriptionIdToSlot[subscriptionId] = slot
}
}
// The SDK generates subscriptionIds client-side and sends them on every
// subscribe/unsubscribe, so subscribe/unsubscribe acks carry no state
// the SDK needs to reconcile.
}

private fun handleResponseError(message: RealtimeResponse) {
Expand All @@ -356,19 +364,12 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
return
}

// Use backend-provided subscription IDs for O(1) dispatch
subscriptions.forEach { subscriptionId ->
// O(1) lookup using subscriptionId
val slot = subscriptionIdToSlot[subscriptionId]
if (slot != null) {
val subscription = activeSubscriptions[slot]
if (subscription != null) {
val typedEvent = event.copy(
payload = rawPayload.jsonCast(subscription.payloadClass)
)
subscription.callback(typedEvent)
}
}
val subscription = activeSubscriptions[subscriptionId] ?: return@forEach
val typedEvent = event.copy(
payload = rawPayload.jsonCast(subscription.payloadClass)
)
subscription.callback(typedEvent)
}
}

Expand Down
Loading
Loading