Mattermost follows (mostly) the slack notification scheme, but it does not work with the blocks api.
I thus hacked a simple custom class. Not sure if this is useful for someone else?
In addition, I had to mod the util.py file in the validation_operators folder to make it work with our server (set verify=False):
util.py (partial):
def send_slack_notification(query, slack_webhook):
session = requests.Session()
try:
response = session.post(url=slack_webhook, json=query, verify=False)
except requests.ConnectionError:
the poor-mans mattermost class:
mattermost_renderer.py
import datetime
from .renderer import Renderer
class MattermostRenderer(Renderer):
def __init__(self):
pass
def render(self, validation_json=None):
timestamp = datetime.datetime.strftime(datetime.datetime.now(), "%x %X")
default_text = "No validation occurred. Please ensure you passed a validation_json."
status = "Failed :x:"
query = {
"text": default_text
}
# TODO improve this nested logic
expectation_suite_name = None
data_asset_name = None
if validation_json:
if "meta" in validation_json:
data_asset_name = validation_json["meta"].get(
"data_asset_name",
"no_name_provided_" + datetime.datetime.utcnow().isoformat().replace(":", "") + "Z"
)
expectation_suite_name = validation_json["meta"].get("expectation_suite_name", "default")
n_checks_succeeded = validation_json["statistics"]["successful_expectations"]
n_checks = validation_json["statistics"]["evaluated_expectations"]
run_id = validation_json["meta"].get("run_id", None)
check_details_text = "*{}* of *{}* expectations were met".format(
n_checks_succeeded, n_checks)
if validation_json["success"]:
status = "Success :tada:"
summary_text = """*Batch Validation Status*: {}
*Data Asset:* `{}`
*Expectation suite name*: `{}`
*Run ID*: `{}`
*Timestamp*: `{}`
*Summary*: {}""".format(
status,
data_asset_name,
expectation_suite_name,
run_id,
timestamp,
check_details_text
)
if "result_reference" in validation_json["meta"]:
query["text"] += "---\n" + "- *Validation Report*: {}".format(validation_json["meta"]["result_reference"])
if "dataset_reference" in validation_json["meta"]:
query["text"] += "- *Validation data asset*: {}".format(validation_json["meta"]["dataset_reference"])
return query
Mattermost follows (mostly) the slack notification scheme, but it does not work with the blocks api.
I thus hacked a simple custom class. Not sure if this is useful for someone else?
In addition, I had to mod the
util.pyfile in thevalidation_operatorsfolder to make it work with our server (setverify=False):util.py (partial):
the poor-mans mattermost class:
mattermost_renderer.py