Skip to content

Commit 83cdd60

Browse files
authored
Add how_much_to_skip option to the Source element (#116)
* Update `ex_hls` to a version which accepts `how_much_to_skip_ms` as a parameter * Add `how_much_to_skip` option to the source element * Send Discontinuity event on each output pad of the Source with duration field set to the duration of skipped part of the stream (it might differ from the requested `how_much_to_skip` due to the fact that it needs to be aligned with segments)
1 parent 712ad7c commit 83cdd60

File tree

7 files changed

+130
-26
lines changed

7 files changed

+130
-26
lines changed

lib/membrane_http_adaptive_stream/source.ex

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,19 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
104104
105105
Defaults to `:highest_resolution`.
106106
"""
107+
],
108+
how_much_to_skip: [
109+
spec: Membrane.Time.t(),
110+
default: Membrane.Time.seconds(0),
111+
description: """
112+
Specifies how much time should be discarded from each of the tracks.
113+
114+
Please note that an actual discarded part of the stream might will be at most of that length
115+
because it needs to be aligned with HLS segments distribution.
116+
The source will send an `Membrane.Event.Discontinuity` event with `:duration` field
117+
representing duration of the discarded part of the stream.
118+
""",
119+
inspector: &Membrane.Time.inspect/1
107120
]
108121

109122
@impl true
@@ -125,7 +138,8 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
125138
client_genserver: nil,
126139
new_tracks_notification: nil,
127140
audio_output: initial_pad_state,
128-
video_output: initial_pad_state
141+
video_output: initial_pad_state,
142+
initial_discontinuity_event_sent?: false
129143
})
130144

131145
{[], state}
@@ -134,7 +148,11 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
134148
@impl true
135149
def handle_setup(_ctx, state) do
136150
{:ok, client_genserver} =
137-
ClientGenServer.start_link(state.url, state.variant_selection_policy)
151+
ClientGenServer.start_link(
152+
state.url,
153+
state.variant_selection_policy,
154+
state.how_much_to_skip
155+
)
138156

139157
{[], %{state | client_genserver: client_genserver}}
140158
end
@@ -234,6 +252,19 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
234252
end)
235253
end
236254

255+
defp get_discontinuity_events(%{initial_discontinuity_event_sent?: false} = state) do
256+
how_much_truly_skipped = ClientGenServer.how_much_truly_skipped(state.client_genserver)
257+
258+
get_pads(state)
259+
|> Enum.flat_map(fn pad_ref ->
260+
[event: {pad_ref, %Membrane.Event.Discontinuity{duration: how_much_truly_skipped}}]
261+
end)
262+
end
263+
264+
defp get_discontinuity_events(_state) do
265+
[]
266+
end
267+
237268
defp get_redemands(state) do
238269
get_pads(state)
239270
|> Enum.flat_map(fn pad_ref -> [redemand: pad_ref] end)
@@ -390,7 +421,9 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
390421
case buffer_or_eos do
391422
%Buffer{} = buffer ->
392423
state = state |> put_in([pad_name, :oldest_buffer_dts], buffer.dts)
393-
{[buffer: {pad_ref, buffer}], state}
424+
425+
{get_discontinuity_events(state) ++ [buffer: {pad_ref, buffer}],
426+
%{state | initial_discontinuity_event_sent?: true}}
394427

395428
:end_of_stream ->
396429
{[end_of_stream: pad_ref], state}
@@ -409,7 +442,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
409442
_any when pad_ref == nil or eos_received? ->
410443
0
411444

412-
# todo: maybe we should handle rollovers
445+
# maybe we should handle rollovers
413446
{:value, %Buffer{dts: newest_dts}}
414447
when newest_dts - oldest_dts >= state.buffered_stream_time ->
415448
0

lib/membrane_http_adaptive_stream/source/client_genserver.ex

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
88

99
@spec start_link(
1010
String.t(),
11-
Membrane.HTTPAdaptiveStream.Source.variant_selection_policy()
11+
Membrane.HTTPAdaptiveStream.Source.variant_selection_policy(),
12+
Membrane.Time.t()
1213
) ::
1314
GenServer.on_start()
14-
def start_link(url, variant_selection_policy) do
15+
def start_link(url, variant_selection_policy, how_much_to_skip) do
1516
GenServer.start_link(__MODULE__,
1617
url: url,
17-
variant_selection_policy: variant_selection_policy
18+
variant_selection_policy: variant_selection_policy,
19+
how_much_to_skip: how_much_to_skip
1820
)
1921
end
2022

@@ -36,21 +38,34 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
3638
GenServer.call(client_genserver, :get_tracks_info)
3739
end
3840

41+
@spec how_much_truly_skipped(pid()) :: Membrane.Time.t()
42+
def how_much_truly_skipped(client_genserver) do
43+
GenServer.call(client_genserver, :how_much_truly_skipped)
44+
end
45+
3946
@impl true
40-
def init(url: url, variant_selection_policy: variant_selection_policy) do
47+
def init(
48+
url: url,
49+
variant_selection_policy: variant_selection_policy,
50+
how_much_to_skip: how_much_to_skip
51+
) do
4152
state = %{
4253
url: url,
4354
variant_selection_policy: variant_selection_policy,
44-
client: nil
55+
how_much_to_skip: how_much_to_skip,
56+
client: nil,
57+
how_much_truly_skipped: nil
4558
}
4659

4760
{:ok, state, {:continue, :setup}}
4861
end
4962

5063
@impl true
5164
def handle_continue(:setup, state) do
65+
how_much_to_skip_ms = Membrane.Time.as_milliseconds(state.how_much_to_skip, :round)
66+
5267
state =
53-
%{state | client: Client.new(state.url)}
68+
%{state | client: Client.new(state.url, how_much_to_skip_ms)}
5469
|> choose_variant()
5570

5671
{:noreply, state}
@@ -91,13 +106,27 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
91106
@impl true
92107
def handle_cast({:request_audio_chunk, pid}, state) do
93108
{chunk, client} = Client.read_audio_chunk(state.client)
109+
110+
state =
111+
put_in(
112+
state.how_much_truly_skipped,
113+
Client.get_how_much_truly_skipped_ms(client) |> Membrane.Time.milliseconds()
114+
)
115+
94116
send(pid, {:audio_chunk, chunk})
95117
{:noreply, %{state | client: client}}
96118
end
97119

98120
@impl true
99121
def handle_cast({:request_video_chunk, pid}, state) do
100122
{chunk, client} = Client.read_video_chunk(state.client)
123+
124+
state =
125+
put_in(
126+
state.how_much_truly_skipped,
127+
Client.get_how_much_truly_skipped_ms(client) |> Membrane.Time.milliseconds()
128+
)
129+
101130
send(pid, {:video_chunk, chunk})
102131
{:noreply, %{state | client: client}}
103132
end
@@ -107,4 +136,9 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
107136
{:ok, tracks_info, client} = Client.get_tracks_info(state.client)
108137
{:reply, tracks_info, %{state | client: client}}
109138
end
139+
140+
@impl true
141+
def handle_call(:how_much_truly_skipped, _from, state) do
142+
{:reply, state.how_much_truly_skipped, state}
143+
end
110144
end

mix.exs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,14 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do
6565
[
6666
{:membrane_core, "~> 1.2"},
6767
{:membrane_tee_plugin, "~> 0.12.0"},
68-
{:membrane_mp4_plugin, "~> 0.35.0"},
68+
{:membrane_mp4_plugin, "~> 0.36.0"},
6969
{:membrane_aac_plugin, "~> 0.19.0"},
7070
{:membrane_h26x_plugin, "~> 0.10.0"},
71-
{:ex_hls,
72-
github: "membraneframework-labs/ex_hls", ref: "88847ba9a547366fd3f4868d49b72bd1e68ae412"},
71+
{:ex_hls, github: "membraneframework-labs/ex_hls"},
7372
{:bunch, "~> 1.6"},
7473
{:qex, "~> 0.5"},
7574
{:membrane_hackney_plugin, "~> 0.11.0", only: :test},
7675
{:membrane_transcoder_plugin, "~> 0.3.2", only: :test},
77-
# {:membrane_transcoder_plugin, path: "../membrane_transcoder_plugin", only: :test},
7876
{:membrane_realtimer_plugin, "~> 0.10.1", only: :test},
7977
{:membrane_portaudio_plugin, "~> 0.19.2", only: :test},
8078
{:membrane_sdl_plugin, "~> 0.18.5", only: :test},

mix.lock

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
1313
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
1414
"ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"},
15-
"ex_hls": {:git, "https://github.com/membraneframework-labs/ex_hls.git", "88847ba9a547366fd3f4868d49b72bd1e68ae412", [ref: "88847ba9a547366fd3f4868d49b72bd1e68ae412"]},
15+
"ex_hls": {:git, "https://github.com/membraneframework-labs/ex_hls.git", "703b405a0949556d3a5044e75198f1d23e40ec80", []},
1616
"ex_m3u8": {:hex, :ex_m3u8, "0.15.3", "c10427f450b2ed7bfd85808d8dce21214f1fe9fa18927591cbbf96fea0a6a8aa", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "99f20c0b44bab130dc6aca71fefe0d1a174413ae9ac2763220994b29bd310939"},
1717
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
1818
"finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"},
@@ -43,7 +43,7 @@
4343
"membrane_mp3_lame_plugin": {:hex, :membrane_mp3_lame_plugin, "0.18.3", "441af1b5115c0d9cfc56023a25de42ee88d42ed9c4f4093bee344fe07d950e10", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_mpegaudio_format, "~> 0.3.0", [hex: :membrane_mpegaudio_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}], "hexpm", "f25b985d7f80a7c721d1f127dc62c57ea4797d5f617e334766ed29f391187a6b"},
4444
"membrane_mp3_mad_plugin": {:hex, :membrane_mp3_mad_plugin, "0.18.4", "30003dfbacfa92b51d5124107f9c502ee3f48d8dbc310d304d0164a819f8b8a8", [:mix], [{:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_mpegaudio_format, "~> 0.3.0", [hex: :membrane_mpegaudio_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "dc78f5ca1bc52380add89a2d0cce9706e97d00abcfbdd46ad82afe44b745a43b"},
4545
"membrane_mp4_format": {:hex, :membrane_mp4_format, "0.8.0", "8c6e7d68829228117d333b4fbb030e7be829aab49dd8cb047fdc664db1812e6a", [:mix], [], "hexpm", "148dea678a1f82ccfd44dbde6f936d2f21255f496cb45a22cc6eec427f025522"},
46-
"membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.35.3", "80228f4332eeef4fce4d90184a82bd5869d184f78438419660da7dc91871a238", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "c6d8b20e49540329f246e9a9c69adae330d424802fdfa1e6485d76a5257e6169"},
46+
"membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.36.0", "ef4fc6fc72cc439974f2ae0ce579f3dbf8674f02ab00f7f524a6600b786a9ca8", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "84f55a42c69cb557b73d6272f958812f607abaaa6a3473f301d22393f2a62808"},
4747
"membrane_mpegaudio_format": {:hex, :membrane_mpegaudio_format, "0.3.0", "d4fee77fad9f953171c52acd6d53b6646cfc7fbb827c63caa7c6a1efeb86450a", [:mix], [], "hexpm", "dec903efd0086133402b44515d04301790832b4f39995747b0e712c8f966d50d"},
4848
"membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"},
4949
"membrane_opus_plugin": {:hex, :membrane_opus_plugin, "0.20.5", "aa344bb9931c8e22b2286778cce0658e0d4aa071a503c18c55e1b161e17ab337", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "94fd4447b6576780afc6144dbb0520b43bd399c86a10bf5df1fa878a91798cf6"},
@@ -64,7 +64,7 @@
6464
"mimerl": {:hex, :mimerl, "1.4.0", "3882a5ca67fbbe7117ba8947f27643557adec38fa2307490c4c4207624cb213b", [:rebar3], [], "hexpm", "13af15f9f68c65884ecca3a3891d50a7b57d82152792f3e19d88650aa126b144"},
6565
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
6666
"mockery": {:hex, :mockery, "2.3.3", "3dba87bd0422a513e6af6e0d811383f38f82ac6be5d3d285a5fcca9c299bd0ac", [:mix], [], "hexpm", "17282be00613286254298117cd25e607a39f15ac03b41c631f60e52f5b5ec974"},
67-
"mpeg_ts": {:git, "https://github.com/membraneframework-labs/kim_mpeg_ts.git", "8c036fca6558a4339033a5a8697ebf147728f36b", [branch: "backport-v1.0.3"]},
67+
"mpeg_ts": {:git, "https://github.com/kim-company/kim_mpeg_ts.git", "ebde017bf639a9d57d068f029f2e43562ae78746", []},
6868
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
6969
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
7070
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
Binary file not shown.
Binary file not shown.

0 commit comments

Comments
 (0)