Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
efa78cc
Initial commit - without tests - possibly flawed concurrency
bjornss-scaleaq Mar 25, 2026
4c7ec84
Pruned / Renamed RTP Stats
bjornss-scaleaq Mar 26, 2026
680d4fe
Reformulated SourceStats in the form of more Generic interface - avoi…
bjornss-scaleaq Mar 26, 2026
204bff2
Merge branch 'main' into feature/expsourcemetrics
bjornss-scaleaq Mar 31, 2026
a5daf4c
Merge branch 'main' into feature/expsourcemetrics
bjornss-scaleaq Apr 7, 2026
26ab2a9
Correct Packet Jitter scale to sec(s) unit - use values for RTSP Stat…
bjornss-scaleaq Apr 8, 2026
96bfe22
Renaming new stats to 'staticsourceStats' emphasizing this is for sta…
bjornss-scaleaq Apr 9, 2026
a280c9a
Merge branch 'main' into feature/expsourcemetrics
bjornss-scaleaq Apr 9, 2026
a07c6bb
Merge branch 'main' into feature/expsourcemetrics
bjornss-scaleaq Jun 5, 2026
13f161c
Updated providers of staticsources from rtp, srt, webrtc and refined …
bjornss-scaleaq Jun 5, 2026
ba9994d
Add test for staticsources stats
bjornss-scaleaq Jun 8, 2026
4d7fc8b
Added staticsource stats test for RTP and WebRTC protocols
bjornss-scaleaq Jun 8, 2026
d8703e7
Added staticsources stats test for handler
bjornss-scaleaq Jun 8, 2026
27d6ad5
Added stats test for staticsource/rtsp
bjornss-scaleaq Jun 8, 2026
7639bbb
Merge branch 'main' into feature/expsourcemetrics
bjornss-scaleaq Jun 8, 2026
2a60910
Removed 90kHZ Jitter scaling - assuming H.264/H.265 to align with oth…
bjornss-scaleaq Jun 16, 2026
4308c3e
Made staticsource rtp stats connection dependent
bjornss-scaleaq Jun 16, 2026
f2f1cfc
Ensured consistent nil-stats before first connect for RTP
bjornss-scaleaq Jun 16, 2026
8cd2415
Staticsources RTSP Fix race condition on client pointer read/write by…
bjornss-scaleaq Jun 16, 2026
29455aa
Merge branch 'bluenviron:main' into feature/expsourcemetrics
bjornss-scaleaq Jun 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,17 @@ func (pa *path) isOnline() bool {
return pa.source != nil
}

func (pa *path) sourceStats() defs.StaticSourceStats {
if pa.source == nil {
return nil
}

if sp, ok := pa.source.(defs.StaticSourceStatsProvider); ok {
return sp.SourceStats()
}
return nil
}

func (pa *path) run() {
defer close(pa.done)
defer pa.wg.Done()
Expand Down Expand Up @@ -701,6 +712,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) {
}
return pa.stream.InboundBytes()
}(),
StaticStats: func() defs.StaticSourceStats {
if !pa.isAvailable() {
return nil
}
return pa.sourceStats()
}(),
BytesSent: func() uint64 {
if !pa.isAvailable() {
return 0
Expand Down
9 changes: 9 additions & 0 deletions internal/defs/api_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type APIPath struct {
Online bool `json:"online"`
OnlineTime *time.Time `json:"onlineTime"`
Source *APIPathSource `json:"source"`
StaticStats StaticSourceStats `json:"staticsourceStats,omitempty"`
Tracks []APIPathTrackCodec `json:"tracks" deprecated:"true"`
Tracks2 []APIPathTrack `json:"tracks2"`
Readers []APIPathReader `json:"readers"`
Expand All @@ -85,6 +86,14 @@ type APIPath struct {
BytesSent uint64 `json:"bytesSent" deprecated:"true"`
}

/*
// APIPathRTPSourceStats contains RTP protocol staticsources inbound stats
type APIPathRTPSourceStats struct {
InboundRTPPackets uint64 `json:"inboundRTPPackets"`
InboundRTPPacketsLost uint64 `json:"inboundRTPPacketsLost"`
InboundRTPPacketsJitter float64 `json:"inboundRTPPacketsJitter"`
}*/

// APIPathList is a list of paths.
type APIPathList struct {
ItemCount int `json:"itemCount"`
Expand Down
54 changes: 54 additions & 0 deletions internal/defs/source_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package defs

// StaticSourceStats is a marker interface for all static-source stats types.
// It allows multiple concrete stats implementations (RTSP, SRT, etc.).
// Dynamic sources are not covered by this interface.
type StaticSourceStats interface {
isStaticSourceStats()
}

// StaticSourceStatsProvider is an OPTIONAL capability.
// Only static sources that can provide stats will implement this.
type StaticSourceStatsProvider interface {
SourceStats() StaticSourceStats
}

// BaseSourceStats holds packet statistics common to all packet-based sources.
// Packet loss is always reported; Jitter is a pointer so that protocols that
// cannot provide it (e.g. SRT) leave it nil and it is omitted from the API.
type BaseSourceStats struct {
PacketsReceived uint64 `json:"inboundRTPPackets"`
PacketsLost uint64 `json:"inboundRTPPacketsLost"`
Jitter *float64 `json:"inboundRTPPacketsJitter,omitempty"`
}

// RTSPSourceStats - RTSP-specific stats (API-safe, no gortsplib leakage)
type RTSPSourceStats struct {
BaseSourceStats
PacketsInError uint64 `json:"inboundRTPPacketsInError"`
}

func (*RTSPSourceStats) isStaticSourceStats() {}

// RTPSourceStats - raw RTP (udp+rtp) source stats.
// Jitter is not computed for raw RTP and is left nil.
type RTPSourceStats struct {
BaseSourceStats
}

func (*RTPSourceStats) isStaticSourceStats() {}

// WebRTCSourceStats - WebRTC (WHEP) source stats.
type WebRTCSourceStats struct {
BaseSourceStats
}

func (*WebRTCSourceStats) isStaticSourceStats() {}

// SRTSourceStats - SRT source stats.
// SRT exposes receive packet loss but no RTP-style jitter, so Jitter is nil.
type SRTSourceStats struct {
BaseSourceStats
}

func (*SRTSourceStats) isStaticSourceStats() {}
118 changes: 118 additions & 0 deletions internal/defs/source_stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package defs

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/require"
)

// compile-time assertions that every concrete type satisfies the marker.
var (
_ StaticSourceStats = (*RTSPSourceStats)(nil)
_ StaticSourceStats = (*RTPSourceStats)(nil)
_ StaticSourceStats = (*WebRTCSourceStats)(nil)
_ StaticSourceStats = (*SRTSourceStats)(nil)
)

func TestBaseSourceStatsJitterOmittedWhenNil(t *testing.T) {
stats := &SRTSourceStats{
BaseSourceStats: BaseSourceStats{
PacketsReceived: 10,
PacketsLost: 2,
// Jitter nil
},
}

byts, err := json.Marshal(stats)
require.NoError(t, err)

var m map[string]any
require.NoError(t, json.Unmarshal(byts, &m))

_, ok := m["inboundRTPPacketsJitter"]
require.False(t, ok, "jitter key must be omitted when nil")

require.Equal(t, float64(10), m["inboundRTPPackets"])
require.Equal(t, float64(2), m["inboundRTPPacketsLost"])
}

func TestBaseSourceStatsJitterPresentWhenSet(t *testing.T) {
jitter := 0.0125
stats := &WebRTCSourceStats{
BaseSourceStats: BaseSourceStats{
PacketsReceived: 100,
PacketsLost: 5,
Jitter: &jitter,
},
}

byts, err := json.Marshal(stats)
require.NoError(t, err)

var m map[string]any
require.NoError(t, json.Unmarshal(byts, &m))

v, ok := m["inboundRTPPacketsJitter"]
require.True(t, ok, "jitter key must be present when set")
require.Equal(t, 0.0125, v)
}

func TestRTSPSourceStatsJSON(t *testing.T) {
jitter := 0.5
stats := &RTSPSourceStats{
BaseSourceStats: BaseSourceStats{
PacketsReceived: 1000,
PacketsLost: 7,
Jitter: &jitter,
},
PacketsInError: 3,
}

byts, err := json.Marshal(stats)
require.NoError(t, err)

var m map[string]any
require.NoError(t, json.Unmarshal(byts, &m))

require.Equal(t, float64(1000), m["inboundRTPPackets"])
require.Equal(t, float64(7), m["inboundRTPPacketsLost"])
require.Equal(t, 0.5, m["inboundRTPPacketsJitter"])
require.Equal(t, float64(3), m["inboundRTPPacketsInError"])
}

func TestRTPSourceStatsJSON(t *testing.T) {
stats := &RTPSourceStats{
BaseSourceStats: BaseSourceStats{
PacketsReceived: 42,
PacketsLost: 1,
// Jitter nil for raw RTP
},
}

byts, err := json.Marshal(stats)
require.NoError(t, err)

var m map[string]any
require.NoError(t, json.Unmarshal(byts, &m))

require.Equal(t, float64(42), m["inboundRTPPackets"])
require.Equal(t, float64(1), m["inboundRTPPacketsLost"])
_, ok := m["inboundRTPPacketsJitter"]
require.False(t, ok, "raw RTP must not report jitter")
}

// StaticStats round-trips through the SourceStats marker interface
// as it does inside APIPath.
func TestStaticSourceStatsViaInterface(t *testing.T) {
var s StaticSourceStats = &SRTSourceStats{
BaseSourceStats: BaseSourceStats{PacketsReceived: 5, PacketsLost: 0},
}

byts, err := json.Marshal(s)
require.NoError(t, err)

var m map[string]any
require.NoError(t, json.Unmarshal(byts, &m))
require.Equal(t, float64(5), m["inboundRTPPackets"])
}
9 changes: 9 additions & 0 deletions internal/staticsources/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type staticSource interface {
logger.Writer
Run(defs.StaticSourceRunParams) error
APISourceDescribe() *defs.APIPathSource
// SourceStats() any
}

type handlerPathManager interface {
Expand Down Expand Up @@ -336,3 +337,11 @@ func (s *Handler) SetNotReady(req defs.PathSourceStaticSetNotReadyReq) {
func (s *Handler) AddReader(req defs.PathAddReaderReq) (*defs.PathAddReaderRes, error) {
return s.PathManager.AddReader(req)
}

// SourceStats - Get sourcestatistics if available
func (s *Handler) SourceStats() defs.StaticSourceStats {
if sp, ok := s.instance.(defs.StaticSourceStatsProvider); ok {
return sp.SourceStats()
}
return nil
}
67 changes: 67 additions & 0 deletions internal/staticsources/handler_stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package staticsources

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger"
)

// fakeStatsSource is a staticSource that also provides stats.
type fakeStatsSource struct {
stats defs.StaticSourceStats
}

func (*fakeStatsSource) Log(logger.Level, string, ...any) {}

func (*fakeStatsSource) Run(defs.StaticSourceRunParams) error { return nil }

func (*fakeStatsSource) APISourceDescribe() *defs.APIPathSource {
return &defs.APIPathSource{}
}

func (s *fakeStatsSource) SourceStats() defs.StaticSourceStats { return s.stats }

var _ defs.StaticSourceStatsProvider = (*fakeStatsSource)(nil)

// fakeNoStatsSource is a staticSource that does NOT provide stats.
type fakeNoStatsSource struct{}

func (*fakeNoStatsSource) Log(logger.Level, string, ...any) {}

func (*fakeNoStatsSource) Run(defs.StaticSourceRunParams) error { return nil }

func (*fakeNoStatsSource) APISourceDescribe() *defs.APIPathSource {
return &defs.APIPathSource{}
}

func TestHandlerSourceStatsProvider(t *testing.T) {
jitter := 0.25
sentinel := &defs.RTSPSourceStats{
BaseSourceStats: defs.BaseSourceStats{
PacketsReceived: 123,
PacketsLost: 4,
Jitter: &jitter,
},
PacketsInError: 1,
}

h := &Handler{instance: &fakeStatsSource{stats: sentinel}}

got := h.SourceStats()
require.Same(t, sentinel, got)
}

func TestHandlerSourceStatsProviderNil(t *testing.T) {
// a provider may legitimately return nil (e.g. not yet connected)
h := &Handler{instance: &fakeStatsSource{stats: nil}}
require.Nil(t, h.SourceStats())
}

func TestHandlerSourceStatsNotProvided(t *testing.T) {
// sources that do not implement the capability must yield nil
h := &Handler{instance: &fakeNoStatsSource{}}
require.Nil(t, h.SourceStats())
}
Loading