Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .devcontainer/docker-compose.override.yml
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
version: "3.8"
4 changes: 1 addition & 3 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3.8"

services:
dev:
build:
Expand All @@ -16,7 +14,7 @@ services:
- db-tenant:/databases/db-tenant/:ro
environment:
- DATABASE_URL=mariadb://root@database-root:3306/development
- DATABASE_URL_TENANT_ONE=mariadb://root@database-tenant:3306/test_econ
- DATABASE_URL_TENANT_ONE=mariadb://root@database-tenant:3306/dev_econ
- DATABASE_SKIP_DEFAULT_POOL_CREATION=true
- DATABASE_CHOOSE_POOL=root
- OPAMSOLVERTIMEOUT=180
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ jobs:
permissions:
packages: read
env:
EMAIL_RATE_LIMIT: 3600
MATCHER_MAX_CAPACITY: 80
MYSQL_DATABASE: test_econ
MYSQL_ROOT_PASSWORD: password
services:
Expand Down
12 changes: 6 additions & 6 deletions pool.opam.locked
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ depends: [
"http" {= "6.2.1"}
"httpaf" {= "0.7.1"}
"httpaf-lwt-unix" {= "0.7.1"}
"hxd" {= "0.3.6"}
"hxd" {= "0.4.0"}
"integers" {= "0.7.0"}
"ipaddr" {= "5.6.1"}
"ipaddr-sexp" {= "5.6.1"}
"ipaddr" {= "5.6.2"}
"ipaddr-sexp" {= "5.6.2"}
"jane-street-headers" {= "v0.17.0" & with-test}
"jst-config" {= "v0.17.0" & with-test}
"jwto" {= "0.4.0"}
Expand All @@ -98,7 +98,7 @@ depends: [
"lwt-dllist" {= "1.1.0"}
"lwt_ppx" {= "5.9.1"}
"lwt_ssl" {= "1.2.0"}
"macaddr" {= "5.6.1"}
"macaddr" {= "5.6.2"}
"magic-mime" {= "1.3.1"}
"mariadb" {= "1.3.0"}
"markup" {= "1.0.3"}
Expand Down Expand Up @@ -183,8 +183,8 @@ depends: [
"stdune" {= "3.21.1"}
"stringext" {= "1.6.0"}
"time_now" {= "v0.17.0" & with-test}
"tls" {= "2.0.3"}
"tls-lwt" {= "2.0.3"}
"tls" {= "2.0.4"}
"tls-lwt" {= "2.0.4"}
"top-closure" {= "3.21.1"}
"topkg" {= "1.1.1"}
"tsort" {= "2.2.0"}
Expand Down
1 change: 1 addition & 0 deletions pool/app/contact/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(name contact)
(libraries
changelog
email
i18n
guard
pool_common
Expand Down
2 changes: 2 additions & 0 deletions pool/app/contact/repo/repo.ml
Original file line number Diff line number Diff line change
Expand Up @@ -650,3 +650,5 @@ module InactivityNotification = struct
Database.exec pool request (Entity.id contact)
;;
end

let increment_smtp_bounce = Email.Contact.increment_smtp_bounce
5 changes: 5 additions & 0 deletions pool/app/email/email.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ let find_active_token pool address =

module Service = Email_service
module Guard = Entity_guard

module Contact = struct
let increment_smtp_bounce = Repo_sql.Contact.increment_smtp_bounce
let reset_smtp_bounce = Repo_sql.Contact.reset_smtp_bounce
end
67 changes: 61 additions & 6 deletions pool/app/email/email.mli
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,27 @@ module SmtpAuth : sig
include Pool_model.Base.BooleanSig
end

module SystemAccount : sig
include Pool_model.Base.BooleanSig
end

module InternalRegex : sig
include Pool_model.Base.StringSig
end

module RateLimit : sig
include Pool_model.Base.IntegerSig

val default : t
val timediff_seconds : int
end

module InvitationCapacity : sig
include Pool_model.Base.IntegerSig

val default : t
end

type t =
{ id : Id.t
; label : Label.t
Expand All @@ -132,10 +153,24 @@ module SmtpAuth : sig
; mechanism : Mechanism.t
; protocol : Protocol.t
; default : Default.t
; system_account : SystemAccount.t
; internal_regex : InternalRegex.t option
; rate_limit : RateLimit.t
; invitation_capacity : InvitationCapacity.t
}

val id : t -> Id.t
val label : t -> Label.t
val server : t -> Server.t
val port : t -> Port.t
val username : t -> Username.t option
val mechanism : t -> Mechanism.t
val protocol : t -> Protocol.t
val default : t -> Default.t
val system_account : t -> SystemAccount.t
val internal_regex : t -> InternalRegex.t option
val rate_limit : t -> RateLimit.t
val invitation_capacity : t -> InvitationCapacity.t

type smtp = t

Expand All @@ -158,12 +193,20 @@ module SmtpAuth : sig
; mechanism : Mechanism.t
; protocol : Protocol.t
; default : Default.t
; system_account : SystemAccount.t
; internal_regex : InternalRegex.t option
; rate_limit : RateLimit.t
; invitation_capacity : InvitationCapacity.t
}

val to_entity : t -> smtp

val create
: ?id:Id.t
-> ?rate_limit:RateLimit.t
-> ?invitation_capacity:InvitationCapacity.t
-> ?system_account:SystemAccount.t
-> ?internal_regex:InternalRegex.t
-> Label.t
-> Server.t
-> Port.t
Expand All @@ -181,14 +224,17 @@ module SmtpAuth : sig
val find_default : Database.Label.t -> (t, Pool_message.Error.t) Lwt_result.t
val find_default_opt : Database.Label.t -> t option Lwt.t
val find_all : Database.Label.t -> t list Lwt.t
val find_for_experiment : Database.Label.t -> t list Lwt.t
val find_by : Query.t -> Database.Label.t -> (t list * Query.t) Lwt.t
val defalut_is_set : Database.Label.t -> bool Lwt.t
val count_invitations_sent_since : Database.Label.t -> Id.t option -> int -> int Lwt.t
val column_label : Query.Column.t
val column_smtp_server : Query.Column.t
val column_smtp_username : Query.Column.t
val column_smtp_mechanism : Query.Column.t
val column_smtp_protocol : Query.Column.t
val column_smtp_default_account : Query.Column.t
val column_smtp_system_account : Query.Column.t
val filterable_by : Query.Filter.human option
val default_query : Query.t
val searchable_by : Query.Column.t list
Expand Down Expand Up @@ -247,15 +293,19 @@ module Service : sig
: ?id:Pool_queue.Id.t
-> ?new_email_address:Pool_user.EmailAddress.t
-> ?new_smtp_auth_id:Pool_common.Id.t
-> ?message_template:string
-> ?message_template:Pool_common.MessageTemplateLabel.t
-> ?job_ctx:Pool_queue.job_ctx
-> Database.Label.t
-> Job.t
-> unit Lwt.t

val dispatch_all
: Database.Label.t
-> (Pool_queue.Id.t * Job.t * string option * Pool_queue.job_ctx option) list
-> (Pool_queue.Id.t
* Job.t
* Pool_common.MessageTemplateLabel.t option
* Pool_queue.job_ctx option)
list
-> unit Lwt.t

val lifecycle : Sihl.Container.lifecycle
Expand All @@ -280,6 +330,11 @@ module Guard : sig
end
end

module Contact : sig
val increment_smtp_bounce : Database.Label.t -> Pool_user.EmailAddress.t -> unit Lwt.t
val reset_smtp_bounce : Database.Label.t -> Pool_user.EmailAddress.t -> unit Lwt.t
end

type verification_event =
| Created of Pool_user.EmailAddress.t * Pool_token.t * Pool_user.Id.t
| EmailVerified of unverified t
Expand All @@ -291,7 +346,7 @@ val pp_verification_event : Format.formatter -> verification_event -> unit
type dispatch =
{ job : Service.Job.t
; id : Pool_queue.Id.t option
; message_template : string option
; message_template : Pool_common.MessageTemplateLabel.t option
; job_ctx : Pool_queue.job_ctx option
}

Expand All @@ -300,12 +355,12 @@ val pp_dispatch : Format.formatter -> dispatch -> unit
val yojson_of_dispatch : dispatch -> Yojson.Safe.t
val job : dispatch -> Service.Job.t
val id : dispatch -> Pool_queue.Id.t option
val message_template : dispatch -> string option
val message_template : dispatch -> Pool_common.MessageTemplateLabel.t option
val job_ctx : dispatch -> Pool_queue.job_ctx option

val create_dispatch
: ?id:Pool_queue.Id.t
-> ?message_template:string
-> ?message_template:Pool_common.MessageTemplateLabel.t
-> ?job_ctx:Pool_queue.job_ctx
-> Service.Job.t
-> dispatch
Expand All @@ -326,7 +381,7 @@ val verification_event_name : verification_event -> string

val create_sent
: ?id:Pool_queue.Id.t
-> ?message_template:string
-> ?message_template:Pool_common.MessageTemplateLabel.t
-> ?job_ctx:Pool_queue.job_ctx
-> ?new_email_address:Pool_user.EmailAddress.t
-> ?new_smtp_auth_id:SmtpAuth.Id.t
Expand Down
126 changes: 101 additions & 25 deletions pool/app/email/email_service.ml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,21 @@ let register () =
module Job = struct
include Entity.Job

let is_system_email_template
(database_label : Database.Label.t)
(template : Pool_common.MessageTemplateLabel.t)
: bool Lwt.t
=
let open Utils.Lwt_result.Infix in
Settings.find_system_email_templates database_label
||> (CCList.mem ~eq:Pool_common.MessageTemplateLabel.equal) template
;;

let resolve_system_smtp_auth_id database_label =
let%lwt auth = Repo_sql.Smtp.find_full_system_or_default_opt database_label in
auth |> CCOption.map SmtpAuth.Write.(fun { id; _ } -> id) |> Lwt.return
;;

let encode = yojson_of_t %> Yojson.Safe.to_string

let decode str =
Expand All @@ -361,21 +376,54 @@ module Job = struct
;;

let send =
let open CCFun in
let open Utils.Lwt_result.Infix in
let open Pool_queue in
let handle ?id:_ label ({ email; smtp_auth_id; _ } : t) =
let increment_smtp_bounce = Repo_sql.Contact.increment_smtp_bounce in
let reset_smtp_bounce = Repo_sql.Contact.reset_smtp_bounce in
let is_recipient_not_found msg =
(*
Error Documentation:
* https://learn.microsoft.com/en-us/troubleshoot/exchange/email-delivery/ndr/recipientnotfound-ndr
*)
CCString.find ~sub:"550" msg >= 0
&& CCString.find ~sub:"RESOLVER.ADR.RecipientNotFound" msg >= 0
in
let handle ?id:_ label (job : t) =
let email_data = email job in
let smtp_auth_id = smtp_auth_id job in
Lwt.catch
(fun () -> send ?smtp_auth_id label email ||> CCResult.return)
(Printexc.to_string %> Pool_message.Error.nothandled %> Lwt.return_error)
(fun () ->
let%lwt result =
Smtp.send ?smtp_auth_id label email_data ||> CCResult.return
in
let%lwt () =
reset_smtp_bounce
label
(Pool_user.EmailAddress.of_string email_data.Sihl_email.recipient)
in
Lwt.return result)
(fun exn ->
match Printexc.to_string exn with
| msg when is_recipient_not_found msg ->
let recipient = email_data.Sihl_email.recipient in
Logs.info ~src (fun m ->
m
~tags:(Database.Logger.Tags.create label)
"SMTP 550 bounce for %s — incrementing smtp_bounces_count"
recipient);
let%lwt () =
increment_smtp_bounce label (Pool_user.EmailAddress.of_string recipient)
in
Lwt.return_error (Pool_message.Error.SmtpRecipientNotFound recipient)
| msg -> Lwt.return_error (Pool_message.Error.nothandled msg))
in
Job.create
~max_tries:10
~retry_delay:(Sihl.Time.Span.hours 1)
handle
encode
decode
JobName.SendEmail
Pool_queue.(
Job.create
~max_tries:10
~retry_delay:(Sihl.Time.Span.hours 1)
handle
encode
decode
JobName.SendEmail)
;;
end

Expand All @@ -390,7 +438,16 @@ let dispatch
=
let tags = Database.Logger.Tags.create database_label in
Logs.debug ~src (fun m -> m ~tags "Dispatch email to %s" email.Sihl_email.recipient);
let job = job |> Job.update ?new_email_address ?new_smtp_auth_id in
let%lwt resolved =
match new_smtp_auth_id, Job.smtp_auth_id job, message_template with
| Some id, _, _ -> Lwt.return_some id
| None, Some _, _ | None, None, None -> Lwt.return_none
| None, None, Some template ->
(match%lwt Job.is_system_email_template database_label template with
| true -> Job.resolve_system_smtp_auth_id database_label
| false -> Lwt.return_none)
in
let job = job |> Job.update ?new_email_address ?new_smtp_auth_id:resolved in
Pool_queue.dispatch
?id
?message_template
Expand All @@ -401,18 +458,37 @@ let dispatch
;;

let dispatch_all database_label jobs =
let recipients, jobs =
jobs
|> CCList.fold_left
(fun (recipients, jobs)
(id, ({ Entity.Job.email; _ } as job), message_template, mappings) ->
( email.Sihl_email.recipient :: recipients
, ( id
, job |> Job.intercept_prepare_of_event
, message_template
, CCOption.get_or ~default:(Pool_queue.job_ctx_create []) mappings )
:: jobs ))
([], [])
let system_template_cache = Hashtbl.create 4 in
let resolved_system_smtp_auth_id = ref None in
let%lwt recipients, jobs =
Lwt_list.fold_left_s
(fun (recipients, jobs)
(id, ({ Entity.Job.email; _ } as job), message_template, mappings) ->
let%lwt resolved_new_smtp_auth_id =
match Job.smtp_auth_id job, message_template with
| Some _, _ -> Lwt.return_none
| None, Some template ->
let%lwt is_system_template =
Utils.Lwt_cache.cached system_template_cache template (fun () ->
Job.is_system_email_template database_label template)
in
(match is_system_template with
| true ->
Utils.Lwt_cache.once resolved_system_smtp_auth_id (fun () ->
Job.resolve_system_smtp_auth_id database_label)
| false -> Lwt.return_none)
| None, _ -> Lwt.return_none
in
let job = job |> Job.update ?new_smtp_auth_id:resolved_new_smtp_auth_id in
Lwt.return
( email.Sihl_email.recipient :: recipients
, ( id
, job |> Job.intercept_prepare_of_event
, message_template
, CCOption.get_or ~default:(Pool_queue.job_ctx_create []) mappings )
:: jobs ))
([], [])
jobs
in
Logs.debug ~src (fun m ->
m
Expand Down
Loading
Loading