Skip to content
This repository was archived by the owner on Aug 21, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/instanceserver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
"scripts": {
"check-errors": "tsc --noemit",
"start": "cross-env APP_ENV=production ts-node --swc src/index.ts",
"start-channel": "cross-env APP_ENV=production INSTANCESERVER_PORT=3032 ts-node --swc src/index.ts",
"start-channel": "cross-env APP_ENV=production INSTANCESERVER_PORT=3032 DEV_CHANNEL=true ts-node --swc src/index.ts",
"dev": "cross-env APP_ENV=development NODE_OPTIONS='--inspect=2995' ts-node --swc src/index.ts",
"dev-channel": " cross-env APP_ENV=development NODE_OPTIONS='--inspect=2996' INSTANCESERVER_PORT=3032 ts-node --swc src/index.ts",
"dev-channel": " cross-env APP_ENV=development NODE_OPTIONS='--inspect=2996' DEV_CHANNEL=true INSTANCESERVER_PORT=3032 ts-node --swc src/index.ts",
"dev-nossl": "cross-env NOSSL=true ts-node --swc src/index.ts",
"test": "exit 0",
"validate": "npm run build && npm run test"
Expand Down
7 changes: 6 additions & 1 deletion packages/instanceserver/src/NetworkFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ export const setupSubdomain = async () => {
}
]

localConfig.mediasoup.webRtcServerOptions.listenInfos.forEach((listenInfo) => {
listenInfo.announcedIp = announcedIp
listenInfo.ip = '0.0.0.0'
})

localConfig.mediasoup.plainTransport.listenIp = {
ip: '0.0.0.0',
announcedIp
Expand Down Expand Up @@ -460,7 +465,7 @@ export async function handleHeartbeat(network: SocketWebRTCServerNetwork, spark:
export async function handleDisconnect(network: SocketWebRTCServerNetwork, spark: Spark, peerID: PeerID): Promise<any> {
const userId = getUserIdFromPeerID(network, peerID) as UserId
const disconnectedClient = network.peers.get(peerID)
if (!disconnectedClient) return logger.warn(`Tried to handle disconnect for peer ${peerID} but was not foudn`)
if (!disconnectedClient) return logger.warn(`Tried to handle disconnect for peer ${peerID} but was not found`)
// On local, new connections can come in before the old sockets are disconnected.
// The new connection will overwrite the socketID for the user's client.
// This will only clear transports if the client's socketId matches the socket that's disconnecting.
Expand Down
41 changes: 23 additions & 18 deletions packages/instanceserver/src/WebRTCFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,27 @@ import {
DataProducer,
DataProducerOptions,
MediaKind,
ProducerOptions,
Router,
RtpCodecCapability,
RtpParameters,
SctpStreamParameters,
Transport,
WebRtcServer,
WebRtcTransport,
Worker
} from 'mediasoup/node/lib/types'
import os from 'os'
import { Spark } from 'primus'

import { PeerID } from '@etherealengine/common/src/interfaces/PeerID'
import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine'
import { DataChannelType } from '@etherealengine/engine/src/networking/classes/Network'
import { MessageTypes } from '@etherealengine/engine/src/networking/enums/MessageTypes'
import { NetworkPeer } from '@etherealengine/engine/src/networking/interfaces/NetworkPeer'
import {
dataChannelRegistry,
MediaStreamAppData,
MediaTagType,
NetworkState
MediaTagType
} from '@etherealengine/engine/src/networking/NetworkState'
import { getState } from '@etherealengine/hyperflux'
import { Application } from '@etherealengine/server-core/declarations'
import config from '@etherealengine/server-core/src/appconfig'
import { localConfig, sctpParameters } from '@etherealengine/server-core/src/config'
import multiLogger from '@etherealengine/server-core/src/ServerLogger'
Expand Down Expand Up @@ -63,6 +59,10 @@ export async function startWebRTC() {
logTags: ['sctp']
})

const webRtcServerOptions = JSON.parse(JSON.stringify(localConfig.mediasoup.webRtcServerOptions))
for (const listenInfo of webRtcServerOptions.listenInfos) listenInfo.port += i
newWorker.appData.webRtcServer = await newWorker.createWebRtcServer(webRtcServerOptions)

newWorker.on('died', (err) => {
logger.fatal(err, 'mediasoup worker died (this should never happen)')
process.exit(1)
Expand All @@ -71,7 +71,7 @@ export async function startWebRTC() {
logger.info('Created Mediasoup worker.')

const mediaCodecs = localConfig.mediasoup.router.mediaCodecs as RtpCodecCapability[]
const newRouter = await newWorker.createRouter({ mediaCodecs })
const newRouter = await newWorker.createRouter({ mediaCodecs, appData: { worker: newWorker } })
routers.instance.push(newRouter)
logger.info('Worker created router.')
workers.push(newWorker)
Expand Down Expand Up @@ -260,6 +260,13 @@ export const handleWebRtcConsumeData = async (
}
}

export async function closeDataProducer(network, dataProducer): Promise<void> {
network.dataProducers.delete(dataProducer.id)
logger.info("data producer's transport closed: " + dataProducer.id)
dataProducer.close()
network.peers.get(dataProducer.appData.peerID)!.dataProducers!.delete(dataProducer.id)
}

export async function closeTransport(
network: SocketWebRTCServerNetwork,
transport: WebRTCTransportExtension
Expand All @@ -268,6 +275,10 @@ export async function closeTransport(
// our producer and consumer event handlers will take care of
// calling closeProducer() and closeConsumer() on all the producers
// and consumers associated with this transport
const dataProducers = (transport as any).dataProducers
dataProducers?.forEach(async (dataProducer) => await closeDataProducer(network, dataProducer))
const producers = (transport as any).producers
producers?.forEach(async (producer) => await closeProducer(network, producer))
if (transport && typeof transport.close === 'function') {
await transport.close()
delete network.mediasoupTransports[transport.id]
Expand Down Expand Up @@ -334,7 +345,7 @@ export async function createWebRtcTransport(
network.routers[`${channelType}:${channelId}`] = [] as any
await Promise.all(
network.workers.map(async (worker) => {
const newRouter = await worker.createRouter({ mediaCodecs })
const newRouter = await worker.createRouter({ mediaCodecs, appData: { worker } })
network.routers[`${channelType}:${channelId}`].push(newRouter)
return Promise.resolve()
})
Expand All @@ -356,7 +367,7 @@ export async function createWebRtcTransport(
}

return selectedrouter?.createWebRtcTransport({
listenIps: listenIps,
webRtcServer: (selectedrouter.appData.worker as Worker).appData!.webRtcServer as WebRtcServer,
enableUdp: true,
enableTcp: false,
preferUdp: true,
Expand Down Expand Up @@ -556,22 +567,16 @@ export async function handleWebRtcProduceData(

await Promise.all(
network.routers.instance.map(async (router) => {
if (router.id !== transport.internal.routerId) {
if (router.id !== transport.internal.routerId)
return currentRouter.pipeToRouter({
dataProducerId: dataProducer.id,
router: router
})
}
})
)

// if our associated transport closes, close ourself, too
dataProducer.on('transportclose', () => {
network.dataProducers.delete(dataProducer.id)
logger.info("data producer's transport closed: " + dataProducer.id)
dataProducer.close()
network.peers.get(peerID)!.dataProducers!.delete(dataProducer.id)
})
dataProducer.on('transportclose', () => closeDataProducer(network, dataProducer))
const internalConsumer = await createInternalDataConsumer(network, dataProducer, peerID)
if (internalConsumer) {
if (!network.peers.has(peerID)) {
Expand Down Expand Up @@ -1060,7 +1065,7 @@ export async function handleWebRtcInitializeRouter(
network.routers[`${channelType}:${channelId}`] = []
await Promise.all(
network.workers.map(async (worker) => {
const newRouter = await worker.createRouter({ mediaCodecs })
const newRouter = await worker.createRouter({ mediaCodecs, appData: { worker } })
network.routers[`${channelType}:${channelId}`].push(newRouter)
})
)
Expand Down
39 changes: 37 additions & 2 deletions packages/server-core/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import type { WebRtcTransportOptions } from 'mediasoup/node/lib/WebRtcTransport'
import configFile from './appconfig'
import { SctpParameters } from './types/SctpParameters'

const NUM_RTC_PORTS = process.env.NUM_RTC_PORTS ? parseInt(process.env.NUM_RTC_PORTS) : 10000

export const sctpParameters: SctpParameters = {
OS: 1024,
MIS: 65535,
Expand All @@ -14,6 +16,22 @@ export const sctpParameters: SctpParameters = {
export const config = {
httpPeerStale: 15000,
mediasoup: {
webRtcServerOptions: {
listenInfos: [
{
protocol: 'udp',
ip: configFile.instanceserver.hostname! || '0.0.0.0',
announcedIp: null! as string,
port: process.env.DEV_CHANNEL === 'true ' ? 30000 : 40000
},
{
protocol: 'tcp',
ip: configFile.instanceserver.hostname! || '0.0.0.0',
announcedIp: null! as string,
port: process.env.DEV_CHANNEL === 'true' ? 30000 : 40000
}
]
},
worker: {
rtcMinPort: 40000,
rtcMaxPort: 49999,
Expand Down Expand Up @@ -73,9 +91,26 @@ export const config = {
export const localConfig = {
httpPeerStale: 15000,
mediasoup: {
webRtcServerOptions: {
listenInfos: [
{
protocol: 'udp',
ip: configFile.instanceserver.hostname! || '0.0.0.0',
announcedIp: null! as string,
port: process.env.DEV_CHANNEL === 'true' ? 30000 : configFile.instanceserver.rtc_start_port
},
{
protocol: 'tcp',
ip: configFile.instanceserver.hostname! || '0.0.0.0',
announcedIp: null! as string,
port: process.env.DEV_CHANNEL === 'true' ? 30000 : configFile.instanceserver.rtc_start_port
}
]
},
worker: {
rtcMinPort: configFile.instanceserver.rtc_start_port,
rtcMaxPort: configFile.instanceserver.rtc_end_port,
rtcMinPort: process.env.DEV_CHANNEL === 'true' ? 30000 : configFile.instanceserver.rtc_start_port,
rtcMaxPort:
(process.env.DEV_CHANNEL === 'true' ? 30000 : configFile.instanceserver.rtc_start_port) + NUM_RTC_PORTS - 1,
logLevel: 'info',
logTags: ['info', 'ice', 'dtls', 'rtp', 'srtp', 'rtcp']
},
Expand Down
83 changes: 46 additions & 37 deletions scripts/prune_docker_cache.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,57 @@
/* eslint-disable @typescript-eslint/no-var-requires */

import { DeleteObjectsCommand, GetObjectCommand, ListObjectsV2Command } from '@aws-sdk/client-s3'
import cli from 'cli'
import { buffer } from 'node:stream/consumers'

import {
createDefaultStorageProvider,
getStorageProvider
} from '@etherealengine/server-core/src/media/storageprovider/storageprovider'
import { buffer } from 'node:stream/consumers'
import { DeleteObjectsCommand, GetObjectCommand, ListObjectsV2Command } from '@aws-sdk/client-s3'

cli.enable('status');
cli.enable('status')

const options = cli.parse({
bucket: [true, 'Name of Bucket', 'string'],
releaseName: [true, 'Name of release', 'string']
});
bucket: [true, 'Name of Bucket', 'string'],
releaseName: [true, 'Name of release', 'string']
})

cli.main(async () => {
try {
await createDefaultStorageProvider()
const storageProvider = getStorageProvider()
const s3 = storageProvider.provider
const listCommand = new ListObjectsV2Command({
Bucket: options.bucket,
Prefix: 'blobs/',
Delimiter: '/'
})
const result = await s3.send(listCommand)
const manifestCommand = new GetObjectCommand({ Bucket: options.bucket, Key: `manifests/latest_${options.releaseName}` })
const manifestResult = await s3.send(manifestCommand)
const bufferBody = await buffer(manifestResult.Body)
const manifest = JSON.parse(bufferBody.toString())
const layers = manifest.layers
const matches = layers.map(layer => layer.blob)
const layersToRemove = result.Contents.filter(item => matches.indexOf(item.Key.replace('blobs/', '')) < 0).map(item => { return { Key: item.Key }})
const deleteCommand = new DeleteObjectsCommand({
Bucket: options.bucket,
Delete: {
Objects: layersToRemove
}
})
await s3.send(deleteCommand)
console.log('Pruned Docker build cache', options)
process.exit(0)
} catch(err) {
console.log('Error in pruning Docker build cache', options);
console.log(err);
}
});
try {
await createDefaultStorageProvider()
const storageProvider = getStorageProvider()
const s3 = storageProvider.provider
const listCommand = new ListObjectsV2Command({
Bucket: options.bucket,
Prefix: 'blobs/',
Delimiter: '/'
})
const result = await s3.send(listCommand)
const manifestCommand = new GetObjectCommand({
Bucket: options.bucket,
Key: `manifests/latest_${options.releaseName}`
})
const manifestResult = await s3.send(manifestCommand)
const bufferBody = await buffer(manifestResult.Body)
const manifest = JSON.parse(bufferBody.toString())
const layers = manifest.layers
const matches = layers.map((layer) => layer.blob)
const layersToRemove = result.Contents.filter((item) => matches.indexOf(item.Key.replace('blobs/', '')) < 0).map(
(item) => {
return { Key: item.Key }
}
)
const deleteCommand = new DeleteObjectsCommand({
Bucket: options.bucket,
Delete: {
Objects: layersToRemove
}
})
await s3.send(deleteCommand)
console.log('Pruned Docker build cache', options)
process.exit(0)
} catch (err) {
console.log('Error in pruning Docker build cache', options)
console.log(err)
}
})