diff --git a/package.json b/package.json index 61b4a5378c..5e70e8e17f 100755 --- a/package.json +++ b/package.json @@ -117,6 +117,7 @@ "@types/cli": "0.11.21", "@types/mocha": "10.0.0", "@types/pino": "^7.0.5", + "@types/primus": "^7.3.6", "@typescript-eslint/eslint-plugin": "5.47.0", "@typescript-eslint/parser": "5.47.0", "concurrently": "7.1.0", diff --git a/packages/client-core/src/common/services/LocationInstanceConnectionService.ts b/packages/client-core/src/common/services/LocationInstanceConnectionService.ts index fa4693ae90..eab64a5227 100755 --- a/packages/client-core/src/common/services/LocationInstanceConnectionService.ts +++ b/packages/client-core/src/common/services/LocationInstanceConnectionService.ts @@ -1,6 +1,6 @@ import { Paginated } from '@feathersjs/feathers' -import { none, State } from '@hookstate/core' -import React, { useEffect } from 'react' +import { none } from '@hookstate/core' +import { useEffect } from 'react' import { Instance } from '@etherealengine/common/src/interfaces/Instance' import { UserId } from '@etherealengine/common/src/interfaces/UserId' @@ -25,8 +25,12 @@ import { import { API } from '../../API' import { leaveNetwork } from '../../transports/SocketWebRTCClientFunctions' -import { SocketWebRTCClientNetwork } from '../../transports/SocketWebRTCClientNetwork' -import { accessAuthState } from '../../user/services/AuthService' +import { + connectToNetwork, + initializeNetwork, + SocketWebRTCClientNetwork +} from '../../transports/SocketWebRTCClientFunctions' +import { AuthState } from '../../user/services/AuthService' import { NetworkConnectionService } from './NetworkConnectionService' type InstanceState = { @@ -60,7 +64,7 @@ export const LocationInstanceConnectionServiceReceptor = (action) => { matches(action) .when(LocationInstanceConnectionAction.serverProvisioned.matches, (action) => { getMutableState(NetworkState).hostIds.world.set(action.instanceId) - addNetwork(new SocketWebRTCClientNetwork(action.instanceId, NetworkTopics.world)) + addNetwork(initializeNetwork(action.instanceId, NetworkTopics.world)) return s.instances.merge({ [action.instanceId]: { ipAddress: action.ipAddress, @@ -113,7 +117,7 @@ export const LocationInstanceConnectionService = { createPrivateRoom?: boolean ) => { logger.info({ locationId, instanceId, sceneId }, 'Provision World Server') - const token = accessAuthState().authUser.accessToken.value + const token = getState(AuthState).authUser.accessToken if (instanceId != null) { const instance = (await API.instance.client.service('instance').find({ query: { @@ -152,7 +156,7 @@ export const LocationInstanceConnectionService = { }, provisionExistingServer: async (locationId: string, instanceId: string, sceneId: string) => { logger.info({ locationId, instanceId, sceneId }, 'Provision Existing World Server') - const token = accessAuthState().authUser.accessToken.value + const token = getState(AuthState).authUser.accessToken const instance = (await API.instance.client.service('instance').find({ query: { id: instanceId, @@ -194,7 +198,7 @@ export const LocationInstanceConnectionService = { }, provisionExistingServerByRoomCode: async (locationId: string, roomCode: string, sceneId: string) => { logger.info({ locationId, roomCode, sceneId }, 'Provision Existing World Server') - const token = accessAuthState().authUser.accessToken.value + const token = getState(AuthState).authUser.accessToken const instance = (await API.instance.client.service('instance').find({ query: { roomCode, @@ -242,9 +246,8 @@ export const LocationInstanceConnectionService = { if (network.primus) { leaveNetwork(network, false) } - const { ipAddress, port, locationId, roomCode } = - accessLocationInstanceConnectionState().instances.value[instanceId] - await network.initialize({ port, ipAddress, locationId, roomCode }) + const { ipAddress, port, locationId, roomCode } = getState(LocationInstanceState).instances[instanceId] + await connectToNetwork(network, { port, ipAddress, locationId, roomCode }) }, useAPIListeners: () => { useEffect(() => { diff --git a/packages/client-core/src/common/services/MediaInstanceConnectionService.ts b/packages/client-core/src/common/services/MediaInstanceConnectionService.ts index f5f37a1619..541a2cff80 100755 --- a/packages/client-core/src/common/services/MediaInstanceConnectionService.ts +++ b/packages/client-core/src/common/services/MediaInstanceConnectionService.ts @@ -19,10 +19,14 @@ import { import { API } from '../../API' import { accessChatState } from '../../social/services/ChatService' -import { accessLocationState } from '../../social/services/LocationService' +import { accessLocationState, LocationState } from '../../social/services/LocationService' import { endVideoChat, leaveNetwork } from '../../transports/SocketWebRTCClientFunctions' -import { SocketWebRTCClientNetwork } from '../../transports/SocketWebRTCClientNetwork' -import { accessAuthState } from '../../user/services/AuthService' +import { + connectToNetwork, + initializeNetwork, + SocketWebRTCClientNetwork +} from '../../transports/SocketWebRTCClientFunctions' +import { AuthState } from '../../user/services/AuthService' import { NetworkConnectionService } from './NetworkConnectionService' const logger = multiLogger.child({ component: 'client-core:service:media-instance' }) @@ -60,7 +64,7 @@ export const MediaInstanceConnectionServiceReceptor = (action) => { matches(action) .when(MediaInstanceConnectionAction.serverProvisioned.matches, (action) => { getMutableState(NetworkState).hostIds.media.set(action.instanceId) - addNetwork(new SocketWebRTCClientNetwork(action.instanceId, NetworkTopics.media)) + addNetwork(initializeNetwork(action.instanceId, NetworkTopics.media)) return s.instances[action.instanceId].set({ ipAddress: action.ipAddress, port: action.port, @@ -115,7 +119,7 @@ export const useMediaInstanceConnectionState = () => useState(accessMediaInstanc export const MediaInstanceConnectionService = { provisionServer: async (channelId?: string, createPrivateRoom = false) => { logger.info(`Provision Media Server, channelId: "${channelId}".`) - const token = accessAuthState().authUser.accessToken.value + const token = getState(AuthState).authUser.accessToken const provisionResult = await API.instance.client.service('instance-provision').find({ query: { channelId, @@ -142,9 +146,9 @@ export const MediaInstanceConnectionService = { }, connectToServer: async (instanceId: string, channelId: string) => { dispatchAction(MediaInstanceConnectionAction.serverConnecting({ instanceId })) - const authState = accessAuthState() - const user = authState.user.value - const { ipAddress, port } = accessMediaInstanceConnectionState().instances.value[instanceId] + const authState = getState(AuthState) + const user = authState.user + const { ipAddress, port } = getState(MediaInstanceState).instances[instanceId] const network = Engine.instance.mediaNetwork as SocketWebRTCClientNetwork logger.info({ primus: !!network.primus, network }, 'Connect To Media Server.') @@ -153,23 +157,22 @@ export const MediaInstanceConnectionService = { leaveNetwork(network, false) } - const locationState = accessLocationState() + const locationState = getState(LocationState) const currentLocation = locationState.currentLocation.location dispatchAction( MediaInstanceConnectionAction.enableVideo({ instanceId, enableVideo: - currentLocation?.locationSetting?.videoEnabled?.value === true || + currentLocation?.locationSetting?.videoEnabled === true || !( - currentLocation?.locationSetting?.locationType?.value === 'showroom' && - user.locationAdmins?.find((locationAdmin) => locationAdmin.locationId === currentLocation?.id?.value) == - null + currentLocation?.locationSetting?.locationType === 'showroom' && + user.locationAdmins?.find((locationAdmin) => locationAdmin.locationId === currentLocation?.id) == null ) }) ) - await network.initialize({ port, ipAddress, channelId }) + await connectToNetwork(network, { port, ipAddress, channelId }) }, resetServer: (instanceId: string) => { dispatchAction(MediaInstanceConnectionAction.disconnect({ instanceId })) diff --git a/packages/client-core/src/components/UserMediaWindow/index.tsx b/packages/client-core/src/components/UserMediaWindow/index.tsx index 3825ecab05..280efec74d 100755 --- a/packages/client-core/src/components/UserMediaWindow/index.tsx +++ b/packages/client-core/src/components/UserMediaWindow/index.tsx @@ -10,6 +10,7 @@ import { globalMuteProducer, globalUnmuteProducer, pauseConsumer, + ProducerExtension, resumeConsumer, toggleMicrophonePaused, toggleScreenshareAudioPaused, @@ -38,7 +39,7 @@ import Tooltip from '@etherealengine/ui/src/Tooltip' import { useMediaInstance } from '../../common/services/MediaInstanceConnectionService' import { MediaStreamState } from '../../transports/MediaStreams' import { PeerMediaChannelState, PeerMediaStreamInterface } from '../../transports/PeerMediaChannelState' -import { ConsumerExtension, SocketWebRTCClientNetwork } from '../../transports/SocketWebRTCClientNetwork' +import { ConsumerExtension, SocketWebRTCClientNetwork } from '../../transports/SocketWebRTCClientFunctions' import Draggable from './Draggable' import styles from './index.module.scss' @@ -163,7 +164,7 @@ export const useUserMediaWindowHook = ({ peerID, type }: Props) => { audioTrackClones.forEach((track) => track.stop()) if (harkListener) (harkListener as any).stop() } - }, [audioTrackId, harkListener]) + }, [audioTrackId]) useEffect(() => { videoElement.id = `${peerID}_video` diff --git a/packages/client-core/src/components/World/InstanceServerWarnings.tsx b/packages/client-core/src/components/World/InstanceServerWarnings.tsx index fda0db99bf..6d5b5c329c 100644 --- a/packages/client-core/src/components/World/InstanceServerWarnings.tsx +++ b/packages/client-core/src/components/World/InstanceServerWarnings.tsx @@ -8,7 +8,7 @@ import { import { MediaInstanceConnectionService } from '@etherealengine/client-core/src/common/services/MediaInstanceConnectionService' import { ChatService, useChatState } from '@etherealengine/client-core/src/social/services/ChatService' import { useLocationState } from '@etherealengine/client-core/src/social/services/LocationService' -import { SocketWebRTCClientNetwork } from '@etherealengine/client-core/src/transports/SocketWebRTCClientNetwork' +import { SocketWebRTCClientNetwork } from '@etherealengine/client-core/src/transports/SocketWebRTCClientFunctions' import { matches } from '@etherealengine/engine/src/common/functions/MatchesUtils' import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine' import { useEngineState } from '@etherealengine/engine/src/ecs/classes/EngineState' diff --git a/packages/client-core/src/components/World/LoadEngineWithScene.tsx b/packages/client-core/src/components/World/LoadEngineWithScene.tsx index c1e13f96c2..3fff2c06e8 100755 --- a/packages/client-core/src/components/World/LoadEngineWithScene.tsx +++ b/packages/client-core/src/components/World/LoadEngineWithScene.tsx @@ -27,7 +27,7 @@ import { AppLoadingAction, AppLoadingStates, useLoadingState } from '../../commo import { NotificationService } from '../../common/services/NotificationService' import { useRouter } from '../../common/services/RouterService' import { useLocationState } from '../../social/services/LocationService' -import { SocketWebRTCClientNetwork } from '../../transports/SocketWebRTCClientNetwork' +import { SocketWebRTCClientNetwork } from '../../transports/SocketWebRTCClientFunctions' import { initClient, loadScene } from './LocationLoadHelper' const logger = multiLogger.child({ component: 'client-core:world' }) diff --git a/packages/client-core/src/components/World/OfflineLocation.tsx b/packages/client-core/src/components/World/OfflineLocation.tsx index 715cd8bbfd..8280b16280 100644 --- a/packages/client-core/src/components/World/OfflineLocation.tsx +++ b/packages/client-core/src/components/World/OfflineLocation.tsx @@ -4,7 +4,7 @@ import { useAuthState } from '@etherealengine/client-core/src/user/services/Auth import { PeerID } from '@etherealengine/common/src/interfaces/PeerID' import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine' import { getEngineState } from '@etherealengine/engine/src/ecs/classes/EngineState' -import { Network, NetworkTopics } from '@etherealengine/engine/src/networking/classes/Network' +import { createNetwork, Network, NetworkTopics } from '@etherealengine/engine/src/networking/classes/Network' import { NetworkPeerFunctions } from '@etherealengine/engine/src/networking/functions/NetworkPeerFunctions' import { receiveJoinWorld } from '@etherealengine/engine/src/networking/functions/receiveJoinWorld' import { addNetwork, NetworkState } from '@etherealengine/engine/src/networking/NetworkState' @@ -26,7 +26,7 @@ export const OfflineLocation = () => { const networkState = getMutableState(NetworkState) networkState.hostIds.world.set(userId) - addNetwork(new Network(userId, NetworkTopics.world)) + addNetwork(createNetwork(userId, NetworkTopics.world)) addOutgoingTopicIfNecessary(NetworkTopics.world) NetworkPeerFunctions.createPeer( diff --git a/packages/client-core/src/media/PeerMedia.tsx b/packages/client-core/src/media/PeerMedia.tsx index ad83ffec44..61740fb495 100644 --- a/packages/client-core/src/media/PeerMedia.tsx +++ b/packages/client-core/src/media/PeerMedia.tsx @@ -17,7 +17,7 @@ import { ConsumerExtension, ProducerExtension, SocketWebRTCClientNetwork -} from '../transports/SocketWebRTCClientNetwork' +} from '../transports/SocketWebRTCClientFunctions' import { AuthState } from '../user/services/AuthService' import { NetworkUserState } from '../user/services/NetworkUserService' import { MediaState } from './services/MediaStreamService' diff --git a/packages/client-core/src/media/services/MediaStreamService.ts b/packages/client-core/src/media/services/MediaStreamService.ts index 75b0c6ee3b..aae33c41e4 100755 --- a/packages/client-core/src/media/services/MediaStreamService.ts +++ b/packages/client-core/src/media/services/MediaStreamService.ts @@ -7,7 +7,7 @@ import { getNearbyUsers } from '@etherealengine/engine/src/networking/functions/ import { defineAction, defineState, dispatchAction, getMutableState, useState } from '@etherealengine/hyperflux' import { MediaStreamState } from '../../transports/MediaStreams' -import { ConsumerExtension, SocketWebRTCClientNetwork } from '../../transports/SocketWebRTCClientNetwork' +import { ConsumerExtension, SocketWebRTCClientNetwork } from '../../transports/SocketWebRTCClientFunctions' import { accessNetworkUserState } from '../../user/services/NetworkUserService' //State diff --git a/packages/client-core/src/social/services/LocationService.ts b/packages/client-core/src/social/services/LocationService.ts index 03095e9c64..85b47d621a 100755 --- a/packages/client-core/src/social/services/LocationService.ts +++ b/packages/client-core/src/social/services/LocationService.ts @@ -9,7 +9,7 @@ import { API } from '../../API' import { NotificationService } from '../../common/services/NotificationService' //State -const LocationState = defineState({ +export const LocationState = defineState({ name: 'LocationState', initial: () => ({ locationName: null! as string, diff --git a/packages/client-core/src/social/services/PartyService.ts b/packages/client-core/src/social/services/PartyService.ts index a1806e8b7c..a1851a3139 100755 --- a/packages/client-core/src/social/services/PartyService.ts +++ b/packages/client-core/src/social/services/PartyService.ts @@ -18,7 +18,7 @@ import { } from '../../common/services/MediaInstanceConnectionService' import { NotificationService } from '../../common/services/NotificationService' import { endVideoChat, leaveNetwork } from '../../transports/SocketWebRTCClientFunctions' -import { SocketWebRTCClientNetwork } from '../../transports/SocketWebRTCClientNetwork' +import { SocketWebRTCClientNetwork } from '../../transports/SocketWebRTCClientFunctions' import { accessAuthState } from '../../user/services/AuthService' import { NetworkUserAction, NetworkUserService } from '../../user/services/NetworkUserService' import { accessChatState, ChatAction, ChatService } from './ChatService' diff --git a/packages/client-core/src/transports/MediaStreams.ts b/packages/client-core/src/transports/MediaStreams.ts index 3bc0d95d8f..b0606490f1 100755 --- a/packages/client-core/src/transports/MediaStreams.ts +++ b/packages/client-core/src/transports/MediaStreams.ts @@ -8,7 +8,7 @@ import { } from '@etherealengine/engine/src/networking/constants/VideoConstants' import { defineAction, defineState, getMutableState } from '@etherealengine/hyperflux' -import { ProducerExtension } from './SocketWebRTCClientNetwork' +import { ProducerExtension } from './SocketWebRTCClientFunctions' const logger = multiLogger.child({ component: 'client-core:MediaStreams' }) diff --git a/packages/client-core/src/transports/PeerMediaChannelState.ts b/packages/client-core/src/transports/PeerMediaChannelState.ts index 9dbb9401d9..7e1dba4a77 100644 --- a/packages/client-core/src/transports/PeerMediaChannelState.ts +++ b/packages/client-core/src/transports/PeerMediaChannelState.ts @@ -1,7 +1,7 @@ import { PeerID } from '@etherealengine/common/src/interfaces/PeerID' import { defineState, getMutableState, none } from '@etherealengine/hyperflux' -import { ConsumerExtension, ProducerExtension } from './SocketWebRTCClientNetwork' +import { ConsumerExtension, ProducerExtension } from './SocketWebRTCClientFunctions' export interface PeerMediaStreamInterface { videoStream: ProducerExtension | ConsumerExtension | null @@ -62,3 +62,8 @@ export const removePeerMediaChannels = (peerID: PeerID) => { const state = getMutableState(PeerMediaChannelState) state[peerID].set(none) } + +export const clearPeerMediaChannels = () => { + console.log('clearPeerMediaChannels') + getMutableState(PeerMediaChannelState).set({}) +} diff --git a/packages/client-core/src/transports/SocketWebRTCClientFunctions.ts b/packages/client-core/src/transports/SocketWebRTCClientFunctions.ts index 80cbbdca32..b30b0783e8 100755 --- a/packages/client-core/src/transports/SocketWebRTCClientFunctions.ts +++ b/packages/client-core/src/transports/SocketWebRTCClientFunctions.ts @@ -1,4 +1,7 @@ +import * as mediasoupClient from 'mediasoup-client' import { + Consumer, + DataProducer, DtlsParameters, MediaKind, Transport as MediaSoupTransport, @@ -6,11 +9,13 @@ import { RtpParameters, SctpStreamParameters } from 'mediasoup-client/lib/types' +import type { EventEmitter } from 'primus' +import Primus from 'primus-client' import { v4 as uuidv4 } from 'uuid' import config from '@etherealengine/common/src/config' import { AuthTask } from '@etherealengine/common/src/interfaces/AuthTask' -import { ChannelType } from '@etherealengine/common/src/interfaces/Channel' +import { Channel, ChannelType } from '@etherealengine/common/src/interfaces/Channel' import { MediaStreamAppData, MediaTagType } from '@etherealengine/common/src/interfaces/MediaStreamConstants' import { PeerID, PeersUpdateType } from '@etherealengine/common/src/interfaces/PeerID' import { UserId } from '@etherealengine/common/src/interfaces/UserId' @@ -18,8 +23,8 @@ import multiLogger from '@etherealengine/common/src/logger' import { getSearchParamFromURL } from '@etherealengine/common/src/utils/getSearchParamFromURL' import { matches } from '@etherealengine/engine/src/common/functions/MatchesUtils' import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine' -import { EngineActions } from '@etherealengine/engine/src/ecs/classes/EngineState' -import { NetworkTopics } from '@etherealengine/engine/src/networking/classes/Network' +import { EngineActions, EngineState } from '@etherealengine/engine/src/ecs/classes/EngineState' +import { createNetwork, NetworkTopics, TransportInterface } from '@etherealengine/engine/src/networking/classes/Network' import { PUBLIC_STUN_SERVERS } from '@etherealengine/engine/src/networking/constants/STUNServers' import { CAM_VIDEO_SIMULCAST_ENCODINGS, @@ -37,17 +42,22 @@ import { addActionReceptor, dispatchAction, getMutableState, + getState, none, removeActionReceptor, removeActionsForTopic } from '@etherealengine/hyperflux' -import { Action } from '@etherealengine/hyperflux/functions/ActionFunctions' +import { Action, Topic } from '@etherealengine/hyperflux/functions/ActionFunctions' -import { LocationInstanceConnectionAction } from '../common/services/LocationInstanceConnectionService' import { - accessMediaInstanceConnectionState, + LocationInstanceConnectionAction, + LocationInstanceConnectionService, + LocationInstanceState +} from '../common/services/LocationInstanceConnectionService' +import { MediaInstanceConnectionAction, - MediaInstanceConnectionService + MediaInstanceConnectionService, + MediaInstanceState } from '../common/services/MediaInstanceConnectionService' import { NetworkConnectionService } from '../common/services/NetworkConnectionService' import { MediaState, MediaStreamAction, MediaStreamService } from '../media/services/MediaStreamService' @@ -58,21 +68,300 @@ import { stopLipsyncTracking } from '../media/webcam/WebcamInput' import { ChatState } from '../social/services/ChatService' -import { accessAuthState } from '../user/services/AuthService' +import { LocationState } from '../social/services/LocationService' +import { AuthState } from '../user/services/AuthService' import { MediaStreamService as _MediaStreamService, MediaStreamActions, MediaStreamState } from './MediaStreams' -import { ConsumerExtension, ProducerExtension, SocketWebRTCClientNetwork } from './SocketWebRTCClientNetwork' +import { clearPeerMediaChannels } from './PeerMediaChannelState' import { updateNearbyAvatars } from './UpdateNearbyUsersSystem' const logger = multiLogger.child({ component: 'client-core:SocketWebRTCClientFunctions' }) +export type WebRTCTransportExtension = Omit & { appData: MediaStreamAppData } +export type ProducerExtension = Omit & { appData: MediaStreamAppData } +export type ConsumerExtension = Omit & { appData: MediaStreamAppData; producerPaused: boolean } + +let id = 0 + +// import { encode, decode } from 'msgpackr' + +// Adds support for Promise to Primus client +// Each 'data' listener function needs to be named something unique in order for removeListener to +// not remove all 'data' listener functions +export const promisedRequest = (network: SocketWebRTCClientNetwork, type: any, data = {}) => { + return new Promise((resolve) => { + const responseFunction = (data) => { + if (data.type.toString() === message.type.toString() && message.id === data.id) { + resolve(data.data) + network.primus.removeListener('data', responseFunction) + } + } + Object.defineProperty(responseFunction, 'name', { value: `responseFunction${id}`, writable: true }) + const message = { + type: type, + data: data, + id: id++ + } + network.primus.write(message) + + network.primus.on('data', responseFunction) + }) +} + +const handleFailedConnection = (locationConnectionFailed) => { + if (locationConnectionFailed) { + const currentLocation = getMutableState(LocationState).currentLocation.location + const locationInstanceConnectionState = getMutableState(LocationInstanceState) + const instanceId = getState(NetworkState).hostIds.world ?? '' + if (!locationInstanceConnectionState.instances[instanceId]?.connected?.value) { + dispatchAction(LocationInstanceConnectionAction.disconnect({ instanceId })) + LocationInstanceConnectionService.provisionServer( + currentLocation.id.value, + instanceId || undefined, + currentLocation.sceneId.value + ) + } + } else { + const mediaInstanceConnectionState = getMutableState(MediaInstanceState) + const instanceId = getState(NetworkState).hostIds.media ?? '' + if (!mediaInstanceConnectionState.instances[instanceId]?.connected?.value) { + dispatchAction(MediaInstanceConnectionAction.disconnect({ instanceId })) + const authState = getMutableState(AuthState) + const selfUser = authState.user + const chatState = getMutableState(ChatState) + const channelState = chatState.channels + const channels = channelState.channels.value as Channel[] + const channelEntries = Object.values(channels).filter((channel) => !!channel) as any + const instanceChannel = channelEntries.find((entry) => entry.instanceId === Engine.instance.worldNetwork?.hostId) + if (instanceChannel) { + MediaInstanceConnectionService.provisionServer(instanceChannel?.id!, true) + } else { + const partyChannel = Object.values(chatState.channels.channels.value).find( + (channel) => channel.channelType === 'party' && channel.partyId === selfUser.partyId.value + ) + MediaInstanceConnectionService.provisionServer(partyChannel?.id!, false) + } + } + } + return +} + +// close() { +// } + +export const closeNetwork = (network: SocketWebRTCClientNetwork) => { + logger.info('SocketWebRTCClientNetwork close') + network.recvTransport?.close() + network.sendTransport?.close() + network.recvTransport = null! + network.sendTransport = null! + network.heartbeat && clearInterval(network.heartbeat) + network.primus?.removeAllListeners() + network.primus?.end() + network.primus = null! +} + +export const initializeNetwork = (hostId: UserId, topic: Topic) => { + const mediasoupDevice = new mediasoupClient.Device( + getMutableState(EngineState).isBot.value ? { handlerName: 'Chrome74' } : undefined + ) + + const transport = { + get peers() { + return network.primus ? [network.hostPeerID] : [] + }, + + messageToPeer: (peerId: PeerID, data: any) => { + network.primus?.write(data) + }, + + messageToAll: (data: any) => { + network.primus?.write(data) + }, + + bufferToPeer: (peerID: PeerID, data: any) => { + transport.bufferToAll(data) + }, + + bufferToAll: (data: any) => { + if (network.dataProducer && !network.dataProducer.closed && network.dataProducer.readyState === 'open') + network.dataProducer.send(data) + } + } as TransportInterface + + const network = { + ...createNetwork(hostId, topic), + mediasoupDevice, + transport, + reconnecting: false, + recvTransport: null! as MediaSoupTransport, + sendTransport: null! as MediaSoupTransport, + primus: null! as Primus, + + dataProducer: null! as DataProducer, + heartbeat: null! as NodeJS.Timer, // is there an equivalent browser type for this? + + producers: [] as ProducerExtension[], + consumers: [] as ConsumerExtension[] + } + + return network +} + +export type SocketWebRTCClientNetwork = Awaited> + +export const connectToNetwork = async ( + network: SocketWebRTCClientNetwork, + args: { + ipAddress: string + port: string + locationId?: string | null + channelId?: string | null + roomCode?: string | null + } +) => { + const authState = getState(AuthState) + const token = authState.authUser.accessToken + + const { ipAddress, port, locationId, channelId, roomCode } = args + + const query = { + locationId, + channelId, + roomCode, + token + } as { + locationId?: string + channelId?: string + roomCode?: string + address?: string + port?: string + token: string + } + + if (locationId) delete query.channelId + if (channelId) delete query.locationId + if (!roomCode) delete query.roomCode + + let primus: Primus + + try { + if ( + config.client.localBuild === 'true' || + (config.client.appEnv === 'development' && config.client.localNginx !== 'true') + ) { + const queryString = new URLSearchParams(query).toString() + primus = new Primus(`https://${ipAddress as string}:${port.toString()}?${queryString}`) + } else { + query.address = ipAddress + query.port = port.toString() + const queryString = new URLSearchParams(query).toString() + primus = new Primus(`${config.client.instanceserverUrl}?${queryString}`) + } + } catch (err) { + logger.error(err) + return handleFailedConnection(locationId != null)! + } + + network.primus = primus + + const connectionFailTimeout = setTimeout(() => { + return handleFailedConnection(locationId != null) + }, 3000) + + await new Promise((resolve) => { + primus.on('incoming::open', (event) => { + clearTimeout(connectionFailTimeout) + if (network.reconnecting) { + network.reconnecting = false + network.primus._connected = false + return + } + + if (network.primus._connected) return + network.primus._connected = true + + logger.info('CONNECT to port %o', { port, locationId }) + onConnectToInstance(network) + + // Send heartbeat every second + network.heartbeat = setInterval(() => { + network.primus.write({ type: MessageTypes.Heartbeat.toString() }) + }, 1000) + resolve() + }) + }) +} + +type Primus = EventEmitter & { + buffer: any[] + disconnect: boolean + emitter: any //EventEmitter + offlineHandler: Function + online: boolean + onlineHandler: Function + options: { + pingTimeout: 45000 + queueSize: number + reconnect: any + strategy: string + timeout: number + transport: any + } + readable: boolean + readyState: number + recovery: any + socket: WebSocket + timers: any + transformers: { outgoing: Array; incoming: Array } + transport: any + url: URL + writable: boolean + _connected: boolean + _events: any + _eventsCount: number + + AVOID_WEBSOCKETS: false + NETWORK_EVENTS: Function + ark: any + authorization: false + client: Function + clone: Function + critical: Function + decoder: Function + destroy: Function + emits: Function + encoder: Function + end: Function + heartbeat: Function + id: Function + initialise: Function + merge: Function + open: Function + parse: Function + pathname: '/primus' + plugin: Function + protocol: Function + querystring: Function + querystringify: Function + reserved: Function + send: Function + timeout: Function + transform: Function + transforms: Function + uri: Function + version: '7.3.4' + write: Function + _write: Function +} + export const getChannelTypeIdFromTransport = (network: SocketWebRTCClientNetwork) => { - const channelConnectionState = accessMediaInstanceConnectionState() + const channelConnectionState = getState(MediaInstanceState) const mediaNetwork = Engine.instance.mediaNetwork - const currentChannelInstanceConnection = mediaNetwork && channelConnectionState.instances[mediaNetwork.hostId].ornull + const currentChannelInstanceConnection = mediaNetwork && channelConnectionState.instances[mediaNetwork.hostId] const isWorldConnection = network.topic === NetworkTopics.world return { - channelType: isWorldConnection ? 'instance' : currentChannelInstanceConnection?.channelType.value, - channelId: isWorldConnection ? null : currentChannelInstanceConnection?.channelId.value + channelType: isWorldConnection ? 'instance' : currentChannelInstanceConnection?.channelType, + channelId: isWorldConnection ? null : currentChannelInstanceConnection?.channelId } } @@ -97,13 +386,13 @@ export async function onConnectToInstance(network: SocketWebRTCClientNetwork) { dispatchAction(NetworkConnectionService.actions.mediaInstanceReconnected({})) } - const authState = accessAuthState() - const token = authState.authUser.accessToken.value + const authState = getState(AuthState) + const token = authState.authUser.accessToken const payload = { accessToken: token } const { status } = await new Promise((resolve) => { const interval = setInterval(async () => { - const response = (await network.request(MessageTypes.Authorization.toString(), payload)) as AuthTask + const response = (await promisedRequest(network, MessageTypes.Authorization.toString(), payload)) as AuthTask if (response.status !== 'pending') { clearInterval(interval) resolve(response) @@ -140,7 +429,7 @@ export async function onConnectToInstance(network: SocketWebRTCClientNetwork) { inviteCode: getSearchParamFromURL('inviteCode') } as JoinWorldRequestData - const connectToWorldResponse = await network.request(MessageTypes.JoinWorld.toString(), joinWorldRequest) + const connectToWorldResponse = await promisedRequest(network, MessageTypes.JoinWorld.toString(), joinWorldRequest) if (!connectToWorldResponse || !connectToWorldResponse.routerRtpCapabilities) { dispatchAction(NetworkConnectionService.actions.worldInstanceReconnected({})) @@ -260,8 +549,8 @@ export async function onConnectToMediaInstance(network: SocketWebRTCClientNetwor channelId: string }) { const selfProducerIds = [mediaStreamState.camVideoProducer.value?.id, mediaStreamState.camAudioProducer.value?.id] - const channelConnectionState = accessMediaInstanceConnectionState() - const currentChannelInstanceConnection = channelConnectionState.instances[network.hostId].ornull + const channelConnectionState = getState(MediaInstanceState) + const currentChannelInstanceConnection = channelConnectionState.instances[network.hostId] const consumerMatch = network.consumers?.find( (c) => c?.appData?.peerID === peerID && c?.appData?.mediaTag === mediaTag && c?.producerId === producerId @@ -272,9 +561,9 @@ export async function onConnectToMediaInstance(network: SocketWebRTCClientNetwor selfProducerIds.indexOf(producerId) < 0 && (consumerMatch == null || (consumerMatch.track?.muted && consumerMatch.track?.enabled)) && (channelType === 'instance' - ? currentChannelInstanceConnection.channelType.value === 'instance' - : currentChannelInstanceConnection.channelType.value === channelType && - currentChannelInstanceConnection.channelId.value === channelId) + ? currentChannelInstanceConnection.channelType === 'instance' + : currentChannelInstanceConnection.channelType === channelType && + currentChannelInstanceConnection.channelId === channelId) ) { // that we don't already have consumers for... await subscribeToTrack(network as SocketWebRTCClientNetwork, peerID, mediaTag) @@ -292,12 +581,11 @@ export async function onConnectToMediaInstance(network: SocketWebRTCClientNetwor network.reconnecting = false await onConnectToInstance(network) await updateNearbyAvatars() - const request = network.request const primus = network.primus if (mediaStreamState.videoStream.value) { if (mediaStreamState.camVideoProducer.value) { - if (!primus.disconnect && typeof request === 'function') - await request(MessageTypes.WebRTCCloseProducer.toString(), { + if (!primus.disconnect && typeof promisedRequest === 'function') + await promisedRequest(network, MessageTypes.WebRTCCloseProducer.toString(), { producerId: mediaStreamState.camVideoProducer.value.id }) await mediaStreamState.camVideoProducer.value?.close() @@ -308,8 +596,8 @@ export async function onConnectToMediaInstance(network: SocketWebRTCClientNetwor } if (mediaStreamState.audioStream.value) { if (mediaStreamState.camAudioProducer.value != null) { - if (!primus.disconnect && typeof request === 'function') - await request(MessageTypes.WebRTCCloseProducer.toString(), { + if (!primus.disconnect && typeof promisedRequest === 'function') + await promisedRequest(network, MessageTypes.WebRTCCloseProducer.toString(), { producerId: mediaStreamState.camAudioProducer.value.id }) await mediaStreamState.camAudioProducer.value?.close() @@ -321,16 +609,16 @@ export async function onConnectToMediaInstance(network: SocketWebRTCClientNetwor network.primus.removeListener('reconnected', reconnectHandler) network.primus.removeListener('disconnection', disconnectHandler) if (mediaStreamState.screenVideoProducer.value) { - if (!primus.disconnect && typeof request === 'function') - await request(MessageTypes.WebRTCCloseProducer.toString(), { + if (!primus.disconnect && typeof promisedRequest === 'function') + await promisedRequest(network, MessageTypes.WebRTCCloseProducer.toString(), { producerId: mediaStreamState.screenVideoProducer.value.id }) await mediaStreamState.screenVideoProducer.value?.close() MediaStreamService.updateScreenVideoState() } if (mediaStreamState.screenAudioProducer.value) { - if (!primus.disconnect && typeof request === 'function') - await request(MessageTypes.WebRTCCloseProducer.toString(), { + if (!primus.disconnect && typeof promisedRequest === 'function') + await promisedRequest(network, MessageTypes.WebRTCCloseProducer.toString(), { producerId: mediaStreamState.screenAudioProducer.value.id }) await mediaStreamState.screenAudioProducer.value?.close() @@ -416,7 +704,7 @@ export async function createTransport(network: SocketWebRTCClientNetwork, direct // us back the info we need to create a client-side transport let transport: MediaSoupTransport - const { transportOptions } = await network.request(MessageTypes.WebRTCTransportCreate.toString(), { + const { transportOptions } = await promisedRequest(network, MessageTypes.WebRTCTransportCreate.toString(), { direction, sctpCapabilities: network.mediasoupDevice.sctpCapabilities, channelType: channelType, @@ -439,7 +727,7 @@ export async function createTransport(network: SocketWebRTCClientNetwork, direct callback: () => void, errback: (error: Error) => void ) => { - const connectResult = await network.request(MessageTypes.WebRTCTransportConnect.toString(), { + const connectResult = await promisedRequest(network, MessageTypes.WebRTCTransportConnect.toString(), { transportId: transportOptions.id, dtlsParameters }) @@ -489,7 +777,7 @@ export async function createTransport(network: SocketWebRTCClientNetwork, direct // up a server-side producer object, and get back a // producer.id. call callback() on success or errback() on // failure. - const { error, id } = await network.request(MessageTypes.WebRTCSendTrack.toString(), { + const { error, id } = await promisedRequest(network, MessageTypes.WebRTCSendTrack.toString(), { transportId: transportOptions.id, kind, rtpParameters, @@ -518,7 +806,7 @@ export async function createTransport(network: SocketWebRTCClientNetwork, direct errback: (error: Error) => void ) => { const { sctpStreamParameters, label, protocol, appData } = parameters - const { error, id } = await network.request(MessageTypes.WebRTCProduceData.toString(), { + const { error, id } = await promisedRequest(network, MessageTypes.WebRTCProduceData.toString(), { transportId: transport.id, sctpStreamParameters, label, @@ -584,7 +872,7 @@ export async function initSendTransport(network: SocketWebRTCClientNetwork): Pro export async function initRouter(network: SocketWebRTCClientNetwork): Promise { const { channelId, channelType } = getChannelTypeIdFromTransport(network) - await network.request(MessageTypes.InitializeRouter.toString(), { + await promisedRequest(network, MessageTypes.InitializeRouter.toString(), { channelType, channelId }) @@ -633,12 +921,12 @@ export async function configureMediaTransports( } export async function createCamVideoProducer(network: SocketWebRTCClientNetwork): Promise { - const channelConnectionState = accessMediaInstanceConnectionState() - const currentChannelInstanceConnection = channelConnectionState.instances[network.hostId].ornull - const channelType = currentChannelInstanceConnection.channelType.value - const channelId = currentChannelInstanceConnection.channelId.value + const channelConnectionState = getState(MediaInstanceState) + const currentChannelInstanceConnection = channelConnectionState.instances[network.hostId] + const channelType = currentChannelInstanceConnection.channelType + const channelId = currentChannelInstanceConnection.channelId const mediaStreamState = getMutableState(MediaStreamState) - if (mediaStreamState.videoStream.value !== null && currentChannelInstanceConnection.videoEnabled.value) { + if (mediaStreamState.videoStream.value !== null && currentChannelInstanceConnection.videoEnabled) { if (network.sendTransport == null) { await new Promise((resolve) => { const waitForTransportReadyInterval = setInterval(() => { @@ -685,10 +973,10 @@ export async function createCamVideoProducer(network: SocketWebRTCClientNetwork) } export async function createCamAudioProducer(network: SocketWebRTCClientNetwork): Promise { - const channelConnectionState = accessMediaInstanceConnectionState() - const currentChannelInstanceConnection = channelConnectionState.instances[network.hostId].ornull - const channelType = currentChannelInstanceConnection.channelType.value - const channelId = currentChannelInstanceConnection.channelId.value + const channelConnectionState = getState(MediaInstanceState) + const currentChannelInstanceConnection = channelConnectionState.instances[network.hostId] + const channelType = currentChannelInstanceConnection.channelType + const channelId = currentChannelInstanceConnection.channelId const mediaStreamState = getMutableState(MediaStreamState) if (mediaStreamState.audioStream.value !== null) { //To control the producer audio volume, we need to clone the audio track and connect a Gain to it. @@ -755,34 +1043,33 @@ export async function endVideoChat( if (network) { const mediaStreamState = getMutableState(MediaStreamState) try { - const request = network.request const primus = network.primus if (mediaStreamState.camVideoProducer.value) { - if (!primus.disconnect && typeof request === 'function') - await request(MessageTypes.WebRTCCloseProducer.toString(), { + if (!primus.disconnect) + await promisedRequest(network, MessageTypes.WebRTCCloseProducer.toString(), { producerId: mediaStreamState.camVideoProducer.value.id }) await mediaStreamState.camVideoProducer.value?.close() } if (mediaStreamState.camAudioProducer.value) { - if (!primus.disconnect && typeof request === 'function') - await request(MessageTypes.WebRTCCloseProducer.toString(), { + if (!primus.disconnect) + await promisedRequest(network, MessageTypes.WebRTCCloseProducer.toString(), { producerId: mediaStreamState.camAudioProducer.value.id }) await mediaStreamState.camAudioProducer.value?.close() } if (mediaStreamState.screenVideoProducer.value) { - if (!primus.disconnect && typeof request === 'function') - await request(MessageTypes.WebRTCCloseProducer.toString(), { + if (!primus.disconnect) + await promisedRequest(network, MessageTypes.WebRTCCloseProducer.toString(), { producerId: mediaStreamState.screenVideoProducer.value.id }) await mediaStreamState.screenVideoProducer.value?.close() } if (mediaStreamState.screenAudioProducer.value) { - if (!primus.disconnect && typeof request === 'function') - await request(MessageTypes.WebRTCCloseProducer.toString(), { + if (!primus.disconnect) + await promisedRequest(network, MessageTypes.WebRTCCloseProducer.toString(), { producerId: mediaStreamState.screenAudioProducer.value.id }) await mediaStreamState.screenAudioProducer.value?.close() @@ -790,10 +1077,9 @@ export async function endVideoChat( if (options?.endConsumers === true) { network.consumers.map(async (c) => { - if (request && typeof request === 'function') - await request(MessageTypes.WebRTCCloseConsumer.toString(), { - consumerId: c.id - }) + await promisedRequest(network, MessageTypes.WebRTCCloseConsumer.toString(), { + consumerId: c.id + }) await c.close() }) } @@ -835,13 +1121,13 @@ export function resetProducer(): void { export async function subscribeToTrack(network: SocketWebRTCClientNetwork, peerID: PeerID, mediaTag: MediaTagType) { const primus = network.primus if (primus?.disconnect) return - const channelConnectionState = accessMediaInstanceConnectionState() - const currentChannelInstanceConnection = channelConnectionState.instances[network.hostId].ornull - const channelType = currentChannelInstanceConnection.channelType.value - const channelId = currentChannelInstanceConnection.channelId.value + const channelConnectionState = getState(MediaInstanceState) + const currentChannelInstanceConnection = channelConnectionState.instances[network.hostId] + const channelType = currentChannelInstanceConnection.channelType + const channelId = currentChannelInstanceConnection.channelId // ask the server to create a server-side consumer object and send us back the info we need to create a client-side consumer - const consumerParameters = await network.request(MessageTypes.WebRTCReceiveTrack.toString(), { + const consumerParameters = await promisedRequest(network, MessageTypes.WebRTCReceiveTrack.toString(), { mediaTag, mediaPeerId: peerID, rtpCapabilities: network.mediasoupDevice.rtpCapabilities, @@ -881,12 +1167,12 @@ export async function subscribeToTrack(network: SocketWebRTCClientNetwork, peerI } export async function unsubscribeFromTrack(network: SocketWebRTCClientNetwork, peerID: PeerID, mediaTag: any) { - const consumer = network.consumers.find((c) => c.appData.peerID === peerID && c.appData.mediaTag === mediaTag) + const consumer = network.consumers.find((c) => c.appData.peerID === peerID && c.appData.mediaTag === mediaTag)! await closeConsumer(network, consumer) } export async function pauseConsumer(network: SocketWebRTCClientNetwork, consumer: ConsumerExtension) { - await network.request(MessageTypes.WebRTCPauseConsumer.toString(), { + await promisedRequest(network, MessageTypes.WebRTCPauseConsumer.toString(), { consumerId: consumer.id }) @@ -895,7 +1181,7 @@ export async function pauseConsumer(network: SocketWebRTCClientNetwork, consumer } export async function resumeConsumer(network: SocketWebRTCClientNetwork, consumer: ConsumerExtension) { - await network.request(MessageTypes.WebRTCResumeConsumer.toString(), { + await promisedRequest(network, MessageTypes.WebRTCResumeConsumer.toString(), { consumerId: consumer.id }) if (consumer && typeof consumer.resume === 'function' && !consumer.closed && !(consumer as any)._closed) @@ -903,7 +1189,7 @@ export async function resumeConsumer(network: SocketWebRTCClientNetwork, consume } export async function pauseProducer(network: SocketWebRTCClientNetwork, producer: ProducerExtension) { - await network.request(MessageTypes.WebRTCPauseProducer.toString(), { + await promisedRequest(network, MessageTypes.WebRTCPauseProducer.toString(), { producerId: producer.id }) @@ -912,7 +1198,7 @@ export async function pauseProducer(network: SocketWebRTCClientNetwork, producer } export async function resumeProducer(network: SocketWebRTCClientNetwork, producer: ProducerExtension) { - await network.request(MessageTypes.WebRTCResumeProducer.toString(), { + await promisedRequest(network, MessageTypes.WebRTCResumeProducer.toString(), { producerId: producer.id }) @@ -921,24 +1207,24 @@ export async function resumeProducer(network: SocketWebRTCClientNetwork, produce } export async function globalMuteProducer(network: SocketWebRTCClientNetwork, producer: { id: any }) { - await network.request(MessageTypes.WebRTCPauseProducer.toString(), { + await promisedRequest(network, MessageTypes.WebRTCPauseProducer.toString(), { producerId: producer.id, globalMute: true }) } export async function globalUnmuteProducer(network: SocketWebRTCClientNetwork, producer: { id: any }) { - await network.request(MessageTypes.WebRTCResumeProducer.toString(), { + await promisedRequest(network, MessageTypes.WebRTCResumeProducer.toString(), { producerId: producer.id }) } -export async function closeConsumer(network: SocketWebRTCClientNetwork, consumer: any) { +export async function closeConsumer(network: SocketWebRTCClientNetwork, consumer: ConsumerExtension) { await consumer?.close() - network.consumers = network.consumers.filter((c: any) => !(c.id === consumer.id)) as any[] + network.consumers = network.consumers.filter((c) => !(c.id === consumer.id)) dispatchAction(MediaStreamAction.setConsumersAction({ consumers: network.consumers })) - await network.request(MessageTypes.WebRTCCloseConsumer.toString(), { + await promisedRequest(network, MessageTypes.WebRTCCloseConsumer.toString(), { consumerId: consumer.id }) } @@ -1050,7 +1336,7 @@ export function leaveNetwork(network: SocketWebRTCClientNetwork, kicked?: boolea // Leaving a network should close all transports from the server side. // This will also destroy all the associated producers and consumers. // All we need to do on the client is null all references. - network.close() + closeNetwork(network) if (network.topic === NetworkTopics.media) { if (mediaStreamState.audioStream.value) { @@ -1069,6 +1355,7 @@ export function leaveNetwork(network: SocketWebRTCClientNetwork, kicked?: boolea mediaStreamState.audioStream.set(null) mediaStreamState.localScreen.set(null) network.consumers = [] + clearPeerMediaChannels() removeNetwork(network) getMutableState(NetworkState).hostIds.media.set(none) dispatchAction(MediaInstanceConnectionAction.disconnect({ instanceId: network.hostId })) @@ -1080,7 +1367,7 @@ export function leaveNetwork(network: SocketWebRTCClientNetwork, kicked?: boolea dispatchAction(EngineActions.connectToWorld({ connectedWorld: false })) // if world has a media server connection if (Engine.instance.mediaNetwork) { - const mediaState = accessMediaInstanceConnectionState().instances[Engine.instance.mediaNetwork.hostId].value + const mediaState = getState(MediaInstanceState).instances[Engine.instance.mediaNetwork.hostId] if (mediaState.channelType === 'instance' && mediaState.connected) { leaveNetwork(Engine.instance.mediaNetwork as SocketWebRTCClientNetwork) } @@ -1117,10 +1404,10 @@ export const startScreenshare = async (network: SocketWebRTCClientNetwork) => { console.log('local screen', mediaStreamState.localScreen.value) - const channelConnectionState = accessMediaInstanceConnectionState() - const currentChannelInstanceConnection = channelConnectionState.instances[network.hostId].ornull - const channelType = currentChannelInstanceConnection.channelType.value - const channelId = currentChannelInstanceConnection.channelId.value + const channelConnectionState = getState(MediaInstanceState) + const currentChannelInstanceConnection = channelConnectionState.instances[network.hostId] + const channelType = currentChannelInstanceConnection.channelType + const channelId = currentChannelInstanceConnection.channelId // create a producer for video mediaStreamState.screenShareVideoPaused.set(false) @@ -1166,7 +1453,7 @@ export const stopScreenshare = async (network: SocketWebRTCClientNetwork) => { await mediaStreamState.screenVideoProducer.value.pause() mediaStreamState.screenShareVideoPaused.set(true) - const { error } = await network.request(MessageTypes.WebRTCCloseProducer.toString(), { + const { error } = await promisedRequest(network, MessageTypes.WebRTCCloseProducer.toString(), { producerId: mediaStreamState.screenVideoProducer.value.id }) @@ -1177,9 +1464,13 @@ export const stopScreenshare = async (network: SocketWebRTCClientNetwork) => { } if (mediaStreamState.screenAudioProducer.value) { - const { error: screenAudioProducerError } = await network.request(MessageTypes.WebRTCCloseProducer.toString(), { - producerId: mediaStreamState.screenAudioProducer.value.id - }) + const { error: screenAudioProducerError } = await promisedRequest( + network, + MessageTypes.WebRTCCloseProducer.toString(), + { + producerId: mediaStreamState.screenAudioProducer.value.id + } + ) if (screenAudioProducerError) logger.error(screenAudioProducerError) await mediaStreamState.screenAudioProducer.value.close() diff --git a/packages/client-core/src/transports/SocketWebRTCClientNetwork.ts b/packages/client-core/src/transports/SocketWebRTCClientNetwork.ts deleted file mode 100755 index b34da64330..0000000000 --- a/packages/client-core/src/transports/SocketWebRTCClientNetwork.ts +++ /dev/null @@ -1,232 +0,0 @@ -import * as mediasoupClient from 'mediasoup-client' -import { Consumer, DataProducer, Transport as MediaSoupTransport, Producer } from 'mediasoup-client/lib/types' -import Primus from 'primus-client' - -import config from '@etherealengine/common/src/config' -import { Channel } from '@etherealengine/common/src/interfaces/Channel' -import { MediaStreamAppData } from '@etherealengine/common/src/interfaces/MediaStreamConstants' -import { UserId } from '@etherealengine/common/src/interfaces/UserId' -import multiLogger from '@etherealengine/common/src/logger' -import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine' -import { EngineState } from '@etherealengine/engine/src/ecs/classes/EngineState' -import { Network } from '@etherealengine/engine/src/networking/classes/Network' -import { MessageTypes } from '@etherealengine/engine/src/networking/enums/MessageTypes' -import { NetworkState } from '@etherealengine/engine/src/networking/NetworkState' -import { clearOutgoingActions, dispatchAction, getMutableState, getState } from '@etherealengine/hyperflux' -import { addOutgoingTopicIfNecessary, Topic } from '@etherealengine/hyperflux/functions/ActionFunctions' - -import { - accessLocationInstanceConnectionState, - LocationInstanceConnectionAction, - LocationInstanceConnectionService -} from '../common/services/LocationInstanceConnectionService' -import { - accessMediaInstanceConnectionState, - MediaInstanceConnectionAction, - MediaInstanceConnectionService -} from '../common/services/MediaInstanceConnectionService' -import { accessChatState } from '../social/services/ChatService' -import { accessLocationState } from '../social/services/LocationService' -import { accessAuthState } from '../user/services/AuthService' -import { onConnectToInstance } from './SocketWebRTCClientFunctions' - -const logger = multiLogger.child({ component: 'client-core:SocketWebRTCClientNetwork' }) - -export type WebRTCTransportExtension = Omit & { appData: MediaStreamAppData } -export type ProducerExtension = Omit & { appData: MediaStreamAppData } -export type ConsumerExtension = Omit & { appData: MediaStreamAppData; producerPaused: boolean } - -let id = 0 - -// import { encode, decode } from 'msgpackr' - -// Adds support for Promise to Primus client -// Each 'data' listener function needs to be named something unique in order for removeListener to -// not remove all 'data' listener functions -const promisedRequest = (primus: Primus) => { - return function request(type: any, data = {}): any { - return new Promise((resolve) => { - const responseFunction = (data) => { - if (data.type.toString() === message.type.toString() && message.id === data.id) { - resolve(data.data) - primus.removeListener('data', responseFunction) - } - } - Object.defineProperty(responseFunction, 'name', { value: `responseFunction${id}`, writable: true }) - const message = { - type: type, - data: data, - id: id++ - } - primus.write(message) - - primus.on('data', responseFunction) - }) - } -} - -const handleFailedConnection = (locationConnectionFailed) => { - if (locationConnectionFailed) { - const currentLocation = accessLocationState().currentLocation.location - const locationInstanceConnectionState = accessLocationInstanceConnectionState() - const instanceId = getState(NetworkState).hostIds.world ?? '' - if (!locationInstanceConnectionState.instances[instanceId]?.connected?.value) { - dispatchAction(LocationInstanceConnectionAction.disconnect({ instanceId })) - LocationInstanceConnectionService.provisionServer( - currentLocation.id.value, - instanceId || undefined, - currentLocation.sceneId.value - ) - } - } else { - const mediaInstanceConnectionState = accessMediaInstanceConnectionState() - const instanceId = getState(NetworkState).hostIds.media ?? '' - if (!mediaInstanceConnectionState.instances[instanceId]?.connected?.value) { - dispatchAction(MediaInstanceConnectionAction.disconnect({ instanceId })) - const authState = accessAuthState() - const selfUser = authState.user - const chatState = accessChatState() - const channelState = chatState.channels - const channels = channelState.channels.value as Channel[] - const channelEntries = Object.values(channels).filter((channel) => !!channel) as any - const instanceChannel = channelEntries.find((entry) => entry.instanceId === Engine.instance.worldNetwork?.hostId) - if (instanceChannel) { - MediaInstanceConnectionService.provisionServer(instanceChannel?.id!, true) - } else { - const partyChannel = Object.values(chatState.channels.channels.value).find( - (channel) => channel.channelType === 'party' && channel.partyId === selfUser.partyId.value - ) - MediaInstanceConnectionService.provisionServer(partyChannel?.id!, false) - } - } - } - return -} - -export class SocketWebRTCClientNetwork extends Network { - constructor(hostId: UserId, topic: Topic) { - super(hostId, topic) - addOutgoingTopicIfNecessary(topic) - } - - mediasoupDevice = new mediasoupClient.Device( - getMutableState(EngineState).isBot.value ? { handlerName: 'Chrome74' } : undefined - ) - reconnecting = false - recvTransport: MediaSoupTransport - sendTransport: MediaSoupTransport - primus: Primus = null! - request: ReturnType - - dataProducer: DataProducer - heartbeat: NodeJS.Timer // is there an equivalent browser type for this? - - producers = [] as ProducerExtension[] - consumers = [] as ConsumerExtension[] - - sendActions() { - if (!this.ready) return - const actions = [...Engine.instance.store.actions.outgoing[this.topic].queue] - if (actions.length && this.primus) { - this.primus.write({ type: MessageTypes.ActionData.toString(), /*encode(*/ data: actions }) //) - clearOutgoingActions(this.topic) - } - } - - // This sends message on a data channel (data channel creation is now handled explicitly/default) - sendData(data: ArrayBuffer): void { - if (this.dataProducer && !this.dataProducer.closed && this.dataProducer.readyState === 'open') - this.dataProducer.send(data) - } - - close() { - logger.info('SocketWebRTCClientNetwork close') - this.recvTransport?.close() - this.sendTransport?.close() - this.recvTransport = null! - this.sendTransport = null! - this.heartbeat && clearInterval(this.heartbeat) - this.primus?.removeAllListeners() - this.primus?.end() - this.primus = null! - } - - public async initialize(args: { - ipAddress: string - port: string - locationId?: string | null - channelId?: string | null - roomCode?: string | null - }): Promise { - this.reconnecting = false - if (this.primus) { - return logger.error(new Error('Network already initialized')) - } - logger.info('Initialising transport with args %o', args) - const { ipAddress, port, locationId, channelId, roomCode } = args - - const authState = accessAuthState() - const token = authState.authUser.accessToken.value - - const query = { - locationId, - channelId, - roomCode, - token - } as { - locationId?: string - channelId?: string - roomCode?: string - address?: string - port?: string - token: string - } - - if (locationId) delete query.channelId - if (channelId) delete query.locationId - if (!roomCode) delete query.roomCode - - try { - if ( - config.client.localBuild === 'true' || - (config.client.appEnv === 'development' && config.client.localNginx !== 'true') - ) { - const queryString = new URLSearchParams(query).toString() - this.primus = new Primus(`https://${ipAddress as string}:${port.toString()}?${queryString}`) - } else { - query.address = ipAddress - query.port = port.toString() - const queryString = new URLSearchParams(query).toString() - this.primus = new Primus(`${config.client.instanceserverUrl}?${queryString}`) - } - } catch (err) { - logger.error(err) - return handleFailedConnection(locationId != null) - } - this.request = promisedRequest(this.primus) - - const connectionFailTimeout = setTimeout(() => { - return handleFailedConnection(locationId != null) - }, 3000) - - this.primus.on('incoming::open', (event) => { - clearTimeout(connectionFailTimeout) - if (this.reconnecting) { - this.reconnecting = false - ;(this.primus as any)._connected = false - return - } - - if ((this.primus as any)._connected) return - ;(this.primus as any)._connected = true - - logger.info('CONNECT to port %o', { port, locationId }) - onConnectToInstance(this) - - // Send heartbeat every second - this.heartbeat = setInterval(() => { - this.primus.write({ type: MessageTypes.Heartbeat.toString() }) - }, 1000) - }) - } -} diff --git a/packages/client-core/src/transports/UpdateNearbyUsersSystem.ts b/packages/client-core/src/transports/UpdateNearbyUsersSystem.ts index 910fb997bd..eb49f6c622 100755 --- a/packages/client-core/src/transports/UpdateNearbyUsersSystem.ts +++ b/packages/client-core/src/transports/UpdateNearbyUsersSystem.ts @@ -1,35 +1,38 @@ import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine' import { MessageTypes } from '@etherealengine/engine/src/networking/enums/MessageTypes' -import { dispatchAction } from '@etherealengine/hyperflux' +import { dispatchAction, getState } from '@etherealengine/hyperflux' -import { accessMediaInstanceConnectionState } from '../common/services/MediaInstanceConnectionService' -import { accessMediaStreamState, MediaStreamService } from '../media/services/MediaStreamService' +import { MediaInstanceState } from '../common/services/MediaInstanceConnectionService' +import { MediaState, MediaStreamService } from '../media/services/MediaStreamService' import { NetworkUserService } from '../user/services/NetworkUserService' import { MediaStreamActions } from './MediaStreams' +import { promisedRequest, SocketWebRTCClientNetwork } from './SocketWebRTCClientFunctions' export const updateNearbyAvatars = () => { - const network = Engine.instance.mediaNetwork + const network = Engine.instance.mediaNetwork as SocketWebRTCClientNetwork MediaStreamService.updateNearbyLayerUsers() - const mediaState = accessMediaStreamState() + if (!network) return + + const mediaState = getState(MediaState) NetworkUserService.getLayerUsers(true) - const channelConnectionState = accessMediaInstanceConnectionState() - const currentChannelInstanceConnection = network && channelConnectionState.instances[network.hostId]?.ornull - if (!currentChannelInstanceConnection?.value) return - - network?.request(MessageTypes.WebRTCRequestCurrentProducers.toString(), { - userIds: mediaState.nearbyLayerUsers.value || [], - channelType: currentChannelInstanceConnection.channelType.value, - channelId: currentChannelInstanceConnection.channelId.value - }) + const channelConnectionState = getState(MediaInstanceState) + const currentChannelInstanceConnection = channelConnectionState.instances[network.hostId] + if (!currentChannelInstanceConnection) return - if (!mediaState.nearbyLayerUsers.length) return + const nearbyUserIds = mediaState.nearbyLayerUsers + + promisedRequest(network, MessageTypes.WebRTCRequestCurrentProducers.toString(), { + userIds: nearbyUserIds || [], + channelType: currentChannelInstanceConnection.channelType, + channelId: currentChannelInstanceConnection.channelId + }) - const nearbyUserIds = mediaState.nearbyLayerUsers.value + if (!nearbyUserIds.length) return - network?.consumers.forEach((consumer) => { + network.consumers.forEach((consumer) => { if (!nearbyUserIds.includes(network.peers.get(consumer.appData.peerID)?.userId!)) { dispatchAction(MediaStreamActions.closeConsumer({ consumer })) } diff --git a/packages/editor/src/components/realtime/WorldInstanceConnection.tsx b/packages/editor/src/components/realtime/WorldInstanceConnection.tsx index 62401c5112..058fd05f0e 100644 --- a/packages/editor/src/components/realtime/WorldInstanceConnection.tsx +++ b/packages/editor/src/components/realtime/WorldInstanceConnection.tsx @@ -9,7 +9,7 @@ import { } from '@etherealengine/client-core/src/common/services/LocationInstanceConnectionService' import { LoadingCircle } from '@etherealengine/client-core/src/components/LoadingCircle' import { leaveNetwork } from '@etherealengine/client-core/src/transports/SocketWebRTCClientFunctions' -import { SocketWebRTCClientNetwork } from '@etherealengine/client-core/src/transports/SocketWebRTCClientNetwork' +import { SocketWebRTCClientNetwork } from '@etherealengine/client-core/src/transports/SocketWebRTCClientFunctions' import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine' import { addActionReceptor, dispatchAction, removeActionReceptor } from '@etherealengine/hyperflux' diff --git a/packages/engine/src/networking/classes/Network.ts b/packages/engine/src/networking/classes/Network.ts index 642e57eb53..42d890a196 100755 --- a/packages/engine/src/networking/classes/Network.ts +++ b/packages/engine/src/networking/classes/Network.ts @@ -1,6 +1,6 @@ import { PeerID } from '@etherealengine/common/src/interfaces/PeerID' import { UserId } from '@etherealengine/common/src/interfaces/UserId' -import { Topic } from '@etherealengine/hyperflux/functions/ActionFunctions' +import { addOutgoingTopicIfNecessary, Topic } from '@etherealengine/hyperflux/functions/ActionFunctions' import { RingBuffer } from '../../common/classes/RingBuffer' import { Engine } from '../../ecs/classes/Engine' @@ -14,112 +14,101 @@ export const NetworkTopics = { media: 'media' as Topic } -/** Interface for the Transport. */ -export class Network { - /** - * Initialize the transport. - * @param address Address of this transport. - * @param port Port of this transport. - * @param instance Whether this is a connection to an instance server or not (i.e. channel server) - * @param opts Options. - */ - initialize(args: any) {} - - /** - * Send data over transport. - * @param data Data to be sent. - */ - sendData(data: any) {} - - /** - * Send outgoing actions through reliable channel - */ - sendActions() {} - - /** - * Sends a message across the connection and resolves with the reponse - * @param message - */ - async request(message: string, data?: any): Promise {} - - /** - * Closes all the media soup transports - */ - close(instance?: boolean, channel?: boolean) {} - - /** Consumers and producers have separate types on client and server */ - producers = [] as any[] - consumers = [] as any[] - - /** List of data producer nodes. */ - dataProducers = new Map() - - /** List of data consumer nodes. */ - dataConsumers = new Map() - - /** Buffer holding all incoming Messages. */ - incomingMessageQueueUnreliableIDs: RingBuffer = new RingBuffer(100) - - /** Buffer holding all incoming Messages. */ - incomingMessageQueueUnreliable: RingBuffer = new RingBuffer(100) - - /** Connected peers */ - peers = new Map() as Map - - /** Publish to connected peers that peer information has changed */ - updatePeers() {} - - /** Map of numerical user index to user client IDs */ - userIndexToUserID = new Map() - - /** Map of user client IDs to numerical user index */ - userIDToUserIndex = new Map() - - /** Map of numerical peer index to peer IDs */ - peerIndexToPeerID = new Map() - - /** Map of peer IDs to numerical peer index */ - peerIDToPeerIndex = new Map() - - /** - * The index to increment when a new user joins - * NOTE: Must only be updated by the host - */ - userIndexCount = 0 - - /** - * The index to increment when a new peer connects - * NOTE: Must only be updated by the host - */ - peerIndexCount = 0 - - /** - * The UserId of the host - * - will either be a user's UserId, or an instance server's InstanceId - */ - hostId: UserId - - /** - * The PeerID of the current user's instance - */ - peerID: PeerID - - /** - * The network is ready for sending messages and data - */ - ready: boolean - - /** - * Check if this user is hosting the world. - */ - get isHosting() { - return Engine.instance.userId === this.hostId - } - - topic: Topic +export interface TransportInterface { + get peers(): PeerID[] + messageToPeer: (peerId: PeerID, data: any) => void + messageToAll: (data: any) => void + bufferToPeer: (peerId: PeerID, data: any) => void + bufferToAll: (data: any) => void +} - constructor(hostId: UserId, topic: Topic) { - this.hostId = hostId - this.topic = topic +/** Interface for the Transport. */ +export const createNetwork = (hostId: UserId, topic: Topic) => { + addOutgoingTopicIfNecessary(topic) + return { + /** Consumers and producers have separate types on client and server */ + producers: [] as any[], + consumers: [] as any[], + + /** List of data producer nodes. */ + dataProducers: new Map(), + + /** List of data consumer nodes. */ + dataConsumers: new Map(), + + /** Buffer holding all incoming Messages. */ + incomingMessageQueueUnreliableIDs: new RingBuffer(100), + + /** Buffer holding all incoming Messages. */ + incomingMessageQueueUnreliable: new RingBuffer(100), + + /** Connected peers */ + peers: new Map() as Map, + + /** Map of numerical peer index to peer IDs */ + peerIndexToPeerID: new Map(), + + /** Map of peer IDs to numerical peer index */ + peerIDToPeerIndex: new Map(), + + /** + * The index to increment when a new peer connects + * NOTE: Must only be updated by the host + */ + peerIndexCount: 0, + + /** Connected users */ + users: new Map() as Map, + + /** Map of numerical user index to user client IDs */ + userIndexToUserID: new Map(), + + /** Map of user client IDs to numerical user index */ + userIDToUserIndex: new Map(), + + /** Gets the host peer */ + get hostPeerID() { + return this.users.get(this.hostId)?.[0] + }, + + /** + * The index to increment when a new user joins + * NOTE: Must only be updated by the host + */ + userIndexCount: 0, + + /** + * The UserId of the host + * - will either be a user's UserId, or an instance server's InstanceId + */ + hostId, + + /** + * The PeerID of the current user's instance + * @todo non null this + */ + peerID: null! as PeerID, + + /** + * The network is ready for sending messages and data + */ + ready: false, + + /** + * The transport used by this network. + * @todo non null this + */ + transport: null! as TransportInterface, + + /** + * Check if this user is hosting the world. + */ + get isHosting() { + return Engine.instance.userId === this.hostId + }, + + topic } } + +export type Network = ReturnType diff --git a/packages/engine/src/networking/functions/NetworkPeerFunctions.ts b/packages/engine/src/networking/functions/NetworkPeerFunctions.ts index b11f4330fc..bf54076054 100644 --- a/packages/engine/src/networking/functions/NetworkPeerFunctions.ts +++ b/packages/engine/src/networking/functions/NetworkPeerFunctions.ts @@ -38,6 +38,12 @@ function createPeer( userIndex }) + if (!network.users.has(userID)) { + network.users.set(userID, [peerID]) + } else { + network.users.get(userID)!.push(peerID) + } + const worldState = getMutableState(WorldState) worldState.userNames[userID].set(name) } @@ -60,6 +66,11 @@ function destroyPeer(network: Network, peerID: PeerID) { network.peerIDToPeerIndex.delete(peerID) network.peerIndexToPeerID.delete(peerIndex) + const userPeers = network.users.get(userID)! + const peerIndexInUserPeers = userPeers.indexOf(peerID) + userPeers.splice(peerIndexInUserPeers, 1) + if (!userPeers.length) network.users.delete(userID) + /** * if no other connections exist for this user, and this action is occurring on the world network, * we want to remove them from world.users diff --git a/packages/engine/src/networking/functions/validateNetworkObjects.ts b/packages/engine/src/networking/functions/validateNetworkObjects.ts deleted file mode 100644 index b393d3addb..0000000000 --- a/packages/engine/src/networking/functions/validateNetworkObjects.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { Engine } from '../../ecs/classes/Engine' -import { Network } from '../classes/Network' -import { NetworkPeerFunctions } from './NetworkPeerFunctions' - -export async function validateNetworkObjects(network: Network): Promise { - for (const [peerID, client] of network.peers) { - if (client.userId === Engine.instance.userId) continue - // Validate that user has phoned home recently - if (Date.now() - client.lastSeenTs > 30000) { - console.log('Removing client ', peerID, ' due to inactivity') - - NetworkPeerFunctions.destroyPeer(network, peerID) - network.updatePeers() - - console.log('Disconnected Client:', peerID) - if (client?.instanceRecvTransport) { - console.log('Closing instanceRecvTransport') - await client.instanceRecvTransport.close() - console.log('Closed instanceRecvTransport') - } - if (client?.instanceSendTransport) { - console.log('Closing instanceSendTransport') - await client.instanceSendTransport.close() - console.log('Closed instanceSendTransport') - } - if (client?.channelRecvTransport) { - console.log('Closing channelRecvTransport') - await client.channelRecvTransport.close() - console.log('Closed channelRecvTransport') - } - if (client?.channelSendTransport) { - console.log('Closing channelSendTransport') - await client.channelSendTransport.close() - console.log('Closed channelSendTransport') - } - - console.log('Removed transports for', peerID) - } - } -} diff --git a/packages/engine/src/networking/systems/IncomingNetworkSystem.ts b/packages/engine/src/networking/systems/IncomingNetworkSystem.ts index 234560e067..804abea058 100644 --- a/packages/engine/src/networking/systems/IncomingNetworkSystem.ts +++ b/packages/engine/src/networking/systems/IncomingNetworkSystem.ts @@ -1,7 +1,5 @@ import { Engine } from '../../ecs/classes/Engine' import { getEngineState } from '../../ecs/classes/EngineState' -import { Network } from '../classes/Network' -import { validateNetworkObjects } from '../functions/validateNetworkObjects' import { createDataReader } from '../serialization/DataReader' export const applyUnreliableQueueFast = (deserialize: Function) => () => { @@ -23,15 +21,11 @@ export default async function IncomingNetworkSystem() { const deserialize = createDataReader() const applyIncomingNetworkState = applyUnreliableQueueFast(deserialize) - const VALIDATE_NETWORK_INTERVAL = Engine.instance.tickRate * 5 - const engineState = getEngineState() const execute = () => { if (!engineState.isEngineInitialized.value) return applyIncomingNetworkState() - if (Engine.instance.worldNetwork?.isHosting && Engine.instance.fixedTick % VALIDATE_NETWORK_INTERVAL === 0) - validateNetworkObjects(Engine.instance.worldNetwork as Network) } const cleanup = async () => {} diff --git a/packages/engine/src/networking/systems/OutgoingActionSystem.ts b/packages/engine/src/networking/systems/OutgoingActionSystem.ts index d97921a23f..c265544ad4 100644 --- a/packages/engine/src/networking/systems/OutgoingActionSystem.ts +++ b/packages/engine/src/networking/systems/OutgoingActionSystem.ts @@ -1,12 +1,76 @@ -import { getState } from '@etherealengine/hyperflux' +import { PeersUpdateType } from '@etherealengine/common/src/interfaces/PeerID' +import { Action, clearOutgoingActions, getMutableState, getState } from '@etherealengine/hyperflux' import { Engine } from '../../ecs/classes/Engine' +import { Network } from '../classes/Network' +import { MessageTypes } from '../enums/MessageTypes' +import { WorldState } from '../interfaces/WorldState' import { NetworkState } from '../NetworkState' -const sendOutgoingActions = () => { - for (const [instanceId, network] of Object.entries(getState(NetworkState).networks)) { +/** Publish to connected peers that peer information has changed */ +export const updatePeers = (network: Network) => { + const userNames = getMutableState(WorldState).userNames + const peers = Array.from(network.peers.values()).map((peer) => { + return { + peerID: peer.peerID, + peerIndex: peer.peerIndex, + userID: peer.userId, + userIndex: peer.userIndex, + name: userNames[peer.userId].value + } + }) as Array + for (const peerID of network.transport.peers) + network.transport.messageToPeer(peerID, { type: MessageTypes.UpdatePeers.toString(), data: peers }) +} + +export const sendActionsAsPeer = (network: Network) => { + if (!network.ready) return + const actions = [...Engine.instance.store.actions.outgoing[network.topic].queue] + if (!actions.length) return + for (const peerID of network.transport.peers) { + network.transport.messageToPeer(network.hostPeerID, { + type: MessageTypes.ActionData.toString(), + /*encode(*/ data: actions + }) //) + } + clearOutgoingActions(network.topic) +} + +export const sendActionsAsHost = (network: Network) => { + if (!network.ready) return + + const actions = [...Engine.instance.store.actions.outgoing[network.topic].queue] + if (!actions.length) return + + const outgoing = Engine.instance.store.actions.outgoing + + for (const peerID of network.transport.peers) { + const arr: Action[] = [] + for (const a of [...actions]) { + const action = { ...a } + if (outgoing[network.topic].historyUUIDs.has(action.$uuid)) { + const idx = outgoing[network.topic].queue.indexOf(action) + outgoing[network.topic].queue.splice(idx, 1) + } + if (!action.$to) continue + const toUserId = network.peers.get(peerID)?.userId + if (action.$to === 'all' || (action.$to === 'others' && toUserId !== action.$from) || action.$to === toUserId) { + arr.push(action) + } + } + if (arr.length) + network.transport.messageToPeer(peerID, { type: MessageTypes.ActionData.toString(), /*encode(*/ data: arr }) //) + } + + // TODO: refactor this to support multiple connections of the same topic type + clearOutgoingActions(network.topic, Engine.instance.store) +} + +export const sendOutgoingActions = () => { + for (const network of Object.values(getState(NetworkState).networks)) { try { - network.sendActions() + if (Engine.instance.userId === network.hostId) sendActionsAsHost(network as Network) + else sendActionsAsPeer(network as Network) } catch (e) { console.error(e) } diff --git a/packages/engine/src/networking/systems/OutgoingNetworkSystem.ts b/packages/engine/src/networking/systems/OutgoingNetworkSystem.ts index 9a6072d9fd..ad3f8adfbc 100644 --- a/packages/engine/src/networking/systems/OutgoingNetworkSystem.ts +++ b/packages/engine/src/networking/systems/OutgoingNetworkSystem.ts @@ -26,15 +26,16 @@ const serializeAndSend = (serialize: ReturnType) => { : authoritativeNetworkTransformsQuery() if (ents.length > 0) { const userID = Engine.instance.userId - const peerID = Engine.instance.worldNetwork.peerID - const data = serialize(Engine.instance.worldNetwork as Network, userID, peerID, ents) + const network = Engine.instance.worldNetwork as Network + const peerID = network.peerID + const data = serialize(network, userID, peerID, ents) // todo: insert historian logic here if (data.byteLength > 0) { // side effect - network IO // delay until end of frame - Promise.resolve().then(() => Engine.instance.worldNetwork.sendData(data)) + Promise.resolve().then(() => network.transport.bufferToPeer(network.hostPeerID, data)) } } } diff --git a/packages/engine/tests/util/createMockNetwork.ts b/packages/engine/tests/util/createMockNetwork.ts index 23cf000993..c729c91dfe 100644 --- a/packages/engine/tests/util/createMockNetwork.ts +++ b/packages/engine/tests/util/createMockNetwork.ts @@ -1,11 +1,11 @@ import { UserId } from '@etherealengine/common/src/interfaces/UserId' import { getMutableState } from '@etherealengine/hyperflux' -import { Network, NetworkTopics } from '../../src/networking/classes/Network' +import { createNetwork, NetworkTopics } from '../../src/networking/classes/Network' import { addNetwork, NetworkState } from '../../src/networking/NetworkState' export const createMockNetwork = (networkType = NetworkTopics.world) => { if (networkType === NetworkTopics.world) getMutableState(NetworkState).hostIds.world.set(networkType as any as UserId) else getMutableState(NetworkState).hostIds.media.set(networkType as any as UserId) - addNetwork(new Network(networkType as any as UserId, networkType)) + addNetwork(createNetwork(networkType as any as UserId, networkType)) } diff --git a/packages/instanceserver/src/NetworkFunctions.ts b/packages/instanceserver/src/NetworkFunctions.ts index 8c8487cca2..fcaf91ccb7 100755 --- a/packages/instanceserver/src/NetworkFunctions.ts +++ b/packages/instanceserver/src/NetworkFunctions.ts @@ -15,28 +15,30 @@ import { MessageTypes } from '@etherealengine/engine/src/networking/enums/Messag import { NetworkPeerFunctions } from '@etherealengine/engine/src/networking/functions/NetworkPeerFunctions' import { JoinWorldRequestData } from '@etherealengine/engine/src/networking/functions/receiveJoinWorld' import { WorldState } from '@etherealengine/engine/src/networking/interfaces/WorldState' +import { updatePeers } from '@etherealengine/engine/src/networking/systems/OutgoingActionSystem' import { GroupComponent } from '@etherealengine/engine/src/scene/components/GroupComponent' import { TransformComponent } from '@etherealengine/engine/src/transform/components/TransformComponent' -import { dispatchAction, getMutableState } from '@etherealengine/hyperflux' +import { dispatchAction, getMutableState, getState } from '@etherealengine/hyperflux' import { Action } from '@etherealengine/hyperflux/functions/ActionFunctions' import { Application } from '@etherealengine/server-core/declarations' import config from '@etherealengine/server-core/src/appconfig' import { localConfig } from '@etherealengine/server-core/src/config' import multiLogger from '@etherealengine/server-core/src/ServerLogger' +import { ServerState } from '@etherealengine/server-core/src/ServerState' import getLocalServerIp from '@etherealengine/server-core/src/util/get-local-server-ip' -import { SocketWebRTCServerNetwork } from './SocketWebRTCServerNetwork' +import { SocketWebRTCServerNetwork } from './SocketWebRTCServerFunctions' import { closeTransport } from './WebRTCFunctions' const logger = multiLogger.child({ component: 'instanceserver:network' }) const isNameRegex = /instanceserver-([a-zA-Z0-9]{5}-[a-zA-Z0-9]{5})/ -export const setupSubdomain = async (network: SocketWebRTCServerNetwork) => { - const app = network.app +export const setupSubdomain = async () => { + const app = getState(ServerState).app as Application let stringSubdomainNumber: string if (config.kubernetes.enabled) { - await cleanupOldInstanceservers(network) + await cleanupOldInstanceservers(app) app.instanceServer = await app.agonesSDK.getGameServer() // We used to provision subdomains for instanceservers, e.g. 00001.instanceserver.domain.com @@ -44,7 +46,7 @@ export const setupSubdomain = async (network: SocketWebRTCServerNetwork) => { // UDP, so the following was commented out. // const name = app.instanceServer.objectMeta.name // const isIdentifier = isNameRegex.exec(name)! - // stringSubdomainNumber = await getFreeSubdomain(transport, isIdentifier[1], 0) + // stringSubdomainNumber = await getFreeSubdomain(isIdentifier[1], 0) // app.isSubdomainNumber = stringSubdomainNumber // // const Route53 = new AWS.Route53({ ...config.aws.route53.keys }) @@ -79,19 +81,17 @@ export const setupSubdomain = async (network: SocketWebRTCServerNetwork) => { ] } -export async function getFreeSubdomain( - network: SocketWebRTCServerNetwork, - isIdentifier: string, - subdomainNumber: number -): Promise { +export async function getFreeSubdomain(isIdentifier: string, subdomainNumber: number): Promise { + const app = getState(ServerState).app + const stringSubdomainNumber = subdomainNumber.toString().padStart(config.instanceserver.identifierDigits, '0') - const subdomainResult = await network.app.service('instanceserver-subdomain-provision').find({ + const subdomainResult = await app.service('instanceserver-subdomain-provision').find({ query: { is_number: stringSubdomainNumber } }) if ((subdomainResult as any).total === 0) { - await network.app.service('instanceserver-subdomain-provision').create({ + await app.service('instanceserver-subdomain-provision').create({ allocated: true, is_number: stringSubdomainNumber, is_id: isIdentifier @@ -103,19 +103,19 @@ export async function getFreeSubdomain( }, 500) ) - const newSubdomainResult = (await network.app.service('instanceserver-subdomain-provision').find({ + const newSubdomainResult = (await app.service('instanceserver-subdomain-provision').find({ query: { is_number: stringSubdomainNumber } })) as any if (newSubdomainResult.total > 0 && newSubdomainResult.data[0].gs_id === isIdentifier) return stringSubdomainNumber - else return getFreeSubdomain(network, isIdentifier, subdomainNumber + 1) + else return getFreeSubdomain(isIdentifier, subdomainNumber + 1) } else { const subdomain = (subdomainResult as any).data[0] if (subdomain.allocated === true || subdomain.allocated === 1) { - return getFreeSubdomain(network, isIdentifier, subdomainNumber + 1) + return getFreeSubdomain(isIdentifier, subdomainNumber + 1) } - await network.app.service('instanceserver-subdomain-provision').patch(subdomain.id, { + await app.service('instanceserver-subdomain-provision').patch(subdomain.id, { allocated: true, is_id: isIdentifier }) @@ -126,25 +126,25 @@ export async function getFreeSubdomain( }, 500) ) - const newSubdomainResult = (await network.app.service('instanceserver-subdomain-provision').find({ + const newSubdomainResult = (await app.service('instanceserver-subdomain-provision').find({ query: { is_number: stringSubdomainNumber } })) as any if (newSubdomainResult.total > 0 && newSubdomainResult.data[0].gs_id === isIdentifier) return stringSubdomainNumber - else return getFreeSubdomain(network, isIdentifier, subdomainNumber + 1) + else return getFreeSubdomain(isIdentifier, subdomainNumber + 1) } } -export async function cleanupOldInstanceservers(network: SocketWebRTCServerNetwork): Promise { - const instances = await network.app.service('instance').Model.findAndCountAll({ +export async function cleanupOldInstanceservers(app: Application): Promise { + const instances = await app.service('instance').Model.findAndCountAll({ offset: 0, limit: 1000, where: { ended: false } }) - const instanceservers = await network.app.k8AgonesClient.listNamespacedCustomObject( + const instanceservers = await app.k8AgonesClient.listNamespacedCustomObject( 'agones.dev', 'v1', 'default', @@ -161,7 +161,7 @@ export async function cleanupOldInstanceservers(network: SocketWebRTCServerNetwo return is.status.address === ip && inputPort.port.toString() === port }) return match == null - ? network.app.service('instance').patch(instance.id, { + ? app.service('instance').patch(instance.id, { ended: true }) : Promise.resolve() @@ -172,7 +172,7 @@ export async function cleanupOldInstanceservers(network: SocketWebRTCServerNetwo isNameRegex.exec(is.metadata.name) != null ? isNameRegex.exec(is.metadata.name)![1] : null ) - await network.app.service('instanceserver-subdomain-provision').patch( + await app.service('instanceserver-subdomain-provision').patch( null, { allocated: false @@ -224,7 +224,7 @@ export function getUserIdFromPeerID(network: SocketWebRTCServerNetwork, sparkID: return client?.userId } -export const handleConnectingPeer = async (network: SocketWebRTCServerNetwork, spark: any, user: UserInterface) => { +export const handleConnectingPeer = async (network: SocketWebRTCServerNetwork, spark: Spark, user: UserInterface) => { const userId = user.id const avatarDetail = user.avatar const peerID = spark.id as PeerID @@ -250,6 +250,12 @@ export const handleConnectingPeer = async (network: SocketWebRTCServerNetwork, s dataProducers: new Map() // Key => label of data channel }) + if (!network.users.has(userId)) { + network.users.set(userId, [peerID]) + } else { + network.users.get(userId)!.push(peerID) + } + const worldState = getMutableState(WorldState) worldState.userNames[userId].set(user.name) worldState.userAvatarDetails[userId].set({ @@ -262,9 +268,11 @@ export const handleConnectingPeer = async (network: SocketWebRTCServerNetwork, s const spectating = network.peers.get(peerID)!.spectating - network.app.service('message').create( + const app = getState(ServerState).app + + app.service('message').create( { - targetObjectId: network.app.instance.id, + targetObjectId: app.instance.id, targetObjectType: 'instance', text: `${user.name} joined` + (spectating ? ' as spectator' : ''), isNotification: true @@ -279,7 +287,7 @@ export const handleConnectingPeer = async (network: SocketWebRTCServerNetwork, s export async function handleJoinWorld( network: SocketWebRTCServerNetwork, - spark: any, + spark: Spark, data: JoinWorldRequestData, messageId: string, userId: UserId, @@ -291,7 +299,7 @@ export async function handleJoinWorld( const peerID = spark.id as PeerID - network.updatePeers() + updatePeers(network) spark.write({ type: MessageTypes.JoinWorld.toString(), @@ -306,7 +314,9 @@ export async function handleJoinWorld( id: messageId }) - if (data.inviteCode && !network.app.isChannelInstance) await getUserSpawnFromInvite(network, user, data.inviteCode!) + const app = getState(ServerState).app + + if (data.inviteCode && !app.isChannelInstance) await getUserSpawnFromInvite(network, user, data.inviteCode!) } const getUserSpawnFromInvite = async ( @@ -316,7 +326,8 @@ const getUserSpawnFromInvite = async ( iteration = 0 ) => { if (inviteCode) { - const result = (await network.app.service('user').find({ + const app = getState(ServerState).app + const result = (await app.service('user').find({ query: { action: 'invite-code-lookup', inviteCode: inviteCode @@ -391,20 +402,21 @@ export async function handleHeartbeat(network: SocketWebRTCServerNetwork, spark: } export async function handleDisconnect(network: SocketWebRTCServerNetwork, spark: Spark): Promise { - const userId = getUserIdFromPeerID(network, spark.id) as UserId const peerID = spark.id as PeerID + const userId = getUserIdFromPeerID(network, peerID) as UserId const disconnectedClient = network.peers.get(peerID) if (!disconnectedClient) return logger.warn(`Tried to handle disconnect for peer ${peerID} but was not foudn`) // On local, new connections can come in before the old sockets are disconnected. // The new connection will overwrite the socketID for the user's client. // This will only clear transports if the client's socketId matches the socket that's disconnecting. - if (spark.id === disconnectedClient?.peerID) { + if (peerID === disconnectedClient?.peerID) { const state = getMutableState(WorldState) const userName = state.userNames[userId].value - network.app.service('message').create( + const app = getState(ServerState).app + app.service('message').create( { - targetObjectId: network.app.instance.id, + targetObjectId: app.instance.id, targetObjectType: 'instance', text: `${userName} left`, isNotification: true @@ -417,7 +429,7 @@ export async function handleDisconnect(network: SocketWebRTCServerNetwork, spark ) NetworkPeerFunctions.destroyPeer(network, peerID) - network.updatePeers() + updatePeers(network) logger.info(`Disconnecting user ${userId} on spark ${peerID}`) if (disconnectedClient?.instanceRecvTransport) disconnectedClient.instanceRecvTransport.close() if (disconnectedClient?.instanceSendTransport) disconnectedClient.instanceSendTransport.close() @@ -439,7 +451,7 @@ export async function handleLeaveWorld( if (transport.appData.peerID === peerID) closeTransport(network, transport) if (network.peers.has(peerID)) { NetworkPeerFunctions.destroyPeer(network, peerID) - network.updatePeers() + updatePeers(network) } spark.write({ type: MessageTypes.LeaveWorld.toString(), id: messageId }) } diff --git a/packages/instanceserver/src/ServerHostNetworkSystem.ts b/packages/instanceserver/src/ServerHostNetworkSystem.ts new file mode 100644 index 0000000000..965d8d0bab --- /dev/null +++ b/packages/instanceserver/src/ServerHostNetworkSystem.ts @@ -0,0 +1,31 @@ +import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine' +import { NetworkPeerFunctions } from '@etherealengine/engine/src/networking/functions/NetworkPeerFunctions' +import { updatePeers } from '@etherealengine/engine/src/networking/systems/OutgoingActionSystem' + +import { SocketWebRTCServerNetwork } from './SocketWebRTCServerFunctions' + +export async function validateNetworkObjects(network: SocketWebRTCServerNetwork): Promise { + for (const [peerID, client] of network.peers) { + if (client.userId === Engine.instance.userId) continue + if (Date.now() - client.lastSeenTs > 30000) { + NetworkPeerFunctions.destroyPeer(network, peerID) + updatePeers(network) + } + } +} + +export default async function ServerHostNetworkSystem() { + const VALIDATE_NETWORK_INTERVAL = Engine.instance.tickRate * 5 + + const execute = () => { + const network = Engine.instance.worldNetwork as SocketWebRTCServerNetwork + if (!network) return + if (Engine.instance.worldNetwork.isHosting && Engine.instance.fixedTick % VALIDATE_NETWORK_INTERVAL === 0) { + validateNetworkObjects(network) + } + } + + const cleanup = async () => {} + + return { execute, cleanup } +} diff --git a/packages/instanceserver/src/SocketFunctions.ts b/packages/instanceserver/src/SocketFunctions.ts index c404c3753f..ea688b4879 100644 --- a/packages/instanceserver/src/SocketFunctions.ts +++ b/packages/instanceserver/src/SocketFunctions.ts @@ -2,6 +2,7 @@ import { AuthError } from '@etherealengine/common/src/enums/AuthError' import { AuthTask } from '@etherealengine/common/src/interfaces/AuthTask' import { UserInterface } from '@etherealengine/common/src/interfaces/User' import { UserId } from '@etherealengine/common/src/interfaces/UserId' +import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine' import { EngineActions, getEngineState } from '@etherealengine/engine/src/ecs/classes/EngineState' import { MessageTypes } from '@etherealengine/engine/src/networking/enums/MessageTypes' import { matchActionOnce } from '@etherealengine/engine/src/networking/functions/matchActionOnce' @@ -17,6 +18,7 @@ import { handleJoinWorld, handleLeaveWorld } from './NetworkFunctions' +import { getServerNetwork } from './SocketWebRTCServerFunctions' import { handleWebRtcCloseConsumer, handleWebRtcCloseProducer, @@ -48,6 +50,9 @@ export const setupSocketFunctions = async (app: Application, spark: any) => { **/ if (!getEngineState().joinedWorld.value) await new Promise((resolve) => matchActionOnce(EngineActions.joinedWorld.matches, resolve)) + + const network = getServerNetwork(app) + spark.on('data', async (message) => { if (message.type === MessageTypes.Authorization.toString()) { const data = message.data @@ -113,7 +118,7 @@ export const setupSocketFunctions = async (app: Application, spark: any) => { * @todo Check if the user is banned */ - await handleConnectingPeer(app.network, spark, user) + await handleConnectingPeer(network, spark, user) } catch (e) { console.error(e) authTask.status = 'fail' @@ -124,68 +129,68 @@ export const setupSocketFunctions = async (app: Application, spark: any) => { spark.on('end', () => { console.log('got disconnection') - handleDisconnect(app.network, spark) + handleDisconnect(network, spark) }) spark.on('data', async (message) => { const { type, data, id } = message switch (type) { case MessageTypes.JoinWorld.toString(): - handleJoinWorld(app.network, spark, data, id, userId, user) + handleJoinWorld(network, spark, data, id, userId, user) break case MessageTypes.Heartbeat.toString(): - handleHeartbeat(app.network, spark) + handleHeartbeat(network, spark) break case MessageTypes.ActionData.toString(): - handleIncomingActions(app.network, spark, data) + handleIncomingActions(network, spark, data) break case MessageTypes.LeaveWorld.toString(): - handleLeaveWorld(app.network, spark, data, id) + handleLeaveWorld(network, spark, data, id) break case MessageTypes.WebRTCTransportCreate.toString(): - handleWebRtcTransportCreate(app.network, spark, data, id) + handleWebRtcTransportCreate(network, spark, data, id) break case MessageTypes.WebRTCProduceData.toString(): - handleWebRtcProduceData(app.network, spark, data, id) + handleWebRtcProduceData(network, spark, data, id) break case MessageTypes.WebRTCTransportConnect.toString(): - handleWebRtcTransportConnect(app.network, spark, data, id) + handleWebRtcTransportConnect(network, spark, data, id) break case MessageTypes.WebRTCTransportClose.toString(): - handleWebRtcTransportClose(app.network, spark, data, id) + handleWebRtcTransportClose(network, spark, data, id) break case MessageTypes.WebRTCCloseProducer.toString(): - handleWebRtcCloseProducer(app.network, spark, data, id) + handleWebRtcCloseProducer(network, spark, data, id) break case MessageTypes.WebRTCSendTrack.toString(): - handleWebRtcSendTrack(app.network, spark, data, id) + handleWebRtcSendTrack(network, spark, data, id) break case MessageTypes.WebRTCReceiveTrack.toString(): - handleWebRtcReceiveTrack(app.network, spark, data, id) + handleWebRtcReceiveTrack(network, spark, data, id) break case MessageTypes.WebRTCPauseConsumer.toString(): - handleWebRtcPauseConsumer(app.network, spark, data, id) + handleWebRtcPauseConsumer(network, spark, data, id) break case MessageTypes.WebRTCResumeConsumer.toString(): - handleWebRtcResumeConsumer(app.network, spark, data, id) + handleWebRtcResumeConsumer(network, spark, data, id) break case MessageTypes.WebRTCCloseConsumer.toString(): - handleWebRtcCloseConsumer(app.network, spark, data, id) + handleWebRtcCloseConsumer(network, spark, data, id) break case MessageTypes.WebRTCConsumerSetLayers.toString(): - handleWebRtcConsumerSetLayers(app.network, spark, data, id) + handleWebRtcConsumerSetLayers(network, spark, data, id) break case MessageTypes.WebRTCResumeProducer.toString(): - handleWebRtcResumeProducer(app.network, spark, data, id) + handleWebRtcResumeProducer(network, spark, data, id) break case MessageTypes.WebRTCPauseProducer.toString(): - handleWebRtcPauseProducer(app.network, spark, data, id) + handleWebRtcPauseProducer(network, spark, data, id) break case MessageTypes.WebRTCRequestCurrentProducers.toString(): - handleWebRtcRequestCurrentProducers(app.network, spark, data, id) + handleWebRtcRequestCurrentProducers(network, spark, data, id) break case MessageTypes.InitializeRouter.toString(): - handleWebRtcInitializeRouter(app.network, spark, data, id) + handleWebRtcInitializeRouter(network, spark, data, id) break } }) diff --git a/packages/instanceserver/src/SocketWebRTCServerFunctions.ts b/packages/instanceserver/src/SocketWebRTCServerFunctions.ts new file mode 100755 index 0000000000..378b2237a2 --- /dev/null +++ b/packages/instanceserver/src/SocketWebRTCServerFunctions.ts @@ -0,0 +1,88 @@ +import { Consumer, Producer, TransportInternal, WebRtcTransport } from 'mediasoup/node/lib/types' + +import { MediaStreamAppData } from '@etherealengine/common/src/interfaces/MediaStreamConstants' +import { PeerID } from '@etherealengine/common/src/interfaces/PeerID' +import { UserId } from '@etherealengine/common/src/interfaces/UserId' +import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine' +import { createNetwork } from '@etherealengine/engine/src/networking/classes/Network' +import { Topic } from '@etherealengine/hyperflux/functions/ActionFunctions' +import { Application } from '@etherealengine/server-core/declarations' +import multiLogger from '@etherealengine/server-core/src/ServerLogger' + +import { startWebRTC } from './WebRTCFunctions' + +const logger = multiLogger.child({ component: 'instanceserver:webrtc:network' }) + +export type WebRTCTransportExtension = Omit & { + appData: MediaStreamAppData + internal: TransportInternal +} +export type ProducerExtension = Omit & { appData: MediaStreamAppData } +export type ConsumerExtension = Omit & { appData: MediaStreamAppData } + +export const initializeNetwork = async (app: Application, hostId: UserId, topic: Topic) => { + const { workers, routers } = await startWebRTC() + + const outgoingDataTransport = await routers.instance[0].createDirectTransport() + const options = { + ordered: false, + label: 'outgoingProducer', + protocol: 'raw', + appData: { peerID: 'outgoingProducer' } + } + const outgoingDataProducer = await outgoingDataTransport.produceData(options) + + const currentRouter = routers.instance[0] + + await Promise.all( + (routers.instance as any).map(async (router) => { + if (router.id !== currentRouter.id) + return currentRouter.pipeToRouter({ dataProducerId: outgoingDataProducer.id, router: router }) + else return Promise.resolve() + }) + ) + logger.info('Server transport initialized.') + + const transport = { + get peers() { + return Object.keys(app.primus.connections) as PeerID[] + }, + + messageToPeer: (peerId: PeerID, data: any) => { + const spark = app.primus.connections[peerId] + if (spark) spark.write(data) + }, + + messageToAll: (data: any) => { + for (const spark of Object.values(app.primus.connections)) spark.write(data) + }, + + bufferToPeer: (peerID: PeerID, data: any) => { + /** noop */ + }, + + bufferToAll: (data: any) => { + network.outgoingDataProducer.send(Buffer.from(new Uint8Array(data))) + } + } + + const network = { + ...createNetwork(hostId, topic), + workers, + routers, + transport, + outgoingDataTransport, + outgoingDataProducer, + mediasoupTransports: [] as WebRTCTransportExtension[], + transportsConnectPending: [] as Promise[], + producers: [] as ProducerExtension[], + consumers: [] as ConsumerExtension[] + } + + return network +} + +export type SocketWebRTCServerNetwork = Awaited> + +export const getServerNetwork = (app: Application) => + (app.isChannelInstance ? Engine.instance.mediaNetwork : Engine.instance.worldNetwork) as SocketWebRTCServerNetwork diff --git a/packages/instanceserver/src/SocketWebRTCServerNetwork.ts b/packages/instanceserver/src/SocketWebRTCServerNetwork.ts deleted file mode 100755 index 83437330db..0000000000 --- a/packages/instanceserver/src/SocketWebRTCServerNetwork.ts +++ /dev/null @@ -1,143 +0,0 @@ -import { - Consumer, - DataProducer, - Producer, - Router, - Transport, - TransportInternal, - WebRtcTransport, - Worker -} from 'mediasoup/node/lib/types' - -import { MediaStreamAppData } from '@etherealengine/common/src/interfaces/MediaStreamConstants' -import { PeersUpdateType } from '@etherealengine/common/src/interfaces/PeerID' -import { UserId } from '@etherealengine/common/src/interfaces/UserId' -import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine' -import { Network } from '@etherealengine/engine/src/networking/classes/Network' -import { MessageTypes } from '@etherealengine/engine/src/networking/enums/MessageTypes' -import { WorldState } from '@etherealengine/engine/src/networking/interfaces/WorldState' -import { clearOutgoingActions, getMutableState } from '@etherealengine/hyperflux' -import { Action, addOutgoingTopicIfNecessary, Topic } from '@etherealengine/hyperflux/functions/ActionFunctions' -import { Application } from '@etherealengine/server-core/declarations' -import multiLogger from '@etherealengine/server-core/src/ServerLogger' - -import { setupSubdomain } from './NetworkFunctions' -import { startWebRTC } from './WebRTCFunctions' - -const logger = multiLogger.child({ component: 'instanceserver:webrtc:network' }) - -export type WebRTCTransportExtension = Omit & { - appData: MediaStreamAppData - internal: TransportInternal -} -export type ProducerExtension = Omit & { appData: MediaStreamAppData } -export type ConsumerExtension = Omit & { appData: MediaStreamAppData } - -export class SocketWebRTCServerNetwork extends Network { - workers: Worker[] = [] - routers: Record - transport: Transport - app: Application - - outgoingDataTransport: Transport - outgoingDataProducer: DataProducer - request = () => null! - - mediasoupTransports: WebRTCTransportExtension[] = [] - transportsConnectPending: Promise[] = [] - - producers = [] as ProducerExtension[] - consumers = [] as ConsumerExtension[] - - constructor(hostId: UserId, topic: Topic, app: Application) { - super(hostId, topic) - this.app = app - addOutgoingTopicIfNecessary(topic) - } - - public updatePeers = () => { - const userNames = getMutableState(WorldState).userNames - const peers = Array.from(this.peers.values()).map((peer) => { - return { - peerID: peer.peerID, - peerIndex: peer.peerIndex, - userID: peer.userId, - userIndex: peer.userIndex, - name: userNames[peer.userId].value - } - }) as Array - if (peers.length) - this.app.primus.forEach((spark, id, connections) => { - spark.write({ type: MessageTypes.UpdatePeers.toString(), data: peers }) - }) - } - - public sendActions = (): any => { - if (!this.ready) return - - const actions = [...Engine.instance.store.actions.outgoing[this.topic].queue] - if (!actions.length) return - - const outgoing = Engine.instance.store.actions.outgoing - - this.app.primus.forEach((spark, sparkId) => { - const arr: Action[] = [] - for (const a of [...actions]) { - const action = { ...a } - if (outgoing[this.topic].historyUUIDs.has(action.$uuid)) { - const idx = outgoing[this.topic].queue.indexOf(action) - outgoing[this.topic].queue.splice(idx, 1) - } - if (!action.$to) continue - const toUserId = this.peers.get(sparkId)?.userId - if (action.$to === 'all' || (action.$to === 'others' && toUserId !== action.$from) || action.$to === toUserId) { - arr.push(action) - } - } - if (arr.length) spark.write({ type: MessageTypes.ActionData.toString(), /*encode(*/ data: arr }) //) - }) - - // TODO: refactor this to support multiple connections of the same topic type - clearOutgoingActions(this.topic, Engine.instance.store) - } - - public sendReliableData = (message: any): any => { - if (this.app.primus != null) - this.app.primus.forEach((spark) => { - spark.write(MessageTypes.ReliableMessage.toString(), message) - }) - } - - public sendData = (data: Buffer): void => { - if (this.outgoingDataProducer != null) this.outgoingDataProducer.send(Buffer.from(new Uint8Array(data))) - } - - close() { - if (this.transport && typeof this.transport.close === 'function') this.transport.close() - } - - public async initialize(): Promise { - await setupSubdomain(this) - await startWebRTC(this) - - this.outgoingDataTransport = await this.routers.instance[0].createDirectTransport() - const options = { - ordered: false, - label: 'outgoingProducer', - protocol: 'raw', - appData: { peerID: 'outgoingProducer' } - } - this.outgoingDataProducer = await this.outgoingDataTransport.produceData(options) - - const currentRouter = this.routers.instance[0] - - await Promise.all( - (this.routers.instance as any).map(async (router) => { - if (router.id !== currentRouter.id) - return currentRouter.pipeToRouter({ dataProducerId: this.outgoingDataProducer.id, router: router }) - else return Promise.resolve() - }) - ) - logger.info('Server transport initialized.') - } -} diff --git a/packages/instanceserver/src/WebRTCFunctions.ts b/packages/instanceserver/src/WebRTCFunctions.ts index 91fe883e48..951e6c1927 100755 --- a/packages/instanceserver/src/WebRTCFunctions.ts +++ b/packages/instanceserver/src/WebRTCFunctions.ts @@ -9,7 +9,8 @@ import { RtpCodecCapability, RtpParameters, Transport, - WebRtcTransport + WebRtcTransport, + Worker } from 'mediasoup/node/lib/types' import os from 'os' import { Spark } from 'primus' @@ -17,9 +18,11 @@ import { Spark } from 'primus' import { MediaStreamAppData, MediaTagType } from '@etherealengine/common/src/interfaces/MediaStreamConstants' import { PeerID } from '@etherealengine/common/src/interfaces/PeerID' import { MessageTypes } from '@etherealengine/engine/src/networking/enums/MessageTypes' +import { getState } from '@etherealengine/hyperflux' import config from '@etherealengine/server-core/src/appconfig' import { localConfig, sctpParameters } from '@etherealengine/server-core/src/config' import multiLogger from '@etherealengine/server-core/src/ServerLogger' +import { ServerState } from '@etherealengine/server-core/src/ServerState' import { WebRtcTransportParams } from '@etherealengine/server-core/src/types/WebRtcTransportParams' import { getUserIdFromPeerID } from './NetworkFunctions' @@ -28,7 +31,7 @@ import { ProducerExtension, SocketWebRTCServerNetwork, WebRTCTransportExtension -} from './SocketWebRTCServerNetwork' +} from './SocketWebRTCServerFunctions' const logger = multiLogger.child({ component: 'instanceserver:webrtc' }) @@ -41,11 +44,12 @@ const toArrayBuffer = (buf): any => { return ab } -export async function startWebRTC(network: SocketWebRTCServerNetwork): Promise { +export async function startWebRTC() { logger.info('Starting WebRTC Server.') // Initialize roomstate const cores = os.cpus() - network.routers = { instance: [] } + const routers = { instance: [] } as { instance: Router[] } + const workers = [] as Worker[] for (let i = 0; i < cores.length; i++) { const newWorker = await createWorker({ logLevel: 'debug', @@ -65,17 +69,18 @@ export async function startWebRTC(network: SocketWebRTCServerNetwork): Promise async (producer: ProducerExtension): Promise => { const peerID = spark.id as PeerID - const userId = getUserIdFromPeerID(network, spark.id)! + const userId = getUserIdFromPeerID(network, peerID)! const selfClient = network.peers.get(peerID)! if (selfClient?.peerID != null) { for (const [, client] of network.peers) { @@ -110,7 +115,7 @@ export const sendCurrentProducers = async ( channelId?: string ): Promise => { const peerID = spark.id as PeerID - const selfUserId = getUserIdFromPeerID(network, spark.id)! + const selfUserId = getUserIdFromPeerID(network, peerID)! const selfClient = network.peers.get(peerID)! if (selfClient?.peerID) { for (const [peerID, client] of network.peers) { @@ -147,7 +152,7 @@ export const handleConsumeDataEvent = (network: SocketWebRTCServerNetwork, spark: Spark) => async (dataProducer: DataProducer): Promise => { const peerID = spark.id as PeerID - const userId = getUserIdFromPeerID(network, spark.id)! + const userId = getUserIdFromPeerID(network, peerID)! logger.info('Data Consumer being created on server by client: ' + userId) if (!network.peers.has(peerID)) { return false @@ -327,7 +332,7 @@ export async function createInternalDataConsumer( network.incomingMessageQueueUnreliableIDs.add(peerID) // forward data to clients in world immediately // TODO: need to include the userId (or index), so consumers can validate - network.sendData(message) + network.transport.bufferToAll(message) }) return consumer } catch (err) { @@ -392,14 +397,15 @@ export async function handleWebRtcTransportCreate( const { id, iceParameters, iceCandidates, dtlsParameters } = newTransport if (config.kubernetes.enabled) { - const serverResult = await network.app.k8AgonesClient.listNamespacedCustomObject( + const app = getState(ServerState).app + const serverResult = await app.k8AgonesClient.listNamespacedCustomObject( 'agones.dev', 'v1', 'default', 'gameservers' ) const thisGs = (serverResult?.body! as any).items.find( - (server) => server.metadata.name === network.app.instanceServer.objectMeta.name + (server) => server.metadata.name === app.instanceServer.objectMeta.name ) for (let [index, candidate] of iceCandidates.entries()) { @@ -446,9 +452,9 @@ export async function handleWebRtcProduceData( try { console.log('webRTCProduceData') const peerID = spark.id as PeerID - const userId = getUserIdFromPeerID(network, spark.id) + const userId = getUserIdFromPeerID(network, peerID) if (!userId) { - logger.info('userId could not be found for sparkID ' + spark.id) + logger.info('userId could not be found for sparkID ' + peerID) return } if (!data.label) { @@ -639,7 +645,7 @@ export async function handleWebRtcSendTrack( messageId: string ) { const peerID = spark.id as PeerID - const userId = getUserIdFromPeerID(network, spark.id) + const userId = getUserIdFromPeerID(network, peerID) const { transportId, kind, rtpParameters, paused = false, appData } = data const transport = network.mediasoupTransports[transportId] diff --git a/packages/instanceserver/src/WorldHostModule.ts b/packages/instanceserver/src/WorldHostModule.ts new file mode 100644 index 0000000000..45922fac3a --- /dev/null +++ b/packages/instanceserver/src/WorldHostModule.ts @@ -0,0 +1,13 @@ +import { SystemUpdateType } from '@etherealengine/engine/src/ecs/functions/SystemUpdateType' + +import ServerHostNetworkSystem from './ServerHostNetworkSystem' + +export function WorldHostModule() { + return [ + { + uuid: 'ee.instanceserver.ServerHostNetworkSystem', + type: SystemUpdateType.FIXED, + systemLoader: () => Promise.resolve({ default: ServerHostNetworkSystem }) + } + ] +} diff --git a/packages/instanceserver/src/channels.ts b/packages/instanceserver/src/channels.ts index 4de9e1045a..db521cd2a6 100755 --- a/packages/instanceserver/src/channels.ts +++ b/packages/instanceserver/src/channels.ts @@ -30,8 +30,9 @@ import { getProjectsList } from '@etherealengine/server-core/src/projects/projec import multiLogger from '@etherealengine/server-core/src/ServerLogger' import getLocalServerIp from '@etherealengine/server-core/src/util/get-local-server-ip' -import { authorizeUserToJoinServer } from './NetworkFunctions' -import { SocketWebRTCServerNetwork } from './SocketWebRTCServerNetwork' +import { authorizeUserToJoinServer, setupSubdomain } from './NetworkFunctions' +import { getServerNetwork, initializeNetwork, SocketWebRTCServerNetwork } from './SocketWebRTCServerFunctions' +import { WorldHostModule } from './WorldHostModule' const logger = multiLogger.child({ component: 'instanceserver:channels' }) @@ -229,11 +230,8 @@ const loadEngine = async (app: Application, sceneId: string) => { Engine.instance.userId = hostId const topic = app.isChannelInstance ? NetworkTopics.media : NetworkTopics.world - const network = new SocketWebRTCServerNetwork(hostId, topic, app) - app.network = network - const initPromise = network.initialize() - - addNetwork(network) + await setupSubdomain() + const networkPromise = initializeNetwork(app, hostId, topic) const projects = await getProjectsList() if (app.isChannelInstance) { @@ -253,7 +251,8 @@ const loadEngine = async (app: Application, sceneId: string) => { ...TransformModule(), ...SceneCommonModule(), ...AvatarCommonModule(), - ...RealtimeNetworkingModule(false, true) + ...RealtimeNetworkingModule(false, true), + ...WorldHostModule() ]) await loadEngineInjection(projects) dispatchAction(EngineActions.initializeEngine({ initialised: true })) @@ -267,7 +266,11 @@ const loadEngine = async (app: Application, sceneId: string) => { logger.info('Scene loaded!') } - await initPromise + + const network = await networkPromise + + addNetwork(network) + network.ready = true NetworkPeerFunctions.createPeer( @@ -408,7 +411,7 @@ const shutdownServer = async (app: Application, instanceId: string) => { // todo: this could be more elegant const getActiveUsersCount = (app: Application, userToIgnore: UserInterface) => { - const activeClients = app.network.peers + const activeClients = getServerNetwork(app).peers const activeUsers = [...activeClients].filter( ([id, client]) => client.userId !== Engine.instance.userId && client.userId !== userToIgnore.id ) @@ -489,9 +492,11 @@ const handleUserDisconnect = async ( await new Promise((resolve) => setTimeout(resolve, config.instanceserver.shutdownDelayMs)) + const network = getServerNetwork(app) + // check if there are no peers connected (1 being the server, // 0 if the serer was just starting when someone connected and disconnected) - if (app.network.peers.size <= 1) { + if (network.peers.size <= 1) { logger.info('Shutting down instance server as there are no users present.') await shutdownServer(app, instanceId) } diff --git a/packages/server-core/declarations.ts b/packages/server-core/declarations.ts index 7493773cb3..4cb7673f4a 100755 --- a/packages/server-core/declarations.ts +++ b/packages/server-core/declarations.ts @@ -7,8 +7,6 @@ import * as k8s from '@kubernetes/client-node' import { ServiceTypes } from '@etherealengine/common/declarations' -import { SocketWebRTCServerNetwork } from '../instanceserver/src/SocketWebRTCServerNetwork' - export const ServerMode = { API: 'API' as const, Instance: 'Instance' as const, @@ -17,6 +15,11 @@ export const ServerMode = { export type ServerTypeMode = typeof ServerMode[keyof typeof ServerMode] +export type PrimusType = Primus & { + forEach(cb: (spark: Primus.Spark, id: string, connections: { [id: string]: Primus.Spark }) => boolean | void): Primus + use(name: string, fn: (req: any, res: any, next: any) => void, level?: number): Primus +} + export type Application = ExpressFeathers & { // Common k8AgonesClient: k8s.CustomObjectsApi @@ -25,8 +28,7 @@ export type Application = ExpressFeathers & { k8BatchClient: k8s.BatchV1Api agonesSDK: any sync: any - primus: Primus - network: SocketWebRTCServerNetwork + primus: PrimusType seed: () => Application // function serverMode: ServerTypeMode diff --git a/packages/server-core/src/ServerState.ts b/packages/server-core/src/ServerState.ts new file mode 100644 index 0000000000..ee9cd8b061 --- /dev/null +++ b/packages/server-core/src/ServerState.ts @@ -0,0 +1,10 @@ +import { defineState } from '@etherealengine/hyperflux' + +import { Application } from '../declarations' + +export const ServerState = defineState({ + name: 'ServerState', + initial: { + app: null! as Application + } +}) diff --git a/packages/server-core/src/createApp.ts b/packages/server-core/src/createApp.ts index b0537ef569..215bb680f3 100644 --- a/packages/server-core/src/createApp.ts +++ b/packages/server-core/src/createApp.ts @@ -24,6 +24,7 @@ import config from './appconfig' import { createDefaultStorageProvider, createIPFSStorageProvider } from './media/storageprovider/storageprovider' import sequelize from './sequelize' import { elasticOnlyLogger, logger } from './ServerLogger' +import { ServerState } from './ServerState' import services from './services' import authentication from './user/authentication' import primus from './util/primus' @@ -131,9 +132,9 @@ export const createFeathersExpressApp = ( createIPFSStorageProvider() } + createEngine() + getMutableState(EngineState).publicPath.set(config.client.dist) if (!appConfig.db.forceRefresh) { - createEngine() - getMutableState(EngineState).publicPath.set(config.client.dist) setupEngineActionSystems() initializeNode() } @@ -200,5 +201,7 @@ export const createFeathersExpressApp = ( app.use(errorHandler({ logger })) + getMutableState(ServerState).merge({ app }) + return app } diff --git a/packages/server-core/src/util/primus.ts b/packages/server-core/src/util/primus.ts index c8889c1aba..425b20a5de 100644 --- a/packages/server-core/src/util/primus.ts +++ b/packages/server-core/src/util/primus.ts @@ -3,6 +3,8 @@ import http from 'http' import Primus from 'primus' import Emitter from 'primus-emitter' +import { PrimusType } from '../../declarations' + function configurePrimus(config?: any, configurer?: any) { return function (app) { // Returns the connection object @@ -36,7 +38,7 @@ function configurePrimus(config?: any, configurer?: any) { async setup(this: any, server = http.createServer(), ...rest: any[]) { if (!this.primus) { - const primus = (this.primus = new Primus(server, config)) + const primus = (this.primus = new Primus(server, config)) as PrimusType primus.plugin('emitter', Emitter)