Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions lib/membrane_http_adaptive_stream/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,19 @@ defmodule Membrane.HTTPAdaptiveStream.Source do

Defaults to `:highest_resolution`.
"""
],
how_much_to_skip: [
spec: Membrane.Time.t(),
default: Membrane.Time.seconds(0),
description: """
Specifies how much time should be discarded from each of the tracks.

Please note that an actual discarded part of the stream might will be at most of that length
because it needs to be aligned with HLS segments distribution.
The source will send an `Membrane.Event.Discontinuity` event with `:duration` field
representing duration of the discarded part of the stream.
""",
inspector: &Membrane.Time.inspect/1
]

@impl true
Expand All @@ -125,7 +138,8 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
client_genserver: nil,
new_tracks_notification: nil,
audio_output: initial_pad_state,
video_output: initial_pad_state
video_output: initial_pad_state,
initial_discontinuity_event_sent?: false
})

{[], state}
Expand All @@ -134,7 +148,11 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
@impl true
def handle_setup(_ctx, state) do
{:ok, client_genserver} =
ClientGenServer.start_link(state.url, state.variant_selection_policy)
ClientGenServer.start_link(
state.url,
state.variant_selection_policy,
state.how_much_to_skip
)

{[], %{state | client_genserver: client_genserver}}
end
Expand Down Expand Up @@ -234,6 +252,19 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
end)
end

defp get_discontinuity_events(%{initial_discontinuity_event_sent?: false} = state) do
how_much_skipped = ClientGenServer.how_much_skipped(state.client_genserver)

get_pads(state)
|> Enum.flat_map(fn pad_ref ->
[event: {pad_ref, %Membrane.Event.Discontinuity{duration: how_much_skipped}}]
end)
end

defp get_discontinuity_events(_state) do
[]
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discontinuity events are supposed to be sent up to once on every pad, am I right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's right


defp get_redemands(state) do
get_pads(state)
|> Enum.flat_map(fn pad_ref -> [redemand: pad_ref] end)
Expand Down Expand Up @@ -390,7 +421,9 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
case buffer_or_eos do
%Buffer{} = buffer ->
state = state |> put_in([pad_name, :oldest_buffer_dts], buffer.dts)
{[buffer: {pad_ref, buffer}], state}

{get_discontinuity_events(state) ++ [buffer: {pad_ref, buffer}],
%{state | initial_discontinuity_event_sent?: true}}

:end_of_stream ->
{[end_of_stream: pad_ref], state}
Expand All @@ -409,7 +442,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
_any when pad_ref == nil or eos_received? ->
0

# todo: maybe we should handle rollovers
# maybe we should handle rollovers
{:value, %Buffer{dts: newest_dts}}
when newest_dts - oldest_dts >= state.buffered_stream_time ->
0
Expand Down
46 changes: 40 additions & 6 deletions lib/membrane_http_adaptive_stream/source/client_genserver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do

@spec start_link(
String.t(),
Membrane.HTTPAdaptiveStream.Source.variant_selection_policy()
Membrane.HTTPAdaptiveStream.Source.variant_selection_policy(),
Membrane.Time.t()
) ::
GenServer.on_start()
def start_link(url, variant_selection_policy) do
def start_link(url, variant_selection_policy, how_much_to_skip) do
GenServer.start_link(__MODULE__,
url: url,
variant_selection_policy: variant_selection_policy
variant_selection_policy: variant_selection_policy,
how_much_to_skip: how_much_to_skip
)
end

Expand All @@ -36,21 +38,34 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
GenServer.call(client_genserver, :get_tracks_info)
end

@spec how_much_skipped(pid()) :: Membrane.Time.t()
def how_much_skipped(client_genserver) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please rename it to sth like how_much_already_skipped to distinguish it more from how_much_to_skip

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think "already" fits there, but I think how_much_truly_skipped sounds OK

GenServer.call(client_genserver, :how_much_skipped)
end

@impl true
def init(url: url, variant_selection_policy: variant_selection_policy) do
def init(
url: url,
variant_selection_policy: variant_selection_policy,
how_much_to_skip: how_much_to_skip
) do
state = %{
url: url,
variant_selection_policy: variant_selection_policy,
client: nil
how_much_to_skip: how_much_to_skip,
client: nil,
how_much_skipped: nil
}

{:ok, state, {:continue, :setup}}
end

@impl true
def handle_continue(:setup, state) do
how_much_to_skip_ms = Membrane.Time.as_milliseconds(state.how_much_to_skip, :round)

state =
%{state | client: Client.new(state.url)}
%{state | client: Client.new(state.url, how_much_to_skip_ms)}
|> choose_variant()

{:noreply, state}
Expand Down Expand Up @@ -91,13 +106,27 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
@impl true
def handle_cast({:request_audio_chunk, pid}, state) do
{chunk, client} = Client.read_audio_chunk(state.client)

state =
put_in(
state.how_much_skipped,
state.client.base_timestamp_ms |> round() |> Membrane.Time.milliseconds()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is base_timestamp_ms a part of public API? IMO it would be better to access it via a public function with docs and spec

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and I don't get why we set how_much_skipped to base_timestamp_ms - the name of the second variable doesn't sound like a value describing how much of a stream we have already skipped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a hacky solution (I didn't expect that I would need to send a Discontinuity event with information about how much of the stream was skipped).
how_much_skipped is set to base_timestamp_ms as the client sets base_timestamp_ms to the timestamp of the first non-discarded sample.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, I've added a helper function in ex_hls

)

send(pid, {:audio_chunk, chunk})
{:noreply, %{state | client: client}}
end

@impl true
def handle_cast({:request_video_chunk, pid}, state) do
{chunk, client} = Client.read_video_chunk(state.client)

state =
put_in(
state.how_much_skipped,
state.client.base_timestamp_ms |> round() |> Membrane.Time.milliseconds()
)

send(pid, {:video_chunk, chunk})
{:noreply, %{state | client: client}}
end
Expand All @@ -107,4 +136,9 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
{:ok, tracks_info, client} = Client.get_tracks_info(state.client)
{:reply, tracks_info, %{state | client: client}}
end

@impl true
def handle_call(:how_much_skipped, _from, state) do
{:reply, state.how_much_skipped, state}
end
end
5 changes: 2 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,15 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do
[
{:membrane_core, "~> 1.2"},
{:membrane_tee_plugin, "~> 0.12.0"},
{:membrane_mp4_plugin, "~> 0.35.0"},
{:membrane_mp4_plugin, "~> 0.36.0"},
{:membrane_aac_plugin, "~> 0.19.0"},
{:membrane_h26x_plugin, "~> 0.10.0"},
{:ex_hls,
github: "membraneframework-labs/ex_hls", ref: "88847ba9a547366fd3f4868d49b72bd1e68ae412"},
github: "membraneframework-labs/ex_hls", ref: "458bddeff680a50c224e92a66dc1166a4caa3b14"},
{:bunch, "~> 1.6"},
{:qex, "~> 0.5"},
{:membrane_hackney_plugin, "~> 0.11.0", only: :test},
{:membrane_transcoder_plugin, "~> 0.3.2", only: :test},
# {:membrane_transcoder_plugin, path: "../membrane_transcoder_plugin", only: :test},
{:membrane_realtimer_plugin, "~> 0.10.1", only: :test},
{:membrane_portaudio_plugin, "~> 0.19.2", only: :test},
{:membrane_sdl_plugin, "~> 0.18.5", only: :test},
Expand Down
6 changes: 3 additions & 3 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"},
"ex_hls": {:git, "https://github.com/membraneframework-labs/ex_hls.git", "88847ba9a547366fd3f4868d49b72bd1e68ae412", [ref: "88847ba9a547366fd3f4868d49b72bd1e68ae412"]},
"ex_hls": {:git, "https://github.com/membraneframework-labs/ex_hls.git", "458bddeff680a50c224e92a66dc1166a4caa3b14", [ref: "458bddeff680a50c224e92a66dc1166a4caa3b14"]},
"ex_m3u8": {:hex, :ex_m3u8, "0.15.3", "c10427f450b2ed7bfd85808d8dce21214f1fe9fa18927591cbbf96fea0a6a8aa", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "99f20c0b44bab130dc6aca71fefe0d1a174413ae9ac2763220994b29bd310939"},
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
"finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"},
Expand Down Expand Up @@ -43,7 +43,7 @@
"membrane_mp3_lame_plugin": {:hex, :membrane_mp3_lame_plugin, "0.18.3", "441af1b5115c0d9cfc56023a25de42ee88d42ed9c4f4093bee344fe07d950e10", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_mpegaudio_format, "~> 0.3.0", [hex: :membrane_mpegaudio_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}], "hexpm", "f25b985d7f80a7c721d1f127dc62c57ea4797d5f617e334766ed29f391187a6b"},
"membrane_mp3_mad_plugin": {:hex, :membrane_mp3_mad_plugin, "0.18.4", "30003dfbacfa92b51d5124107f9c502ee3f48d8dbc310d304d0164a819f8b8a8", [:mix], [{:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_mpegaudio_format, "~> 0.3.0", [hex: :membrane_mpegaudio_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "dc78f5ca1bc52380add89a2d0cce9706e97d00abcfbdd46ad82afe44b745a43b"},
"membrane_mp4_format": {:hex, :membrane_mp4_format, "0.8.0", "8c6e7d68829228117d333b4fbb030e7be829aab49dd8cb047fdc664db1812e6a", [:mix], [], "hexpm", "148dea678a1f82ccfd44dbde6f936d2f21255f496cb45a22cc6eec427f025522"},
"membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.35.3", "80228f4332eeef4fce4d90184a82bd5869d184f78438419660da7dc91871a238", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "c6d8b20e49540329f246e9a9c69adae330d424802fdfa1e6485d76a5257e6169"},
"membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.36.0", "ef4fc6fc72cc439974f2ae0ce579f3dbf8674f02ab00f7f524a6600b786a9ca8", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "84f55a42c69cb557b73d6272f958812f607abaaa6a3473f301d22393f2a62808"},
"membrane_mpegaudio_format": {:hex, :membrane_mpegaudio_format, "0.3.0", "d4fee77fad9f953171c52acd6d53b6646cfc7fbb827c63caa7c6a1efeb86450a", [:mix], [], "hexpm", "dec903efd0086133402b44515d04301790832b4f39995747b0e712c8f966d50d"},
"membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"},
"membrane_opus_plugin": {:hex, :membrane_opus_plugin, "0.20.5", "aa344bb9931c8e22b2286778cce0658e0d4aa071a503c18c55e1b161e17ab337", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "94fd4447b6576780afc6144dbb0520b43bd399c86a10bf5df1fa878a91798cf6"},
Expand All @@ -64,7 +64,7 @@
"mimerl": {:hex, :mimerl, "1.4.0", "3882a5ca67fbbe7117ba8947f27643557adec38fa2307490c4c4207624cb213b", [:rebar3], [], "hexpm", "13af15f9f68c65884ecca3a3891d50a7b57d82152792f3e19d88650aa126b144"},
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
"mockery": {:hex, :mockery, "2.3.3", "3dba87bd0422a513e6af6e0d811383f38f82ac6be5d3d285a5fcca9c299bd0ac", [:mix], [], "hexpm", "17282be00613286254298117cd25e607a39f15ac03b41c631f60e52f5b5ec974"},
"mpeg_ts": {:git, "https://github.com/membraneframework-labs/kim_mpeg_ts.git", "8c036fca6558a4339033a5a8697ebf147728f36b", [branch: "backport-v1.0.3"]},
"mpeg_ts": {:git, "https://github.com/kim-company/kim_mpeg_ts.git", "ebde017bf639a9d57d068f029f2e43562ae78746", []},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
Expand Down
Binary file not shown.
Binary file not shown.
Loading