From 965ee530e68c6b0e37b0423cadbbd4cbf7e7a0a0 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 18 Sep 2025 17:21:23 +0200 Subject: [PATCH 1/4] bump ExHLS and bump version --- README.md | 2 +- mix.exs | 4 ++-- mix.lock | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 9a83bc3..48db0a6 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ In future, the support for MPEG-DASH is planned as well Add the following line to your `deps` in `mix.exs`. Run `mix deps.get`. ```elixir - {:membrane_http_adaptive_stream_plugin, "~> 0.20.0"} + {:membrane_http_adaptive_stream_plugin, "~> 0.20.1"} ``` ## Usage Example diff --git a/mix.exs b/mix.exs index 25b4b6f..2e53979 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do use Mix.Project - @version "0.20.0" + @version "0.20.1" @github_url "https://github.com/membraneframework/membrane_http_adaptive_stream_plugin" def project do @@ -69,7 +69,7 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do {:membrane_aac_plugin, "~> 0.19.0"}, {:membrane_h26x_plugin, "~> 0.10.0"}, {:stream_split, "~> 0.1.7"}, - {:ex_hls, "~> 0.1.0"}, + {:ex_hls, "~> 0.1.1"}, {:bunch, "~> 1.6"}, {:qex, "~> 0.5"}, {:muontrap, "~> 1.6", only: :test}, diff --git a/mix.lock b/mix.lock index 73edc3a..45ebae2 100644 --- a/mix.lock +++ b/mix.lock @@ -13,7 +13,7 @@ "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"}, - "ex_hls": {:hex, :ex_hls, "0.1.0", "618d1b19839bb77b82e9f5a3a0fcd3520643eb0bd894b4a204fb416767ec7fc9", [:mix], [{:ex_m3u8, "~> 0.15.3", [hex: :ex_m3u8, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.2", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.36.0", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:mpeg_ts, "~> 2.0.0", [hex: :mpeg_ts, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "91a636f66e6f1e08eb518b724199169d77443d612ceb31834cb1d5ee1dc4fe17"}, + "ex_hls": {:hex, :ex_hls, "0.1.1", "71cfe9999dd7330ff252fc7bd03fb5ccb9d312eed7cafc30f489de6c9db2462d", [:mix], [{:ex_m3u8, "~> 0.15.3", [hex: :ex_m3u8, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.2", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.36.0", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:mpeg_ts, "~> 2.0.0", [hex: :mpeg_ts, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "260269c1459ab51557383c3a72e0ed21721db0dd3e38b10987decb336f8e902b"}, "ex_m3u8": {:hex, :ex_m3u8, "0.15.3", "c10427f450b2ed7bfd85808d8dce21214f1fe9fa18927591cbbf96fea0a6a8aa", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "99f20c0b44bab130dc6aca71fefe0d1a174413ae9ac2763220994b29bd310939"}, "file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"}, "finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"}, From 66ba60d21fd14f0960d4f11be12e2b0974d5a9db Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 22 Sep 2025 14:20:20 +0200 Subject: [PATCH 2/4] Handle case when ex_hls has to add some latency to Live HLS stream --- lib/membrane_http_adaptive_stream/source.ex | 75 +++++++++++++------ .../source/client_genserver.ex | 2 +- mix.exs | 2 +- mix.lock | 2 +- .../integration_test/source_test.exs | 3 +- 5 files changed, 58 insertions(+), 26 deletions(-) diff --git a/lib/membrane_http_adaptive_stream/source.ex b/lib/membrane_http_adaptive_stream/source.ex index a6e1731..f9ed503 100644 --- a/lib/membrane_http_adaptive_stream/source.ex +++ b/lib/membrane_http_adaptive_stream/source.ex @@ -120,7 +120,12 @@ defmodule Membrane.HTTPAdaptiveStream.Source do state = Map.from_struct(opts) |> Map.merge(%{ - # status will be either :initialized, :waiting_on_pads or :streaming + # status will be either: + # - :initialized + # - :waiting_on_client_genserver_ready + # - :client_genserver_ready + # - :waiting_on_pads + # - :streaming status: :initialized, client_genserver: nil, stream: nil, @@ -166,20 +171,14 @@ defmodule Membrane.HTTPAdaptiveStream.Source do @impl true def handle_playing(ctx, state) do - # both start_streaming/1 and generate_new_tracks_notification/1 functions - # call ClientGenServer.get_tracks_info/1 that triggers downloading first - # segments of the HLS stream + :ok = spawn_client_genserver(ctx, state) - state = create_client_gen_server(ctx, state) + self() |> Process.send_after(:client_genserver_ready_timeout, 60_000) - if Map.values(state.pad_refs) != [nil, nil] do - state |> start_streaming() - else - state |> generate_new_tracks_notification() - end + {[], %{state | status: :waiting_on_client_genserver_ready}} end - defp create_client_gen_server(ctx, state) do + defp spawn_client_genserver(ctx, state) do start_link_arg = %{ url: state.url, variant_selection_policy: state.variant_selection_policy, @@ -192,18 +191,10 @@ defmodule Membrane.HTTPAdaptiveStream.Source do {__MODULE__.ClientGenServer, start_link_arg} ) - client_genserver = - receive do - {:client_genserver, client_genserver} -> client_genserver - after - 5_000 -> - raise "Timeout waiting for #{inspect(__MODULE__)}.ClientGenServer initialization" - end - - %{state | client_genserver: client_genserver} + :ok end - defp generate_new_tracks_notification(%{status: :initialized} = state) do + defp generate_new_tracks_notification(%{status: :client_genserver_ready} = state) do tracks_info = ClientGenServer.get_tracks_info(state.client_genserver) new_tracks = @@ -228,7 +219,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source do end defp start_streaming(%{status: status} = state) - when status in [:initialized, :waiting_on_pads] do + when status in [:client_genserver_ready, :waiting_on_pads] do actions = get_stream_formats(state) ++ get_redemands(state) state = %{state | status: :streaming} {actions, state} @@ -354,6 +345,46 @@ defmodule Membrane.HTTPAdaptiveStream.Source do {[], state} end + @impl true + def handle_demand(pad_ref, demand, :buffers, _ctx, state) + when state.status == :waiting_on_client_genserver_ready do + Membrane.Logger.debug(""" + Ignoring demand (#{inspect(demand)} buffers) on pad #{inspect(pad_ref)} because this \ + element is still waiting for ExHLS to start downloading multimedia segments. + """) + + {[], state} + end + + @impl true + def handle_info({:client_genserver_ready, client_genserver}, _ctx, state) + when state.status == :waiting_on_client_genserver_ready do + state = %{ + state + | client_genserver: client_genserver, + status: :client_genserver_ready + } + + # both start_streaming/1 and generate_new_tracks_notification/1 functions + # call ClientGenServer.get_tracks_info/1 that triggers downloading first + # segments of the HLS stream + + if Map.values(state.pad_refs) != [nil, nil] do + state |> start_streaming() + else + state |> generate_new_tracks_notification() + end + end + + @impl true + def handle_info(:client_genserver_ready_timeout, _ctx, state) do + if state.status == :waiting_on_client_genserver_ready do + raise "Timeout while waiting for ExHLS to download first media segments." + end + + {[], state} + end + @impl true def handle_info({:chunk, %ExHLS.Chunk{} = chunk}, _ctx, state) do buffer = diff --git a/lib/membrane_http_adaptive_stream/source/client_genserver.ex b/lib/membrane_http_adaptive_stream/source/client_genserver.ex index ecabc33..7ec9bd7 100644 --- a/lib/membrane_http_adaptive_stream/source/client_genserver.ex +++ b/lib/membrane_http_adaptive_stream/source/client_genserver.ex @@ -83,7 +83,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do tracks_info: tracks_info } - send(state.source, {:client_genserver, self()}) + send(state.source, {:client_genserver_ready, self()}) {:noreply, state} end diff --git a/mix.exs b/mix.exs index 2e53979..ab546bb 100644 --- a/mix.exs +++ b/mix.exs @@ -69,7 +69,7 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do {:membrane_aac_plugin, "~> 0.19.0"}, {:membrane_h26x_plugin, "~> 0.10.0"}, {:stream_split, "~> 0.1.7"}, - {:ex_hls, "~> 0.1.1"}, + {:ex_hls, "~> 0.1.2"}, {:bunch, "~> 1.6"}, {:qex, "~> 0.5"}, {:muontrap, "~> 1.6", only: :test}, diff --git a/mix.lock b/mix.lock index 45ebae2..0de5f47 100644 --- a/mix.lock +++ b/mix.lock @@ -13,7 +13,7 @@ "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"}, - "ex_hls": {:hex, :ex_hls, "0.1.1", "71cfe9999dd7330ff252fc7bd03fb5ccb9d312eed7cafc30f489de6c9db2462d", [:mix], [{:ex_m3u8, "~> 0.15.3", [hex: :ex_m3u8, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.2", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.36.0", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:mpeg_ts, "~> 2.0.0", [hex: :mpeg_ts, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "260269c1459ab51557383c3a72e0ed21721db0dd3e38b10987decb336f8e902b"}, + "ex_hls": {:hex, :ex_hls, "0.1.2", "fca2c2a4ddf8459b9a47bf1fd6552c5d74cccf5dc72f56cf87129111c3e2f8ee", [:mix], [{:ex_m3u8, "~> 0.15.3", [hex: :ex_m3u8, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.2", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.36.0", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:mpeg_ts, "~> 2.0.0", [hex: :mpeg_ts, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "8129921a863918999cda4032cf66e5e4de9fa24faa33cee28a09b34f13438a49"}, "ex_m3u8": {:hex, :ex_m3u8, "0.15.3", "c10427f450b2ed7bfd85808d8dce21214f1fe9fa18927591cbbf96fea0a6a8aa", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "99f20c0b44bab130dc6aca71fefe0d1a174413ae9ac2763220994b29bd310939"}, "file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"}, "finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"}, diff --git a/test/membrane_http_adaptive_stream/integration_test/source_test.exs b/test/membrane_http_adaptive_stream/integration_test/source_test.exs index 48f657f..511e1c0 100644 --- a/test/membrane_http_adaptive_stream/integration_test/source_test.exs +++ b/test/membrane_http_adaptive_stream/integration_test/source_test.exs @@ -159,12 +159,13 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do describe "Membrane.HTTPAdaptiveStream.Source demuxes audio and video from" do @tag :live @tag :tmp_dir + @tag :x test "Live HLS stream", %{tmp_dir: tmp_dir} do index_m3u8 = Path.join(tmp_dir, "index.m3u8") generate_live_hls(@bbb_33s_mp4_url, index_m3u8) await_until_file_exists(index_m3u8) - Process.sleep(7_000) + # Process.sleep(1_000) audio_result_file = Path.join(tmp_dir, "audio.aac") video_result_file = Path.join(tmp_dir, "video.h264") From 9452a2a03bef5482df58da787e02bd1e53291ab3 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 22 Sep 2025 14:34:38 +0200 Subject: [PATCH 3/4] Remove unnecessary comments --- lib/membrane_http_adaptive_stream/source.ex | 4 ---- .../integration_test/source_test.exs | 2 -- 2 files changed, 6 deletions(-) diff --git a/lib/membrane_http_adaptive_stream/source.ex b/lib/membrane_http_adaptive_stream/source.ex index f9ed503..ac1bb72 100644 --- a/lib/membrane_http_adaptive_stream/source.ex +++ b/lib/membrane_http_adaptive_stream/source.ex @@ -365,10 +365,6 @@ defmodule Membrane.HTTPAdaptiveStream.Source do status: :client_genserver_ready } - # both start_streaming/1 and generate_new_tracks_notification/1 functions - # call ClientGenServer.get_tracks_info/1 that triggers downloading first - # segments of the HLS stream - if Map.values(state.pad_refs) != [nil, nil] do state |> start_streaming() else diff --git a/test/membrane_http_adaptive_stream/integration_test/source_test.exs b/test/membrane_http_adaptive_stream/integration_test/source_test.exs index 511e1c0..0918968 100644 --- a/test/membrane_http_adaptive_stream/integration_test/source_test.exs +++ b/test/membrane_http_adaptive_stream/integration_test/source_test.exs @@ -163,9 +163,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do test "Live HLS stream", %{tmp_dir: tmp_dir} do index_m3u8 = Path.join(tmp_dir, "index.m3u8") generate_live_hls(@bbb_33s_mp4_url, index_m3u8) - await_until_file_exists(index_m3u8) - # Process.sleep(1_000) audio_result_file = Path.join(tmp_dir, "audio.aac") video_result_file = Path.join(tmp_dir, "video.h264") From 3491cc7630f67aef4931c60118eba5b5abf8c32e Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 22 Sep 2025 15:01:57 +0200 Subject: [PATCH 4/4] Make flaky test not flaky --- .../integration_test/source_test.exs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/test/membrane_http_adaptive_stream/integration_test/source_test.exs b/test/membrane_http_adaptive_stream/integration_test/source_test.exs index 0918968..06b1158 100644 --- a/test/membrane_http_adaptive_stream/integration_test/source_test.exs +++ b/test/membrane_http_adaptive_stream/integration_test/source_test.exs @@ -192,9 +192,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do test "Live HLS stream played from the middle", %{tmp_dir: tmp_dir} do index_m3u8 = Path.join(tmp_dir, "index.m3u8") generate_live_hls(@bbb_33s_mp4_url, index_m3u8) - - await_until_file_exists(index_m3u8) - Process.sleep(20_000) + :ok = await_until_media_sequence_is_3(index_m3u8) spec = child(:hls_source, %Membrane.HTTPAdaptiveStream.Source{ @@ -253,6 +251,18 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do end end + defp await_until_media_sequence_is_3(index_m3u8) do + with {:ok, content} <- File.read(index_m3u8), + true <- String.contains?(content, "#EXT-X-MEDIA-SEQUENCE:3") do + :ok + else + _error -> + Logger.debug("Waiting for media sequence to be 3...") + Process.sleep(100) + await_until_media_sequence_is_3(index_m3u8) + end + end + defp generate_live_hls(source_mp4, index_m3u8) do start_supervised!( {