diff --git a/.buildkite/docker-image.yml b/.buildkite/docker-image.yml index 5ac9bc415c..ba8fc9477f 100644 --- a/.buildkite/docker-image.yml +++ b/.buildkite/docker-image.yml @@ -18,6 +18,7 @@ steps: - export ELECTRIC_VERSION=$(git describe --abbrev=7 --tags --always --first-parent --match '@core/sync-service@*' | sed -En 's|^@core/sync-service@||p') - export SHORT_COMMIT_SHA=$(git rev-parse --short HEAD) - docker buildx build --platform linux/arm64/v8,linux/amd64 --push + --build-context electric-telemetry=../electric-telemetry --build-arg ELECTRIC_VERSION=$${ELECTRIC_VERSION} -t $${ELECTRIC_IMAGE_NAME}:canary -t $${ELECTRIC_CANARY_IMAGE_NAME}:$${SHORT_COMMIT_SHA} @@ -31,6 +32,7 @@ steps: - cd ./packages/sync-service - export ELECTRIC_VERSION=$(jq '.version' -r package.json) - docker buildx build --platform linux/arm64/v8,linux/amd64 --push + --build-context electric-telemetry=../electric-telemetry --build-arg ELECTRIC_VERSION=$${ELECTRIC_VERSION} -t $${ELECTRIC_IMAGE_NAME}:$${ELECTRIC_VERSION} -t $${ELECTRIC_IMAGE_NAME}:latest diff --git a/.changeset/eleven-pens-sell.md b/.changeset/eleven-pens-sell.md new file mode 100644 index 0000000000..508061b57f --- /dev/null +++ b/.changeset/eleven-pens-sell.md @@ -0,0 +1,6 @@ +--- +'@core/electric-telemetry': patch +'@core/sync-service': patch +--- + +Extract telemetry code from Electric into a separate package, for easier modification and sharing of the telemetry code between Electric and Cloud. diff --git a/.github/workflows/autoformat.yml b/.github/workflows/autoformat.yml index 49a1c0bb8d..12dab56707 100644 --- a/.github/workflows/autoformat.yml +++ b/.github/workflows/autoformat.yml @@ -48,7 +48,7 @@ jobs: elixir_formatting: strategy: matrix: - package: [sync-service, elixir-client] + package: [sync-service, elixir-client, electric-telemetry] name: Elixir formatting and linting runs-on: ubuntu-latest defaults: diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index e85825f265..dbd4482b34 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -62,6 +62,7 @@ jobs: --push \ --cache-from ${{ env.REGISTRY }}/electric:canary-builder \ --cache-from ${{ env.REGISTRY }}/electric:pr-${{ github.event.issue.number }}-builder \ + --build-context electric-telemetry=../electric-telemetry \ --build-arg BUILDKIT_INLINE_CACHE=1 \ --tag ${{ env.REGISTRY }}/electric:pr-${{ github.event.issue.number }}-builder \ --target builder \ diff --git a/.github/workflows/docker_image_smoketest.yml b/.github/workflows/docker_image_smoketest.yml index 48d97010c1..dc94859bfe 100644 --- a/.github/workflows/docker_image_smoketest.yml +++ b/.github/workflows/docker_image_smoketest.yml @@ -36,6 +36,8 @@ jobs: - uses: docker/build-push-action@v6 with: context: packages/sync-service + build-contexts: | + electric-telemetry=packages/electric-telemetry push: false load: true tags: 'electric-test-image' diff --git a/.github/workflows/electric_telemetry_tests.yml b/.github/workflows/electric_telemetry_tests.yml new file mode 100644 index 0000000000..5f129c9c99 --- /dev/null +++ b/.github/workflows/electric_telemetry_tests.yml @@ -0,0 +1,86 @@ +name: Electric Telemetry CI + +on: + push: + branches: ['main'] + paths-ignore: + - 'website/**' + - '**/README.md' + pull_request: + paths-ignore: + - 'website/**' + - '**/README.md' + +permissions: + contents: read + +jobs: + build_and_test: + name: Build and test electric-telemetry + runs-on: ubuntu-latest + defaults: + run: + working-directory: packages/electric-telemetry + env: + MIX_ENV: test + MIX_OS_DEPS_COMPILE_PARTITION_COUNT: 4 + steps: + - uses: actions/checkout@v4 + + - uses: erlef/setup-beam@v1 + with: + version-type: strict + version-file: '.tool-versions' + + - name: Cache electric-telemetry dependencies + uses: actions/cache@v4 + with: + path: packages/electric-telemetry/deps + key: ${{ runner.os }}-electric-telemetry-deps-${{ hashFiles('packages/electric-telemetry/mix.lock') }} + restore-keys: | + ${{ runner.os }}-electric-telemetry-deps-${{ hashFiles('packages/electric-telemetry/mix.lock') }} + ${{ runner.os }}-electric-telemetry-deps- + + - name: Cache compiled code + uses: actions/cache@v4 + with: + path: | + packages/electric-telemetry/_build/*/lib + !packages/electric-telemetry/_build/*/lib/electric_telemetry + key: "${{ runner.os }}-electric-telemetry-build-${{ env.MIX_ENV }}-${{ hashFiles('packages/sync-service/mix.lock') }}" + restore-keys: | + ${{ runner.os }}-electric-telemetry-build-${{ env.MIX_ENV }}-${{ hashFiles('packages/sync-service/mix.lock') }} + ${{ runner.os }}-electric-telemetry-build-${{ env.MIX_ENV }}- + ${{ runner.os }}-electric-telemetry-build- + + - name: Install dependencies + run: mix do deps.get + deps.compile + + - name: Compile + # don't bail on compile warnings - let's get the results of the tests + # and let the formatting task check for compilation warnings + run: mix compile + + - name: Run tests + run: mix coveralls.json + + - name: Upload coverage reports to CodeCov + uses: codecov/codecov-action@ad3126e916f78f00edff4ed0317cf185271ccc2d + with: + token: ${{ secrets.CODECOV_TOKEN }} + flags: elixir,unit-tests,electric-telemetry + files: ./cover/excoveralls.json + + - name: Upload test results to CodeCov + uses: codecov/test-results-action@f2dba722c67b86c6caa034178c6e4d35335f6706 + if: ${{ !cancelled() }} + env: + DUMMY_COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}-dummy + with: + token: ${{ secrets.CODECOV_TOKEN }} + fail_ci_if_error: true + flags: elixir,unit-tests,electric-telemetry + files: ./junit/test-junit-report.xml + # Upload all PR test results to single branch - requires overriding branch and commit + override_branch: ${{ github.event_name == 'pull_request' && 'codecov/pr-test-aggregation' || '' }} + override_commit: ${{ github.event_name == 'pull_request' && env.DUMMY_COMMIT_SHA || '' }} diff --git a/.github/workflows/elixir_tests.yml b/.github/workflows/elixir_tests.yml index b42b92776d..b9b4832a81 100644 --- a/.github/workflows/elixir_tests.yml +++ b/.github/workflows/elixir_tests.yml @@ -95,26 +95,16 @@ jobs: ${{ runner.os }}-sync-service-build- - name: Install dependencies - run: | - mix deps.get - mix deps.compile - MIX_TARGET=application mix deps.compile + run: mix do deps.get + deps.compile - name: Compile # don't bail on compile warnings - let's get the results of the tests # and let the formatting task check for compilation warnings - run: | - mix compile - MIX_TARGET=application mix compile + run: mix compile - name: Run tests run: mix coveralls.json --include slow --cover --export-coverage test - - name: Run telemetry tests - env: - MIX_TARGET: application - run: mix coveralls.json --only telemetry_target --import-cover cover - - name: Upload coverage reports to CodeCov uses: codecov/codecov-action@ad3126e916f78f00edff4ed0317cf185271ccc2d with: diff --git a/packages/electric-telemetry/.dockerignore b/packages/electric-telemetry/.dockerignore new file mode 100644 index 0000000000..d983cc0f3c --- /dev/null +++ b/packages/electric-telemetry/.dockerignore @@ -0,0 +1,3 @@ +_build/ +deps/ +test/ diff --git a/packages/electric-telemetry/.formatter.exs b/packages/electric-telemetry/.formatter.exs new file mode 100644 index 0000000000..d2cda26edd --- /dev/null +++ b/packages/electric-telemetry/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/packages/electric-telemetry/.gitignore b/packages/electric-telemetry/.gitignore new file mode 100644 index 0000000000..5b8c75f91d --- /dev/null +++ b/packages/electric-telemetry/.gitignore @@ -0,0 +1,24 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# Temporary files, for example, from tests. +/tmp/ + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +electric_telemetry-*.tar + diff --git a/packages/electric-telemetry/README.md b/packages/electric-telemetry/README.md new file mode 100644 index 0000000000..101d956721 --- /dev/null +++ b/packages/electric-telemetry/README.md @@ -0,0 +1,41 @@ +# ElectricTelemetry + +Library used by Electric to gather telemetry and export it to a number of supported destinations. Originally extracted from [electric-sql/electric](https://github.com/electric-sql/electric). + +## Installation + +Install it by adding `electric_telemetry` to your list of dependencies in `mix.exs`: + +```elixir +def deps do + [ + {:electric_telemetry, github: "electric-sql/electric", sparse: "packages/electric-telemetry"} + ] +end +``` + +## Configuration + +Runtime configuration is available for the underlying otel_metric_exporter lib: + +```elixir +config :otel_metric_exporter, + otlp_endpoint: "...", + otlp_headers: %{...} + resource: %{...} +``` + +The configuration options for `ElectricTelemetry.ApplicationTelemetry` or `ElectricTelemetry.StackTelemetry` must be passed as a keyword list function argument. See `ElectricTelemetry.Opts` for the supported options. + +## Overview + +At a high level, the library includes these modules: + +- `ElectricTelemetry` includes basic utilities such as validating user options. + +- `ElectricTelemetry.ApplicationTelemetry` defines metrics and periodic measurements that apply to BEAM as a whole. + +- `ElectricTelemetry.StackTelemetry` defines metrics that are specific to the notion of an + Electric stack: shape stats, replication client stats, etc. No builtin measurements are defines here. + +- Reporter modules. These are enabled individually and are used for exporting metrics to the corresponding destination. diff --git a/packages/electric-telemetry/lib/electric/telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry.ex new file mode 100644 index 0000000000..71d9e5bcd1 --- /dev/null +++ b/packages/electric-telemetry/lib/electric/telemetry.ex @@ -0,0 +1,47 @@ +defmodule ElectricTelemetry do + def scheduler_ids do + num_schedulers = :erlang.system_info(:schedulers) + Enum.map(1..num_schedulers, &:"normal_#{&1}") ++ [:cpu, :io] + end + + def keep_for_stack(metrics, stack_id) do + Enum.map(metrics, fn + {key, metric} -> {key, filter_metric(metric, stack_id)} + metric when is_map(metric) -> filter_metric(metric, stack_id) + end) + end + + defp filter_metric(metric, stack_id) do + Map.update!(metric, :keep, fn + nil -> fn metadata -> metadata[:stack_id] == stack_id end + fun -> fn metadata -> fun.(metadata) && metadata[:stack_id] == stack_id end + end) + end + + @opts_schema NimbleOptions.new!(ElectricTelemetry.Opts.schema()) + + def validate_options(user_opts) do + with {:ok, validated_opts} <- NimbleOptions.validate(user_opts, @opts_schema) do + config = + Map.new(validated_opts, fn + {k, kwlist} when k in [:reporters, :intervals_and_thresholds] -> {k, Map.new(kwlist)} + kv -> kv + end) + + {:ok, config} + end + end + + def export_enabled?(%{reporters: reporters}) do + truthy?( + reporters.statsd_host || + reporters.call_home_url || + reporters.otel_metrics? || + reporters.prometheus? + ) + end + + defp truthy?(false), do: false + defp truthy?(nil), do: false + defp truthy?(_), do: true +end diff --git a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex new file mode 100644 index 0000000000..8a379790c8 --- /dev/null +++ b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex @@ -0,0 +1,438 @@ +defmodule ElectricTelemetry.ApplicationTelemetry do + @moduledoc """ + Collects and exports application level telemetry such as CPU, memory and BEAM metrics. + + See also StackTelemetry for stack specific telemetry. + """ + use Supervisor + + import Telemetry.Metrics + + alias ElectricTelemetry.Reporters + + require Logger + + @behaviour ElectricTelemetry.Poller + + def start_link(opts) do + with {:ok, opts} <- ElectricTelemetry.validate_options(opts) do + if ElectricTelemetry.export_enabled?(opts) do + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) + else + # Avoid starting the telemetry supervisor and its telemetry_poller child if we're not + # intending to export periodic measurements metrics anywhere. + :ignore + end + end + end + + @impl Supervisor + def init(opts) do + children = + [ + {ElectricTelemetry.SystemMonitor, opts}, + ElectricTelemetry.Poller.child_spec(opts, callback_module: __MODULE__) + | exporter_child_specs(opts) + ] + |> Enum.reject(&is_nil/1) + + Supervisor.init(children, strategy: :one_for_one) + end + + defp exporter_child_specs(opts) do + metrics = metrics(opts) + + [ + Reporters.CallHomeReporter.child_spec( + opts, + metrics: Reporters.CallHomeReporter.application_metrics() + ), + Reporters.Otel.child_spec(opts, metrics: metrics), + Reporters.Prometheus.child_spec(opts, metrics: metrics), + Reporters.Statsd.child_spec(opts, metrics: Reporters.Statsd.application_metrics()) + ] + end + + @impl ElectricTelemetry.Poller + def builtin_periodic_measurements(telemetry_opts) do + [ + # Measurements included with the telemetry_poller application. + # + # By default, The telemetry_poller application starts its own poller but we disable that + # and add its default measurements to the list of our custom ones. + # + # This allows for all periodic measurements to be defined in one place. + :memory, + :persistent_term, + :system_counts, + :total_run_queue_lengths + ] ++ + Enum.map( + [ + :uptime_event, + :cpu_utilization, + :scheduler_utilization, + :run_queue_lengths, + :garbage_collection, + :reductions, + :process_memory, + :get_system_load_average, + :get_system_memory_usage + ], + &{__MODULE__, &1, [telemetry_opts]} + ) + end + + def metrics(telemetry_opts) do + [ + last_value("process.memory.total", tags: [:process_type], unit: :byte), + last_value("system.cpu.core_count"), + last_value("system.cpu.utilization.total"), + last_value("system.load_percent.avg1"), + last_value("system.load_percent.avg5"), + last_value("system.load_percent.avg15"), + last_value("system.memory_percent.free_memory"), + last_value("system.memory_percent.available_memory"), + last_value("system.memory_percent.used_memory"), + last_value("vm.garbage_collection.total_runs"), + last_value("vm.garbage_collection.total_bytes_reclaimed", unit: :byte), + last_value("vm.memory.atom", unit: :byte), + last_value("vm.memory.atom_used", unit: :byte), + last_value("vm.memory.binary", unit: :byte), + last_value("vm.memory.code", unit: :byte), + last_value("vm.memory.ets", unit: :byte), + last_value("vm.memory.processes", unit: :byte), + last_value("vm.memory.processes_used", unit: :byte), + last_value("vm.memory.system", unit: :byte), + last_value("vm.memory.total", unit: :byte), + sum("vm.monitor.long_message_queue.length", tags: [:process_type]), + distribution("vm.monitor.long_schedule.timeout", + tags: [:process_type], + unit: :millisecond + ), + distribution("vm.monitor.long_gc.timeout", tags: [:process_type], unit: :millisecond), + last_value("vm.persistent_term.count"), + last_value("vm.persistent_term.memory", unit: :byte), + last_value("vm.reductions.total"), + last_value("vm.reductions.delta"), + last_value("vm.run_queue_lengths.total"), + last_value("vm.run_queue_lengths.total_plus_io"), + last_value("vm.scheduler_utilization.total"), + last_value("vm.scheduler_utilization.weighted"), + last_value("vm.system_counts.atom_count"), + last_value("vm.system_counts.port_count"), + last_value("vm.system_counts.process_count"), + last_value("vm.total_run_queue_lengths.total"), + last_value("vm.total_run_queue_lengths.cpu"), + last_value("vm.total_run_queue_lengths.io"), + last_value("vm.uptime.total", + unit: :second, + measurement: &:erlang.convert_time_unit(&1.total, :native, :second) + ) + ] ++ + cpu_utilization_metrics() ++ + scheduler_utilization_metrics() ++ + run_queue_lengths_metrics() ++ + additional_metrics(telemetry_opts) + end + + def cpu_utilization_metrics do + 1..:erlang.system_info(:logical_processors) + |> Enum.map(&last_value("system.cpu.utilization.core_#{&1 - 1}")) + end + + def scheduler_utilization_metrics do + num_schedulers = :erlang.system_info(:schedulers) + schedulers_range = 1..num_schedulers + + num_dirty_cpu_schedulers = :erlang.system_info(:dirty_cpu_schedulers) + + dirty_cpu_schedulers_range = + (num_schedulers + 1)..(num_schedulers + num_dirty_cpu_schedulers) + + Enum.map(schedulers_range, &last_value("vm.scheduler_utilization.normal_#{&1}")) ++ + Enum.map(dirty_cpu_schedulers_range, &last_value("vm.scheduler_utilization.cpu_#{&1}")) + end + + def run_queue_lengths_metrics do + Enum.map(ElectricTelemetry.scheduler_ids(), &last_value("vm.run_queue_lengths.#{&1}")) + end + + def additional_metrics(%{additional_metrics: metrics}), do: metrics + def additional_metrics(_), do: [] + + ### + + def uptime_event(_) do + :telemetry.execute([:vm, :uptime], %{ + total: :erlang.monotonic_time() - :erlang.system_info(:start_time) + }) + end + + def process_memory(%{intervals_and_thresholds: %{top_process_count: process_count}}) do + for %{type: type, memory: memory} <- + ElectricTelemetry.Processes.top_memory_by_type(process_count) do + :telemetry.execute([:process, :memory], %{total: memory}, %{process_type: to_string(type)}) + end + end + + def cpu_utilization(_) do + case :cpu_sup.util([:per_cpu]) do + {:error, reason} -> + Logger.debug("Failed to collect CPU utilization: #{inspect(reason)}") + + data -> + {per_core_utilization, bare_values} = + for {cpu_index, busy, _free, _misc} <- data do + {{:"core_#{cpu_index}", busy}, busy} + end + |> Enum.unzip() + + utilization = + per_core_utilization + |> Map.new() + |> Map.put(:total, mean(bare_values)) + + :telemetry.execute([:system, :cpu, :utilization], utilization) + + :telemetry.execute([:system, :cpu], %{core_count: length(bare_values)}) + end + end + + # The Erlang docs do not specify a recommended value to use between two successive samples + # of scheduler utilization. + @scheduler_wall_time_measurement_duration 100 + + def scheduler_utilization(_) do + # Perform the measurement in a task to ensure that the `scheduler_wall_time` flag does + # not remain enabled in case of unforeseen errors. + t = + Task.async(fn -> + :erlang.system_flag(:scheduler_wall_time, true) + s1 = :scheduler.get_sample() + Process.sleep(@scheduler_wall_time_measurement_duration) + s2 = :scheduler.get_sample() + {s1, s2} + end) + + {s1, s2} = Task.await(t) + + schedulers = :scheduler.utilization(s1, s2) + + utilization = + Map.new(schedulers, fn + # Scheduler utilization of a normal scheduler with number scheduler_id + {:normal, scheduler_id, util, _percent} -> {:"normal_#{scheduler_id}", util * 100} + # Scheduler utilization of a dirty-cpu scheduler with number scheduler_id + {:cpu, scheduler_id, util, _percent} -> {:"cpu_#{scheduler_id}", util * 100} + # Total utilization of all normal and dirty-cpu schedulers + {:total, util, _percent} -> {:total, util * 100} + # Total utilization of all normal and dirty-cpu schedulers, weighted against maximum amount of available CPU time + {:weighted, util, _percent} -> {:weighted, util * 100} + end) + + :telemetry.execute([:vm, :scheduler_utilization], utilization) + end + + def run_queue_lengths(_) do + scheduler_ids = ElectricTelemetry.scheduler_ids() + + run_queue_lengths = :erlang.statistics(:run_queue_lengths_all) + + measurements = + Enum.zip(scheduler_ids, run_queue_lengths) + |> Map.new() + |> Map.put(:total, :erlang.statistics(:total_run_queue_lengths)) + |> Map.put(:total_plus_io, :erlang.statistics(:total_run_queue_lengths_all)) + + :telemetry.execute([:vm, :run_queue_lengths], measurements) + end + + def garbage_collection(_) do + word_size = :erlang.system_info(:wordsize) + {num_gc_runs, num_words_reclaimed, 0} = :erlang.statistics(:garbage_collection) + + :telemetry.execute([:vm, :garbage_collection], %{ + total_runs: num_gc_runs, + total_bytes_reclaimed: num_words_reclaimed * word_size + }) + end + + def reductions(_) do + {total_reductions, reductions_since_last_call} = :erlang.statistics(:reductions) + + :telemetry.execute([:vm, :reductions], %{ + total: total_reductions, + delta: reductions_since_last_call + }) + end + + def get_system_load_average(_) do + case :erlang.system_info(:logical_processors) do + cores when is_number(cores) and cores > 0 -> + # > The load values are proportional to how long time a runnable Unix + # > process has to spend in the run queue before it is scheduled. + # > Accordingly, higher values mean more system load. The returned value + # > divided by 256 produces the figure displayed by rup and top. + # + # I'm going one step further and dividing by the number of CPUs so in a 4 + # core system, a load of 4.0 (in top) will show as 100%. + # Since load can go above num cores, we can to 200%, 300% but + # I think this makes sense. + # + # Certainly the formula in the erlang docs: + # + # > the following simple mathematical transformation can produce the load + # > value as a percentage: + # > + # > PercentLoad = 100 * (1 - D/(D + Load)) + # > + # > D determines which load value should be associated with which + # > percentage. Choosing D = 50 means that 128 is 60% load, 256 is 80%, 512 + # > is 90%, and so on. + # + # Makes little sense. Setting `D` as they say and plugging in a avg1 value + # of 128 does not give 60% so I'm not sure how to square what they say with + # the numbers... + # + # e.g. my machine currently has a cpu util (:cpu_sup.util()) of 4% and an + # avg1() of 550 ish across 24 cores (so doing very little) but that formula + # would give a `PercentLoad` of ~92%. + # + # My version would give value of 550 / 256 / 24 = 9% + [:avg1, :avg5, :avg15] + |> Enum.reduce(%{}, fn probe, acc -> + case apply(:cpu_sup, probe, []) do + {:error, reason} -> + Logger.debug("Failed to collect system load #{probe}: #{inspect(reason)}") + acc + + value -> + Map.put(acc, probe, 100 * (value / 256 / cores)) + end + end) + |> case do + x when x == %{} -> :ok + map -> :telemetry.execute([:system, :load_percent], map) + end + + _ -> + Logger.debug("Failed to collect system load average: no cores reported") + end + end + + @required_system_memory_keys ~w[system_total_memory free_memory]a + + def get_system_memory_usage(_) do + system_memory = Map.new(:memsup.get_system_memory_data()) + + # Sanity-check that all the required keys are present before doing any arithmetic on them + missing_system_memory_keys = + Enum.reject(@required_system_memory_keys, &Map.has_key?(system_memory, &1)) + + mem_stats = + cond do + missing_system_memory_keys != [] -> + Logger.warning( + "Error gathering system memory stats: " <> + "missing data points #{Enum.join(missing_system_memory_keys, ", ")}" + ) + + %{} + + system_memory.system_total_memory == 0 -> + Logger.warning("Error gathering system memory stats: zero total memory reported") + %{} + + true -> + total = system_memory.system_total_memory + + used = total - system_memory.free_memory + + mem_stats = + system_memory + |> Map.take(~w[available_memory free_memory buffered_memory cached_memory]a) + |> Map.put(:used_memory, used) + |> Map.merge(resident_memory(system_memory)) + + mem_percent_stats = Map.new(mem_stats, fn {k, v} -> {k, 100 * v / total} end) + + mem_stats = Map.put(mem_stats, :total_memory, total) + + :telemetry.execute([:system, :memory], mem_stats) + :telemetry.execute([:system, :memory_percent], mem_percent_stats) + + mem_stats + end + + Map.merge(mem_stats, swap_stats(:os.type(), system_memory)) + end + + defp resident_memory(%{available_memory: available_memory}) do + %{resident_memory: available_memory} + end + + defp resident_memory(%{ + free_memory: free, + buffered_memory: buffered, + cached_memory: cached, + system_total_memory: total + }) do + %{resident_memory: total - (free + buffered + cached)} + end + + @resident_memory_keys ~w[available_memory free_memory buffered_memory cached_memory]a + defp resident_memory(system_memory) do + missing_keys = + @resident_memory_keys + |> Enum.reject(&Map.has_key?(system_memory, &1)) + + Logger.warning( + "Error gathering resident memory stats: " <> + "missing data points #{Enum.join(missing_keys, ", ")}" + ) + + %{} + end + + defp swap_stats({:unix, :darwin}, _system_memory) do + # On macOS, swap stats are not available + %{} + end + + defp swap_stats(_os_type, %{total_swap: total, free_swap: free}) do + used = total - free + + swap_stats = %{total_swap: total, free_swap: free, used_swap: used} + + swap_percent_stats = + if total > 0 do + %{free_swap: 100 * free / total, used_swap: 100 * used / total} + else + %{free_swap: 0, used_swap: 0} + end + + :telemetry.execute([:system, :swap], swap_stats) + :telemetry.execute([:system, :swap_percent], swap_percent_stats) + + swap_stats + end + + @required_swap_keys ~w[total_swap free_swap]a + defp swap_stats(_os_type, system_memory) do + missing_swap_keys = Enum.reject(@required_swap_keys, &Map.has_key?(system_memory, &1)) + + Logger.warning( + "Error gathering system swap stats: " <> + "missing data points #{Enum.join(missing_swap_keys, ", ")}" + ) + + %{} + end + + defp mean([]), do: nil + + defp mean(list) when is_list(list) do + Enum.sum(list) / Enum.count(list) + end +end diff --git a/packages/electric-telemetry/lib/electric/telemetry/call_home_reporter.ex b/packages/electric-telemetry/lib/electric/telemetry/call_home_reporter.ex new file mode 100644 index 0000000000..e55434d331 --- /dev/null +++ b/packages/electric-telemetry/lib/electric/telemetry/call_home_reporter.ex @@ -0,0 +1,275 @@ +defmodule ElectricTelemetry.CallHomeReporter do + @moduledoc """ + Reporter that collects runtime telemetry information and sends it to a configured + home server once in a while. The information is aggregated over a period of time, + with percentile values calculated for the metrics that have them. + """ + + use GenServer + + require Logger + + alias Telemetry.Metrics + alias ElectricTelemetry.Measurement + + @type metric :: Telemetry.Metrics.t() + @type report_format :: keyword(metric() | report_format()) + + def start_link(opts) do + name = Keyword.get(opts, :name, __MODULE__) + metrics = Keyword.fetch!(opts, :metrics) + static_info = Keyword.get(opts, :static_info, %{}) + first_report_in = cast_time_to_ms(Keyword.fetch!(opts, :first_report_in)) + reporting_period = cast_time_to_ms(Keyword.fetch!(opts, :reporting_period)) + reporter_fn = Keyword.get(opts, :reporter_fn, &report_home/2) + stack_id = Keyword.get(opts, :stack_id) + telemetry_url = Keyword.fetch!(opts, :call_home_url) + + init_opts = %{ + metrics: metrics, + first_report_in: first_report_in, + reporting_period: reporting_period, + name: name, + static_info: static_info, + reporter_fn: reporter_fn, + stack_id: stack_id, + telemetry_url: telemetry_url + } + + GenServer.start_link(__MODULE__, init_opts, name: name) + end + + def report_home(telemetry_url, results) do + # Isolate the request in a separate task to avoid blocking and + # to not receive any messages from the HTTP pool internals + Task.start(fn -> Req.post!(telemetry_url, json: results, retry: :transient) end) + :ok + end + + def print_stats(name \\ __MODULE__) do + GenServer.call(name, :print_stats) + end + + defp cast_time_to_ms({time, :minute}), do: time * 60 * 1000 + defp cast_time_to_ms({time, :second}), do: time * 1000 + + @impl GenServer + def init(opts) do + %{ + metrics: metrics, + first_report_in: first_report_in, + reporting_period: reporting_period, + name: name, + static_info: static_info, + reporter_fn: reporter_fn, + stack_id: stack_id + } = opts + + # We need to trap exits here so that `terminate/2` callback has more chances to run + # and send data before crash/shutdown + Process.flag(:trap_exit, true) + Process.set_label({:call_home_reporter, name}) + + if stack_id do + Logger.metadata(stack_id: stack_id) + end + + Logger.notice( + "Starting telemetry reporter. Electric will send anonymous usage data to #{opts.telemetry_url}. " <> + "You can configure this with `ELECTRIC_USAGE_REPORTING` environment variable, " <> + "see https://electric-sql.com/docs/reference/telemetry for more information." + ) + + metrics = save_target_path_to_options(metrics) + + groups = Enum.group_by(metrics, & &1.event_name) + + measurement_ctx = Measurement.init(name) + + # Attach a listener per event + handler_ids = + for {event, metrics} <- groups do + id = {__MODULE__, event, self()} + :telemetry.attach(id, event, &__MODULE__.handle_event/4, {metrics, measurement_ctx}) + id + end + + # Save some information about the metrics to use when building an output object + summary_types = + metrics + |> Enum.flat_map(fn + %Metrics.Summary{unit: :unique} = m -> [{get_result_path(m), :count_unique}] + %Metrics.Summary{} = m -> [{get_result_path(m), :summary}] + _ -> [] + end) + |> Map.new() + + all_paths = Enum.map(metrics, &get_result_path/1) + + clearable_paths = + metrics + |> Enum.reject(&Keyword.get(&1.reporter_options, :persist_between_sends, false)) + |> Enum.map(&get_result_path/1) + + Process.send_after(self(), :report, first_report_in) + + {:ok, + %{ + telemetry_url: opts.telemetry_url, + measurement_ctx: measurement_ctx, + handler_ids: handler_ids, + summary_types: summary_types, + all_paths: all_paths, + reporting_period: reporting_period, + static_info: static_info, + clearable_paths: clearable_paths, + reporter_fn: reporter_fn, + last_reported: DateTime.utc_now() + }} + end + + @impl GenServer + def terminate(_, state) do + for id <- state.handler_ids do + :telemetry.detach(id) + end + + # On shutdown try to push all the data we still can. + state.reporter_fn.(state.telemetry_url, build_report(state)) + end + + @impl GenServer + def handle_call(:print_stats, _from, state) do + {:reply, build_stats(state), state} + end + + @impl GenServer + def handle_info(:report, state) do + full_report = build_report(state) + + state = + try do + :ok = state.reporter_fn.(state.telemetry_url, full_report) + clear_stats(%{state | last_reported: full_report.timestamp}) + rescue + e -> + Logger.warning( + "Reporter function failed while trying to send telemetry data.\nError: #{Exception.format(:error, e, __STACKTRACE__)}" + ) + + state + end + + # If we've failed to send the results for more than 24 hours, then drop current stats + # to save memory + state = + if DateTime.diff(DateTime.utc_now(), state.last_reported, :hour) >= 24 do + clear_stats(%{state | last_reported: DateTime.utc_now()}) + else + state + end + + Process.send_after(self(), :report, state.reporting_period) + {:noreply, state} + end + + defp build_report(state) do + %{ + last_reported: state.last_reported, + timestamp: DateTime.utc_now(), + report_version: 2, + data: build_stats(state) + } + end + + defp build_stats(state) do + state.all_paths + |> Enum.map(fn path -> + default = + case state.summary_types[path] do + :summary -> %{min: 0, max: 0, mean: 0} + _ -> 0 + end + + {path, Measurement.calc_metric(state.measurement_ctx, path, default)} + end) + |> Enum.reduce(%{}, fn {path, val}, acc -> + path = path |> Tuple.to_list() |> Enum.map(&Access.key(&1, %{})) + put_in(acc, path, val) + end) + |> deep_merge(state.static_info) + end + + defp clear_stats(state) do + for key <- state.clearable_paths do + Measurement.clear_metric(state.measurement_ctx, key) + end + + state + end + + def handle_event(_event_name, measurements, metadata, {metrics, measurement_ctx}) do + for %{reporter_options: opts} = metric <- metrics, keep?(metric, metadata) do + path = Keyword.fetch!(opts, :result_path) + measurement = extract_measurement(metric, measurements, metadata) + + case metric do + %Metrics.Counter{} -> + Measurement.handle_counter(measurement_ctx, path) + + %Metrics.Sum{} -> + Measurement.handle_sum(measurement_ctx, path, measurement) + + %Metrics.LastValue{} -> + Measurement.handle_last_value(measurement_ctx, path, measurement) + + %Metrics.Summary{unit: :unique} -> + value = metadata[Keyword.fetch!(opts, :count_unique)] + Measurement.handle_unique_count(measurement_ctx, path, value) + + %Metrics.Summary{} -> + Measurement.handle_summary(measurement_ctx, path, measurement) + end + end + end + + defp keep?(%{keep: nil}, _metadata), do: true + defp keep?(metric, metadata), do: metric.keep.(metadata) + + defp extract_measurement(metric, measurements, metadata) do + case metric.measurement do + fun when is_function(fun, 2) -> fun.(measurements, metadata) + fun when is_function(fun, 1) -> fun.(measurements) + key -> measurements[key] + end + end + + @spec save_target_path_to_options(report_format()) :: [metric()] + defp save_target_path_to_options(report, prefix \\ []) when is_list(report) do + Enum.flat_map(report, fn + {k, v} when is_list(v) -> + save_target_path_to_options(v, prefix ++ [k]) + + {k, v} -> + if v.tags != [], + do: raise("Call home reporter doesn't support splitting metrics by tags") + + [ + Map.update!( + v, + :reporter_options, + &Keyword.put(&1, :result_path, List.to_tuple(prefix ++ [k])) + ) + ] + end) + end + + defp get_result_path(%{reporter_options: opts}), do: Keyword.fetch!(opts, :result_path) + + def deep_merge(left, right) do + Map.merge(left, right, fn + _, %{} = l, %{} = r -> deep_merge(l, r) + _, _, r -> r + end) + end +end diff --git a/packages/sync-service/lib/electric/telemetry/measurement.ex b/packages/electric-telemetry/lib/electric/telemetry/measurement.ex similarity index 99% rename from packages/sync-service/lib/electric/telemetry/measurement.ex rename to packages/electric-telemetry/lib/electric/telemetry/measurement.ex index fa913763df..a343f7c7d1 100644 --- a/packages/sync-service/lib/electric/telemetry/measurement.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/measurement.ex @@ -1,4 +1,4 @@ -defmodule Electric.Telemetry.Measurement do +defmodule ElectricTelemetry.Measurement do @moduledoc """ Module for handling telemetry measurements using ETS for storage. Supports counters, sums, last values, unique counts, and summaries. diff --git a/packages/electric-telemetry/lib/electric/telemetry/opts.ex b/packages/electric-telemetry/lib/electric/telemetry/opts.ex new file mode 100644 index 0000000000..163ca769da --- /dev/null +++ b/packages/electric-telemetry/lib/electric/telemetry/opts.ex @@ -0,0 +1,78 @@ +defmodule ElectricTelemetry.Opts do + def schema do + [ + instance_id: [type: :string, required: true], + installation_id: [type: :string], + stack_id: [type: :string], + version: [type: :string, required: true], + reporters: [ + type: :keyword_list, + default: [], + keys: [ + statsd_host: [type: {:or, [:string, nil]}, default: nil], + call_home_url: [type: {:or, [:string, {:struct, URI}, nil]}, default: nil], + otel_metrics?: [type: :boolean, default: false], + prometheus?: [type: :boolean, default: false], + otel_resource_attributes: [type: :map, default: %{}] + ] + ], + intervals_and_thresholds: [ + type: :keyword_list, + default: [], + keys: [ + system_metrics_poll_interval: [type: :integer, default: :timer.seconds(5)], + top_process_count: [type: :integer, default: 5], + # Garbage collection should run almost instantly since each process has its own heap that + # is garbage collected independently of others. 50ms might be too generous. + long_gc_threshold: [type: :integer, default: 50], + # A process generally runs for 1ms at a time. Erlang docs mention that 100ms should be + # expected in a realistic production setting. So we tentatively set it to 150ms. + long_schedule_threshold: [type: :integer, default: 150], + # All processes generally have 0 message waiting in the message queue. If a process starts + # lagging behind and reaches 10 pending messages, something's going seriously wrong in the + # VM. + # We tentatively set the threshold to 20 to observe in production and adjust. + long_message_queue_enable_threshold: [type: :integer, default: 20], + long_message_queue_disable_threshold: [type: :integer, default: 0] + ] + ], + periodic_measurements: [ + type: + {:list, + {:or, + [ + {:in, + [:builtin, :memory, :persistent_term, :system_counts, :total_run_queue_lengths]}, + :atom, + :mfa, + {:fun, 1} + ]}} + ], + additional_metrics: [ + type: + {:list, + {:or, + [ + {:struct, Telemetry.Metrics.Counter}, + {:struct, Telemetry.Metrics.LastValue}, + {:struct, Telemetry.Metrics.Sum}, + {:struct, Telemetry.Metrics.Summary}, + {:struct, Telemetry.Metrics.Distribution} + ]}} + ], + otel_opts: [ + type: :keyword_list, + keys: [ + otlp_protocol: [type: {:in, [:http_protobuf]}, default: :http_protobuf], + otlp_compression: [type: {:in, [:gzip]}, default: :gzip], + # The otlp_endpoint option is actually required but we rely on OtelMetricExporter + # fetching it from the app env if it's not passed explicitly. + otlp_endpoint: [type: :string], + otlp_headers: [type: :map, default: %{}], + export_period: [type: :integer, default: :timer.seconds(30)], + resource: [type: :map, default: %{}] + ] + ] + ] + end +end diff --git a/packages/electric-telemetry/lib/electric/telemetry/poller.ex b/packages/electric-telemetry/lib/electric/telemetry/poller.ex new file mode 100644 index 0000000000..12607ac9d1 --- /dev/null +++ b/packages/electric-telemetry/lib/electric/telemetry/poller.ex @@ -0,0 +1,43 @@ +defmodule ElectricTelemetry.Poller do + @callback builtin_periodic_measurements(map) :: list() + + def child_spec(telemetry_opts, poller_opts) do + {callback_module, opts} = Keyword.pop(poller_opts, :callback_module) + + case periodic_measurements(telemetry_opts, callback_module) do + [] -> + nil + + measurements -> + opts = + Keyword.merge( + [ + measurements: measurements, + period: telemetry_opts.intervals_and_thresholds.system_metrics_poll_interval, + init_delay: :timer.seconds(5) + ], + opts + ) + + {:telemetry_poller, opts} + end + end + + def periodic_measurements(%{periodic_measurements: measurements} = telemetry_opts, module) do + Enum.flat_map(measurements, fn + :builtin -> module.builtin_periodic_measurements(telemetry_opts) + # These are implemented by telemetry_poller + f when f in [:memory, :persistent_term, :system_counts, :total_run_queue_lengths] -> [f] + # Bare function names are assumed to be referring to functions defined in the caller module + f when is_atom(f) -> {module, f, [telemetry_opts]} + f when is_function(f, 1) -> {__MODULE__, :user_measurement, [f, telemetry_opts]} + {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> [{m, f, a ++ [telemetry_opts]}] + end) + end + + def periodic_measurements(telemetry_opts, module), + do: module.builtin_periodic_measurements(telemetry_opts) + + # Helper function to enable telemetry_poller to call a user-provided anonymous function + def user_measurement(f, telemetry_opts), do: f.(telemetry_opts) +end diff --git a/packages/sync-service/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex similarity index 97% rename from packages/sync-service/lib/electric/telemetry/processes.ex rename to packages/electric-telemetry/lib/electric/telemetry/processes.ex index 5821ba5dc4..01bfe44639 100644 --- a/packages/sync-service/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -1,4 +1,4 @@ -defmodule Electric.Telemetry.Processes do +defmodule ElectricTelemetry.Processes do @default_count 5 def proc_type(pid), do: proc_type(pid, info(pid)) diff --git a/packages/electric-telemetry/lib/electric/telemetry/reporters/call_home_reporter.ex b/packages/electric-telemetry/lib/electric/telemetry/reporters/call_home_reporter.ex new file mode 100644 index 0000000000..0ac7d19199 --- /dev/null +++ b/packages/electric-telemetry/lib/electric/telemetry/reporters/call_home_reporter.ex @@ -0,0 +1,116 @@ +defmodule ElectricTelemetry.Reporters.CallHomeReporter do + import Telemetry.Metrics + + def child_spec(telemetry_opts, reporter_opts) do + if call_home_url = get_in(telemetry_opts, [:reporters, :call_home_url]) do + start_opts = + Keyword.merge( + [ + static_info: static_info(telemetry_opts), + call_home_url: call_home_url, + first_report_in: {2, :minute}, + reporting_period: {30, :minute} + ], + reporter_opts + ) + + {ElectricTelemetry.CallHomeReporter, start_opts} + end + end + + # IMPORTANT: these metrics are validated on the receiver side, so if you change them, + # make sure you also change the receiver + def application_metrics do + [ + resources: [ + uptime: + last_value("vm.uptime.total", + unit: :second, + measurement: &:erlang.convert_time_unit(&1.total, :native, :second) + ), + used_memory: summary("vm.memory.total", unit: :byte), + run_queue_total: summary("vm.total_run_queue_lengths.total"), + run_queue_cpu: summary("vm.total_run_queue_lengths.cpu"), + run_queue_io: summary("vm.total_run_queue_lengths.io") + ], + system: [ + load_avg1: last_value("system.load_percent.avg1"), + load_avg5: last_value("system.load_percent.avg5"), + load_avg15: last_value("system.load_percent.avg15"), + memory_free: last_value("system.memory.free_memory"), + memory_used: last_value("system.memory.used_memory"), + memory_free_percent: last_value("system.memory_percent.free_memory"), + memory_used_percent: last_value("system.memory_percent.used_memory"), + swap_free: last_value("system.swap.free"), + swap_used: last_value("system.swap.used"), + swap_free_percent: last_value("system.swap_percent.free"), + swap_used_percent: last_value("system.swap_percent.used") + ] + ] + end + + # IMPORTANT: these metrics are validated on the receiver side, so if you change them, + # make sure you also change the receiver + def stack_metrics(stack_id) do + [ + environment: + [ + pg_version: + last_value("electric.postgres.info_looked_up.pg_version", + reporter_options: [persist_between_sends: true] + ) + ] + |> ElectricTelemetry.keep_for_stack(stack_id), + usage: + [ + inbound_bytes: + sum("electric.postgres.replication.transaction_received.bytes", unit: :byte), + inbound_transactions: sum("electric.postgres.replication.transaction_received.count"), + inbound_operations: + sum("electric.postgres.replication.transaction_received.operations"), + stored_bytes: sum("electric.storage.transaction_stored.bytes", unit: :byte), + stored_transactions: sum("electric.storage.transaction_stored.count"), + stored_operations: sum("electric.storage.transaction_stored.operations"), + total_used_storage_kb: last_value("electric.storage.used", unit: {:byte, :kilobyte}), + total_shapes: last_value("electric.shapes.total_shapes.count"), + active_shapes: + summary("electric.plug.serve_shape.monotonic_time", + unit: :unique, + reporter_options: [count_unique: :shape_handle], + keep: &(&1.status < 300) + ), + unique_clients: + summary("electric.plug.serve_shape.monotonic_time", + unit: :unique, + reporter_options: [count_unique: :client_ip], + keep: &(&1.status < 300) + ), + sync_requests: + counter("electric.plug.serve_shape.monotonic_time", keep: &(&1[:live] != true)), + live_requests: counter("electric.plug.serve_shape.monotonic_time", keep: & &1[:live]), + served_bytes: sum("electric.plug.serve_shape.bytes", unit: :byte), + wal_size: summary("electric.postgres.replication.wal_size", unit: :byte) + ] + |> ElectricTelemetry.keep_for_stack(stack_id) + ] + end + + def static_info(telemetry_opts) do + {total_mem, _, _} = :memsup.get_memory_data() + processors = :erlang.system_info(:logical_processors) + {os_family, os_name} = :os.type() + arch = :erlang.system_info(:system_architecture) + + %{ + electric_version: telemetry_opts.version, + environment: %{ + os: %{family: os_family, name: os_name}, + arch: to_string(arch), + cores: processors, + ram: total_mem, + electric_instance_id: Map.fetch!(telemetry_opts, :instance_id), + electric_installation_id: Map.get(telemetry_opts, :installation_id, "electric_default") + } + } + end +end diff --git a/packages/electric-telemetry/lib/electric/telemetry/reporters/otel.ex b/packages/electric-telemetry/lib/electric/telemetry/reporters/otel.ex new file mode 100644 index 0000000000..543a4c2b0a --- /dev/null +++ b/packages/electric-telemetry/lib/electric/telemetry/reporters/otel.ex @@ -0,0 +1,9 @@ +defmodule ElectricTelemetry.Reporters.Otel do + def child_spec(telemetry_opts, reporter_opts) do + if get_in(telemetry_opts, [:reporters, :otel_metrics?]) do + otel_opts = Map.get(telemetry_opts, :otel_opts, []) + start_opts = otel_opts ++ reporter_opts + {OtelMetricExporter, start_opts} + end + end +end diff --git a/packages/electric-telemetry/lib/electric/telemetry/reporters/prometheus.ex b/packages/electric-telemetry/lib/electric/telemetry/reporters/prometheus.ex new file mode 100644 index 0000000000..ff95c3478c --- /dev/null +++ b/packages/electric-telemetry/lib/electric/telemetry/reporters/prometheus.ex @@ -0,0 +1,21 @@ +defmodule ElectricTelemetry.Reporters.Prometheus do + def child_spec(telemetry_opts, reporter_opts) do + if get_in(telemetry_opts, [:reporters, :prometheus?]) do + {TelemetryMetricsPrometheus.Core, add_buckets_to_distribution_metrics(reporter_opts)} + end + end + + defp add_buckets_to_distribution_metrics(opts) do + Keyword.update!(opts, :metrics, fn metrics -> + Enum.map(metrics, fn + %Telemetry.Metrics.Distribution{} = metric -> add_buckets_to_metric(metric) + metric -> metric + end) + end) + end + + @buckets [0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1] + defp add_buckets_to_metric(metric) do + Map.update!(metric, :reporter_options, &Keyword.put_new(&1, :buckets, @buckets)) + end +end diff --git a/packages/electric-telemetry/lib/electric/telemetry/reporters/statsd.ex b/packages/electric-telemetry/lib/electric/telemetry/reporters/statsd.ex new file mode 100644 index 0000000000..78702a4f69 --- /dev/null +++ b/packages/electric-telemetry/lib/electric/telemetry/reporters/statsd.ex @@ -0,0 +1,70 @@ +defmodule ElectricTelemetry.Reporters.Statsd do + import Telemetry.Metrics + + def child_spec(telemetry_opts, reporter_opts) do + if host = get_in(telemetry_opts, [:reporters, :statsd_host]) do + start_opts = + Keyword.merge( + [ + host: host, + formatter: :datadog, + global_tags: [instance_id: telemetry_opts.instance_id] + ], + reporter_opts + ) + + {TelemetryMetricsStatsd, start_opts} + end + end + + def application_metrics do + [ + last_value("vm.memory.total", unit: :byte), + last_value("vm.memory.processes_used", unit: :byte), + last_value("vm.memory.binary", unit: :byte), + last_value("vm.memory.ets", unit: :byte), + last_value("vm.total_run_queue_lengths.total"), + last_value("vm.total_run_queue_lengths.cpu"), + last_value("vm.total_run_queue_lengths.io"), + last_value("system.load_percent.avg1"), + last_value("system.load_percent.avg5"), + last_value("system.load_percent.avg15"), + last_value("system.memory.free_memory"), + last_value("system.memory.used_memory"), + last_value("system.swap.free"), + last_value("system.swap.used") + ] + |> add_instance_id_tag() + end + + def stack_metrics(stack_id) do + [ + summary("plug.router_dispatch.stop.duration", + tags: [:route], + unit: {:native, :millisecond} + ), + summary("plug.router_dispatch.exception.duration", + tags: [:route], + unit: {:native, :millisecond} + ), + summary("electric.shape_cache.create_snapshot_task.stop.duration", + unit: {:native, :millisecond} + ), + summary("electric.storage.make_new_snapshot.stop.duration", + unit: {:native, :millisecond} + ), + summary("electric.querying.stream_initial_data.stop.duration", + unit: {:native, :millisecond} + ), + last_value("electric.connection.consumers_ready.duration", unit: {:native, :millisecond}), + last_value("electric.connection.consumers_ready.total"), + last_value("electric.connection.consumers_ready.before_recovery") + ] + |> add_instance_id_tag() + |> ElectricTelemetry.keep_for_stack(stack_id) + end + + defp add_instance_id_tag(metrics) do + Enum.map(metrics, fn metric -> Map.update!(metric, :tags, &[:instance_id | &1]) end) + end +end diff --git a/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex new file mode 100644 index 0000000000..fe33711c79 --- /dev/null +++ b/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex @@ -0,0 +1,113 @@ +defmodule ElectricTelemetry.StackTelemetry do + @moduledoc """ + Collects and exports stack level telemetry such as database and shape metrics. + + If multiple databases are used, each database will have it's own stack and it's own StackTelemetry. + + See also ApplicationTelemetry for application/system level specific telemetry. + """ + use Supervisor + + import Telemetry.Metrics + + alias ElectricTelemetry.Reporters + + require Logger + + @behaviour ElectricTelemetry.Poller + + def start_link(opts) do + with {:ok, opts} <- ElectricTelemetry.validate_options(opts) do + if ElectricTelemetry.export_enabled?(opts) do + Supervisor.start_link(__MODULE__, opts) + else + # Avoid starting the telemetry supervisor and its telemetry_poller child if we're not + # intending to export periodic measurements metrics anywhere. + :ignore + end + end + end + + @impl Supervisor + def init(%{stack_id: stack_id} = opts) do + Process.set_label({:stack_telemetry_supervisor, stack_id}) + Logger.metadata(stack_id: stack_id) + + children = + [ + ElectricTelemetry.Poller.child_spec(opts, + callback_module: __MODULE__, + init_delay: :timer.seconds(3) + ) + | exporter_child_specs(opts) + ] + |> Enum.reject(&is_nil/1) + + Supervisor.init(children, strategy: :one_for_one) + end + + defp exporter_child_specs(%{stack_id: stack_id} = opts) do + metrics = metrics(opts) + + [ + Reporters.CallHomeReporter.child_spec( + opts, + stack_id: stack_id, + name: :"stack_call_home_telemetry_#{stack_id}", + metrics: Reporters.CallHomeReporter.stack_metrics(stack_id) + ), + Reporters.Otel.child_spec(opts, + name: :"stack_otel_telemetry_#{stack_id}", + metrics: metrics + ), + Reporters.Prometheus.child_spec(opts, + name: :"stack_prometheus_telemetry_#{stack_id}", + metrics: metrics + ), + Reporters.Statsd.child_spec(opts, metrics: Reporters.Statsd.stack_metrics(opts.stack_id)) + ] + end + + @impl ElectricTelemetry.Poller + def builtin_periodic_measurements(_), do: [] + + def metrics(telemetry_opts) do + [ + distribution("electric.plug.serve_shape.duration", + unit: {:native, :millisecond}, + keep: fn metadata -> metadata[:live] != true end + ), + distribution("electric.shape_cache.create_snapshot_task.stop.duration", + unit: {:native, :millisecond} + ), + distribution("electric.storage.make_new_snapshot.stop.duration", + unit: {:native, :millisecond} + ), + distribution("electric.postgres.replication.transaction_received.receive_lag", + unit: :millisecond + ), + distribution("electric.postgres.replication.transaction_received.operations"), + distribution("electric.storage.transaction_stored.replication_lag", unit: :millisecond), + last_value("electric.postgres.replication.wal_size", unit: :byte), + last_value("electric.storage.used", unit: {:byte, :kilobyte}), + last_value("electric.shapes.total_shapes.count"), + last_value("electric.shapes.active_shapes.count"), + counter("electric.postgres.replication.transaction_received.count"), + sum("electric.postgres.replication.transaction_received.bytes", unit: :byte), + sum("electric.storage.transaction_stored.bytes", unit: :byte), + last_value("electric.shape_monitor.active_reader_count"), + last_value("electric.connection.consumers_ready.duration", + unit: {:native, :millisecond} + ), + last_value("electric.connection.consumers_ready.total"), + last_value("electric.connection.consumers_ready.failed_to_recover"), + last_value("electric.admission_control.acquire.current"), + sum("electric.admission_control.reject.count") + | additional_metrics(telemetry_opts) + ] + |> ElectricTelemetry.keep_for_stack(telemetry_opts.stack_id) + end + + def additional_metrics(%{additional_metrics: metrics}), do: metrics + def additional_metrics(_), do: [] +end diff --git a/packages/sync-service/lib/electric/telemetry/system_monitor.ex b/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex similarity index 95% rename from packages/sync-service/lib/electric/telemetry/system_monitor.ex rename to packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex index e46bfb3ebe..ebb0764020 100644 --- a/packages/sync-service/lib/electric/telemetry/system_monitor.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex @@ -1,4 +1,4 @@ -defmodule Electric.Telemetry.SystemMonitor do +defmodule ElectricTelemetry.SystemMonitor do @moduledoc """ Application-wide process that initializes Erlang's system monitor and consumes monitoring events. @@ -12,7 +12,7 @@ defmodule Electric.Telemetry.SystemMonitor do use GenServer - import Electric.Telemetry.Processes, only: [proc_type: 1] + import ElectricTelemetry.Processes, only: [proc_type: 1] require Logger @@ -21,7 +21,7 @@ defmodule Electric.Telemetry.SystemMonitor do @vm_monitor_long_message_queue [:vm, :monitor, :long_message_queue] def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) + GenServer.start_link(__MODULE__, opts.intervals_and_thresholds, name: __MODULE__) end def init(opts) do @@ -103,6 +103,7 @@ defmodule Electric.Telemetry.SystemMonitor do def handle_info({:monitor, pid, :long_message_queue, false}, state) do Logger.debug("Long message queue no longer detected for pid #{inspect(pid)}") + {:noreply, %{state | long_message_queue_pids: Map.delete(state.long_message_queue_pids, pid)}} end diff --git a/packages/electric-telemetry/mix.exs b/packages/electric-telemetry/mix.exs new file mode 100644 index 0000000000..6d5c337c83 --- /dev/null +++ b/packages/electric-telemetry/mix.exs @@ -0,0 +1,55 @@ +defmodule ElectricTelemetry.MixProject do + use Mix.Project + + def project do + [ + app: :electric_telemetry, + version: version(), + elixir: "~> 1.17", + start_permanent: Mix.env() == :prod, + deps: deps(), + test_coverage: [tool: ExCoveralls] + ] + end + + def application do + [extra_applications: [:logger, :os_mon, :runtime_tools]] + end + + defp deps do + List.flatten( + [ + {:otel_metric_exporter, "~> 0.4.1"}, + {:req, "~> 0.5"}, + {:telemetry, "~> 1.3"}, + {:telemetry_metrics, "~> 1.1"}, + {:telemetry_metrics_prometheus_core, "~> 1.2"}, + {:telemetry_metrics_statsd, "~> 0.7"}, + {:telemetry_poller, "~> 1.3"} + ], + dev_and_test_deps() + ) + end + + defp dev_and_test_deps do + [ + {:dialyxir, "~> 1.4", only: [:test], runtime: false}, + {:excoveralls, "~> 0.18", only: [:test], runtime: false}, + {:junit_formatter, "~> 3.4", only: [:test], runtime: false}, + {:ex_doc, ">= 0.0.0", only: :dev, runtime: false} + ] + end + + defp version(default \\ "0.0.0") do + with :error <- version_from_package_json() do + default + end + end + + defp version_from_package_json do + case File.read("./package.json") do + {:ok, binary} -> binary |> :json.decode() |> Map.fetch!("version") + {:error, _} -> :error + end + end +end diff --git a/packages/electric-telemetry/mix.lock b/packages/electric-telemetry/mix.lock new file mode 100644 index 0000000000..25b23fe15d --- /dev/null +++ b/packages/electric-telemetry/mix.lock @@ -0,0 +1,28 @@ +%{ + "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"}, + "erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"}, + "ex_doc": {:hex, :ex_doc, "0.39.1", "e19d356a1ba1e8f8cfc79ce1c3f83884b6abfcb79329d435d4bbb3e97ccc286e", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "8abf0ed3e3ca87c0847dfc4168ceab5bedfe881692f1b7c45f4a11b232806865"}, + "excoveralls": {:hex, :excoveralls, "0.18.5", "e229d0a65982613332ec30f07940038fe451a2e5b29bce2a5022165f0c9b157e", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "523fe8a15603f86d64852aab2abe8ddbd78e68579c8525ae765facc5eae01562"}, + "finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"}, + "hpax": {:hex, :hpax, "1.0.3", "ed67ef51ad4df91e75cc6a1494f851850c0bd98ebc0be6e81b026e765ee535aa", [:mix], [], "hexpm", "8eab6e1cfa8d5918c2ce4ba43588e894af35dbd8e91e6e55c817bca5847df34a"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "junit_formatter": {:hex, :junit_formatter, "3.4.0", "d0e8db6c34dab6d3c4154c3b46b21540db1109ae709d6cf99ba7e7a2ce4b1ac2", [:mix], [], "hexpm", "bb36e2ae83f1ced6ab931c4ce51dd3dbef1ef61bb4932412e173b0cfa259dacd"}, + "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, + "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"}, + "mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"}, + "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, + "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, + "otel_metric_exporter": {:hex, :otel_metric_exporter, "0.4.2", "2cf96ac9879eb06ebde26fa0856e2cd4d5b5f6127eb9ca587532b89a5f981bfc", [:mix], [{:finch, "~> 0.19", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.15", [hex: :protobuf, repo: "hexpm", optional: false]}, {:retry, "~> 0.19", [hex: :retry, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "e7d7ca69a3863a2b84badeb9dd275a369754d951c7c5a4cd5f321be868c6d613"}, + "protobuf": {:hex, :protobuf, "0.15.0", "c9fc1e9fc1682b05c601df536d5ff21877b55e2023e0466a3855cc1273b74dcb", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "5d7bb325319db1d668838d2691c31c7b793c34111aec87d5ee467a39dac6e051"}, + "req": {:hex, :req, "0.5.16", "99ba6a36b014458e52a8b9a0543bfa752cb0344b2a9d756651db1281d4ba4450", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "974a7a27982b9b791df84e8f6687d21483795882a7840e8309abdbe08bb06f09"}, + "retry": {:hex, :retry, "0.19.0", "aeb326d87f62295d950f41e1255fe6f43280a1b390d36e280b7c9b00601ccbc2", [:mix], [], "hexpm", "85ef376aa60007e7bff565c366310966ec1bd38078765a0e7f20ec8a220d02ca"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"}, + "telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.2.1", "c9755987d7b959b557084e6990990cb96a50d6482c683fb9622a63837f3cd3d8", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "5e2c599da4983c4f88a33e9571f1458bf98b0cf6ba930f1dc3a6e8cf45d5afb6"}, + "telemetry_metrics_statsd": {:hex, :telemetry_metrics_statsd, "0.7.2", "a70cfaf821cb2f3ac2e767988461179add44762d1db752e74dfa0c93449b2857", [:mix], [{:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "933bb8b176b95d5088404b7137d94926b8dea9a74ef2c95d616f2740f1571c13"}, + "telemetry_poller": {:hex, :telemetry_poller, "1.3.0", "d5c46420126b5ac2d72bc6580fb4f537d35e851cc0f8dbd571acf6d6e10f5ec7", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "51f18bed7128544a50f75897db9974436ea9bfba560420b646af27a9a9b35211"}, +} diff --git a/packages/electric-telemetry/package.json b/packages/electric-telemetry/package.json new file mode 100644 index 0000000000..8fb6812eba --- /dev/null +++ b/packages/electric-telemetry/package.json @@ -0,0 +1,5 @@ +{ + "name": "@core/electric-telemetry", + "private": true, + "version": "0.1.0" +} diff --git a/packages/electric-telemetry/test/electric/telemetry/application_telemetry_test.exs b/packages/electric-telemetry/test/electric/telemetry/application_telemetry_test.exs new file mode 100644 index 0000000000..44a4905a4d --- /dev/null +++ b/packages/electric-telemetry/test/electric/telemetry/application_telemetry_test.exs @@ -0,0 +1,34 @@ +defmodule ElectricTelemetry.ApplicationTelemetryTest do + use ExUnit.Case, async: true + + alias ElectricTelemetry.ApplicationTelemetry + + describe "get_system_memory_usage" do + test "returns calculated memory stats" do + case :os.type() do + {:unix, :darwin} -> + assert %{ + total_memory: _, + available_memory: _, + free_memory: _, + used_memory: _, + resident_memory: _ + } = ApplicationTelemetry.get_system_memory_usage(%{}) + + _ -> + assert %{ + total_memory: _, + available_memory: _, + buffered_memory: _, + cached_memory: _, + free_memory: _, + used_memory: _, + resident_memory: _, + total_swap: _, + free_swap: _, + used_swap: _ + } = ApplicationTelemetry.get_system_memory_usage(%{}) + end + end + end +end diff --git a/packages/sync-service/test/electric/telemetry/measurement_test.exs b/packages/electric-telemetry/test/electric/telemetry/measurement_test.exs similarity index 99% rename from packages/sync-service/test/electric/telemetry/measurement_test.exs rename to packages/electric-telemetry/test/electric/telemetry/measurement_test.exs index e87fb9944b..492e9f486a 100644 --- a/packages/sync-service/test/electric/telemetry/measurement_test.exs +++ b/packages/electric-telemetry/test/electric/telemetry/measurement_test.exs @@ -1,7 +1,7 @@ -defmodule Electric.Telemetry.MeasurementTest do +defmodule ElectricTelemetry.MeasurementTest do use ExUnit.Case, async: true - alias Electric.Telemetry.Measurement + alias ElectricTelemetry.Measurement describe "init/1" do test "initializes tables with correct options" do diff --git a/packages/sync-service/test/electric/telemetry/processes_test.exs b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs similarity index 88% rename from packages/sync-service/test/electric/telemetry/processes_test.exs rename to packages/electric-telemetry/test/electric/telemetry/processes_test.exs index 031341005d..8e314d0d8d 100644 --- a/packages/sync-service/test/electric/telemetry/processes_test.exs +++ b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs @@ -1,8 +1,8 @@ -defmodule Electric.Telemetry.ProcessesTest do +defmodule ElectricTelemetry.ProcessesTest do use ExUnit.Case, async: true describe "top_memory_by_type/[1, 2]" do - import Electric.Telemetry.Processes, only: [top_memory_by_type: 0, top_memory_by_type: 1] + import ElectricTelemetry.Processes, only: [top_memory_by_type: 0, top_memory_by_type: 1] test "handles dead processes" do parent = self() diff --git a/packages/electric-telemetry/test/test_helper.exs b/packages/electric-telemetry/test/test_helper.exs new file mode 100644 index 0000000000..3ddc879f1a --- /dev/null +++ b/packages/electric-telemetry/test/test_helper.exs @@ -0,0 +1,8 @@ +# The process registry is implicitly used by processes in the dev, prod and test environments alike. +# +# Explicitly start the process registry here since the OTP application does not start a +# supervision tree in the test environment. +# Registry.start_link(name: Electric.Application.process_registry(), keys: :unique) + +ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter]) +ExUnit.start(assert_receive_timeout: 400, exclude: [:slow], capture_log: true) diff --git a/packages/sync-service/.dockerignore b/packages/sync-service/.dockerignore new file mode 100644 index 0000000000..d983cc0f3c --- /dev/null +++ b/packages/sync-service/.dockerignore @@ -0,0 +1,3 @@ +_build/ +deps/ +test/ diff --git a/packages/sync-service/Dockerfile b/packages/sync-service/Dockerfile index 12e3d8b0cd..cc5a1e1e63 100644 --- a/packages/sync-service/Dockerfile +++ b/packages/sync-service/Dockerfile @@ -20,27 +20,29 @@ ARG MIX_ENV=prod ARG ELECTRIC_VERSION ARG MIX_TARGET -WORKDIR /app +WORKDIR /builder/electric + +COPY --from=electric-telemetry / /builder/electric-telemetry -COPY mix.* /app/ +COPY mix.* /builder/electric/ RUN mix deps.get RUN MIX_OS_DEPS_COMPILE_PARTITION_COUNT=4 mix deps.compile # These are ordered by change frequency, with the least frequently changing dir first. -COPY rel /app/rel -COPY lib /app/lib/ +COPY rel /builder/electric/rel +COPY lib /builder/electric/lib/ -COPY package.json /app/ -COPY config/config.exs /app/config/ +COPY package.json /builder/electric/ +COPY config/config.exs /builder/electric/config/ RUN mix compile RUN mix sentry.package_source_code -COPY config/runtime.exs /app/config/ +COPY config/runtime.exs /builder/electric/config/ RUN mix release -RUN ls -l /app/_build +RUN ls -l /builder/electric/_build FROM ${RUNNER_IMAGE} AS runner_setup @@ -58,7 +60,7 @@ ENV LANG=en_US.UTF-8 \ MIX_ENV=prod \ MIX_TARGET=application -WORKDIR "/app" +WORKDIR /app RUN chown nobody /app @@ -68,7 +70,7 @@ ARG MIX_TARGET ARG RELEASE_NAME=electric -COPY --from=builder "/app/_build/${MIX_TARGET}_prod/rel/${RELEASE_NAME}" ./ +COPY --from=builder "/builder/electric/_build/${MIX_TARGET}_prod/rel/${RELEASE_NAME}" ./ RUN mv /app/bin/${RELEASE_NAME} /app/bin/entrypoint @@ -77,3 +79,4 @@ HEALTHCHECK --start-period=10s CMD curl --fail http://localhost:${ELECTRIC_PORT- ENTRYPOINT ["/app/bin/entrypoint"] CMD ["start"] + diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 490ba6423c..f06ebb71df 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -37,10 +37,6 @@ if config_env() == :test do config :logger, :default_handler, level: test_log_level end -# Disable the default telemetry_poller process since we start our own in -# `Electric.Telemetry.{ApplicationTelemetry, StackTelemetry}`. -config :telemetry_poller, default: false - service_name = env!("ELECTRIC_SERVICE_NAME", :string, "electric") instance_id = env!("ELECTRIC_INSTANCE_ID", :string, Electric.Utils.uuid4()) @@ -212,7 +208,6 @@ config :electric, telemetry_url: call_home_telemetry_url, system_metrics_poll_interval: system_metrics_poll_interval, otel_export_period: otel_export_period, - otel_per_process_metrics?: env!("ELECTRIC_OTEL_PER_PROCESS_METRICS", :boolean, nil), otel_sampling_ratio: env!("ELECTRIC_OTEL_SAMPLING_RATIO", :float, nil), metrics_sampling_ratio: env!("ELECTRIC_METRICS_SAMPLING_RATIO", :float, nil), telemetry_top_process_count: env!("ELECTRIC_TELEMETRY_TOP_PROCESS_COUNT", :integer, nil), @@ -274,42 +269,20 @@ config :electric, ) if Electric.telemetry_enabled?() do + # Disable the default telemetry_poller process since we start our own in + # `ElectricTelemetry.{ApplicationTelemetry, StackTelemetry}`. + config :telemetry_poller, default: false + config :sentry, environment_name: config_env(), client: Electric.Telemetry.SentryReqHTTPClient - sentry_dsn = env!("SENTRY_DSN", :string, nil) - - if !is_nil(sentry_dsn) do - config :sentry, - dsn: sentry_dsn + if sentry_dsn = env!("SENTRY_DSN", :string, nil) do + config :sentry, dsn: sentry_dsn end otlp_endpoint = env!("ELECTRIC_OTLP_ENDPOINT", :string, nil) - otel_debug = env!("ELECTRIC_OTEL_DEBUG", :boolean, false) - - otel_batch_processor = - if otlp_endpoint do - {:otel_batch_processor, %{}} - end - - otel_simple_processor = - if otel_debug do - # In this mode, each span is printed to stdout as soon as it ends, without batching. - {:otel_simple_processor, %{exporter: {:otel_exporter_stdout, []}}} - end - - config :opentelemetry, - resource_detectors: [ - :otel_resource_env_var, - :otel_resource_app_env, - Electric.Telemetry.OpenTelemetry.ResourceDetector - ], - resource: %{ - service: %{name: service_name, version: Electric.version()}, - instance: %{id: instance_id} - }, - processors: [otel_batch_processor, otel_simple_processor] |> Enum.reject(&is_nil/1) + otel_debug? = env!("ELECTRIC_OTEL_DEBUG", :boolean, false) if otlp_endpoint do # Shortcut config for Honeycomb.io: @@ -319,7 +292,7 @@ if Electric.telemetry_enabled?() do honeycomb_api_key = env!("ELECTRIC_HNY_API_KEY", :string, nil) honeycomb_dataset = env!("ELECTRIC_HNY_DATASET", :string, nil) - headers = + otlp_headers = Enum.reject( [ {"x-honeycomb-team", honeycomb_api_key}, @@ -328,22 +301,27 @@ if Electric.telemetry_enabled?() do fn {_, val} -> is_nil(val) end ) - config :opentelemetry_exporter, - otlp_protocol: :http_protobuf, - otlp_endpoint: otlp_endpoint, - otlp_headers: headers, - otlp_compression: :gzip + resource = %{ + service: %{name: service_name, version: Electric.version()}, + instance: %{id: instance_id} + } + # We must populate otel_metric_exporter's app env to configure its LogHandler + # and provide base config for the metrics export. config :otel_metric_exporter, - otlp_protocol: :http_protobuf, otlp_endpoint: otlp_endpoint, - otlp_headers: Map.new(headers), - otlp_compression: :gzip, - resource: %{ - name: "metrics", - service: %{name: service_name, version: Electric.version()}, - instance: %{id: instance_id} - } + otlp_headers: Map.new(otlp_headers), + # The `name` resource attribute will be inherited by both metric and log events. This is + # an artifact of otel_metric_exporter's implementation. With some more effort, we could + # allow setting different names for the two types of events. + resource: Map.put(resource, :name, "metrics") + + Electric.Telemetry.OpenTelemetry.Config.configure( + otlp_endpoint: otlp_endpoint, + otlp_headers: otlp_headers, + otel_resource: resource, + otel_debug?: otel_debug? + ) config :electric, :logger, [ {:handler, :otel_log_handler, OtelMetricExporter.LogHandler, diff --git a/packages/sync-service/lib/electric.ex b/packages/sync-service/lib/electric.ex index 0c7097c53b..cdabb2e61c 100644 --- a/packages/sync-service/lib/electric.ex +++ b/packages/sync-service/lib/electric.ex @@ -151,9 +151,7 @@ defmodule Electric do end @telemetry_enabled? Mix.target() == Electric.MixProject.telemetry_target() - def telemetry_enabled? do - @telemetry_enabled? - end + def telemetry_enabled?, do: @telemetry_enabled? def stack_events_registry do Electric.StackEventsRegistry diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index e3d39e7d96..c49f7ca9d6 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -233,8 +233,8 @@ defmodule Electric.Application do end defp application_telemetry(config) do - if Code.ensure_loaded?(Electric.Telemetry.ApplicationTelemetry) do - [{Electric.Telemetry.ApplicationTelemetry, Keyword.fetch!(config, :telemetry_opts)}] + if Code.ensure_loaded?(ElectricTelemetry.ApplicationTelemetry) do + [{ElectricTelemetry.ApplicationTelemetry, Keyword.fetch!(config, :telemetry_opts)}] else [] end @@ -334,20 +334,34 @@ defmodule Electric.Application do [ instance_id: Keyword.fetch!(opts, :instance_id), installation_id: Keyword.fetch!(opts, :installation_id), - system_metrics_poll_interval: get_env(opts, :system_metrics_poll_interval), - statsd_host: get_env(opts, :telemetry_statsd_host), - prometheus?: not is_nil(get_env(opts, :prometheus_port)), - call_home_telemetry?: get_env(opts, :call_home_telemetry?), - otel_metrics?: not is_nil(Application.get_env(:otel_metric_exporter, :otlp_endpoint)), - otel_export_period: get_env(opts, :otel_export_period), - otel_per_process_metrics?: get_env(opts, :otel_per_process_metrics?), - top_process_count: get_env(opts, :telemetry_top_process_count), - long_gc_threshold: get_env(opts, :telemetry_long_gc_threshold), - long_schedule_threshold: get_env(opts, :telemetry_long_schedule_threshold), - long_message_queue_enable_threshold: - get_env(opts, :telemetry_long_message_queue_enable_threshold), - long_message_queue_disable_threshold: - get_env(opts, :telemetry_long_message_queue_disable_threshold) + version: Electric.version(), + intervals_and_thresholds: + get_opts(opts, + system_metrics_poll_interval: :system_metrics_poll_interval, + long_gc_threshold: :telemetry_long_gc_threshold, + long_schedule_threshold: :telemetry_long_schedule_threshold, + long_message_queue_enable_threshold: :telemetry_long_message_queue_enable_threshold, + long_message_queue_disable_threshold: :telemetry_long_message_queue_disable_threshold, + top_process_count: :telemetry_top_process_count + ), + reporters: [ + statsd_host: get_env(opts, :telemetry_statsd_host), + prometheus?: not is_nil(get_env(opts, :prometheus_port)), + call_home_url: + if(get_env(opts, :call_home_telemetry?), do: get_env(opts, :telemetry_url)), + otel_metrics?: not is_nil(Application.get_env(:otel_metric_exporter, :otlp_endpoint)) + ], + otel_opts: get_opts(opts, export_period: :otel_export_period) ] end + + defp get_opts(opts, mapping) do + mapping + |> Enum.map(fn {from, to} -> {from, get_opt(opts, to)} end) + |> Enum.reject(fn {_k, v} -> is_nil(v) end) + end + + defp get_opt(opts, key) do + Keyword.get_lazy(opts, key, fn -> Application.get_env(:electric, key) end) + end end diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index 5e38c528c7..92decbd15e 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -79,24 +79,8 @@ defmodule Electric.Config do call_home_telemetry?: @build_env == :prod, telemetry_statsd_host: nil, telemetry_url: URI.new!("https://checkpoint.electric-sql.com"), - system_metrics_poll_interval: :timer.seconds(5), - otel_export_period: :timer.seconds(30), - otel_per_process_metrics?: false, otel_sampling_ratio: 0.01, metrics_sampling_ratio: 1, - telemetry_top_process_count: 5, - # Garbage collection should run almost instantly since each process has its own heap that - # is garbage collected independently of others. 50ms might be too generous. - telemetry_long_gc_threshold: 50, - # A process generally runs for 1ms at a time. Erlang docs mention that 100ms should be - # expected in a realistic production setting. So we tentatively set it to 150ms. - telemetry_long_schedule_threshold: 500, - # All processes generally have 0 message waiting in the message queue. If a process starts - # lagging behind and reaches 10 pending messages, something's going seriously wrong in the - # VM. - # We tentatively set the threshold to 20 to observe in production and adjust. - telemetry_long_message_queue_enable_threshold: 20, - telemetry_long_message_queue_disable_threshold: 0, ## Memory shape_hibernate_after: :timer.seconds(30), ## Performance tweaks diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index f4940818c7..8e95ee7bec 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -339,22 +339,8 @@ defmodule Electric.StackSupervisor do registry_partitions = Keyword.get(config.tweaks, :registry_partitions, System.schedulers_online()) - telemetry_children = - if Code.ensure_loaded?(Electric.Telemetry.StackTelemetry) do - [ - {Electric.Telemetry.StackTelemetry, - config.telemetry_opts ++ - [ - stack_id: stack_id, - slot_name: config.replication_opts[:slot_name] - ]} - ] - else - [] - end - children = - telemetry_children ++ + telemetry_children(config) ++ [ {Electric.ProcessRegistry, partitions: registry_partitions, stack_id: stack_id}, {Electric.StackConfig, @@ -392,4 +378,101 @@ defmodule Electric.StackSupervisor do Supervisor.init(children, strategy: :one_for_one, auto_shutdown: :any_significant) end + + defp telemetry_children(%{stack_telemetry: stack_telemetry}), do: [stack_telemetry] + + defp telemetry_children(config) do + if Code.ensure_loaded?(ElectricTelemetry.StackTelemetry) do + telemetry_opts = + config.telemetry_opts + |> Keyword.put(:stack_id, config.stack_id) + # Use user-provided periodic measurements or default ones otherwise + |> Keyword.update( + :periodic_measurements, + default_periodic_measurements(config), + & &1 + ) + + [{ElectricTelemetry.StackTelemetry, telemetry_opts}] + else + [] + end + end + + defp default_periodic_measurements(%{stack_id: stack_id} = config) do + [ + {__MODULE__, :count_shapes, [stack_id]}, + {__MODULE__, :count_active_shapes, [stack_id]}, + {__MODULE__, :report_retained_wal_size, [stack_id, config.replication_opts[:slot_name]]} + ] + end + + def count_shapes(stack_id, _telemetry_opts) do + # Telemetry is started before everything else in the stack, so we need to handle + # the case where the shape cache is not started yet. + case Electric.ShapeCache.count_shapes(stack_id) do + :error -> + :ok + + num_shapes -> + Electric.Telemetry.OpenTelemetry.execute( + [:electric, :shapes, :total_shapes], + %{count: num_shapes}, + %{stack_id: stack_id} + ) + end + end + + def count_active_shapes(stack_id, _telemetry_opts) do + Electric.Telemetry.OpenTelemetry.execute( + [:electric, :shapes, :active_shapes], + %{count: Electric.Shapes.ConsumerRegistry.active_consumer_count(stack_id)}, + %{stack_id: stack_id} + ) + end + + @retained_wal_size_query """ + SELECT + pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)::int8 + FROM + pg_replication_slots + WHERE + slot_name = $1 + """ + + @doc false + @spec report_retained_wal_size(Electric.stack_id(), binary(), map()) :: :ok + def report_retained_wal_size(stack_id, slot_name, _telemetry_opts) do + try do + %Postgrex.Result{rows: [[wal_size]]} = + Postgrex.query!( + Electric.Connection.Manager.admin_pool(stack_id), + @retained_wal_size_query, + [slot_name], + timeout: 3_000, + deadline: 3_000 + ) + + # The query above can return `-1` which I'm assuming means "up-to-date". + # This is a confusing stat if we're measuring in bytes, so normalise to + # [0, :infinity) + + Electric.Telemetry.OpenTelemetry.execute( + [:electric, :postgres, :replication], + %{wal_size: max(0, wal_size)}, + %{stack_id: stack_id} + ) + catch + :exit, {:noproc, _} -> + :ok + + # catch all errors to not log them as errors, those are reporing issues at best + type, reason -> + Logger.warning( + "Failed to query retained WAL size\nError: #{Exception.format(type, reason)}", + stack_id: stack_id, + slot_name: slot_name + ) + end + end end diff --git a/packages/sync-service/lib/electric/telemetry.ex b/packages/sync-service/lib/electric/telemetry.ex index e30ebaa867..13023a4753 100644 --- a/packages/sync-service/lib/electric/telemetry.ex +++ b/packages/sync-service/lib/electric/telemetry.ex @@ -1,16 +1,16 @@ defmodule Electric.Telemetry do require Logger - @log_level Application.compile_env(:electric, [Electric.Telemetry, :log_level], false) + @log_level Application.compile_env(:electric, :telemetry_log_level, false) defmacro __using__(_opts) do quote do - import Electric.Telemetry + import Electric.Telemetry, only: [with_telemetry: 2] end end # uses the availability of the given dependencies to optionally compile - # the provided block when MIX_TARGET is `:application`. + # the provided block when telemetry is enabled in the application config defmacro with_telemetry(dependencies, do: block, else: else_block) do include_with_telemetry(__CALLER__, __ENV__, dependencies, block, else_block) end @@ -24,12 +24,12 @@ defmodule Electric.Telemetry do telemetry_code_available? = Enum.all?(modules, &Code.ensure_loaded?/1) if Electric.telemetry_enabled?() && telemetry_code_available? do - if @log_level, - do: - Logger.log( - @log_level, - "Enabling telemetry in #{caller.module || Path.relative_to(caller.file, Path.expand("..", __DIR__))}" - ) + if @log_level do + Logger.log( + @log_level, + "Enabling telemetry in #{caller.module || Path.relative_to(caller.file, Path.expand("..", __DIR__))}" + ) + end quote(do: unquote(block)) else diff --git a/packages/sync-service/lib/electric/telemetry/application_telemetry.ex b/packages/sync-service/lib/electric/telemetry/application_telemetry.ex deleted file mode 100644 index f0aedaafe7..0000000000 --- a/packages/sync-service/lib/electric/telemetry/application_telemetry.ex +++ /dev/null @@ -1,555 +0,0 @@ -use Electric.Telemetry - -with_telemetry [Telemetry.Metrics, OtelMetricExporter] do - defmodule Electric.Telemetry.ApplicationTelemetry do - @moduledoc """ - Collects and exports application level telemetry such as CPU, memory and BEAM metrics. - - See also StackTelemetry for stack specific telemetry. - """ - use Supervisor - - import Telemetry.Metrics - - require Logger - - @opts_schema NimbleOptions.new!(Electric.Telemetry.Opts.schema()) - - def start_link(opts) do - with {:ok, opts} <- NimbleOptions.validate(opts, @opts_schema) do - if telemetry_export_enabled?(Map.new(opts)) do - Supervisor.start_link(__MODULE__, Map.new(opts), name: __MODULE__) - else - # Avoid starting the telemetry supervisor and its telemetry_poller child if we're not - # intending to export periodic measurements metrics anywhere. - :ignore - end - end - end - - def init(opts) do - Process.set_label(:application_telemetry_supervisor) - - [ - system_monitor_child_spec(opts), - telemetry_poller_child_spec(opts) | exporter_child_specs(opts) - ] - |> Supervisor.init(strategy: :one_for_one) - end - - defp system_monitor_child_spec(opts) do - {Electric.Telemetry.SystemMonitor, opts} - end - - defp telemetry_poller_child_spec(opts) do - {:telemetry_poller, - measurements: periodic_measurements(opts), - period: opts.system_metrics_poll_interval, - init_delay: :timer.seconds(5)} - end - - defp telemetry_export_enabled?(opts) do - exporter_child_specs(opts) != [] - end - - defp exporter_child_specs(opts) do - [ - statsd_reporter_child_spec(opts), - prometheus_reporter_child_spec(opts), - call_home_reporter_child_spec(opts), - otel_reporter_child_spec(opts) - ] - |> Enum.reject(&is_nil/1) - end - - defp otel_reporter_child_spec(%{otel_metrics?: true} = opts) do - {OtelMetricExporter, - metrics: otel_metrics(opts), - export_period: opts.otel_export_period, - resource: - Map.merge( - %{instance: %{installation_id: Map.get(opts, :installation_id, "electric_default")}}, - opts.otel_resource_attributes - )} - end - - defp otel_reporter_child_spec(_), do: nil - - defp call_home_reporter_child_spec(%{call_home_telemetry?: true} = opts) do - {Electric.Telemetry.CallHomeReporter, - static_info: static_info(opts), - metrics: call_home_metrics(), - first_report_in: {2, :minute}, - reporting_period: {30, :minute}} - end - - defp call_home_reporter_child_spec(_), do: nil - - defp static_info(opts) do - {total_mem, _, _} = :memsup.get_memory_data() - processors = :erlang.system_info(:logical_processors) - {os_family, os_name} = :os.type() - arch = :erlang.system_info(:system_architecture) - - %{ - electric_version: to_string(Electric.version()), - environment: %{ - os: %{family: os_family, name: os_name}, - arch: to_string(arch), - cores: processors, - ram: total_mem, - electric_instance_id: Map.fetch!(opts, :instance_id), - electric_installation_id: Map.get(opts, :installation_id, "electric_default") - } - } - end - - # IMPORTANT: these metrics are validated on the receiver side, so if you change them, - # make sure you also change the receiver - def call_home_metrics() do - [ - resources: [ - uptime: - last_value("vm.uptime.total", - unit: :second, - measurement: &:erlang.convert_time_unit(&1.total, :native, :second) - ), - used_memory: summary("vm.memory.total", unit: :byte), - run_queue_total: summary("vm.total_run_queue_lengths.total"), - run_queue_cpu: summary("vm.total_run_queue_lengths.cpu"), - run_queue_io: summary("vm.total_run_queue_lengths.io") - ], - system: [ - load_avg1: last_value("system.load_percent.avg1"), - load_avg5: last_value("system.load_percent.avg5"), - load_avg15: last_value("system.load_percent.avg15"), - memory_free: last_value("system.memory.free_memory"), - memory_used: last_value("system.memory.used_memory"), - memory_free_percent: last_value("system.memory_percent.free_memory"), - memory_used_percent: last_value("system.memory_percent.used_memory"), - swap_free: last_value("system.swap.free"), - swap_used: last_value("system.swap.used"), - swap_free_percent: last_value("system.swap_percent.free"), - swap_used_percent: last_value("system.swap_percent.used") - ] - ] - end - - defp statsd_reporter_child_spec(%{statsd_host: host} = opts) when host != nil do - {TelemetryMetricsStatsd, - host: host, - formatter: :datadog, - global_tags: [instance_id: opts.instance_id], - metrics: statsd_metrics()} - end - - defp statsd_reporter_child_spec(_), do: nil - - defp prometheus_reporter_child_spec(%{prometheus?: true}) do - {TelemetryMetricsPrometheus.Core, metrics: prometheus_metrics()} - end - - defp prometheus_reporter_child_spec(_), do: nil - - defp statsd_metrics() do - [ - last_value("vm.memory.total", unit: :byte), - last_value("vm.memory.processes_used", unit: :byte), - last_value("vm.memory.binary", unit: :byte), - last_value("vm.memory.ets", unit: :byte), - last_value("vm.total_run_queue_lengths.total"), - last_value("vm.total_run_queue_lengths.cpu"), - last_value("vm.total_run_queue_lengths.io"), - last_value("system.load_percent.avg1"), - last_value("system.load_percent.avg5"), - last_value("system.load_percent.avg15"), - last_value("system.memory.free_memory"), - last_value("system.memory.used_memory"), - last_value("system.swap.free"), - last_value("system.swap.used") - ] - |> Enum.map(&%{&1 | tags: [:instance_id | &1.tags]}) - end - - defp prometheus_metrics do - num_schedulers = :erlang.system_info(:schedulers) - schedulers_range = 1..num_schedulers - - num_dirty_cpu_schedulers = :erlang.system_info(:dirty_cpu_schedulers) - - dirty_cpu_schedulers_range = - (num_schedulers + 1)..(num_schedulers + num_dirty_cpu_schedulers) - - [ - last_value("process.memory.total", tags: [:process_type], unit: :byte), - last_value("system.cpu.core_count"), - last_value("system.cpu.utilization.total"), - last_value("vm.garbage_collection.total_runs"), - last_value("vm.garbage_collection.total_bytes_reclaimed", unit: :byte), - last_value("vm.memory.atom", unit: :byte), - last_value("vm.memory.atom_used", unit: :byte), - last_value("vm.memory.binary", unit: :byte), - last_value("vm.memory.code", unit: :byte), - last_value("vm.memory.ets", unit: :byte), - last_value("vm.memory.processes", unit: :byte), - last_value("vm.memory.processes_used", unit: :byte), - last_value("vm.memory.system", unit: :byte), - last_value("vm.memory.total", unit: :byte), - last_value("vm.reductions.total"), - last_value("vm.reductions.delta"), - last_value("vm.run_queue_lengths.total"), - last_value("vm.run_queue_lengths.total_plus_io"), - last_value("vm.scheduler_utilization.total"), - last_value("vm.scheduler_utilization.weighted"), - last_value("vm.system_counts.atom_count"), - last_value("vm.system_counts.port_count"), - last_value("vm.system_counts.process_count"), - last_value("vm.total_run_queue_lengths.total"), - last_value("vm.total_run_queue_lengths.cpu"), - last_value("vm.total_run_queue_lengths.io"), - last_value("vm.uptime.total", - unit: :second, - measurement: &:erlang.convert_time_unit(&1.total, :native, :second) - ) - ] ++ - Enum.map( - # Add "system.cpu.utilization.core_*" but since there's no wildcard support we - # explicitly add the cores here. - 0..(:erlang.system_info(:logical_processors) - 1), - &last_value("system.cpu.utilization.core_#{&1}") - ) ++ - Enum.map(scheduler_ids(), &last_value("vm.run_queue_lengths.#{&1}")) ++ - Enum.map(schedulers_range, &last_value("vm.scheduler_utilization.normal_#{&1}")) ++ - Enum.map(dirty_cpu_schedulers_range, &last_value("vm.scheduler_utilization.cpu_#{&1}")) - end - - defp otel_metrics(opts) do - [ - last_value("system.load_percent.avg1"), - last_value("system.load_percent.avg5"), - last_value("system.load_percent.avg15"), - last_value("system.memory_percent.free_memory"), - last_value("system.memory_percent.available_memory"), - last_value("system.memory_percent.used_memory"), - sum("vm.monitor.long_message_queue.length", tags: [:process_type]), - distribution("vm.monitor.long_schedule.timeout", - tags: [:process_type], - unit: :millisecond - ), - distribution("vm.monitor.long_gc.timeout", tags: [:process_type], unit: :millisecond) - ] ++ - prometheus_metrics() ++ - memory_by_process_type_metrics(opts) - end - - defp memory_by_process_type_metrics(%{otel_per_process_metrics?: true}) do - [ - last_value("process.memory.total", tags: [:process_type], unit: :byte) - ] - end - - defp memory_by_process_type_metrics(_), do: [] - - defp scheduler_ids do - num_schedulers = :erlang.system_info(:schedulers) - Enum.map(1..num_schedulers, &:"normal_#{&1}") ++ [:cpu, :io] - end - - defp periodic_measurements(opts) do - word_size = :erlang.system_info(:wordsize) - - [ - # Measurements included with the telemetry_poller application. - # - # By default, The telemetry_poller application starts its own poller but we disable that - # and add its default measurements to the list of our custom ones. - # - # This allows for all periodic measurements to be defined in one place. - :memory, - :total_run_queue_lengths, - :system_counts, - - # Our custom measurements: - {__MODULE__, :uptime_event, []}, - {__MODULE__, :cpu_utilization, []}, - {__MODULE__, :scheduler_utilization, []}, - {__MODULE__, :run_queue_lengths, [scheduler_ids()]}, - {__MODULE__, :garbage_collection, [word_size]}, - {__MODULE__, :reductions, []}, - {__MODULE__, :process_memory, [opts]}, - {__MODULE__, :get_system_load_average, []}, - {__MODULE__, :get_system_memory_usage, []} - ] - end - - def uptime_event do - :telemetry.execute([:vm, :uptime], %{ - total: :erlang.monotonic_time() - :erlang.system_info(:start_time) - }) - end - - def process_memory(%{top_process_count: process_count}) do - for %{type: type, memory: memory} <- - Electric.Telemetry.Processes.top_memory_by_type(process_count) do - :telemetry.execute([:process, :memory], %{total: memory}, %{process_type: to_string(type)}) - end - end - - def cpu_utilization do - case :cpu_sup.util([:per_cpu]) do - {:error, reason} -> - Logger.debug("Failed to collect CPU utilization: #{inspect(reason)}") - - data -> - {per_core_utilization, bare_values} = - for {cpu_index, busy, _free, _misc} <- data do - {{:"core_#{cpu_index}", busy}, busy} - end - |> Enum.unzip() - - utilization = - per_core_utilization - |> Map.new() - |> Map.put(:total, mean(bare_values)) - - :telemetry.execute([:system, :cpu, :utilization], utilization) - - :telemetry.execute([:system, :cpu], %{core_count: length(bare_values)}) - end - end - - # The Erlang docs do not specify a recommended value to use between two successive samples - # of scheduler utilization. - @scheduler_wall_time_measurement_duration 100 - - def scheduler_utilization do - # Perform the measurement in a task to ensure that the `scheduler_wall_time` flag does - # not remain enabled in case of unforeseen errors. - t = - Task.async(fn -> - :erlang.system_flag(:scheduler_wall_time, true) - s1 = :scheduler.get_sample() - Process.sleep(@scheduler_wall_time_measurement_duration) - s2 = :scheduler.get_sample() - {s1, s2} - end) - - {s1, s2} = Task.await(t) - - schedulers = :scheduler.utilization(s1, s2) - - utilization = - Map.new(schedulers, fn - # Scheduler utilization of a normal scheduler with number scheduler_id - {:normal, scheduler_id, util, _percent} -> {:"normal_#{scheduler_id}", util * 100} - # Scheduler utilization of a dirty-cpu scheduler with number scheduler_id - {:cpu, scheduler_id, util, _percent} -> {:"cpu_#{scheduler_id}", util * 100} - # Total utilization of all normal and dirty-cpu schedulers - {:total, util, _percent} -> {:total, util * 100} - # Total utilization of all normal and dirty-cpu schedulers, weighted against maximum amount of available CPU time - {:weighted, util, _percent} -> {:weighted, util * 100} - end) - - :telemetry.execute([:vm, :scheduler_utilization], utilization) - end - - def run_queue_lengths(scheduler_ids) do - run_queue_lengths = :erlang.statistics(:run_queue_lengths_all) - - measurements = - Enum.zip(scheduler_ids, run_queue_lengths) - |> Map.new() - |> Map.put(:total, :erlang.statistics(:total_run_queue_lengths)) - |> Map.put(:total_plus_io, :erlang.statistics(:total_run_queue_lengths_all)) - - :telemetry.execute([:vm, :run_queue_lengths], measurements) - end - - def garbage_collection(word_size) do - {num_gc_runs, num_words_reclaimed, 0} = :erlang.statistics(:garbage_collection) - - :telemetry.execute([:vm, :garbage_collection], %{ - total_runs: num_gc_runs, - total_bytes_reclaimed: num_words_reclaimed * word_size - }) - end - - def reductions do - {total_reductions, reductions_since_last_call} = :erlang.statistics(:reductions) - - :telemetry.execute([:vm, :reductions], %{ - total: total_reductions, - delta: reductions_since_last_call - }) - end - - def get_system_load_average do - case :erlang.system_info(:logical_processors) do - cores when is_number(cores) and cores > 0 -> - # > The load values are proportional to how long time a runnable Unix - # > process has to spend in the run queue before it is scheduled. - # > Accordingly, higher values mean more system load. The returned value - # > divided by 256 produces the figure displayed by rup and top. - # - # I'm going one step further and dividing by the number of CPUs so in a 4 - # core system, a load of 4.0 (in top) will show as 100%. - # Since load can go above num cores, we can to 200%, 300% but - # I think this makes sense. - # - # Certainly the formula in the erlang docs: - # - # > the following simple mathematical transformation can produce the load - # > value as a percentage: - # > - # > PercentLoad = 100 * (1 - D/(D + Load)) - # > - # > D determines which load value should be associated with which - # > percentage. Choosing D = 50 means that 128 is 60% load, 256 is 80%, 512 - # > is 90%, and so on. - # - # Makes little sense. Setting `D` as they say and plugging in a avg1 value - # of 128 does not give 60% so I'm not sure how to square what they say with - # the numbers... - # - # e.g. my machine currently has a cpu util (:cpu_sup.util()) of 4% and an - # avg1() of 550 ish across 24 cores (so doing very little) but that formula - # would give a `PercentLoad` of ~92%. - # - # My version would give value of 550 / 256 / 24 = 9% - [:avg1, :avg5, :avg15] - |> Enum.reduce(%{}, fn probe, acc -> - case apply(:cpu_sup, probe, []) do - {:error, reason} -> - Logger.debug("Failed to collect system load #{probe}: #{inspect(reason)}") - acc - - value -> - Map.put(acc, probe, 100 * (value / 256 / cores)) - end - end) - |> case do - x when x == %{} -> :ok - map -> :telemetry.execute([:system, :load_percent], map) - end - - _ -> - Logger.debug("Failed to collect system load average: no cores reported") - end - end - - @required_system_memory_keys ~w[system_total_memory free_memory]a - - def get_system_memory_usage() do - system_memory = Map.new(:memsup.get_system_memory_data()) - - # Sanity-check that all the required keys are present before doing any arithmetic on them - missing_system_memory_keys = - Enum.reject(@required_system_memory_keys, &Map.has_key?(system_memory, &1)) - - mem_stats = - cond do - missing_system_memory_keys != [] -> - Logger.warning( - "Error gathering system memory stats: " <> - "missing data points #{Enum.join(missing_system_memory_keys, ", ")}" - ) - - %{} - - system_memory.system_total_memory == 0 -> - Logger.warning("Error gathering system memory stats: zero total memory reported") - %{} - - true -> - total = system_memory.system_total_memory - - used = total - system_memory.free_memory - - mem_stats = - system_memory - |> Map.take(~w[available_memory free_memory buffered_memory cached_memory]a) - |> Map.put(:used_memory, used) - |> Map.merge(resident_memory(system_memory)) - - mem_percent_stats = Map.new(mem_stats, fn {k, v} -> {k, 100 * v / total} end) - - mem_stats = Map.put(mem_stats, :total_memory, total) - - :telemetry.execute([:system, :memory], mem_stats) - :telemetry.execute([:system, :memory_percent], mem_percent_stats) - - mem_stats - end - - Map.merge(mem_stats, swap_stats(:os.type(), system_memory)) - end - - defp resident_memory(%{available_memory: available_memory}) do - %{resident_memory: available_memory} - end - - defp resident_memory(%{ - free_memory: free, - buffered_memory: buffered, - cached_memory: cached, - system_total_memory: total - }) do - %{resident_memory: total - (free + buffered + cached)} - end - - @resident_memory_keys ~w[available_memory free_memory buffered_memory cached_memory]a - defp resident_memory(system_memory) do - missing_keys = - @resident_memory_keys - |> Enum.reject(&Map.has_key?(system_memory, &1)) - - Logger.warning( - "Error gathering resident memory stats: " <> - "missing data points #{Enum.join(missing_keys, ", ")}" - ) - - %{} - end - - defp swap_stats({:unix, :darwin}, _system_memory) do - # On macOS, swap stats are not available - %{} - end - - defp swap_stats(_os_type, %{total_swap: total, free_swap: free}) do - used = total - free - - swap_stats = %{total_swap: total, free_swap: free, used_swap: used} - - swap_percent_stats = - if total > 0 do - %{free_swap: 100 * free / total, used_swap: 100 * used / total} - else - %{free_swap: 0, used_swap: 0} - end - - :telemetry.execute([:system, :swap], swap_stats) - :telemetry.execute([:system, :swap_percent], swap_percent_stats) - - swap_stats - end - - @required_swap_keys ~w[total_swap free_swap]a - defp swap_stats(_os_type, system_memory) do - missing_swap_keys = Enum.reject(@required_swap_keys, &Map.has_key?(system_memory, &1)) - - Logger.warning( - "Error gathering system swap stats: " <> - "missing data points #{Enum.join(missing_swap_keys, ", ")}" - ) - - %{} - end - - defp mean([]), do: nil - - defp mean(list) when is_list(list) do - Enum.sum(list) / Enum.count(list) - end - end -end diff --git a/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex b/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex deleted file mode 100644 index 596ce889bd..0000000000 --- a/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex +++ /dev/null @@ -1,282 +0,0 @@ -use Electric.Telemetry - -with_telemetry Telemetry.Metrics do - defmodule Electric.Telemetry.CallHomeReporter do - @moduledoc """ - Reporter that collects runtime telemetry information and sends it to a configured - home server once in a while. The information is aggregated over a period of time, - with percentile values calculated for the metrics that have them. - """ - - use GenServer - - require Logger - - alias Telemetry.Metrics - alias Electric.Telemetry.Measurement - - @type metric :: Telemetry.Metrics.t() - @type report_format :: keyword(metric() | report_format()) - - def start_link(opts) do - name = Keyword.get(opts, :name, __MODULE__) - metrics = Keyword.fetch!(opts, :metrics) - static_info = Keyword.get(opts, :static_info, %{}) - first_report_in = cast_time_to_ms(Keyword.fetch!(opts, :first_report_in)) - reporting_period = cast_time_to_ms(Keyword.fetch!(opts, :reporting_period)) - reporter_fn = Keyword.get(opts, :reporter_fn, &report_home/1) - stack_id = Keyword.get(opts, :stack_id) - - GenServer.start_link( - __MODULE__, - %{ - metrics: metrics, - first_report_in: first_report_in, - reporting_period: reporting_period, - name: name, - static_info: static_info, - reporter_fn: reporter_fn, - stack_id: stack_id - }, - name: name - ) - end - - def report_home(results) do - url = telemetry_url() - # Isolate the request in a separate task to avoid blocking and - # to not receive any messages from the HTTP pool internals - Task.start(fn -> Req.post!(url, json: results, retry: :transient) end) - :ok - end - - defp telemetry_url, do: Electric.Config.get_env(:telemetry_url) - - def print_stats(name \\ __MODULE__) do - GenServer.call(name, :print_stats) - end - - defp cast_time_to_ms({time, :minute}), do: time * 60 * 1000 - defp cast_time_to_ms({time, :second}), do: time * 1000 - - @impl GenServer - def init(opts) do - %{ - metrics: metrics, - first_report_in: first_report_in, - reporting_period: reporting_period, - name: name, - static_info: static_info, - reporter_fn: reporter_fn, - stack_id: stack_id - } = opts - - # We need to trap exits here so that `terminate/2` callback has more chances to run - # and send data before crash/shutdown - Process.flag(:trap_exit, true) - Process.set_label({:call_home_reporter, name}) - - if stack_id do - Logger.metadata(stack_id: stack_id) - Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) - end - - Logger.notice( - "Starting telemetry reporter. Electric will send anonymous usage data to #{telemetry_url()}. " <> - "You can configure this with `ELECTRIC_USAGE_REPORTING` environment variable, " <> - "see https://electric-sql.com/docs/reference/telemetry for more information." - ) - - metrics = save_target_path_to_options(metrics) - - groups = Enum.group_by(metrics, & &1.event_name) - - measurement_ctx = Measurement.init(name) - - # Attach a listener per event - handler_ids = - for {event, metrics} <- groups do - id = {__MODULE__, event, self()} - :telemetry.attach(id, event, &__MODULE__.handle_event/4, {metrics, measurement_ctx}) - id - end - - # Save some information about the metrics to use when building an output object - summary_types = - metrics - |> Enum.flat_map(fn - %Metrics.Summary{unit: :unique} = m -> [{get_result_path(m), :count_unique}] - %Metrics.Summary{} = m -> [{get_result_path(m), :summary}] - _ -> [] - end) - |> Map.new() - - all_paths = Enum.map(metrics, &get_result_path/1) - - clearable_paths = - metrics - |> Enum.reject(&Keyword.get(&1.reporter_options, :persist_between_sends, false)) - |> Enum.map(&get_result_path/1) - - Process.send_after(self(), :report, first_report_in) - - {:ok, - %{ - measurement_ctx: measurement_ctx, - handler_ids: handler_ids, - summary_types: summary_types, - all_paths: all_paths, - reporting_period: reporting_period, - static_info: static_info, - clearable_paths: clearable_paths, - reporter_fn: reporter_fn, - last_reported: DateTime.utc_now() - }} - end - - @impl GenServer - def terminate(_, state) do - for id <- state.handler_ids do - :telemetry.detach(id) - end - - # On shutdown try to push all the data we still can. - state.reporter_fn.(build_report(state)) - end - - @impl GenServer - def handle_call(:print_stats, _from, state) do - {:reply, build_stats(state), state} - end - - @impl GenServer - def handle_info(:report, state) do - full_report = build_report(state) - - state = - try do - :ok = state.reporter_fn.(full_report) - clear_stats(%{state | last_reported: full_report.timestamp}) - rescue - e -> - Logger.warning( - "Reporter function failed while trying to send telemetry data.\nError: #{Exception.format(:error, e, __STACKTRACE__)}" - ) - - state - end - - # If we've failed to send the results for more than 24 hours, then drop current stats - # to save memory - state = - if DateTime.diff(DateTime.utc_now(), state.last_reported, :hour) >= 24 do - clear_stats(%{state | last_reported: DateTime.utc_now()}) - else - state - end - - Process.send_after(self(), :report, state.reporting_period) - {:noreply, state} - end - - defp build_report(state) do - %{ - last_reported: state.last_reported, - timestamp: DateTime.utc_now(), - report_version: 2, - data: build_stats(state) - } - end - - defp build_stats(state) do - state.all_paths - |> Enum.map(fn path -> - default = - case state.summary_types[path] do - :summary -> %{min: 0, max: 0, mean: 0} - _ -> 0 - end - - {path, Measurement.calc_metric(state.measurement_ctx, path, default)} - end) - |> Enum.reduce(%{}, fn {path, val}, acc -> - path = path |> Tuple.to_list() |> Enum.map(&Access.key(&1, %{})) - put_in(acc, path, val) - end) - |> deep_merge(state.static_info) - end - - defp clear_stats(state) do - for key <- state.clearable_paths do - Measurement.clear_metric(state.measurement_ctx, key) - end - - state - end - - def handle_event(_event_name, measurements, metadata, {metrics, measurement_ctx}) do - for %{reporter_options: opts} = metric <- metrics, keep?(metric, metadata) do - path = Keyword.fetch!(opts, :result_path) - measurement = extract_measurement(metric, measurements, metadata) - - case metric do - %Metrics.Counter{} -> - Measurement.handle_counter(measurement_ctx, path) - - %Metrics.Sum{} -> - Measurement.handle_sum(measurement_ctx, path, measurement) - - %Metrics.LastValue{} -> - Measurement.handle_last_value(measurement_ctx, path, measurement) - - %Metrics.Summary{unit: :unique} -> - value = metadata[Keyword.fetch!(opts, :count_unique)] - Measurement.handle_unique_count(measurement_ctx, path, value) - - %Metrics.Summary{} -> - Measurement.handle_summary(measurement_ctx, path, measurement) - end - end - end - - defp keep?(%{keep: nil}, _metadata), do: true - defp keep?(metric, metadata), do: metric.keep.(metadata) - - defp extract_measurement(metric, measurements, metadata) do - case metric.measurement do - fun when is_function(fun, 2) -> fun.(measurements, metadata) - fun when is_function(fun, 1) -> fun.(measurements) - key -> measurements[key] - end - end - - @spec save_target_path_to_options(report_format()) :: [metric()] - defp save_target_path_to_options(report, prefix \\ []) when is_list(report) do - Enum.flat_map(report, fn - {k, v} when is_list(v) -> - save_target_path_to_options(v, prefix ++ [k]) - - {k, v} -> - if v.tags != [], - do: raise("Call home reporter doesn't support splitting metrics by tags") - - [ - Map.update!( - v, - :reporter_options, - &Keyword.put(&1, :result_path, List.to_tuple(prefix ++ [k])) - ) - ] - end) - end - - defp get_result_path(%{reporter_options: opts}), do: Keyword.fetch!(opts, :result_path) - - def deep_merge(left, right) do - Map.merge(left, right, fn - _, %{} = l, %{} = r -> deep_merge(l, r) - _, _, r -> r - end) - end - end -end diff --git a/packages/sync-service/lib/electric/telemetry/open_telemetry/config.ex b/packages/sync-service/lib/electric/telemetry/open_telemetry/config.ex new file mode 100644 index 0000000000..f4427a5519 --- /dev/null +++ b/packages/sync-service/lib/electric/telemetry/open_telemetry/config.ex @@ -0,0 +1,36 @@ +defmodule Electric.Telemetry.OpenTelemetry.Config do + import Config + + def configure(opts) do + otlp_endpoint = Keyword.fetch!(opts, :otlp_endpoint) + otlp_headers = Keyword.fetch!(opts, :otlp_headers) + otel_resource = Keyword.fetch!(opts, :otel_resource) + otel_debug? = Keyword.fetch!(opts, :otel_debug?) + + config :opentelemetry_exporter, + otlp_protocol: :http_protobuf, + otlp_endpoint: otlp_endpoint, + otlp_headers: otlp_headers, + otlp_compression: :gzip + + otel_batch_processor = + if otlp_endpoint do + {:otel_batch_processor, %{}} + end + + otel_simple_processor = + if otel_debug? do + # In this mode, each span is printed to stdout as soon as it ends, without batching. + {:otel_simple_processor, %{exporter: {:otel_exporter_stdout, []}}} + end + + config :opentelemetry, + resource_detectors: [ + :otel_resource_env_var, + :otel_resource_app_env, + Electric.Telemetry.OpenTelemetry.ResourceDetector + ], + resource: otel_resource, + processors: [otel_batch_processor, otel_simple_processor] |> Enum.reject(&is_nil/1) + end +end diff --git a/packages/sync-service/lib/electric/telemetry/open_telemetry/resource_detector.ex b/packages/sync-service/lib/electric/telemetry/open_telemetry/resource_detector.ex index 09c659e13f..42f0af0faf 100644 --- a/packages/sync-service/lib/electric/telemetry/open_telemetry/resource_detector.ex +++ b/packages/sync-service/lib/electric/telemetry/open_telemetry/resource_detector.ex @@ -2,6 +2,7 @@ if Code.ensure_loaded?(:otel_resource) do defmodule Electric.Telemetry.OpenTelemetry.ResourceDetector do @behaviour :otel_resource_detector + # FIXME(alco): either avoid calling this in Cloud or check whether there is already installation_id set for the resource def get_resource(_config) do {m, f, a} = Electric.Config.get_env(:persistent_kv) kv = apply(m, f, [a]) diff --git a/packages/sync-service/lib/electric/telemetry/opts.ex b/packages/sync-service/lib/electric/telemetry/opts.ex deleted file mode 100644 index a66f2ae00b..0000000000 --- a/packages/sync-service/lib/electric/telemetry/opts.ex +++ /dev/null @@ -1,21 +0,0 @@ -defmodule Electric.Telemetry.Opts do - def schema do - [ - instance_id: [type: :string], - installation_id: [type: :string], - system_metrics_poll_interval: [type: :integer, default: :timer.seconds(5)], - statsd_host: [type: {:or, [:string, nil]}, default: nil], - prometheus?: [type: :boolean, default: false], - call_home_telemetry?: [type: :boolean, default: false], - otel_metrics?: [type: :boolean, default: false], - otel_export_period: [type: :integer, default: :timer.seconds(30)], - otel_per_process_metrics?: [type: :boolean, default: false], - otel_resource_attributes: [type: :map, default: %{}], - top_process_count: [type: :integer, default: 5], - long_gc_threshold: [type: :integer, default: 500], - long_schedule_threshold: [type: :integer, default: 500], - long_message_queue_enable_threshold: [type: :integer, default: 1000], - long_message_queue_disable_threshold: [type: :integer, default: 100] - ] - end -end diff --git a/packages/sync-service/lib/electric/telemetry/stack_telemetry.ex b/packages/sync-service/lib/electric/telemetry/stack_telemetry.ex deleted file mode 100644 index 12a3dc4be5..0000000000 --- a/packages/sync-service/lib/electric/telemetry/stack_telemetry.ex +++ /dev/null @@ -1,369 +0,0 @@ -use Electric.Telemetry - -with_telemetry [OtelMetricExporter, Telemetry.Metrics] do - defmodule Electric.Telemetry.StackTelemetry do - @moduledoc """ - Collects and exports stack level telemetry such as database and shape metrics. - - If multiple databases are used, each database will have it's own stack and it's own StackTelemetry. - - See also ApplicationTelemetry for application/system level specific telemetry. - """ - use Supervisor - - import Telemetry.Metrics - - require Logger - - @opts_schema NimbleOptions.new!( - Electric.Telemetry.Opts.schema() ++ - [ - stack_id: [type: :string, required: true], - slot_name: [type: :string, required: true] - ] - ) - - def start_link(opts) do - with {:ok, opts} <- NimbleOptions.validate(opts, @opts_schema) do - if telemetry_export_enabled?(Map.new(opts)) do - Supervisor.start_link(__MODULE__, Map.new(opts)) - else - # Avoid starting the telemetry supervisor and its telemetry_poller child if we're not - # intending to export periodic measurements metrics anywhere. - :ignore - end - end - end - - def init(opts) do - Process.set_label({:stack_telemetry_supervisor, opts.stack_id}) - Logger.metadata(stack_id: opts.stack_id) - Electric.Telemetry.Sentry.set_tags_context(stack_id: opts.stack_id) - - [telemetry_poller_child_spec(opts) | exporter_child_specs(opts)] - |> Supervisor.init(strategy: :one_for_one) - end - - defp telemetry_poller_child_spec(opts) do - {:telemetry_poller, - measurements: periodic_measurements(opts), - period: opts.system_metrics_poll_interval, - init_delay: :timer.seconds(3)} - end - - defp telemetry_export_enabled?(opts) do - exporter_child_specs(opts) != [] - end - - defp exporter_child_specs(opts) do - [ - statsd_reporter_child_spec(opts), - prometheus_reporter_child_spec(opts), - call_home_reporter_child_spec(opts), - otel_reporter_child_spec(opts) - ] - |> Enum.reject(&is_nil/1) - end - - defp otel_reporter_child_spec(%{otel_metrics?: true} = opts) do - {OtelMetricExporter, - name: :"stack_otel_telemetry_#{opts.stack_id}", - metrics: otel_metrics(opts), - export_period: opts.otel_export_period, - resource: - Map.merge( - %{ - stack_id: opts.stack_id, - instance: %{installation_id: Map.get(opts, :installation_id, "electric_default")} - }, - opts.otel_resource_attributes - )} - end - - defp otel_reporter_child_spec(_), do: nil - - defp call_home_reporter_child_spec(%{call_home_telemetry?: true} = opts) do - {Electric.Telemetry.CallHomeReporter, - name: :"stack_call_home_telemetry_#{opts.stack_id}", - stack_id: opts.stack_id, - static_info: static_info(opts), - metrics: call_home_metrics(opts), - first_report_in: {2, :minute}, - reporting_period: {30, :minute}} - end - - defp call_home_reporter_child_spec(_), do: nil - - def static_info(opts) do - {total_mem, _, _} = :memsup.get_memory_data() - processors = :erlang.system_info(:logical_processors) - {os_family, os_name} = :os.type() - arch = :erlang.system_info(:system_architecture) - - %{ - electric_version: to_string(Electric.version()), - environment: %{ - os: %{family: os_family, name: os_name}, - arch: to_string(arch), - cores: processors, - ram: total_mem, - electric_instance_id: Map.fetch!(opts, :instance_id), - electric_installation_id: Map.fetch!(opts, :installation_id), - stack_id: opts.stack_id - } - } - end - - # IMPORTANT: these metrics are validated on the receiver side, so if you change them, - # make sure you also change the receiver - def call_home_metrics(opts) do - for_stack = for_stack(opts) - - [ - environment: [ - pg_version: - last_value("electric.postgres.info_looked_up.pg_version", - reporter_options: [persist_between_sends: true], - keep: for_stack - ) - ], - usage: [ - inbound_bytes: - sum("electric.postgres.replication.transaction_received.bytes", - unit: :byte, - keep: for_stack - ), - inbound_transactions: - sum("electric.postgres.replication.transaction_received.count", keep: for_stack), - inbound_operations: - sum("electric.postgres.replication.transaction_received.operations", keep: for_stack), - stored_bytes: - sum("electric.storage.transaction_stored.bytes", unit: :byte, keep: for_stack), - stored_transactions: sum("electric.storage.transaction_stored.count", keep: for_stack), - stored_operations: - sum("electric.storage.transaction_stored.operations", keep: for_stack), - total_used_storage_kb: - last_value("electric.storage.used", unit: {:byte, :kilobyte}, keep: for_stack), - total_shapes: last_value("electric.shapes.total_shapes.count", keep: for_stack), - active_shapes: - summary("electric.plug.serve_shape.monotonic_time", - unit: :unique, - reporter_options: [count_unique: :shape_handle], - keep: &(&1.status < 300 && for_stack.(&1)) - ), - unique_clients: - summary("electric.plug.serve_shape.monotonic_time", - unit: :unique, - reporter_options: [count_unique: :client_ip], - keep: &(&1.status < 300 && for_stack.(&1)) - ), - sync_requests: - counter("electric.plug.serve_shape.monotonic_time", - keep: &(&1[:live] != true && for_stack.(&1)) - ), - live_requests: - counter("electric.plug.serve_shape.monotonic_time", - keep: &(&1[:live] && for_stack.(&1)) - ), - served_bytes: sum("electric.plug.serve_shape.bytes", unit: :byte, keep: for_stack), - wal_size: - summary("electric.postgres.replication.wal_size", unit: :byte, keep: for_stack) - ] - ] - end - - defp statsd_reporter_child_spec(%{statsd_host: host} = opts) when host != nil do - {TelemetryMetricsStatsd, - host: host, - formatter: :datadog, - global_tags: [instance_id: opts.instance_id], - metrics: statsd_metrics(opts)} - end - - defp statsd_reporter_child_spec(_), do: nil - - defp prometheus_reporter_child_spec(%{prometheus?: true} = opts) do - {TelemetryMetricsPrometheus.Core, - name: :"stack_prometheus_telemetry_#{opts.stack_id}", metrics: prometheus_metrics(opts)} - end - - defp prometheus_reporter_child_spec(_), do: nil - - defp statsd_metrics(opts) do - [ - summary("plug.router_dispatch.stop.duration", - tags: [:route], - unit: {:native, :millisecond}, - keep: for_stack(opts) - ), - summary("plug.router_dispatch.exception.duration", - tags: [:route], - unit: {:native, :millisecond}, - keep: for_stack(opts) - ), - summary("electric.shape_cache.create_snapshot_task.stop.duration", - unit: {:native, :millisecond}, - keep: for_stack(opts) - ), - summary("electric.storage.make_new_snapshot.stop.duration", - unit: {:native, :millisecond}, - keep: for_stack(opts) - ), - summary("electric.querying.stream_initial_data.stop.duration", - unit: {:native, :millisecond}, - keep: for_stack(opts) - ), - last_value("electric.connection.consumers_ready.duration", - unit: {:native, :millisecond}, - keep: for_stack(opts) - ), - last_value("electric.connection.consumers_ready.total", keep: for_stack(opts)), - last_value("electric.connection.consumers_ready.before_recovery", keep: for_stack(opts)) - ] - |> Enum.map(&%{&1 | tags: [:instance_id | &1.tags]}) - end - - defp prometheus_metrics(opts) do - [ - last_value("electric.postgres.replication.wal_size", unit: :byte, keep: for_stack(opts)), - last_value("electric.storage.used", unit: {:byte, :kilobyte}, keep: for_stack(opts)), - last_value("electric.shapes.total_shapes.count", keep: for_stack(opts)), - last_value("electric.shapes.active_shapes.count", keep: for_stack(opts)), - counter("electric.postgres.replication.transaction_received.count", - keep: for_stack(opts) - ), - sum("electric.postgres.replication.transaction_received.bytes", - unit: :byte, - keep: for_stack(opts) - ), - sum("electric.storage.transaction_stored.bytes", unit: :byte, keep: for_stack(opts)), - last_value("electric.shape_monitor.active_reader_count", keep: for_stack(opts)), - last_value("electric.connection.consumers_ready.duration", - unit: {:native, :millisecond}, - keep: for_stack(opts) - ), - last_value("electric.connection.consumers_ready.total", keep: for_stack(opts)), - last_value("electric.connection.consumers_ready.failed_to_recover", - keep: for_stack(opts) - ), - last_value("electric.admission_control.acquire.current", keep: for_stack(opts)), - sum("electric.admission_control.reject.count", keep: for_stack(opts)) - ] - end - - defp otel_metrics(opts) do - for_stack = for_stack(opts) - - [ - distribution("electric.plug.serve_shape.duration", - unit: {:native, :millisecond}, - keep: &(&1[:live] != true && for_stack.(&1)) - ), - distribution("electric.shape_cache.create_snapshot_task.stop.duration", - unit: {:native, :millisecond}, - keep: for_stack - ), - distribution("electric.storage.make_new_snapshot.stop.duration", - unit: {:native, :millisecond}, - keep: for_stack - ), - distribution("electric.postgres.replication.transaction_received.receive_lag", - unit: :millisecond, - keep: for_stack - ), - distribution("electric.postgres.replication.transaction_received.operations", - keep: for_stack - ), - distribution("electric.storage.transaction_stored.replication_lag", - unit: :millisecond, - keep: for_stack - ) - ] ++ prometheus_metrics(opts) - end - - defp periodic_measurements(opts) do - [ - {__MODULE__, :count_shapes, [opts.stack_id]}, - {__MODULE__, :count_active_shapes, [opts.stack_id]}, - {__MODULE__, :report_retained_wal_size, [opts.stack_id, opts.slot_name]} - ] - end - - def count_shapes(stack_id) do - # Telemetry is started before everything else in the stack, so we need to handle - # the case where the shape cache is not started yet. - case Electric.ShapeCache.count_shapes(stack_id) do - :error -> - :ok - - num_shapes -> - Electric.Telemetry.OpenTelemetry.execute( - [:electric, :shapes, :total_shapes], - %{count: num_shapes}, - %{stack_id: stack_id} - ) - end - end - - def count_active_shapes(stack_id) do - Electric.Telemetry.OpenTelemetry.execute( - [:electric, :shapes, :active_shapes], - %{count: Electric.Shapes.ConsumerRegistry.active_consumer_count(stack_id)}, - %{stack_id: stack_id} - ) - end - - def for_stack(opts) do - stack_id = opts.stack_id - - fn metadata -> - metadata[:stack_id] == stack_id - end - end - - @retained_wal_size_query """ - SELECT - pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)::int8 - FROM - pg_replication_slots - WHERE - slot_name = $1 - """ - - @doc false - @spec report_retained_wal_size(atom() | binary(), any()) :: :ok - def report_retained_wal_size(stack_id, slot_name) do - try do - %Postgrex.Result{rows: [[wal_size]]} = - Postgrex.query!( - Electric.Connection.Manager.admin_pool(stack_id), - @retained_wal_size_query, - [slot_name], - timeout: 3_000, - deadline: 3_000 - ) - - # The query above can return `-1` which I'm assuming means "up-to-date". - # This is a confusing stat if we're measuring in bytes, so normalise to - # [0, :infinity) - - Electric.Telemetry.OpenTelemetry.execute( - [:electric, :postgres, :replication], - %{wal_size: max(0, wal_size)}, - %{stack_id: stack_id} - ) - catch - :exit, {:noproc, _} -> - :ok - - # catch all errors to not log them as errors, those are reporing issues at best - type, reason -> - Logger.warning( - "Failed to query retained WAL size\nError: #{Exception.format(type, reason)}", - stack_id: stack_id, - slot_name: slot_name - ) - end - end - end -end diff --git a/packages/sync-service/mix.exs b/packages/sync-service/mix.exs index 70d1b0672b..889c837ed0 100644 --- a/packages/sync-service/mix.exs +++ b/packages/sync-service/mix.exs @@ -10,6 +10,20 @@ defmodule Electric.MixProject do # make the metrics-enabled target available to the rest of the app def telemetry_target, do: @telemetry_target + # Necessary boilerplate because opentelemetry doesn't mention opentelemetry_exporter in + # its list of dependency but actually expects it to be started before itself at runtime. + if Mix.target() == @telemetry_target do + @extra_telemetry_applications [:opentelemetry_exporter] + + @telemetry_applications_in_release [ + opentelemetry_exporter: :permanent, + opentelemetry: :temporary + ] + else + @extra_telemetry_applications [] + @telemetry_applications_in_release [] + end + def project do [ app: :electric, @@ -32,11 +46,10 @@ defmodule Electric.MixProject do ], releases: [ electric: [ - applications: [electric: :permanent] ++ telemetry_applications_in_release(), + applications: @telemetry_applications_in_release, include_executables_for: [:unix] ] ], - default_release: :electric, test_coverage: [ tool: ExCoveralls, ignore_modules: [ @@ -61,9 +74,7 @@ defmodule Electric.MixProject do def application do [ - extra_applications: [:logger, :os_mon, :runtime_tools], - # Using a compile-time flag to select the application module or lack thereof allows - # using this app as a dependency with this additional flag + extra_applications: [:logger, :runtime_tools] ++ @extra_telemetry_applications, mod: {Electric.Application, []} ] end @@ -103,9 +114,6 @@ defmodule Electric.MixProject do {:remote_ip, "~> 1.2"}, {:req, "~> 0.5"}, {:stream_split, "~> 0.1"}, - {:telemetry_poller, "~> 1.2"}, - # tls_certificate_check is required by otel_exporter_otlp - {:tls_certificate_check, "~> 1.27"}, {:tz, "~> 0.28"} ], dev_and_test_deps(), @@ -124,32 +132,15 @@ defmodule Electric.MixProject do ] end - defp telemetry_applications_in_release do - if Mix.target() == @telemetry_target do - # This order of application is important to ensure proper startup sequence of - # application dependencies, namely, inets. - [ - opentelemetry_exporter: :permanent, - opentelemetry: :temporary - ] - else - [] - end - end - - defp telemetry_deps() do + defp telemetry_deps do [ - {:sentry, "~> 11.0"}, + {:electric_telemetry, path: "../electric-telemetry"}, {:opentelemetry, "~> 1.6"}, - {:opentelemetry_exporter, "~> 1.8"}, - {:otel_metric_exporter, "~> 0.4.1"}, - # For debugging the otel_metric_exporter check it out locally and uncomment the line below - # {:otel_metric_exporter, path: "../../../elixir-otel-metric-exporter"}, - {:telemetry_metrics_prometheus_core, "~> 1.1"}, - {:telemetry_metrics_statsd, "~> 0.7"}, + {:opentelemetry_exporter, "~> 1.10.0"}, # Pin protobuf to v0.13.x because starting with v0.14.0 it includes modules that conflict # with those of Protox (which itself is brought in by pg_query_ex). - {:protobuf, "~> 0.13.0", optional: true, override: true} + {:protobuf, "~> 0.13.0", override: true}, + {:sentry, "~> 11.0"} ] |> Enum.map(fn {package, version} when is_binary(version) -> @@ -164,10 +155,10 @@ defmodule Electric.MixProject do end defp telemetry_dep_opts(source_opts) do - Keyword.merge(source_opts, targets: @telemetry_target, optional: true) + Keyword.merge(source_opts, targets: @telemetry_target) end - defp aliases() do + defp aliases do [ start_dev: "cmd --cd dev docker compose up -d", stop_dev: "cmd --cd dev docker compose down -v", diff --git a/packages/sync-service/mix.lock b/packages/sync-service/mix.lock index 903afc47f7..7641be4a7b 100644 --- a/packages/sync-service/mix.lock +++ b/packages/sync-service/mix.lock @@ -37,7 +37,7 @@ "opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.10.0", "972e142392dbfa679ec959914664adefea38399e4f56ceba5c473e1cabdbad79", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.7.0", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.5.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.18", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "33a116ed7304cb91783f779dec02478f887c87988077bfd72840f760b8d4b952"}, "opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"}, "opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.2", "410ab4d76b0921f42dbccbe5a7c831b8125282850be649ee1f70050d3961118a", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "641ab469deb181957ac6d59bce6e1321d5fe2a56df444fc9c19afcad623ab253"}, - "otel_metric_exporter": {:hex, :otel_metric_exporter, "0.4.1", "2d4f62dbc0e342d9f18862107894eea17404a6593b94f3cb5a02b22e6361c65a", [:mix], [{:finch, "~> 0.19", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.15", [hex: :protobuf, repo: "hexpm", optional: false]}, {:retry, "~> 0.19", [hex: :retry, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "35027b082f7f8481b18f344abd38a28887757460b5fbef65a9188049ae1f65bd"}, + "otel_metric_exporter": {:hex, :otel_metric_exporter, "0.4.2", "2cf96ac9879eb06ebde26fa0856e2cd4d5b5f6127eb9ca587532b89a5f981bfc", [:mix], [{:finch, "~> 0.19", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.15", [hex: :protobuf, repo: "hexpm", optional: false]}, {:retry, "~> 0.19", [hex: :retry, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "e7d7ca69a3863a2b84badeb9dd275a369754d951c7c5a4cd5f321be868c6d613"}, "pg_query_ex": {:hex, :pg_query_ex, "0.9.0", "8e34bd2d0e0eb9e8d621c4697032fad4bfba46826950d3b46904a80ab589b43a", [:make, :mix], [{:elixir_make, "~> 0.4", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:protox, "~> 2.0", [hex: :protox, repo: "hexpm", optional: false]}], "hexpm", "a3fada1704fa9e2bc11ff846ad545ef9a1d34f46d86206063c37128960f4f5f5"}, "plug": {:hex, :plug, "1.18.1", "5067f26f7745b7e31bc3368bc1a2b818b9779faa959b49c934c17730efc911cf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "57a57db70df2b422b564437d2d33cf8d33cd16339c1edb190cd11b1a3a546cc2"}, "plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"}, diff --git a/packages/sync-service/test/electric/telemetry/application_telemetry_test.exs b/packages/sync-service/test/electric/telemetry/application_telemetry_test.exs deleted file mode 100644 index ac5fab5853..0000000000 --- a/packages/sync-service/test/electric/telemetry/application_telemetry_test.exs +++ /dev/null @@ -1,40 +0,0 @@ -use Electric.Telemetry - -with_telemetry [Electric.Telemetry.ApplicationTelemetry] do - defmodule Electric.TelemetryTest do - use ExUnit.Case, async: true - - alias Electric.Telemetry.ApplicationTelemetry - - @moduletag :telemetry_target - - describe "get_system_memory_usage" do - test "returns calculated memory stats" do - case :os.type() do - {:unix, :darwin} -> - assert %{ - total_memory: _, - available_memory: _, - free_memory: _, - used_memory: _, - resident_memory: _ - } = ApplicationTelemetry.get_system_memory_usage() - - _ -> - assert %{ - total_memory: _, - available_memory: _, - buffered_memory: _, - cached_memory: _, - free_memory: _, - used_memory: _, - resident_memory: _, - total_swap: _, - free_swap: _, - used_swap: _ - } = ApplicationTelemetry.get_system_memory_usage() - end - end - end - end -end diff --git a/packages/sync-service/test/test_helper.exs b/packages/sync-service/test/test_helper.exs index 18f0f91dd9..375392b593 100644 --- a/packages/sync-service/test/test_helper.exs +++ b/packages/sync-service/test/test_helper.exs @@ -5,7 +5,7 @@ # Registry.start_link(name: Electric.Application.process_registry(), keys: :unique) ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter]) -ExUnit.start(assert_receive_timeout: 400, exclude: [:slow, :telemetry_target], capture_log: true) +ExUnit.start(assert_receive_timeout: 400, exclude: [:slow], capture_log: true) # Repatch in async tests has lazy recompilation issues, so as a temporary fix # we force recompilation in the setup. The issue is tracked here: