Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 54 additions & 6 deletions lib/ex_hls/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ defmodule ExHLS.Client do
:queues,
:timestamp_offsets,
:last_timestamps,
:start_at_ms,
:base_timestamp_ms,
:end_stream_executed?
]

Expand All @@ -44,8 +46,8 @@ defmodule ExHLS.Client do
By default, it uses `DemuxingEngine.MPEGTS` as the demuxing engine implementation.
"""

@spec new(String.t()) :: client()
def new(url) do
@spec new(String.t(), non_neg_integer()) :: client()
def new(url, start_at_ms \\ 0) do
%{status: 200, body: request_body} = Req.get!(url)
multivariant_playlist = request_body |> ExM3U8.deserialize_multivariant_playlist!([])

Expand All @@ -61,6 +63,8 @@ defmodule ExHLS.Client do
queues: %{audio: Qex.new(), video: Qex.new()},
timestamp_offsets: %{audio: nil, video: nil},
last_timestamps: %{audio: nil, video: nil},
start_at_ms: start_at_ms,
base_timestamp_ms: nil,
end_stream_executed?: false
}
end
Expand All @@ -87,14 +91,16 @@ defmodule ExHLS.Client do
defp ensure_media_playlist_loaded(client), do: client

defp read_media_playlist_without_variant(%__MODULE__{media_playlist: nil} = client) do
deserialized_media_playlist =
{deserialized_media_playlist, base_timestamp_ms} =
client.root_playlist_string
|> ExM3U8.deserialize_media_playlist!([])
|> skip_to_start_at(client.start_at_ms)

%{
client
| media_playlist: deserialized_media_playlist,
media_base_url: client.base_url
media_base_url: client.base_url,
base_timestamp_ms: base_timestamp_ms
}
end

Expand All @@ -121,15 +127,17 @@ defmodule ExHLS.Client do

media_playlist = Path.join(client.base_url, chosen_variant.uri) |> Req.get!()

deserialized_media_playlist =
{deserialized_media_playlist, base_timestamp_ms} =
ExM3U8.deserialize_media_playlist!(media_playlist.body, [])
|> skip_to_start_at(client.start_at_ms)

media_base_url = Path.join(client.base_url, Path.dirname(chosen_variant.uri))

%{
client
| media_playlist: deserialized_media_playlist,
media_base_url: media_base_url
media_base_url: media_base_url,
base_timestamp_ms: base_timestamp_ms
}
end

Expand Down Expand Up @@ -168,6 +176,7 @@ defmodule ExHLS.Client do
|> put_in([:last_timestamps, media_type], chunk.dts_ms)
|> put_in([:demuxing_engine], demuxing_engine)

chunk = normalize_timestamps(chunk, client.base_timestamp_ms)
{chunk, client}
else
# returned from the second match
Expand Down Expand Up @@ -316,4 +325,43 @@ defmodule ExHLS.Client do
{:error, _reason} -> :error
end
end

defp skip_to_start_at(media_playlist, start_at_ms) do
{discarded, timeline_with_cumulative_duration} =
Enum.map_reduce(
media_playlist.timeline,
0,
fn
%ExM3U8.Tags.Segment{} = chunk, cumulative_duration_ms ->
chunk_end_ms = cumulative_duration_ms + 1000 * chunk.duration
{{chunk, chunk_end_ms}, chunk_end_ms}

other_tag, cumulative_duration_ms ->
{{other_tag, cumulative_duration_ms}, cumulative_duration_ms}
end
)
|> elem(0)
|> Enum.split_with(fn
{%ExM3U8.Tags.Segment{}, chunk_end_ms} -> chunk_end_ms <= start_at_ms
_other -> false
end)

base_timestamp_ms =
case Enum.at(discarded, -1) do
nil -> 0
{_discarded_timeline, cumulative_duration_ms} -> cumulative_duration_ms
end

timeline = Enum.map(timeline_with_cumulative_duration, &elem(&1, 0))

{put_in(media_playlist.timeline, timeline), base_timestamp_ms}
end

defp normalize_timestamps(chunk, base_timestamp_ms) do
%{
chunk
| pts_ms: round(chunk.pts_ms + base_timestamp_ms),
dts_ms: round(chunk.dts_ms + base_timestamp_ms)
}
end
end
3 changes: 2 additions & 1 deletion lib/ex_hls/demuxing_engine/cmaf.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule ExHLS.DemuxingEngine.CMAF do
@behaviour ExHLS.DemuxingEngine

alias Membrane.MP4.Demuxer.CMAF
alias Membrane.MP4.Demuxer.Sample

@enforce_keys [:demuxer]
defstruct @enforce_keys ++ [tracks_to_chunks: %{}]
Expand Down Expand Up @@ -30,7 +31,7 @@ defmodule ExHLS.DemuxingEngine.CMAF do
chunks
|> Enum.group_by(
fn chunk -> chunk.track_id end,
fn %CMAF.Engine.Sample{} = chunk ->
fn %Sample{} = chunk ->
%ExHLS.Chunk{
payload: chunk.payload,
pts_ms: chunk.pts,
Expand Down
4 changes: 2 additions & 2 deletions lib/ex_hls/demuxing_engine/mpeg_ts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
end
end

@mpegts_clock_rate 90
defp packet_ts_to_millis(ts), do: div(ts, @mpegts_clock_rate)
# value returned by Demuxer is represented in nanoseconds
defp packet_ts_to_millis(ts), do: div(ts, 1_000_000)

@impl true
def end_stream(%__MODULE__{} = demuxing_engine) do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ defmodule ExHLS.Mixfile do
{:ex_m3u8, "~> 0.15.3"},
{:req, "~> 0.5.10"},
{:qex, "~> 0.5.1"},
{:membrane_mp4_plugin, "~> 0.35.3"},
{:membrane_mp4_plugin, "~> 0.36.0"},
{:membrane_h26x_plugin, "~> 0.10.2"},
# {:mpeg_ts, github: "kim-company/kim_mpeg_ts"},
{:mpeg_ts, github: "membraneframework-labs/kim_mpeg_ts", branch: "backport-v1.0.3"},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"},
"membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.5", "e9fa1ee9cda944259c4d2728c8b279bfe0152a3a6c1af187b07fa8411ca4e25e", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "dd0287a6b6223e47bba30a8952d6ec53db35f6a3e33203b7ad786e995711f098"},
"membrane_mp4_format": {:hex, :membrane_mp4_format, "0.8.0", "8c6e7d68829228117d333b4fbb030e7be829aab49dd8cb047fdc664db1812e6a", [:mix], [], "hexpm", "148dea678a1f82ccfd44dbde6f936d2f21255f496cb45a22cc6eec427f025522"},
"membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.35.3", "80228f4332eeef4fce4d90184a82bd5869d184f78438419660da7dc91871a238", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "c6d8b20e49540329f246e9a9c69adae330d424802fdfa1e6485d76a5257e6169"},
"membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.36.0", "ef4fc6fc72cc439974f2ae0ce579f3dbf8674f02ab00f7f524a6600b786a9ca8", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "84f55a42c69cb557b73d6272f958812f607abaaa6a3473f301d22393f2a62808"},
"membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"},
"membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"},
"mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"},
Expand Down
45 changes: 42 additions & 3 deletions test/client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ defmodule Client.Test do

{video_chunk, client} = client |> Client.read_video_chunk()

assert %{pts_ms: 10_033, dts_ms: 10_000} = video_chunk
assert %{pts_ms: 0, dts_ms: 0} = video_chunk
assert byte_size(video_chunk.payload) == 1048

assert <<0, 0, 0, 1, 9, 240, 0, 0, 0, 1, 103, 100, 0, 31, 172, 217, 128, 80, 5, 187, 1, 16,
Expand All @@ -41,7 +41,7 @@ defmodule Client.Test do

{audio_chunk, _client} = Client.read_audio_chunk(client)

assert %{pts_ms: 10_010, dts_ms: 10_010} = audio_chunk
assert %{pts_ms: 0, dts_ms: 0} = audio_chunk
assert byte_size(audio_chunk.payload) == 6154

assert <<255, 241, 80, 128, 4, 63, 252, 222, 4, 0, 0, 108, 105, 98, 102, 97, 97, 99, 32, 49,
Expand Down Expand Up @@ -111,7 +111,7 @@ defmodule Client.Test do

{video_chunk, _client} = Client.read_video_chunk(client)

assert %{pts_ms: 1480, dts_ms: 1400} = video_chunk
assert %{pts_ms: 0, dts_ms: 0} = video_chunk
assert byte_size(video_chunk.payload) == 822

assert <<0, 0, 0, 1, 9, 240, 0, 0, 0, 1, 6, 5, 255, 255, 167, 220, 69, 233, 189, 230, 217, 72,
Expand Down Expand Up @@ -146,4 +146,43 @@ defmodule Client.Test do
72, 183, 150, 44, 216, 32, 217, 35, 238, 239, 120, 50, 54, 52, 32, 45, 32, 99, 111,
114, 101, 32, 49, 54, 52, 32, 114>> <> _rest = video_chunk.payload
end

test "(MPEGTS) stream with start_at_ms" do
start_at_ms = 40_000
client = Client.new(@mpegts_url, start_at_ms)

variant_720 =
Client.get_variants(client)
|> Map.values()
|> Enum.find(&(&1.resolution == {1280, 720}))

assert variant_720 != nil

client = client |> Client.choose_variant(variant_720.id)
{:ok, tracks_info, client} = Client.get_tracks_info(client)

tracks_info = tracks_info |> Map.values()

assert tracks_info |> length() == 2
assert %RemoteStream{content_format: AAC, type: :bytestream} in tracks_info
assert %RemoteStream{content_format: H264, type: :bytestream} in tracks_info

{video_chunk, client} = client |> Client.read_video_chunk()

assert %{pts_ms: 40_004, dts_ms: 40_004} = video_chunk
assert byte_size(video_chunk.payload) == 135_298

assert <<0, 0, 0, 1, 9, 240, 0, 0, 0, 1, 103, 100, 0, 31, 172, 217, 128, 80, 5, 187, 1, 16, 0,
0, 3, 0, 16, 0, 0, 7, 128, 241, 131, 25, 160, 0, 0, 0, 1, 104, 233, 121, 203, 34,
192, 0, 0, 1, 101, 136>> <> _rest = video_chunk.payload

{audio_chunk, _client} = Client.read_audio_chunk(client)

assert %{pts_ms: 40_004, dts_ms: 40_004} = audio_chunk
assert byte_size(audio_chunk.payload) == 6020

assert <<255, 241, 80, 128, 47, 63, 252, 33, 10, 204, 43, 253, 251, 213, 30, 152, 129, 48, 80,
38, 22, 18, 5, 130, 129, 113, 34, 92, 36, 20, 25, 9, 2, 193, 64, 144, 68, 36, 17, 75,
215, 198, 77, 184, 229, 170, 157, 115, 169, 223>> <> _rest = audio_chunk.payload
end
end