Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/pretty-avocados-listen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Support passing custom measurements to be run periodically either at application or stack level.
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,19 @@ with_telemetry [Telemetry.Metrics, OtelMetricExporter] do

[
system_monitor_child_spec(opts),
telemetry_poller_child_spec(opts) | exporter_child_specs(opts)
telemetry_poller_child_spec(opts)
| exporter_child_specs(opts)
]
|> Enum.reject(&is_nil/1)
|> Supervisor.init(strategy: :one_for_one)
end

defp system_monitor_child_spec(opts) do
{Electric.Telemetry.SystemMonitor, opts}
end

defp telemetry_poller_child_spec(%{periodic_measurements: []} = _opts), do: nil

defp telemetry_poller_child_spec(opts) do
{:telemetry_poller,
measurements: periodic_measurements(opts),
Expand Down Expand Up @@ -219,28 +223,37 @@ with_telemetry [Telemetry.Metrics, OtelMetricExporter] do

defp memory_by_process_type_metrics(_), do: []

# By default, The telemetry_poller application starts its own poller but we disable that
# and add its default measurements to the list of our custom ones.
#
# This allows for all Electric's default probes to be defined in one place.
@telemetry_poller_probes [
:memory,
:total_run_queue_lengths,
:system_counts
]

defp periodic_measurements(%{periodic_measurements: funcs} = opts) do
Enum.map(funcs, fn
probe when probe in @telemetry_poller_probes -> probe
probe when is_atom(probe) -> {__MODULE__, probe, [opts]}
{m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> {m, f, [opts | a]}
end)
end

# When custom probes aren't used,
defp periodic_measurements(opts) do
[
# Measurements included with the telemetry_poller application.
#
# By default, The telemetry_poller application starts its own poller but we disable that
# and add its default measurements to the list of our custom ones.
#
# This allows for all periodic measurements to be defined in one place.
:memory,
:total_run_queue_lengths,
:system_counts,

# Our custom measurements:
{__MODULE__, :uptime_event, []},
{__MODULE__, :cpu_utilization, []},
{__MODULE__, :process_memory, [opts]},
{__MODULE__, :get_system_load_average, []},
{__MODULE__, :get_system_memory_usage, []}
]
@telemetry_poller_probes ++
[
{__MODULE__, :uptime_event, [opts]},
{__MODULE__, :cpu_utilization, [opts]},
{__MODULE__, :process_memory, [opts]},
{__MODULE__, :get_system_load_average, [opts]},
{__MODULE__, :get_system_memory_usage, [opts]}
]
end

def uptime_event do
def uptime_event(_) do
:telemetry.execute([:vm, :uptime], %{
total: :erlang.monotonic_time() - :erlang.system_info(:start_time)
})
Expand All @@ -253,7 +266,7 @@ with_telemetry [Telemetry.Metrics, OtelMetricExporter] do
end
end

def cpu_utilization do
def cpu_utilization(_) do
case :cpu_sup.util([:per_cpu]) do
{:error, reason} ->
Logger.debug("Failed to collect CPU utilization: #{inspect(reason)}")
Expand All @@ -276,7 +289,7 @@ with_telemetry [Telemetry.Metrics, OtelMetricExporter] do
end
end

def get_system_load_average do
def get_system_load_average(_) do
case :erlang.system_info(:logical_processors) do
cores when is_number(cores) and cores > 0 ->
# > The load values are proportional to how long time a runnable Unix
Expand Down Expand Up @@ -332,7 +345,7 @@ with_telemetry [Telemetry.Metrics, OtelMetricExporter] do

@required_system_memory_keys ~w[system_total_memory free_memory]a

def get_system_memory_usage() do
def get_system_memory_usage(_) do
system_memory = Map.new(:memsup.get_system_memory_data())

# Sanity-check that all the required keys are present before doing any arithmetic on them
Expand Down
20 changes: 15 additions & 5 deletions packages/sync-service/lib/electric/telemetry/stack_telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ with_telemetry [OtelMetricExporter, Telemetry.Metrics] do
Electric.Telemetry.Sentry.set_tags_context(stack_id: opts.stack_id)

[telemetry_poller_child_spec(opts) | exporter_child_specs(opts)]
|> Enum.reject(&is_nil/1)
|> Supervisor.init(strategy: :one_for_one)
end

defp telemetry_poller_child_spec(%{periodic_measurements: []} = _opts), do: nil

defp telemetry_poller_child_spec(opts) do
{:telemetry_poller,
measurements: periodic_measurements(opts),
Expand Down Expand Up @@ -273,14 +276,21 @@ with_telemetry [OtelMetricExporter, Telemetry.Metrics] do
] ++ prometheus_metrics(opts)
end

defp periodic_measurements(%{periodic_measurements: funcs} = opts) do
Enum.map(funcs, fn
probe when is_atom(probe) -> {__MODULE__, probe, [opts]}
{m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> {m, f, [opts | a]}
end)
end

defp periodic_measurements(opts) do
[
{__MODULE__, :count_shapes, [opts.stack_id]},
{__MODULE__, :report_retained_wal_size, [opts.stack_id, opts.slot_name]}
{__MODULE__, :count_shapes, [opts]},
{__MODULE__, :report_retained_wal_size, [opts]}
]
end

def count_shapes(stack_id) do
def count_shapes(%{stack_id: stack_id}) do
# Telemetry is started before everything else in the stack, so we need to handle
# the case where the shape cache is not started yet.
case Electric.ShapeCache.count_shapes(stack_id: stack_id) do
Expand Down Expand Up @@ -314,8 +324,8 @@ with_telemetry [OtelMetricExporter, Telemetry.Metrics] do
"""

@doc false
@spec report_retained_wal_size(atom() | binary(), any()) :: :ok
def report_retained_wal_size(stack_id, slot_name) do
@spec report_retained_wal_size(%{stack_id: binary(), slot_name: binary()}) :: :ok
def report_retained_wal_size(%{stack_id: stack_id, slot_name: slot_name}) do
try do
%Postgrex.Result{rows: [[wal_size]]} =
Postgrex.query!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ with_telemetry [Electric.Telemetry.ApplicationTelemetry] do
free_memory: _,
used_memory: _,
resident_memory: _
} = ApplicationTelemetry.get_system_memory_usage()
} = ApplicationTelemetry.get_system_memory_usage(%{})

_ ->
assert %{
Expand All @@ -32,7 +32,7 @@ with_telemetry [Electric.Telemetry.ApplicationTelemetry] do
total_swap: _,
free_swap: _,
used_swap: _
} = ApplicationTelemetry.get_system_memory_usage()
} = ApplicationTelemetry.get_system_memory_usage(%{})
end
end
end
Expand Down