Skip to content

Commit b1744df

Browse files
committed
feat(sync-service): Terminate shape consumer processes after timeout
Closes #3253
1 parent 6bb011b commit b1744df

File tree

9 files changed

+359
-38
lines changed

9 files changed

+359
-38
lines changed

.changeset/eighty-pandas-ring.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@core/sync-service': patch
3+
---
4+
5+
Reduce memory usage by terminating consumer processes after the hibernation timeout

integration-tests/scripts/electric_dev.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
set -e
44

5-
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd)
5+
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
66

77
cd "$SCRIPT_DIR"/../../packages/sync-service
88

9-
ELECTRIC_STORAGE_DIR="$SCRIPT_DIR/../_storage" ELECTRIC_REPLICATION_STREAM_ID=integration iex -r "$SCRIPT_DIR/../test_utils/*.exs" -S mix
9+
ELECTRIC_STORAGE_DIR="$SCRIPT_DIR/../_storage" ELECTRIC_REPLICATION_STREAM_ID=integration iex -r "$SCRIPT_DIR/../test_utils/*.exs" "$@" -S mix

integration-tests/tests/_macros.luxinc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@
186186

187187
[macro start_electric_script shell_name port env]
188188
[shell $shell_name]
189-
!ELECTRIC_PORT=$port $env ../scripts/electric_dev.sh
189+
!ELECTRIC_PORT=$port $env ../scripts/electric_dev.sh --no-color
190190
[endmacro]
191191

192192
[macro stop_electric]
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
[doc Verify that consumer processes suspend and resume correctly]
2+
3+
[include _macros.luxinc]
4+
5+
[global pg_container_name=db-connection-scaledown__pg]
6+
[global database_url=postgresql://electric_test:password@localhost:$pg_host_port/electric?sslmode=disable]
7+
8+
###
9+
10+
# Start Postgres and create an additional role that will be used by Electric in this test
11+
[invoke setup_pg_with_shell_name \
12+
"pg" \
13+
"-e INIT_DB_SQL=\"\
14+
CREATE ROLE electric_test LOGIN PASSWORD 'password' REPLICATION;\
15+
GRANT CREATE ON DATABASE electric TO electric_test\"" \
16+
"" \
17+
"-v $(realpath ../scripts/init_db.sh):/docker-entrypoint-initdb.d/initdb-init_db.sh"]
18+
19+
# Create a table for subsequent shape requests
20+
[invoke start_psql]
21+
22+
[shell psql]
23+
!create table items(val text);
24+
??CREATE
25+
26+
!insert into items values ('1'), ('2');
27+
??INSERT
28+
29+
!alter table items owner to electric_test;
30+
??ALTER TABLE
31+
32+
# Start Electric and wait for it to finish initialization.
33+
# Set a very small hibernation timeout so consumers will shutdown quickly
34+
[invoke setup_electric_with_env "ELECTRIC_SHAPE_HIBERNATE_AFTER=200ms"]
35+
[shell electric]
36+
[timeout 10]
37+
??[debug] Replication client started streaming
38+
39+
# Start a live shape request
40+
[shell client]
41+
[invoke curl_shape "http://localhost:3000/v1/shape?table=items&offset=-1"]
42+
43+
??HTTP/1.1 200 OK
44+
45+
?electric-handle: ([\d-]+)
46+
[global handle=$1]
47+
48+
??electric-offset: 0_0
49+
50+
??[{"headers":{"operation":"insert","relation":["public","items"]},"key":"\"public\".\"items\"/\"1\"","value":{"val":"1"}},\
51+
{"headers":{"operation":"insert","relation":["public","items"]},"key":"\"public\".\"items\"/\"2\"","value":{"val":"2"}},\
52+
{"headers":{"control":"snapshot-end"
53+
54+
[invoke curl_shape "http://localhost:3000/v1/shape?table=items&handle=$handle&offset=0_0&live"]
55+
56+
# Check that the consumer process for the given handle has suspended itself
57+
[shell electric]
58+
[sleep 1]
59+
60+
??[debug] Suspending consumer $handle
61+
!{:active_consumer_count, Electric.Shapes.ConsumerRegistry.active_consumer_count("single_stack")}
62+
?{:active_consumer_count, 0}
63+
64+
# Verify that writes to the handle re-create the consumer process
65+
[shell psql]
66+
!insert into items values ('3');
67+
??INSERT
68+
69+
70+
[shell electric]
71+
??[info] Started consumer for existing handle $handle
72+
!{:active_consumer_count, Electric.Shapes.ConsumerRegistry.active_consumer_count("single_stack")}
73+
?{:active_consumer_count, 1}
74+
75+
[shell client]
76+
[timeout 1]
77+
78+
[invoke curl_shape "http://localhost:3000/v1/shape?table=items&handle=$handle&offset=0_0&live"]
79+
80+
??HTTP/1.1 200
81+
82+
??transfer-encoding: chunked
83+
?electric-offset: ([0-9]+)_0
84+
[local global_last_seen_lsn=$1]
85+
86+
??"headers":{"last":true,"lsn":"$global_last_seen_lsn","op_position":0,"operation":"insert","relation":["public","items"],"txids":
87+
??"key":"\"public\".\"items\"/\"3\"","value":{"val":"3"}
88+
??"headers":{"control":"up-to-date","global_last_seen_lsn":"$global_last_seen_lsn"}
89+
90+
91+
[cleanup]
92+
[invoke teardown]

packages/sync-service/lib/electric/shape_cache/shape_cleaner.ex

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ defmodule Electric.ShapeCache.ShapeCleaner do
1515
@type stack_id() :: Electric.stack_id()
1616

1717
@shutdown_cleanup {:shutdown, :cleanup}
18+
@shutdown_suspend {:shutdown, :suspend}
1819

1920
# Public API
2021
def consumer_cleanup_reason, do: @shutdown_cleanup
22+
def consumer_suspend_reason, do: @shutdown_suspend
2123

2224
@spec remove_shapes(stack_id(), [shape_handle()], term()) :: :ok | {:error, term()}
2325
def remove_shapes(stack_id, shape_handles, reason \\ @shutdown_cleanup)
@@ -76,6 +78,8 @@ defmodule Electric.ShapeCache.ShapeCleaner do
7678
end)
7779
end
7880

81+
@type reason() :: {:shutdown, :cleanup} | {:shutdown, :suspend} | term()
82+
@spec handle_writer_termination(stack_id(), shape_handle(), reason()) :: :removed | :ok
7983
def handle_writer_termination(stack_id, shape_handle, @shutdown_cleanup) do
8084
Logger.info("Removing shape #{inspect(shape_handle)}")
8185

@@ -84,6 +88,13 @@ defmodule Electric.ShapeCache.ShapeCleaner do
8488
:removed
8589
end
8690

91+
def handle_writer_termination(stack_id, shape_handle, @shutdown_suspend) do
92+
# deregister the consumer without removing it from the rest of the system
93+
# the next time a txn comes in matching this consumer it will be re-started
94+
# by the consumer registry as per any other lazily loaded consumer
95+
Electric.Shapes.ConsumerRegistry.remove_consumer(shape_handle, stack_id)
96+
end
97+
8798
def handle_writer_termination(_stack_id, _shape_handle, reason)
8899
when reason in [:normal, :killed, :shutdown] or
89100
(is_tuple(reason) and elem(reason, 0) == :shutdown) do

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,23 @@ defmodule Electric.Shapes.Consumer do
362362
end
363363

364364
def handle_info(:timeout, state) do
365-
state = %{state | writer: ShapeCache.Storage.hibernate(state.writer)}
365+
# we can only suspend (terminate) the consumer process if
366+
# 1. we're not waiting for snapshot information
367+
# 2. we are not part of a subquery dependency tree, that is either
368+
# a. we have no dependent shapes
369+
# b. we don't have a materializer subscribed
370+
can_suspend? =
371+
state.snapshot_started and Enum.empty?(state.shape.shape_dependencies_handles) and
372+
not state.materializer_subscribed?
373+
374+
if can_suspend? do
375+
Logger.debug(fn -> ["Suspending consumer ", to_string(state.shape_handle)] end)
376+
{:stop, ShapeCleaner.consumer_suspend_reason(), state}
377+
else
378+
state = %{state | writer: ShapeCache.Storage.hibernate(state.writer)}
366379

367-
{:noreply, state, :hibernate}
380+
{:noreply, state, :hibernate}
381+
end
368382
end
369383

370384
@impl GenServer

packages/sync-service/lib/electric/shapes/consumer_registry.ex

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ defmodule Electric.Shapes.ConsumerRegistry do
1717
stack_id: stack_id()
1818
}
1919

20+
@consumer_suspend_reason Electric.ShapeCache.ShapeCleaner.consumer_suspend_reason()
21+
2022
def name(stack_id, shape_handle) when is_stack_id(stack_id) and is_shape_handle(shape_handle) do
2123
{:via, __MODULE__, {stack_id, shape_handle}}
2224
end
@@ -67,22 +69,36 @@ defmodule Electric.Shapes.ConsumerRegistry do
6769
end
6870

6971
@spec publish([shape_handle()], term(), t()) :: :ok
72+
def publish([], _event, _registry_state) do
73+
:ok
74+
end
75+
7076
def publish(shape_handles, event, registry_state) do
7177
%{table: table} = registry_state
7278

7379
shape_handles
74-
|> Enum.flat_map(fn handle ->
75-
(consumer_pid(handle, table) || start_consumer!(handle, registry_state))
76-
|> List.wrap()
80+
|> Enum.map(fn handle ->
81+
{handle, consumer_pid(handle, table) || start_consumer!(handle, registry_state)}
7782
end)
7883
|> broadcast(event)
84+
|> publish(event, registry_state)
7985
end
8086

8187
@spec remove_consumer(shape_handle(), t()) :: :ok
8288
def remove_consumer(shape_handle, %__MODULE__{table: table}) do
89+
do_remove_consumer(shape_handle, table)
90+
end
91+
92+
@spec remove_consumer(shape_handle(), stack_id()) :: :ok
93+
def remove_consumer(shape_handle, stack_id) when is_stack_id(stack_id) do
94+
do_remove_consumer(shape_handle, ets_name(stack_id))
95+
end
96+
97+
@spec do_remove_consumer(shape_handle(), :ets.table()) :: :ok
98+
defp do_remove_consumer(shape_handle, table) when is_atom(table) or is_reference(table) do
8399
:ets.delete(table, shape_handle)
84100

85-
Logger.debug(fn -> "Stopped and removed consumer #{shape_handle}" end)
101+
Logger.debug(fn -> "Removed consumer #{shape_handle}" end)
86102

87103
:ok
88104
end
@@ -96,26 +112,39 @@ defmodule Electric.Shapes.ConsumerRegistry do
96112
There is no timeout so if the GenServers do not respond or die, this
97113
function will block indefinitely.
98114
"""
99-
@spec broadcast([pid()], term()) :: :ok
100-
def broadcast(pids, message) do
115+
@spec broadcast([{shape_handle(), pid()}], term()) :: [shape_handle()]
116+
def broadcast(handle_pids, message) do
101117
# Based on OTP GenServer.call, see:
102118
# https://github.com/erlang/otp/blob/090c308d7c925e154240685174addaa516ea2f69/lib/stdlib/src/gen.erl#L243
103-
pids
104-
|> Enum.map(fn pid ->
119+
handle_pids
120+
|> Enum.map(fn {handle, pid} ->
105121
ref = Process.monitor(pid)
106122
send(pid, {:"$gen_call", {self(), ref}, message})
107-
ref
123+
{handle, ref}
108124
end)
109-
|> Enum.each(fn ref ->
125+
|> Enum.flat_map(fn {handle, ref} ->
110126
receive do
111127
{^ref, _reply} ->
112128
Process.demonitor(ref, [:flush])
113-
:ok
129+
[]
130+
131+
{:DOWN, ^ref, _, _, @consumer_suspend_reason} ->
132+
# Catch the race condition where a consumer is in the act of
133+
# suspending as the txn arrives by retrying those handles (which will
134+
# start a new consumer instance).
135+
[handle]
114136

115137
{:DOWN, ^ref, _, _, _reason} ->
116-
:ok
138+
[]
117139
end
118140
end)
141+
|> tap(fn
142+
[] ->
143+
:ok
144+
145+
suspended_handles ->
146+
Logger.debug(fn -> ["Re-trying suspended shape handles ", inspect(suspended_handles)] end)
147+
end)
119148
end
120149

121150
defp consumer_pid(handle, table) do

0 commit comments

Comments
 (0)