Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,134 @@ class SubscribeIntegrationTests : BaseIntegrationTest() {
}
}

@Test
fun whenSubscribingToAlreadySubscribedChannelShouldNotResubscribeButShouldEmitSubscriptionChangedStatus() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a pretty long name. What about shouldEmitSubscriptionChangedWhenAlreadySubscribed or something similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldEmitSubscriptionChangedWhenAlreadySubscribed misses important part that we want to make sure that there is no resubscribe.
Name is pretty long but I think it serve the purpose of properly describing what the test does.
The test is complex and IMHO it is good that name describe its behaviour.

What do you think?

val channelName = randomChannel()
val channelName02 = randomChannel()
val expectedMessage = "test_${randomValue()}"

val connectedLatch = CountDownLatch(1)
val subscriptionChangedLatch = CountDownLatch(1)
val messagesLatch = CountDownLatch(2)

val connectedEventCount = AtomicInteger(0)
val subscriptionChangedCount = AtomicInteger(0)
val subscribeHttpRequestCount = AtomicInteger(0)

// Track messages received by each listener
val listenerAMessageCount = AtomicInteger(0)
val listenerBMessageCount = AtomicInteger(0)

// Custom logger to count HTTP subscribe requests
val customLogger = object : CustomLogger {
override fun debug(logMessage: LogMessage) {
if (logMessage.type == LogMessageType.NETWORK_REQUEST) {
val networkRequestDetails = logMessage.message as LogMessageContent.NetworkRequest
if (networkRequestDetails.path.contains("/v2/subscribe/")) {
subscribeHttpRequestCount.incrementAndGet()
}
}
}
}

clientConfig = {
customLoggers = listOf(customLogger)
}

pubnub.addListener(
object : StatusListener {
override fun status(
pubnub: PubNub,
status: PNStatus,
) {
when (status.category) {
PNStatusCategory.PNConnectedCategory -> {
connectedEventCount.incrementAndGet()
connectedLatch.countDown()
}

PNStatusCategory.PNSubscriptionChanged -> {
subscriptionChangedCount.incrementAndGet()
subscriptionChangedLatch.countDown()
}

else -> {}
}
}
}
)

// First subscription
val subscriptionSet1 = pubnub.subscriptionSetOf(setOf(channelName, channelName02))
subscriptionSet1.addListener(
object : EventListener {
override fun message(
pubnub: PubNub,
result: PNMessageResult,
) {
listenerAMessageCount.incrementAndGet()
messagesLatch.countDown()
}
}
)

// Second subscription (SAME channel, different listener)
val subscriptionSet2 = pubnub.subscriptionSetOf(setOf(channelName))
subscriptionSet2.addListener(
object : EventListener {
override fun message(
pubnub: PubNub,
result: PNMessageResult,
) {
listenerBMessageCount.incrementAndGet()
messagesLatch.countDown()
}
}
)

try {
subscriptionSet1.subscribe()
assertTrue("Failed to receive PNConnectedCategory", connectedLatch.await(5, TimeUnit.SECONDS))

subscriptionSet2.subscribe()
assertTrue("Failed to receive PNSubscriptionChanged", subscriptionChangedLatch.await(5, TimeUnit.SECONDS))

// Give a moment for any potential resubscribe to occur
Thread.sleep(500)

// Publish a message - both listeners should receive it
pubnub.publish(channelName, expectedMessage).sync()
assertTrue("Failed to receive messages on both listeners", messagesLatch.await(5, TimeUnit.SECONDS))

// Verify both listeners received the message (this works regardless of the bug)
assertEquals("ListenerA should receive message", 1, listenerAMessageCount.get())
assertEquals("ListenerB should receive message", 1, listenerBMessageCount.get())

assertEquals(
"Should emit PNSubscriptionChanged status but not resubscribe when channels unchanged",
1, // Status emitted but no actual resubscribe (no cancel/new receive)
subscriptionChangedCount.get()
)

assertEquals(
"Should have exactly 1 PNConnectedCategory (initial handshake only)",
1,
connectedEventCount.get()
)

// Should only have 2 HTTP "/subscribe" requests: handshake + subscribe
assertEquals(
"Should have exactly 2 HTTP subscribe requests (handshake + subscribe, NO resubscribe)",
2,
subscribeHttpRequestCount.get()
)
} finally {
// Ensure cleanup happens even if assertions fail
subscriptionSet1.close()
subscriptionSet2.close()
}
}

@Test
fun shouldDeduplicateChannelSubscriptionsWhenSubscribingToListOfTheSameChannels() {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ internal interface State<Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>

internal fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.noTransition(): Pair<S, Set<Ei>> = Pair(this, emptySet())

internal fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.noTransitionWithEffects(
vararg invocations: Ei,
): Pair<S, Set<Ei>> = Pair(this, invocations.toSet())

internal fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.transitionTo(
state: S,
vararg invocations: Ei,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.pubnub.api.enums.PNStatusCategory
import com.pubnub.api.models.consumer.PNStatus
import com.pubnub.internal.eventengine.State
import com.pubnub.internal.eventengine.noTransition
import com.pubnub.internal.eventengine.noTransitionWithEffects
import com.pubnub.internal.eventengine.transitionTo
import com.pubnub.internal.subscribe.eventengine.effect.SubscribeEffectInvocation
import com.pubnub.internal.subscribe.eventengine.event.SubscribeEvent
Expand Down Expand Up @@ -227,17 +228,32 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
}

is SubscribeEvent.SubscriptionChanged -> {
transitionTo(
Receiving(event.channels, event.channelGroups, subscriptionCursor),
SubscribeEffectInvocation.EmitStatus(
PNStatus(
PNStatusCategory.PNSubscriptionChanged,
currentTimetoken = subscriptionCursor.timetoken,
affectedChannels = event.channels,
affectedChannelGroups = event.channelGroups,
// If channels and channelGroups haven't changed, emit status without resubscribing
if (event.channels == channels && event.channelGroups == channelGroups) {
noTransitionWithEffects(
SubscribeEffectInvocation.EmitStatus(
PNStatus(
PNStatusCategory.PNSubscriptionChanged,
currentTimetoken = subscriptionCursor.timetoken,
affectedChannels = event.channels,
affectedChannelGroups = event.channelGroups,
),
),
),
)
)
} else {
// Channels changed, need to resubscribe
transitionTo(
Receiving(event.channels, event.channelGroups, subscriptionCursor),
SubscribeEffectInvocation.EmitStatus(
PNStatus(
PNStatusCategory.PNSubscriptionChanged,
currentTimetoken = subscriptionCursor.timetoken,
affectedChannels = event.channels,
affectedChannelGroups = event.channelGroups,
),
),
)
}
}

is SubscribeEvent.SubscriptionRestored -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,44 @@ class TransitionFromReceivingStateTest {
}

@Test
fun can_transit_from_RECEIVING_to_RECEIVING_when_there_is_SUBSCRIPTION_CHANGED_event() {
fun can_transit_from_RECEIVING_to_RECEIVING_when_there_is_SUBSCRIPTION_CHANGED_event_with_different_channels_and_groups() {
// given
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d avoid adding comments like // given, // when, or // then. Instead, add comments only when they clarify something that is not obvious

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// given, // when, // then is convention that exist in many test in this SDK.
I would leave them for now.
Personally I find // given, // when, // then useful when reading tests.

val newChannels = setOf("Channel2", "Channel3")
val newChannelGroups = setOf("ChannelGroup2")

// when
val (state, invocations) =
transition(
SubscribeState.Receiving(channels, channelGroups, subscriptionCursor),
SubscribeEvent.SubscriptionChanged(newChannels, newChannelGroups),
)

// then
Assertions.assertTrue(state is SubscribeState.Receiving)
state as SubscribeState.Receiving

assertEquals(newChannels, state.channels)
assertEquals(newChannelGroups, state.channelGroups)
assertEquals(subscriptionCursor, state.subscriptionCursor)
assertEquals(
setOf(
SubscribeEffectInvocation.CancelReceiveMessages,
SubscribeEffectInvocation.EmitStatus(
createSubscriptionChangedStatus(
state.subscriptionCursor,
newChannels,
newChannelGroups,
),
),
SubscribeEffectInvocation.ReceiveMessages(newChannels, newChannelGroups, subscriptionCursor),
),
invocations,
)
}

@Test
fun stays_in_RECEIVING_and_emits_status_without_resubscribing_when_SUBSCRIPTION_CHANGED_event_has_same_channels_and_groups() {
// given - channels and channelGroups are the same
// when
val (state, invocations) =
transition(
Expand All @@ -100,17 +136,16 @@ class TransitionFromReceivingStateTest {
assertEquals(channels, state.channels)
assertEquals(channelGroups, state.channelGroups)
assertEquals(subscriptionCursor, state.subscriptionCursor)
// Should only emit status - no cancel or new receive
assertEquals(
setOf(
SubscribeEffectInvocation.CancelReceiveMessages,
SubscribeEffectInvocation.EmitStatus(
createSubscriptionChangedStatus(
state.subscriptionCursor,
channels,
channelGroups,
),
),
SubscribeEffectInvocation.ReceiveMessages(channels, channelGroups, subscriptionCursor),
),
invocations,
)
Expand Down
Loading