Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Added

For backward compatibility reasons, if pack metadata file doesn't contain that attribute, it's
assumed it only works with Python 2. (new feature) #4474
* Add metrics instrumentation to the ``st2notifier`` service. For the available / exposed metrics,
please refer to https://docs.stackstorm.com/reference/metrics.html. (improvement) #4536

Changed
~~~~~~~
Expand Down
40 changes: 28 additions & 12 deletions st2actions/st2actions/notifier/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

from __future__ import absolute_import

from datetime import datetime
import json

Expand Down Expand Up @@ -45,6 +46,8 @@
from st2common.constants.keyvalue import FULL_SYSTEM_SCOPE, SYSTEM_SCOPE, DATASTORE_PARENT_SCOPE
from st2common.services.keyvalues import KeyValueLookup
from st2common.transport.queues import NOTIFIER_ACTIONUPDATE_WORK_QUEUE
from st2common.metrics.base import CounterWithTimer
from st2common.metrics.base import Timer

__all__ = [
'Notifier',
Expand Down Expand Up @@ -73,6 +76,7 @@ def __init__(self, connection, queues, trigger_dispatcher=None):
pack=ACTION_TRIGGER_TYPE['pack'],
name=ACTION_TRIGGER_TYPE['name'])

@CounterWithTimer(key='notifier.action.executions')
def process(self, execution_db):
execution_id = str(execution_db.id)
extra = {'execution': execution_db}
Expand All @@ -86,12 +90,18 @@ def process(self, execution_db):
# action execution will be applied by the workflow engine. A policy may affect the
# final state of the action execution thereby impacting the state of the workflow.
if not workflow_service.is_action_execution_under_workflow_context(execution_db):
policy_service.apply_post_run_policies(liveaction_db)
with CounterWithTimer(key='notifier.apply_post_run_policies'):
policy_service.apply_post_run_policies(liveaction_db)

if liveaction_db.notify is not None:
self._post_notify_triggers(liveaction_db=liveaction_db, execution_db=execution_db)
if liveaction_db.notify:
with CounterWithTimer(key='notifier.notify_trigger.post'):
self._post_notify_triggers(liveaction_db=liveaction_db,
execution_db=execution_db)

self._post_generic_trigger(liveaction_db=liveaction_db, execution_db=execution_db)
if cfg.CONF.action_sensor.enable:
with CounterWithTimer(key='notifier.generic_trigger.post'):
self._post_generic_trigger(liveaction_db=liveaction_db,
execution_db=execution_db)

def _get_execution_for_liveaction(self, liveaction):
execution = ActionExecution.get(liveaction__id=str(liveaction.id))
Expand Down Expand Up @@ -127,7 +137,7 @@ def _post_notify_subsection_triggers(self, liveaction_db=None, execution_db=None
notify_subsection=None,
default_message_suffix=None):
routes = (getattr(notify_subsection, 'routes') or
getattr(notify_subsection, 'channels', None))
getattr(notify_subsection, 'channels', [])) or []

execution_id = str(execution_db.id)

Expand All @@ -142,13 +152,15 @@ def _post_notify_subsection_triggers(self, liveaction_db=None, execution_db=None
)

try:
message = self._transform_message(message=message,
context=jinja_context)
with Timer(key='notifier.transform_message'):
message = self._transform_message(message=message,
context=jinja_context)
except:
LOG.exception('Failed (Jinja) transforming `message`.')

try:
data = self._transform_data(data=data, context=jinja_context)
with Timer(key='notifier.transform_data'):
data = self._transform_data(data=data, context=jinja_context)
except:
LOG.exception('Failed (Jinja) transforming `data`.')

Expand Down Expand Up @@ -187,8 +199,10 @@ def _post_notify_subsection_triggers(self, liveaction_db=None, execution_db=None
payload['channel'] = route
LOG.debug('POSTing %s for %s. Payload - %s.', NOTIFY_TRIGGER_TYPE['name'],
liveaction_db.id, payload)
self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload,
trace_context=trace_context)

with CounterWithTimer(key='notifier.notify_trigger.dispatch'):
self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload,
trace_context=trace_context)
except:
failed_routes.append(route)

Expand Down Expand Up @@ -254,8 +268,10 @@ def _post_generic_trigger(self, liveaction_db=None, execution_db=None):
trace_context = self._get_trace_context(execution_id=execution_id)
LOG.debug('POSTing %s for %s. Payload - %s. TraceContext - %s',
ACTION_TRIGGER_TYPE['name'], liveaction_db.id, payload, trace_context)
self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload,
trace_context=trace_context)

with CounterWithTimer(key='notifier.generic_trigger.dispatch'):
self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload,
trace_context=trace_context)

def _get_runner_ref(self, action_ref):
"""
Expand Down