Skip to content

Commit a9a172a

Browse files
Action runner graceful shutdown
1 parent 35c6797 commit a9a172a

File tree

3 files changed

+246
-0
lines changed

3 files changed

+246
-0
lines changed

st2actions/st2actions/worker.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,24 @@
1717
import sys
1818
import traceback
1919

20+
from tooz.coordination import GroupNotCreated
21+
from oslo_config import cfg
22+
2023
from st2actions.container.base import RunnerContainer
2124
from st2common import log as logging
2225
from st2common.constants import action as action_constants
2326
from st2common.exceptions.actionrunner import ActionRunnerException
2427
from st2common.exceptions.db import StackStormDBObjectNotFoundError
2528
from st2common.models.db.liveaction import LiveActionDB
2629
from st2common.persistence.execution import ActionExecution
30+
from st2common.services import coordination
2731
from st2common.services import executions
2832
from st2common.services import workflows as wf_svc
2933
from st2common.transport.consumers import MessageHandler
3034
from st2common.transport.consumers import ActionsQueueConsumer
3135
from st2common.transport import utils as transport_utils
3236
from st2common.util import action_db as action_utils
37+
from st2common.util import concurrency
3338
from st2common.util import system_info
3439
from st2common.transport import queues
3540

@@ -134,7 +139,32 @@ def process(self, liveaction):
134139

135140
def shutdown(self):
136141
super(ActionExecutionDispatcher, self).shutdown()
142+
143+
if cfg.CONF.actionrunner.graceful_shutdown:
144+
145+
coordinator = coordination.get_coordinator()
146+
member_ids = []
147+
service = "actionrunner"
148+
exit_timeout = cfg.CONF.actionrunner.exit_timeout
149+
sleep_delay = cfg.CONF.actionrunner.sleep_delay
150+
timeout = 0
151+
152+
while timeout < exit_timeout and self._running_liveactions:
153+
try:
154+
member_ids = list(
155+
coordinator.get_members(service.encode("utf-8")).get()
156+
)
157+
except GroupNotCreated:
158+
pass
159+
160+
# Check if there are other runners in service registry
161+
if not member_ids:
162+
break
163+
timeout += sleep_delay
164+
concurrency.sleep(sleep_delay)
165+
137166
# Abandon running executions if incomplete
167+
138168
while self._running_liveactions:
139169
liveaction_id = self._running_liveactions.pop()
140170
try:

st2actions/tests/unit/test_worker.py

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from st2common.persistence.execution import ActionExecution
2929
from st2common.persistence.liveaction import LiveAction
3030
from st2common.services import executions
31+
from st2common.services import coordination
3132
from st2common.util import date as date_utils
3233
from st2common.bootstrap import runnersregistrar as runners_registrar
3334
from local_runner.local_shell_command_runner import LocalShellCommandRunner
@@ -164,3 +165,196 @@ def test_worker_shutdown(self):
164165
# _run_action but will not result in KeyError because the discard method is used to
165166
# to remove the liveaction from _running_liveactions.
166167
runner_thread.wait()
168+
169+
@mock.patch.object(
170+
coordination.NoOpDriver,
171+
"get_members",
172+
mock.MagicMock(return_value=coordination.NoOpAsyncResult("member-1")),
173+
)
174+
def test_worker_graceful_shutdown_with_multiple_runners(self):
175+
cfg.CONF.set_override(
176+
name="graceful_shutdown", override=True, group="actionrunner"
177+
)
178+
action_worker = actions_worker.get_worker()
179+
temp_file = None
180+
181+
# Create a temporary file that is deleted when the file is closed and then set up an
182+
# action to wait for this file to be deleted. This allows this test to run the action
183+
# over a separate thread, run the shutdown sequence on the main thread, and then let
184+
# the local runner to exit gracefully and allow _run_action to finish execution.
185+
with tempfile.NamedTemporaryFile() as fp:
186+
temp_file = fp.name
187+
self.assertIsNotNone(temp_file)
188+
self.assertTrue(os.path.isfile(temp_file))
189+
190+
# Launch the action execution in a separate thread.
191+
params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file}
192+
liveaction_db = self._get_liveaction_model(
193+
WorkerTestCase.local_action_db, params
194+
)
195+
liveaction_db = LiveAction.add_or_update(liveaction_db)
196+
executions.create_execution_object(liveaction_db)
197+
runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db)
198+
199+
# Wait for the worker up to 10s to add the liveaction to _running_liveactions.
200+
for i in range(0, int(10 / 0.1)):
201+
eventlet.sleep(0.1)
202+
if len(action_worker._running_liveactions) > 0:
203+
break
204+
205+
self.assertEqual(len(action_worker._running_liveactions), 1)
206+
207+
# Shutdown the worker to trigger the abandon process.
208+
shutdown_thread = eventlet.spawn(action_worker.shutdown)
209+
210+
# Make sure the temporary file has been deleted.
211+
self.assertFalse(os.path.isfile(temp_file))
212+
213+
# Wait for the worker up to 10s to remove the liveaction from _running_liveactions.
214+
for i in range(0, int(10 / 0.1)):
215+
eventlet.sleep(0.1)
216+
if len(action_worker._running_liveactions) < 1:
217+
break
218+
liveaction_db = LiveAction.get_by_id(liveaction_db.id)
219+
220+
# Verify that _running_liveactions is empty and the liveaction is succeeded.
221+
self.assertEqual(len(action_worker._running_liveactions), 0)
222+
self.assertEqual(
223+
liveaction_db.status,
224+
action_constants.LIVEACTION_STATUS_SUCCEEDED,
225+
str(liveaction_db),
226+
)
227+
228+
# Wait for the local runner to complete. This will activate the finally block in
229+
# _run_action but will not result in KeyError because the discard method is used to
230+
# to remove the liveaction from _running_liveactions.
231+
runner_thread.wait()
232+
shutdown_thread.kill()
233+
234+
def test_worker_graceful_shutdown_with_single_runner(self):
235+
cfg.CONF.set_override(
236+
name="graceful_shutdown", override=True, group="actionrunner"
237+
)
238+
action_worker = actions_worker.get_worker()
239+
temp_file = None
240+
241+
# Create a temporary file that is deleted when the file is closed and then set up an
242+
# action to wait for this file to be deleted. This allows this test to run the action
243+
# over a separate thread, run the shutdown sequence on the main thread, and then let
244+
# the local runner to exit gracefully and allow _run_action to finish execution.
245+
with tempfile.NamedTemporaryFile() as fp:
246+
temp_file = fp.name
247+
self.assertIsNotNone(temp_file)
248+
self.assertTrue(os.path.isfile(temp_file))
249+
250+
# Launch the action execution in a separate thread.
251+
params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file}
252+
liveaction_db = self._get_liveaction_model(
253+
WorkerTestCase.local_action_db, params
254+
)
255+
liveaction_db = LiveAction.add_or_update(liveaction_db)
256+
executions.create_execution_object(liveaction_db)
257+
runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db)
258+
259+
# Wait for the worker up to 10s to add the liveaction to _running_liveactions.
260+
for i in range(0, int(10 / 0.1)):
261+
eventlet.sleep(0.1)
262+
if len(action_worker._running_liveactions) > 0:
263+
break
264+
265+
self.assertEqual(len(action_worker._running_liveactions), 1)
266+
267+
# Shutdown the worker to trigger the abandon process.
268+
shutdown_thread = eventlet.spawn(action_worker.shutdown)
269+
270+
# Make sure the temporary file has been deleted.
271+
self.assertFalse(os.path.isfile(temp_file))
272+
273+
# Wait for the worker up to 10s to remove the liveaction from _running_liveactions.
274+
for i in range(0, int(10 / 0.1)):
275+
eventlet.sleep(0.1)
276+
if len(action_worker._running_liveactions) < 1:
277+
break
278+
liveaction_db = LiveAction.get_by_id(liveaction_db.id)
279+
280+
# Verify that _running_liveactions is empty and the liveaction is abandoned.
281+
self.assertEqual(len(action_worker._running_liveactions), 0)
282+
self.assertEqual(
283+
liveaction_db.status,
284+
action_constants.LIVEACTION_STATUS_ABANDONED,
285+
str(liveaction_db),
286+
)
287+
288+
# Wait for the local runner to complete. This will activate the finally block in
289+
# _run_action but will not result in KeyError because the discard method is used to
290+
# to remove the liveaction from _running_liveactions.
291+
runner_thread.wait()
292+
shutdown_thread.kill()
293+
294+
@mock.patch.object(
295+
coordination.NoOpDriver,
296+
"get_members",
297+
mock.MagicMock(return_value=coordination.NoOpAsyncResult("member-1")),
298+
)
299+
def test_worker_graceful_shutdown_exit_timeout(self):
300+
cfg.CONF.set_override(
301+
name="graceful_shutdown", override=True, group="actionrunner"
302+
)
303+
cfg.CONF.set_override(name="exit_timeout", override=5, group="actionrunner")
304+
action_worker = actions_worker.get_worker()
305+
temp_file = None
306+
307+
# Create a temporary file that is deleted when the file is closed and then set up an
308+
# action to wait for this file to be deleted. This allows this test to run the action
309+
# over a separate thread, run the shutdown sequence on the main thread, and then let
310+
# the local runner to exit gracefully and allow _run_action to finish execution.
311+
with tempfile.NamedTemporaryFile() as fp:
312+
temp_file = fp.name
313+
self.assertIsNotNone(temp_file)
314+
self.assertTrue(os.path.isfile(temp_file))
315+
316+
# Launch the action execution in a separate thread.
317+
params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file}
318+
liveaction_db = self._get_liveaction_model(
319+
WorkerTestCase.local_action_db, params
320+
)
321+
liveaction_db = LiveAction.add_or_update(liveaction_db)
322+
executions.create_execution_object(liveaction_db)
323+
runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db)
324+
325+
# Wait for the worker up to 10s to add the liveaction to _running_liveactions.
326+
for i in range(0, int(10 / 0.1)):
327+
eventlet.sleep(0.1)
328+
if len(action_worker._running_liveactions) > 0:
329+
break
330+
331+
self.assertEqual(len(action_worker._running_liveactions), 1)
332+
333+
# Shutdown the worker to trigger the abandon process.
334+
shutdown_thread = eventlet.spawn(action_worker.shutdown)
335+
# Continue the excution for 5+ seconds to ensure timeout occurs.
336+
eventlet.sleep(6)
337+
338+
# Make sure the temporary file has been deleted.
339+
self.assertFalse(os.path.isfile(temp_file))
340+
341+
# Wait for the worker up to 10s to remove the liveaction from _running_liveactions.
342+
for i in range(0, int(10 / 0.1)):
343+
eventlet.sleep(0.1)
344+
if len(action_worker._running_liveactions) < 1:
345+
break
346+
liveaction_db = LiveAction.get_by_id(liveaction_db.id)
347+
348+
# Verify that _running_liveactions is empty and the liveaction is abandoned.
349+
self.assertEqual(len(action_worker._running_liveactions), 0)
350+
self.assertEqual(
351+
liveaction_db.status,
352+
action_constants.LIVEACTION_STATUS_ABANDONED,
353+
str(liveaction_db),
354+
)
355+
356+
# Wait for the local runner to complete. This will activate the finally block in
357+
# _run_action but will not result in KeyError because the discard method is used to
358+
# to remove the liveaction from _running_liveactions.
359+
runner_thread.wait()
360+
shutdown_thread.kill()

st2common/st2common/config.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,28 @@ def register_opts(ignore_errors=False):
500500
dispatcher_pool_opts, group="actionrunner", ignore_errors=ignore_errors
501501
)
502502

503+
graceful_shutdown_opts = [
504+
cfg.BoolOpt(
505+
"graceful_shutdown",
506+
default=False,
507+
help="This will enable the graceful shutdown and wait for ongoing requests to complete until exit_timeout.",
508+
),
509+
cfg.IntOpt(
510+
"exit_timeout",
511+
default=300,
512+
help="How long to wait for process (in seconds) to exit after receiving shutdown signal.",
513+
),
514+
cfg.IntOpt(
515+
"sleep_delay",
516+
default=2,
517+
help="Time interval between subsequent queries to check running executions.",
518+
),
519+
]
520+
521+
do_register_opts(
522+
graceful_shutdown_opts, group="actionrunner", ignore_errors=ignore_errors
523+
)
524+
503525
ssh_runner_opts = [
504526
cfg.StrOpt(
505527
"remote_dir",

0 commit comments

Comments
 (0)