diff --git a/lib/membrane_http_adaptive_stream/source.ex b/lib/membrane_http_adaptive_stream/source.ex index 458cf20..7ba15e5 100644 --- a/lib/membrane_http_adaptive_stream/source.ex +++ b/lib/membrane_http_adaptive_stream/source.ex @@ -104,6 +104,19 @@ defmodule Membrane.HTTPAdaptiveStream.Source do Defaults to `:highest_resolution`. """ + ], + how_much_to_skip: [ + spec: Membrane.Time.t(), + default: Membrane.Time.seconds(0), + description: """ + Specifies how much time should be discarded from each of the tracks. + + Please note that an actual discarded part of the stream might will be at most of that length + because it needs to be aligned with HLS segments distribution. + The source will send an `Membrane.Event.Discontinuity` event with `:duration` field + representing duration of the discarded part of the stream. + """, + inspector: &Membrane.Time.inspect/1 ] @impl true @@ -125,7 +138,8 @@ defmodule Membrane.HTTPAdaptiveStream.Source do client_genserver: nil, new_tracks_notification: nil, audio_output: initial_pad_state, - video_output: initial_pad_state + video_output: initial_pad_state, + initial_discontinuity_event_sent?: false }) {[], state} @@ -134,7 +148,11 @@ defmodule Membrane.HTTPAdaptiveStream.Source do @impl true def handle_setup(_ctx, state) do {:ok, client_genserver} = - ClientGenServer.start_link(state.url, state.variant_selection_policy) + ClientGenServer.start_link( + state.url, + state.variant_selection_policy, + state.how_much_to_skip + ) {[], %{state | client_genserver: client_genserver}} end @@ -234,6 +252,19 @@ defmodule Membrane.HTTPAdaptiveStream.Source do end) end + defp get_discontinuity_events(%{initial_discontinuity_event_sent?: false} = state) do + how_much_truly_skipped = ClientGenServer.how_much_truly_skipped(state.client_genserver) + + get_pads(state) + |> Enum.flat_map(fn pad_ref -> + [event: {pad_ref, %Membrane.Event.Discontinuity{duration: how_much_truly_skipped}}] + end) + end + + defp get_discontinuity_events(_state) do + [] + end + defp get_redemands(state) do get_pads(state) |> Enum.flat_map(fn pad_ref -> [redemand: pad_ref] end) @@ -390,7 +421,9 @@ defmodule Membrane.HTTPAdaptiveStream.Source do case buffer_or_eos do %Buffer{} = buffer -> state = state |> put_in([pad_name, :oldest_buffer_dts], buffer.dts) - {[buffer: {pad_ref, buffer}], state} + + {get_discontinuity_events(state) ++ [buffer: {pad_ref, buffer}], + %{state | initial_discontinuity_event_sent?: true}} :end_of_stream -> {[end_of_stream: pad_ref], state} @@ -409,7 +442,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source do _any when pad_ref == nil or eos_received? -> 0 - # todo: maybe we should handle rollovers + # maybe we should handle rollovers {:value, %Buffer{dts: newest_dts}} when newest_dts - oldest_dts >= state.buffered_stream_time -> 0 diff --git a/lib/membrane_http_adaptive_stream/source/client_genserver.ex b/lib/membrane_http_adaptive_stream/source/client_genserver.ex index 44c2314..f25a56a 100644 --- a/lib/membrane_http_adaptive_stream/source/client_genserver.ex +++ b/lib/membrane_http_adaptive_stream/source/client_genserver.ex @@ -8,13 +8,15 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do @spec start_link( String.t(), - Membrane.HTTPAdaptiveStream.Source.variant_selection_policy() + Membrane.HTTPAdaptiveStream.Source.variant_selection_policy(), + Membrane.Time.t() ) :: GenServer.on_start() - def start_link(url, variant_selection_policy) do + def start_link(url, variant_selection_policy, how_much_to_skip) do GenServer.start_link(__MODULE__, url: url, - variant_selection_policy: variant_selection_policy + variant_selection_policy: variant_selection_policy, + how_much_to_skip: how_much_to_skip ) end @@ -36,12 +38,23 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do GenServer.call(client_genserver, :get_tracks_info) end + @spec how_much_truly_skipped(pid()) :: Membrane.Time.t() + def how_much_truly_skipped(client_genserver) do + GenServer.call(client_genserver, :how_much_truly_skipped) + end + @impl true - def init(url: url, variant_selection_policy: variant_selection_policy) do + def init( + url: url, + variant_selection_policy: variant_selection_policy, + how_much_to_skip: how_much_to_skip + ) do state = %{ url: url, variant_selection_policy: variant_selection_policy, - client: nil + how_much_to_skip: how_much_to_skip, + client: nil, + how_much_truly_skipped: nil } {:ok, state, {:continue, :setup}} @@ -49,8 +62,10 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do @impl true def handle_continue(:setup, state) do + how_much_to_skip_ms = Membrane.Time.as_milliseconds(state.how_much_to_skip, :round) + state = - %{state | client: Client.new(state.url)} + %{state | client: Client.new(state.url, how_much_to_skip_ms)} |> choose_variant() {:noreply, state} @@ -91,6 +106,13 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do @impl true def handle_cast({:request_audio_chunk, pid}, state) do {chunk, client} = Client.read_audio_chunk(state.client) + + state = + put_in( + state.how_much_truly_skipped, + Client.get_how_much_truly_skipped_ms(client) |> Membrane.Time.milliseconds() + ) + send(pid, {:audio_chunk, chunk}) {:noreply, %{state | client: client}} end @@ -98,6 +120,13 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do @impl true def handle_cast({:request_video_chunk, pid}, state) do {chunk, client} = Client.read_video_chunk(state.client) + + state = + put_in( + state.how_much_truly_skipped, + Client.get_how_much_truly_skipped_ms(client) |> Membrane.Time.milliseconds() + ) + send(pid, {:video_chunk, chunk}) {:noreply, %{state | client: client}} end @@ -107,4 +136,9 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do {:ok, tracks_info, client} = Client.get_tracks_info(state.client) {:reply, tracks_info, %{state | client: client}} end + + @impl true + def handle_call(:how_much_truly_skipped, _from, state) do + {:reply, state.how_much_truly_skipped, state} + end end diff --git a/mix.exs b/mix.exs index b309300..b803742 100644 --- a/mix.exs +++ b/mix.exs @@ -65,16 +65,14 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do [ {:membrane_core, "~> 1.2"}, {:membrane_tee_plugin, "~> 0.12.0"}, - {:membrane_mp4_plugin, "~> 0.35.0"}, + {:membrane_mp4_plugin, "~> 0.36.0"}, {:membrane_aac_plugin, "~> 0.19.0"}, {:membrane_h26x_plugin, "~> 0.10.0"}, - {:ex_hls, - github: "membraneframework-labs/ex_hls", ref: "88847ba9a547366fd3f4868d49b72bd1e68ae412"}, + {:ex_hls, github: "membraneframework-labs/ex_hls"}, {:bunch, "~> 1.6"}, {:qex, "~> 0.5"}, {:membrane_hackney_plugin, "~> 0.11.0", only: :test}, {:membrane_transcoder_plugin, "~> 0.3.2", only: :test}, - # {:membrane_transcoder_plugin, path: "../membrane_transcoder_plugin", only: :test}, {:membrane_realtimer_plugin, "~> 0.10.1", only: :test}, {:membrane_portaudio_plugin, "~> 0.19.2", only: :test}, {:membrane_sdl_plugin, "~> 0.18.5", only: :test}, diff --git a/mix.lock b/mix.lock index 81efbb8..7e0cfda 100644 --- a/mix.lock +++ b/mix.lock @@ -12,7 +12,7 @@ "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"}, - "ex_hls": {:git, "https://github.com/membraneframework-labs/ex_hls.git", "88847ba9a547366fd3f4868d49b72bd1e68ae412", [ref: "88847ba9a547366fd3f4868d49b72bd1e68ae412"]}, + "ex_hls": {:git, "https://github.com/membraneframework-labs/ex_hls.git", "703b405a0949556d3a5044e75198f1d23e40ec80", []}, "ex_m3u8": {:hex, :ex_m3u8, "0.15.3", "c10427f450b2ed7bfd85808d8dce21214f1fe9fa18927591cbbf96fea0a6a8aa", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "99f20c0b44bab130dc6aca71fefe0d1a174413ae9ac2763220994b29bd310939"}, "file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"}, "finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"}, @@ -43,7 +43,7 @@ "membrane_mp3_lame_plugin": {:hex, :membrane_mp3_lame_plugin, "0.18.3", "441af1b5115c0d9cfc56023a25de42ee88d42ed9c4f4093bee344fe07d950e10", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_mpegaudio_format, "~> 0.3.0", [hex: :membrane_mpegaudio_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}], "hexpm", "f25b985d7f80a7c721d1f127dc62c57ea4797d5f617e334766ed29f391187a6b"}, "membrane_mp3_mad_plugin": {:hex, :membrane_mp3_mad_plugin, "0.18.4", "30003dfbacfa92b51d5124107f9c502ee3f48d8dbc310d304d0164a819f8b8a8", [:mix], [{:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_mpegaudio_format, "~> 0.3.0", [hex: :membrane_mpegaudio_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "dc78f5ca1bc52380add89a2d0cce9706e97d00abcfbdd46ad82afe44b745a43b"}, "membrane_mp4_format": {:hex, :membrane_mp4_format, "0.8.0", "8c6e7d68829228117d333b4fbb030e7be829aab49dd8cb047fdc664db1812e6a", [:mix], [], "hexpm", "148dea678a1f82ccfd44dbde6f936d2f21255f496cb45a22cc6eec427f025522"}, - "membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.35.3", "80228f4332eeef4fce4d90184a82bd5869d184f78438419660da7dc91871a238", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "c6d8b20e49540329f246e9a9c69adae330d424802fdfa1e6485d76a5257e6169"}, + "membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.36.0", "ef4fc6fc72cc439974f2ae0ce579f3dbf8674f02ab00f7f524a6600b786a9ca8", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "84f55a42c69cb557b73d6272f958812f607abaaa6a3473f301d22393f2a62808"}, "membrane_mpegaudio_format": {:hex, :membrane_mpegaudio_format, "0.3.0", "d4fee77fad9f953171c52acd6d53b6646cfc7fbb827c63caa7c6a1efeb86450a", [:mix], [], "hexpm", "dec903efd0086133402b44515d04301790832b4f39995747b0e712c8f966d50d"}, "membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"}, "membrane_opus_plugin": {:hex, :membrane_opus_plugin, "0.20.5", "aa344bb9931c8e22b2286778cce0658e0d4aa071a503c18c55e1b161e17ab337", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "94fd4447b6576780afc6144dbb0520b43bd399c86a10bf5df1fa878a91798cf6"}, @@ -64,7 +64,7 @@ "mimerl": {:hex, :mimerl, "1.4.0", "3882a5ca67fbbe7117ba8947f27643557adec38fa2307490c4c4207624cb213b", [:rebar3], [], "hexpm", "13af15f9f68c65884ecca3a3891d50a7b57d82152792f3e19d88650aa126b144"}, "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, "mockery": {:hex, :mockery, "2.3.3", "3dba87bd0422a513e6af6e0d811383f38f82ac6be5d3d285a5fcca9c299bd0ac", [:mix], [], "hexpm", "17282be00613286254298117cd25e607a39f15ac03b41c631f60e52f5b5ec974"}, - "mpeg_ts": {:git, "https://github.com/membraneframework-labs/kim_mpeg_ts.git", "8c036fca6558a4339033a5a8697ebf147728f36b", [branch: "backport-v1.0.3"]}, + "mpeg_ts": {:git, "https://github.com/kim-company/kim_mpeg_ts.git", "ebde017bf639a9d57d068f029f2e43562ae78746", []}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, diff --git a/test/membrane_http_adaptive_stream/integration_test/fixtures/source/mpeg_ts/skipped_audio.aac b/test/membrane_http_adaptive_stream/integration_test/fixtures/source/mpeg_ts/skipped_audio.aac new file mode 100644 index 0000000..117a31a Binary files /dev/null and b/test/membrane_http_adaptive_stream/integration_test/fixtures/source/mpeg_ts/skipped_audio.aac differ diff --git a/test/membrane_http_adaptive_stream/integration_test/fixtures/source/mpeg_ts/skipped_video.h264 b/test/membrane_http_adaptive_stream/integration_test/fixtures/source/mpeg_ts/skipped_video.h264 new file mode 100644 index 0000000..8eaa7c7 Binary files /dev/null and b/test/membrane_http_adaptive_stream/integration_test/fixtures/source/mpeg_ts/skipped_video.h264 differ diff --git a/test/membrane_http_adaptive_stream/integration_test/source_test.exs b/test/membrane_http_adaptive_stream/integration_test/source_test.exs index c3fb4b4..a379d82 100644 --- a/test/membrane_http_adaptive_stream/integration_test/source_test.exs +++ b/test/membrane_http_adaptive_stream/integration_test/source_test.exs @@ -16,6 +16,8 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do @fmp4_audio_ref_file Path.join(@ref_files_dir, "fmp4/audio.aac") @mpeg_ts_video_ref_file Path.join(@ref_files_dir, "mpeg_ts/video.h264") @mpeg_ts_audio_ref_file Path.join(@ref_files_dir, "mpeg_ts/audio.aac") + @skipped_mpeg_ts_video_ref_file Path.join(@ref_files_dir, "mpeg_ts/skipped_video.h264") + @skipped_mpeg_ts_audio_ref_file Path.join(@ref_files_dir, "mpeg_ts/skipped_audio.aac") describe "Membrane.HTTPAdaptiveStream.Source demuxes audio and video from HLS stream" do @tag :tmp_dir @@ -50,12 +52,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do spec = hls_to_file_pipeline_spec( @mpegts_url, - %Membrane.Transcoder{ - assumed_input_stream_format: %Membrane.AAC{ - encapsulation: :ADTS - }, - output_stream_format: Membrane.AAC - }, + %Membrane.AAC.Parser{out_encapsulation: :ADTS}, audio_result_file, video_result_file ) @@ -70,6 +67,37 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do assert_track(audio_result_file, @mpeg_ts_audio_ref_file, 40_000) assert_track(video_result_file, @mpeg_ts_video_ref_file, 70_000) end + + @tag :tmp_dir + test "(MPEG-TS) with how_much_to_skip option", %{tmp_dir: tmp_dir} do + audio_result_file = Path.join(tmp_dir, "audio.aac") + video_result_file = Path.join(tmp_dir, "video.h264") + how_much_to_skip = Membrane.Time.seconds(11) + + spec = + hls_to_file_pipeline_spec( + @mpegts_url, + %Membrane.AAC.Parser{out_encapsulation: :ADTS}, + audio_result_file, + video_result_file, + how_much_to_skip + ) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + Process.sleep(10_000) + desired_skipped_duration = Membrane.Time.seconds(10) + + assert_received {:event_observed, + %Membrane.Event.Discontinuity{duration: ^desired_skipped_duration}} + + Testing.Pipeline.terminate(pipeline) + + # reference files created locally with a quite good internet connection have + # - 78_732 bytes for audio + # - 136_754 bytes for video + assert_track(audio_result_file, @skipped_mpeg_ts_audio_ref_file, 40_000) + assert_track(video_result_file, @skipped_mpeg_ts_video_ref_file, 70_000) + end end describe "Membrane.HTTPAdaptiveStream.Source sends :new_tracks notification" do @@ -157,11 +185,21 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do Testing.Pipeline.terminate(pipeline) end - defp hls_to_file_pipeline_spec(url, audio_transcoder, audio_result_file, video_result_file) do + @default_how_much_to_skip Membrane.Time.seconds(0) + defp hls_to_file_pipeline_spec( + url, + audio_processor, + audio_result_file, + video_result_file, + how_much_to_skip \\ @default_how_much_to_skip + ) do + parent = self() + [ child(:hls_source, %Membrane.HTTPAdaptiveStream.Source{ url: url, - variant_selection_policy: :lowest_resolution + variant_selection_policy: :lowest_resolution, + how_much_to_skip: how_much_to_skip }) |> via_out(:video_output) |> child(%Membrane.Transcoder{ @@ -173,8 +211,9 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do }), get_child(:hls_source) |> via_out(:audio_output) - |> child(audio_transcoder) + |> child(audio_processor) |> child(Membrane.Realtimer) + |> child(%Membrane.Debug.Filter{handle_event: &send(parent, {:event_observed, &1})}) |> child(%Membrane.File.Sink{ location: audio_result_file })