Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions client/src/components/header.vue
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
<span><b>n</b>.eko</span>
</a>
<ul class="menu">
<li>
<button class="btn" @click="startShareScreen" v-if="!mediaStream">
START SCREEN SHARE
</button>
<button class="btn" @click="stopShareScreen" v-else>
STOP SCREEN SHARE
</button>
</li>
<li>
<i
:class="[{ disabled: !admin }, { locked: isLocked('control') }, 'fas', 'fa-mouse']"
Expand Down Expand Up @@ -207,5 +215,31 @@

return this.$t(`locks.${resource}.` + (this.isLocked(resource) ? `locked` : `unlocked`))
}

//
// Screen Share
//
mediaStream: MediaStream | null = null
mediaRtcpSender: RTCRtpSender | null = null
async startShareScreen() {
// get media stream from user's browser
this.mediaStream = await navigator.mediaDevices
.getDisplayMedia({
video: true,
audio: false,
})
const mediaTrack = this.mediaStream.getVideoTracks()[0];
this.mediaRtcpSender = this.$client.addTrack(mediaTrack, this.mediaStream)
}
async stopShareScreen() {
if (this.mediaStream) {
this.mediaStream.getTracks().forEach(track => track.stop())
this.mediaStream = null
}
if (this.mediaRtcpSender) {
this.$client.removeTrack(this.mediaRtcpSender)
this.mediaRtcpSender = null
}
}
}
</script>
16 changes: 16 additions & 0 deletions client/src/neko/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,22 @@ export abstract class BaseClient extends EventEmitter<BaseEvents> {
this._peer.setRemoteDescription({ type: 'answer', sdp })
}

public addTrack(track: MediaStreamTrack, ...streams: MediaStream[]): RTCRtpSender {
if (!this._peer) {
throw new Error('peer not connected')
}

return this._peer.addTrack(track, ...streams)
}

public removeTrack(sender: RTCRtpSender) {
if (!this._peer) {
throw new Error('peer not connected')
}

this._peer.removeTrack(sender)
}

private async onMessage(e: MessageEvent) {
const { event, ...payload } = JSON.parse(e.data) as WebSocketMessages

Expand Down
19 changes: 19 additions & 0 deletions server/internal/capture/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package capture

import (
"errors"
"fmt"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"m1k1o/neko/internal/config"
"m1k1o/neko/internal/types"
"m1k1o/neko/internal/types/codec"
)

type CaptureManagerCtx struct {
Expand All @@ -18,6 +20,9 @@ type CaptureManagerCtx struct {
broadcast *BroacastManagerCtx
audio *StreamSinkManagerCtx
video *StreamSinkManagerCtx

// source-sinks
screenshare *StreamSrcSinkManagerCtx
}

func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx {
Expand All @@ -43,6 +48,15 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
}
return NewVideoPipeline(config.VideoCodec, config.Display, config.VideoPipeline, fps, config.VideoBitrate, config.VideoHWEnc)
}, "video"),

// source-sinks
screenshare: streamSrcSinkNew(config.ScreenshareEnabled, map[string]string{
codec.VP8().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " +
fmt.Sprintf("! application/x-rtp, payload=%d, encoding-name=VP8-DRAFT-IETF-01 ", codec.VP8().PayloadType) +
"! rtpvp8depay " +
"! appsink name=appsink",
// TODO: Add support for more codecs.
}, "webcam"),
}
}

Expand Down Expand Up @@ -95,6 +109,7 @@ func (manager *CaptureManagerCtx) Start() {
func (manager *CaptureManagerCtx) Shutdown() error {
manager.logger.Info().Msgf("shutdown")

manager.screenshare.shutdown()
manager.broadcast.shutdown()

manager.audio.shutdown()
Expand All @@ -114,3 +129,7 @@ func (manager *CaptureManagerCtx) Audio() types.StreamSinkManager {
func (manager *CaptureManagerCtx) Video() types.StreamSinkManager {
return manager.video
}

func (manager *CaptureManagerCtx) Screenshare() types.StreamSrcSinkManager {
return manager.screenshare
}
137 changes: 137 additions & 0 deletions server/internal/capture/streamsrcsink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package capture

import (
"errors"
"sync"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"m1k1o/neko/internal/capture/gst"
"m1k1o/neko/internal/types"
"m1k1o/neko/internal/types/codec"
)

type StreamSrcSinkManagerCtx struct {
logger zerolog.Logger
sampleChannel chan types.Sample

enabled bool
codecPipeline map[string]string // codec -> pipeline

codec codec.RTPCodec
pipeline *gst.Pipeline
pipelineMu sync.Mutex
pipelineStr string
}

func streamSrcSinkNew(enabled bool, codecPipeline map[string]string, video_id string) *StreamSrcSinkManagerCtx {
logger := log.With().
Str("module", "capture").
Str("submodule", "stream-src-sink").
Str("video_id", video_id).Logger()

return &StreamSrcSinkManagerCtx{
logger: logger,
enabled: enabled,
codecPipeline: codecPipeline,
sampleChannel: make(chan types.Sample),
}
}

func (manager *StreamSrcSinkManagerCtx) shutdown() {
manager.logger.Info().Msgf("shutdown")

manager.Stop()
}

func (manager *StreamSrcSinkManagerCtx) Codec() codec.RTPCodec {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()

return manager.codec
}

func (manager *StreamSrcSinkManagerCtx) Start(codec codec.RTPCodec) error {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()

if manager.pipeline != nil {
return types.ErrCapturePipelineAlreadyExists
}

if !manager.enabled {
return errors.New("stream-src-sink not enabled")
}

found := false
for codecName, pipeline := range manager.codecPipeline {
if codecName == codec.Name {
manager.pipelineStr = pipeline
manager.codec = codec
found = true
break
}
}

if !found {
return errors.New("no pipeline found for a codec")
}

var err error

manager.logger.Info().
Str("codec", manager.codec.Name).
Str("src", manager.pipelineStr).
Msgf("creating pipeline")

manager.pipeline, err = gst.CreatePipeline(manager.pipelineStr)
if err != nil {
return err
}

manager.pipeline.AttachAppsrc("appsrc")
manager.pipeline.AttachAppsink("appsink", manager.sampleChannel)
manager.pipeline.Play()

return nil
}

func (manager *StreamSrcSinkManagerCtx) Stop() {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()

if manager.pipeline == nil {
return
}

manager.pipeline.Destroy()
manager.pipeline = nil

manager.logger.Info().
Str("codec", manager.codec.Name).
Str("src", manager.pipelineStr).
Msgf("destroying pipeline")
}

func (manager *StreamSrcSinkManagerCtx) Push(bytes []byte) {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()

if manager.pipeline == nil {
return
}

manager.pipeline.Push(bytes)
}

func (manager *StreamSrcSinkManagerCtx) Started() bool {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()

return manager.pipeline != nil
}

func (manager *StreamSrcSinkManagerCtx) GetSampleChannel() chan types.Sample {
return manager.sampleChannel
}
18 changes: 18 additions & 0 deletions server/internal/config/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Capture struct {
// broadcast
BroadcastPipeline string
BroadcastUrl string

// screenshare
ScreenshareEnabled bool
}

func (Capture) Init(cmd *cobra.Command) error {
Expand Down Expand Up @@ -151,6 +154,15 @@ func (Capture) Init(cmd *cobra.Command) error {
return err
}

//
// screenshare
//

cmd.PersistentFlags().Bool("screenshare.enabled", true, "enable screenshare")
if err := viper.BindPFlag("screenshare.enabled", cmd.PersistentFlags().Lookup("screenshare.enabled")); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -230,4 +242,10 @@ func (s *Capture) Set() {

s.BroadcastPipeline = viper.GetString("broadcast_pipeline")
s.BroadcastUrl = viper.GetString("broadcast_url")

//
// screenshare
//

s.ScreenshareEnabled = viper.GetBool("screenshare.enabled")
}
12 changes: 12 additions & 0 deletions server/internal/types/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,23 @@ type StreamSinkManager interface {
GetSampleChannel() chan Sample
}

type StreamSrcSinkManager interface {
Codec() codec.RTPCodec

Start(codec codec.RTPCodec) error
Stop()

Push(bytes []byte)
Started() bool
GetSampleChannel() chan Sample
}

type CaptureManager interface {
Start()
Shutdown() error

Broadcast() BroadcastManager
Audio() StreamSinkManager
Video() StreamSinkManager
Screenshare() StreamSrcSinkManager
}
Loading