Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions lib/membrane_http_adaptive_stream/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,19 @@ defmodule Membrane.HTTPAdaptiveStream.Source do

Defaults to `:highest_resolution`.
"""
],
how_much_to_skip: [
spec: Membrane.Time.t(),
default: Membrane.Time.seconds(0),
description: """
Specifies how much time should be discarded from each of the tracks.

Please note that an actual discarded part of the stream might will be at most of that length
because it needs to be aligned with HLS segments distribution.
The source will send an `Membrane.Event.Discontinuity` event with `:duration` field
representing duration of the discarded part of the stream.
""",
inspector: &Membrane.Time.inspect/1
]

@impl true
Expand All @@ -125,7 +138,8 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
client_genserver: nil,
new_tracks_notification: nil,
audio_output: initial_pad_state,
video_output: initial_pad_state
video_output: initial_pad_state,
initial_discontinuity_event_sent?: false
})

{[], state}
Expand All @@ -134,7 +148,11 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
@impl true
def handle_setup(_ctx, state) do
{:ok, client_genserver} =
ClientGenServer.start_link(state.url, state.variant_selection_policy)
ClientGenServer.start_link(
state.url,
state.variant_selection_policy,
state.how_much_to_skip
)

{[], %{state | client_genserver: client_genserver}}
end
Expand Down Expand Up @@ -234,6 +252,19 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
end)
end

defp get_discontinuity_events(%{initial_discontinuity_event_sent?: false} = state) do
how_much_truly_skipped = ClientGenServer.how_much_truly_skipped(state.client_genserver)

get_pads(state)
|> Enum.flat_map(fn pad_ref ->
[event: {pad_ref, %Membrane.Event.Discontinuity{duration: how_much_truly_skipped}}]
end)
end

defp get_discontinuity_events(_state) do
[]
end

defp get_redemands(state) do
get_pads(state)
|> Enum.flat_map(fn pad_ref -> [redemand: pad_ref] end)
Expand Down Expand Up @@ -390,7 +421,9 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
case buffer_or_eos do
%Buffer{} = buffer ->
state = state |> put_in([pad_name, :oldest_buffer_dts], buffer.dts)
{[buffer: {pad_ref, buffer}], state}

{get_discontinuity_events(state) ++ [buffer: {pad_ref, buffer}],
%{state | initial_discontinuity_event_sent?: true}}

:end_of_stream ->
{[end_of_stream: pad_ref], state}
Expand All @@ -409,7 +442,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
_any when pad_ref == nil or eos_received? ->
0

# todo: maybe we should handle rollovers
# maybe we should handle rollovers
{:value, %Buffer{dts: newest_dts}}
when newest_dts - oldest_dts >= state.buffered_stream_time ->
0
Expand Down
46 changes: 40 additions & 6 deletions lib/membrane_http_adaptive_stream/source/client_genserver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do

@spec start_link(
String.t(),
Membrane.HTTPAdaptiveStream.Source.variant_selection_policy()
Membrane.HTTPAdaptiveStream.Source.variant_selection_policy(),
Membrane.Time.t()
) ::
GenServer.on_start()
def start_link(url, variant_selection_policy) do
def start_link(url, variant_selection_policy, how_much_to_skip) do
GenServer.start_link(__MODULE__,
url: url,
variant_selection_policy: variant_selection_policy
variant_selection_policy: variant_selection_policy,
how_much_to_skip: how_much_to_skip
)
end

Expand All @@ -36,21 +38,34 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
GenServer.call(client_genserver, :get_tracks_info)
end

@spec how_much_truly_skipped(pid()) :: Membrane.Time.t()
def how_much_truly_skipped(client_genserver) do
GenServer.call(client_genserver, :how_much_truly_skipped)
end

@impl true
def init(url: url, variant_selection_policy: variant_selection_policy) do
def init(
url: url,
variant_selection_policy: variant_selection_policy,
how_much_to_skip: how_much_to_skip
) do
state = %{
url: url,
variant_selection_policy: variant_selection_policy,
client: nil
how_much_to_skip: how_much_to_skip,
client: nil,
how_much_truly_skipped: nil
}

{:ok, state, {:continue, :setup}}
end

@impl true
def handle_continue(:setup, state) do
how_much_to_skip_ms = Membrane.Time.as_milliseconds(state.how_much_to_skip, :round)

state =
%{state | client: Client.new(state.url)}
%{state | client: Client.new(state.url, how_much_to_skip_ms)}
|> choose_variant()

{:noreply, state}
Expand Down Expand Up @@ -91,13 +106,27 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
@impl true
def handle_cast({:request_audio_chunk, pid}, state) do
{chunk, client} = Client.read_audio_chunk(state.client)

state =
put_in(
state.how_much_truly_skipped,
Client.get_how_much_truly_skipped_ms(client) |> Membrane.Time.milliseconds()
)

send(pid, {:audio_chunk, chunk})
{:noreply, %{state | client: client}}
end

@impl true
def handle_cast({:request_video_chunk, pid}, state) do
{chunk, client} = Client.read_video_chunk(state.client)

state =
put_in(
state.how_much_truly_skipped,
Client.get_how_much_truly_skipped_ms(client) |> Membrane.Time.milliseconds()
)

send(pid, {:video_chunk, chunk})
{:noreply, %{state | client: client}}
end
Expand All @@ -107,4 +136,9 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
{:ok, tracks_info, client} = Client.get_tracks_info(state.client)
{:reply, tracks_info, %{state | client: client}}
end

@impl true
def handle_call(:how_much_truly_skipped, _from, state) do
{:reply, state.how_much_truly_skipped, state}
end
end
6 changes: 2 additions & 4 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,14 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do
[
{:membrane_core, "~> 1.2"},
{:membrane_tee_plugin, "~> 0.12.0"},
{:membrane_mp4_plugin, "~> 0.35.0"},
{:membrane_mp4_plugin, "~> 0.36.0"},
{:membrane_aac_plugin, "~> 0.19.0"},
{:membrane_h26x_plugin, "~> 0.10.0"},
{:ex_hls,
github: "membraneframework-labs/ex_hls", ref: "88847ba9a547366fd3f4868d49b72bd1e68ae412"},
{:ex_hls, github: "membraneframework-labs/ex_hls"},
{:bunch, "~> 1.6"},
{:qex, "~> 0.5"},
{:membrane_hackney_plugin, "~> 0.11.0", only: :test},
{:membrane_transcoder_plugin, "~> 0.3.2", only: :test},
# {:membrane_transcoder_plugin, path: "../membrane_transcoder_plugin", only: :test},
{:membrane_realtimer_plugin, "~> 0.10.1", only: :test},
{:membrane_portaudio_plugin, "~> 0.19.2", only: :test},
{:membrane_sdl_plugin, "~> 0.18.5", only: :test},
Expand Down
6 changes: 3 additions & 3 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"},
"ex_hls": {:git, "https://github.com/membraneframework-labs/ex_hls.git", "88847ba9a547366fd3f4868d49b72bd1e68ae412", [ref: "88847ba9a547366fd3f4868d49b72bd1e68ae412"]},
"ex_hls": {:git, "https://github.com/membraneframework-labs/ex_hls.git", "703b405a0949556d3a5044e75198f1d23e40ec80", []},
"ex_m3u8": {:hex, :ex_m3u8, "0.15.3", "c10427f450b2ed7bfd85808d8dce21214f1fe9fa18927591cbbf96fea0a6a8aa", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "99f20c0b44bab130dc6aca71fefe0d1a174413ae9ac2763220994b29bd310939"},
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
"finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"},
Expand Down Expand Up @@ -43,7 +43,7 @@
"membrane_mp3_lame_plugin": {:hex, :membrane_mp3_lame_plugin, "0.18.3", "441af1b5115c0d9cfc56023a25de42ee88d42ed9c4f4093bee344fe07d950e10", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_mpegaudio_format, "~> 0.3.0", [hex: :membrane_mpegaudio_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}], "hexpm", "f25b985d7f80a7c721d1f127dc62c57ea4797d5f617e334766ed29f391187a6b"},
"membrane_mp3_mad_plugin": {:hex, :membrane_mp3_mad_plugin, "0.18.4", "30003dfbacfa92b51d5124107f9c502ee3f48d8dbc310d304d0164a819f8b8a8", [:mix], [{:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_mpegaudio_format, "~> 0.3.0", [hex: :membrane_mpegaudio_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "dc78f5ca1bc52380add89a2d0cce9706e97d00abcfbdd46ad82afe44b745a43b"},
"membrane_mp4_format": {:hex, :membrane_mp4_format, "0.8.0", "8c6e7d68829228117d333b4fbb030e7be829aab49dd8cb047fdc664db1812e6a", [:mix], [], "hexpm", "148dea678a1f82ccfd44dbde6f936d2f21255f496cb45a22cc6eec427f025522"},
"membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.35.3", "80228f4332eeef4fce4d90184a82bd5869d184f78438419660da7dc91871a238", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "c6d8b20e49540329f246e9a9c69adae330d424802fdfa1e6485d76a5257e6169"},
"membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.36.0", "ef4fc6fc72cc439974f2ae0ce579f3dbf8674f02ab00f7f524a6600b786a9ca8", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "84f55a42c69cb557b73d6272f958812f607abaaa6a3473f301d22393f2a62808"},
"membrane_mpegaudio_format": {:hex, :membrane_mpegaudio_format, "0.3.0", "d4fee77fad9f953171c52acd6d53b6646cfc7fbb827c63caa7c6a1efeb86450a", [:mix], [], "hexpm", "dec903efd0086133402b44515d04301790832b4f39995747b0e712c8f966d50d"},
"membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"},
"membrane_opus_plugin": {:hex, :membrane_opus_plugin, "0.20.5", "aa344bb9931c8e22b2286778cce0658e0d4aa071a503c18c55e1b161e17ab337", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "94fd4447b6576780afc6144dbb0520b43bd399c86a10bf5df1fa878a91798cf6"},
Expand All @@ -64,7 +64,7 @@
"mimerl": {:hex, :mimerl, "1.4.0", "3882a5ca67fbbe7117ba8947f27643557adec38fa2307490c4c4207624cb213b", [:rebar3], [], "hexpm", "13af15f9f68c65884ecca3a3891d50a7b57d82152792f3e19d88650aa126b144"},
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
"mockery": {:hex, :mockery, "2.3.3", "3dba87bd0422a513e6af6e0d811383f38f82ac6be5d3d285a5fcca9c299bd0ac", [:mix], [], "hexpm", "17282be00613286254298117cd25e607a39f15ac03b41c631f60e52f5b5ec974"},
"mpeg_ts": {:git, "https://github.com/membraneframework-labs/kim_mpeg_ts.git", "8c036fca6558a4339033a5a8697ebf147728f36b", [branch: "backport-v1.0.3"]},
"mpeg_ts": {:git, "https://github.com/kim-company/kim_mpeg_ts.git", "ebde017bf639a9d57d068f029f2e43562ae78746", []},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
Expand Down
Binary file not shown.
Binary file not shown.
Loading