Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ In future, the support for MPEG-DASH is planned as well
Add the following line to your `deps` in `mix.exs`. Run `mix deps.get`.

```elixir
{:membrane_http_adaptive_stream_plugin, "~> 0.20.0"}
{:membrane_http_adaptive_stream_plugin, "~> 0.20.1"}
```

## Usage Example
Expand Down
71 changes: 49 additions & 22 deletions lib/membrane_http_adaptive_stream/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
state =
Map.from_struct(opts)
|> Map.merge(%{
# status will be either :initialized, :waiting_on_pads or :streaming
# status will be either:
# - :initialized
# - :waiting_on_client_genserver_ready
# - :client_genserver_ready
# - :waiting_on_pads
# - :streaming
status: :initialized,
client_genserver: nil,
stream: nil,
Expand Down Expand Up @@ -166,20 +171,14 @@ defmodule Membrane.HTTPAdaptiveStream.Source do

@impl true
def handle_playing(ctx, state) do
# both start_streaming/1 and generate_new_tracks_notification/1 functions
# call ClientGenServer.get_tracks_info/1 that triggers downloading first
# segments of the HLS stream
:ok = spawn_client_genserver(ctx, state)

state = create_client_gen_server(ctx, state)
self() |> Process.send_after(:client_genserver_ready_timeout, 60_000)

if Map.values(state.pad_refs) != [nil, nil] do
state |> start_streaming()
else
state |> generate_new_tracks_notification()
end
{[], %{state | status: :waiting_on_client_genserver_ready}}
end

defp create_client_gen_server(ctx, state) do
defp spawn_client_genserver(ctx, state) do
start_link_arg = %{
url: state.url,
variant_selection_policy: state.variant_selection_policy,
Expand All @@ -192,18 +191,10 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
{__MODULE__.ClientGenServer, start_link_arg}
)

client_genserver =
receive do
{:client_genserver, client_genserver} -> client_genserver
after
5_000 ->
raise "Timeout waiting for #{inspect(__MODULE__)}.ClientGenServer initialization"
end

%{state | client_genserver: client_genserver}
:ok
end

defp generate_new_tracks_notification(%{status: :initialized} = state) do
defp generate_new_tracks_notification(%{status: :client_genserver_ready} = state) do
tracks_info = ClientGenServer.get_tracks_info(state.client_genserver)

new_tracks =
Expand All @@ -228,7 +219,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
end

defp start_streaming(%{status: status} = state)
when status in [:initialized, :waiting_on_pads] do
when status in [:client_genserver_ready, :waiting_on_pads] do
actions = get_stream_formats(state) ++ get_redemands(state)
state = %{state | status: :streaming}
{actions, state}
Expand Down Expand Up @@ -354,6 +345,42 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
{[], state}
end

@impl true
def handle_demand(pad_ref, demand, :buffers, _ctx, state)
when state.status == :waiting_on_client_genserver_ready do
Membrane.Logger.debug("""
Ignoring demand (#{inspect(demand)} buffers) on pad #{inspect(pad_ref)} because this \
element is still waiting for ExHLS to start downloading multimedia segments.
""")

{[], state}
end

@impl true
def handle_info({:client_genserver_ready, client_genserver}, _ctx, state)
when state.status == :waiting_on_client_genserver_ready do
state = %{
state
| client_genserver: client_genserver,
status: :client_genserver_ready
}

if Map.values(state.pad_refs) != [nil, nil] do
state |> start_streaming()
else
state |> generate_new_tracks_notification()
end
end

@impl true
def handle_info(:client_genserver_ready_timeout, _ctx, state) do
if state.status == :waiting_on_client_genserver_ready do
raise "Timeout while waiting for ExHLS to download first media segments."
end

{[], state}
end

@impl true
def handle_info({:chunk, %ExHLS.Chunk{} = chunk}, _ctx, state) do
buffer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
tracks_info: tracks_info
}

send(state.source, {:client_genserver, self()})
send(state.source, {:client_genserver_ready, self()})

{:noreply, state}
end
Expand Down
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.HTTPAdaptiveStream.MixProject do
use Mix.Project

@version "0.20.0"
@version "0.20.1"
@github_url "https://github.com/membraneframework/membrane_http_adaptive_stream_plugin"

def project do
Expand Down Expand Up @@ -69,7 +69,7 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do
{:membrane_aac_plugin, "~> 0.19.0"},
{:membrane_h26x_plugin, "~> 0.10.0"},
{:stream_split, "~> 0.1.7"},
{:ex_hls, "~> 0.1.0"},
{:ex_hls, "~> 0.1.2"},
{:bunch, "~> 1.6"},
{:qex, "~> 0.5"},
{:muontrap, "~> 1.6", only: :test},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"},
"ex_hls": {:hex, :ex_hls, "0.1.0", "618d1b19839bb77b82e9f5a3a0fcd3520643eb0bd894b4a204fb416767ec7fc9", [:mix], [{:ex_m3u8, "~> 0.15.3", [hex: :ex_m3u8, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.2", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.36.0", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:mpeg_ts, "~> 2.0.0", [hex: :mpeg_ts, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "91a636f66e6f1e08eb518b724199169d77443d612ceb31834cb1d5ee1dc4fe17"},
"ex_hls": {:hex, :ex_hls, "0.1.2", "fca2c2a4ddf8459b9a47bf1fd6552c5d74cccf5dc72f56cf87129111c3e2f8ee", [:mix], [{:ex_m3u8, "~> 0.15.3", [hex: :ex_m3u8, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.2", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.36.0", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:mpeg_ts, "~> 2.0.0", [hex: :mpeg_ts, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "8129921a863918999cda4032cf66e5e4de9fa24faa33cee28a09b34f13438a49"},
"ex_m3u8": {:hex, :ex_m3u8, "0.15.3", "c10427f450b2ed7bfd85808d8dce21214f1fe9fa18927591cbbf96fea0a6a8aa", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "99f20c0b44bab130dc6aca71fefe0d1a174413ae9ac2763220994b29bd310939"},
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
"finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,11 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do
describe "Membrane.HTTPAdaptiveStream.Source demuxes audio and video from" do
@tag :live
@tag :tmp_dir
@tag :x
test "Live HLS stream", %{tmp_dir: tmp_dir} do
index_m3u8 = Path.join(tmp_dir, "index.m3u8")
generate_live_hls(@bbb_33s_mp4_url, index_m3u8)

await_until_file_exists(index_m3u8)
Process.sleep(7_000)

audio_result_file = Path.join(tmp_dir, "audio.aac")
video_result_file = Path.join(tmp_dir, "video.h264")
Expand Down Expand Up @@ -193,9 +192,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do
test "Live HLS stream played from the middle", %{tmp_dir: tmp_dir} do
index_m3u8 = Path.join(tmp_dir, "index.m3u8")
generate_live_hls(@bbb_33s_mp4_url, index_m3u8)

await_until_file_exists(index_m3u8)
Process.sleep(20_000)
:ok = await_until_media_sequence_is_3(index_m3u8)

spec =
child(:hls_source, %Membrane.HTTPAdaptiveStream.Source{
Expand Down Expand Up @@ -254,6 +251,18 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do
end
end

defp await_until_media_sequence_is_3(index_m3u8) do
with {:ok, content} <- File.read(index_m3u8),
true <- String.contains?(content, "#EXT-X-MEDIA-SEQUENCE:3") do
:ok
else
_error ->
Logger.debug("Waiting for media sequence to be 3...")
Process.sleep(100)
await_until_media_sequence_is_3(index_m3u8)
end
end

defp generate_live_hls(source_mp4, index_m3u8) do
start_supervised!(
{
Expand Down