@@ -18,6 +18,7 @@ package client
1818
1919import  (
2020	"bytes" 
21+ 	"context" 
2122	"encoding/json" 
2223	"errors" 
2324	"fmt" 
@@ -38,6 +39,7 @@ import (
3839type  EventClientConfig  struct  {
3940	DestinationURL      string              `env:"EVENT_URL" envDefault:"http://localhost:3000/notify" description:"Notifier service url"` 
4041	NotificationMedium  NotificationMedium  `env:"NOTIFICATION_MEDIUM" envDefault:"rest" description:"notification medium"` 
42+ 	EnableNotifierV2    bool                `env:"ENABLE_NOTIFIER_V2" envDefault:"false" description:"enable notifier v2"` 
4143}
4244type  NotificationMedium  string 
4345
@@ -58,24 +60,25 @@ type EventClient interface {
5860}
5961
6062type  Event  struct  {
61- 	EventTypeId         int                `json:"eventTypeId"` 
62- 	EventName           string             `json:"eventName"` 
63- 	PipelineId          int                `json:"pipelineId"` 
64- 	PipelineType        string             `json:"pipelineType"` 
65- 	CorrelationId       string             `json:"correlationId"` 
66- 	Payload             * Payload           `json:"payload"` 
67- 	EventTime           string             `json:"eventTime"` 
68- 	TeamId              int                `json:"teamId"` 
69- 	AppId               int                `json:"appId"` 
70- 	EnvId               int                `json:"envId"` 
71- 	IsProdEnv           bool               `json:"isProdEnv"` 
72- 	ClusterId           int                `json:"clusterId"` 
73- 	CdWorkflowType      bean.WorkflowType  `json:"cdWorkflowType,omitempty"` 
74- 	CdWorkflowRunnerId  int                `json:"cdWorkflowRunnerId"` 
75- 	CiWorkflowRunnerId  int                `json:"ciWorkflowRunnerId"` 
76- 	CiArtifactId        int                `json:"ciArtifactId"` 
77- 	BaseUrl             string             `json:"baseUrl"` 
78- 	UserId              int                `json:"-"` 
63+ 	EventTypeId          int                `json:"eventTypeId"` 
64+ 	EventName            string             `json:"eventName"` 
65+ 	PipelineId           int                `json:"pipelineId"` 
66+ 	PipelineType         string             `json:"pipelineType"` 
67+ 	CorrelationId        string             `json:"correlationId"` 
68+ 	Payload              * Payload           `json:"payload"` 
69+ 	EventTime            string             `json:"eventTime"` 
70+ 	TeamId               int                `json:"teamId"` 
71+ 	AppId                int                `json:"appId"` 
72+ 	EnvId                int                `json:"envId"` 
73+ 	IsProdEnv            bool               `json:"isProdEnv"` 
74+ 	ClusterId            int                `json:"clusterId"` 
75+ 	CdWorkflowType       bean.WorkflowType  `json:"cdWorkflowType,omitempty"` 
76+ 	CdWorkflowRunnerId   int                `json:"cdWorkflowRunnerId"` 
77+ 	CiWorkflowRunnerId   int                `json:"ciWorkflowRunnerId"` 
78+ 	CiArtifactId         int                `json:"ciArtifactId"` 
79+ 	EnvIdsForCiPipeline  []int              `json:"envIdsForCiPipeline"` 
80+ 	BaseUrl              string             `json:"baseUrl"` 
81+ 	UserId               int                `json:"-"` 
7982}
8083
8184type  Payload  struct  {
@@ -95,22 +98,25 @@ type Payload struct {
9598}
9699
97100type  EventRESTClientImpl  struct  {
98- 	logger                * zap.SugaredLogger 
99- 	client                * http.Client 
100- 	config                * EventClientConfig 
101- 	pubsubClient          * pubsub.PubSubClientServiceImpl 
102- 	ciPipelineRepository  pipelineConfig.CiPipelineRepository 
103- 	pipelineRepository    pipelineConfig.PipelineRepository 
104- 	attributesRepository  repository.AttributesRepository 
105- 	moduleService         module.ModuleService 
101+ 	logger                          * zap.SugaredLogger 
102+ 	client                          * http.Client 
103+ 	config                          * EventClientConfig 
104+ 	pubsubClient                    * pubsub.PubSubClientServiceImpl 
105+ 	ciPipelineRepository            pipelineConfig.CiPipelineRepository 
106+ 	pipelineRepository              pipelineConfig.PipelineRepository 
107+ 	attributesRepository            repository.AttributesRepository 
108+ 	moduleService                   module.ModuleService 
109+ 	notificationSettingsRepository  repository.NotificationSettingsRepository 
106110}
107111
108112func  NewEventRESTClientImpl (logger  * zap.SugaredLogger , client  * http.Client , config  * EventClientConfig , pubsubClient  * pubsub.PubSubClientServiceImpl ,
109113	ciPipelineRepository  pipelineConfig.CiPipelineRepository , pipelineRepository  pipelineConfig.PipelineRepository ,
110- 	attributesRepository  repository.AttributesRepository , moduleService  module.ModuleService ) * EventRESTClientImpl  {
114+ 	attributesRepository  repository.AttributesRepository , moduleService  module.ModuleService ,
115+ 	notificationSettingsRepository  repository.NotificationSettingsRepository ) * EventRESTClientImpl  {
111116	return  & EventRESTClientImpl {logger : logger , client : client , config : config , pubsubClient : pubsubClient ,
112117		ciPipelineRepository : ciPipelineRepository , pipelineRepository : pipelineRepository ,
113- 		attributesRepository : attributesRepository , moduleService : moduleService }
118+ 		attributesRepository : attributesRepository , moduleService : moduleService ,
119+ 		notificationSettingsRepository : notificationSettingsRepository }
114120}
115121
116122func  (impl  * EventRESTClientImpl ) buildFinalPayload (event  Event , cdPipeline  * pipelineConfig.Pipeline , ciPipeline  * pipelineConfig.CiPipeline ) * Payload  {
@@ -235,34 +241,131 @@ func (impl *EventRESTClientImpl) sendEventsOnNats(body []byte) error {
235241// do not call this method if notification module is not installed 
236242func  (impl  * EventRESTClientImpl ) sendEvent (event  Event ) (bool , error ) {
237243	impl .logger .Debugw ("event before send" , "event" , event )
238- 	body , err  :=  json .Marshal (event )
244+ 
245+ 	// Step 1: Create payload and destination URL based on config 
246+ 	bodyBytes , destinationUrl , err  :=  impl .createPayloadAndDestination (event )
239247	if  err  !=  nil  {
240- 		impl .logger .Errorw ("error while marshaling event request " , "err" , err )
241248		return  false , err 
242249	}
250+ 
251+ 	// Step 2: Send via appropriate medium (NATS or REST) 
252+ 	return  impl .deliverEvent (bodyBytes , destinationUrl )
253+ }
254+ 
255+ func  (impl  * EventRESTClientImpl ) createPayloadAndDestination (event  Event ) ([]byte , string , error ) {
256+ 	if  impl .config .EnableNotifierV2  {
257+ 		return  impl .createV2PayloadAndDestination (event )
258+ 	}
259+ 	return  impl .createDefaultPayloadAndDestination (event )
260+ }
261+ 
262+ func  (impl  * EventRESTClientImpl ) createV2PayloadAndDestination (event  Event ) ([]byte , string , error ) {
263+ 	destinationUrl  :=  impl .config .DestinationURL  +  "/v2" 
264+ 
265+ 	// Fetch notification settings 
266+ 	req  :=  repository.GetRulesRequest {
267+ 		TeamId :              event .TeamId ,
268+ 		EnvId :               event .EnvId ,
269+ 		AppId :               event .AppId ,
270+ 		PipelineId :          event .PipelineId ,
271+ 		PipelineType :        event .PipelineType ,
272+ 		IsProdEnv :           & event .IsProdEnv ,
273+ 		ClusterId :           event .ClusterId ,
274+ 		EnvIdsForCiPipeline : event .EnvIdsForCiPipeline ,
275+ 	}
276+ 	notificationSettings , err  :=  impl .notificationSettingsRepository .FindNotificationSettingsWithRules (
277+ 		context .Background (), event .EventTypeId , req ,
278+ 	)
279+ 	if  err  !=  nil  {
280+ 		impl .logger .Errorw ("error while fetching notification settings" , "err" , err )
281+ 		return  nil , "" , err 
282+ 	}
283+ 
284+ 	// Process notification settings into beans 
285+ 	notificationSettingsBean , err  :=  impl .processNotificationSettings (notificationSettings )
286+ 	if  err  !=  nil  {
287+ 		return  nil , "" , err 
288+ 	}
289+ 
290+ 	// Create combined payload 
291+ 	combinedPayload  :=  map [string ]interface {}{
292+ 		"event" :                event ,
293+ 		"notificationSettings" : notificationSettingsBean ,
294+ 	}
295+ 
296+ 	bodyBytes , err  :=  json .Marshal (combinedPayload )
297+ 	if  err  !=  nil  {
298+ 		impl .logger .Errorw ("error while marshaling combined event request" , "err" , err )
299+ 		return  nil , "" , err 
300+ 	}
301+ 
302+ 	return  bodyBytes , destinationUrl , nil 
303+ }
304+ 
305+ func  (impl  * EventRESTClientImpl ) createDefaultPayloadAndDestination (event  Event ) ([]byte , string , error ) {
306+ 	bodyBytes , err  :=  json .Marshal (event )
307+ 	if  err  !=  nil  {
308+ 		impl .logger .Errorw ("error while marshaling event request" , "err" , err )
309+ 		return  nil , "" , err 
310+ 	}
311+ 	return  bodyBytes , impl .config .DestinationURL , nil 
312+ }
313+ 
314+ func  (impl  * EventRESTClientImpl ) processNotificationSettings (notificationSettings  []repository.NotificationSettings ) ([]* repository.NotificationSettingsBean , error ) {
315+ 	notificationSettingsBean  :=  make ([]* repository.NotificationSettingsBean , 0 )
316+ 	for  _ , item  :=  range  notificationSettings  {
317+ 		config  :=  make ([]repository.ConfigEntry , 0 )
318+ 		if  item .Config  !=  ""  {
319+ 			if  err  :=  json .Unmarshal ([]byte (item .Config ), & config ); err  !=  nil  {
320+ 				impl .logger .Errorw ("error while unmarshaling config" , "err" , err )
321+ 				return  nil , err 
322+ 			}
323+ 		}
324+ 		notificationSettingsBean  =  append (notificationSettingsBean , & repository.NotificationSettingsBean {
325+ 			Id :           item .Id ,
326+ 			TeamId :       item .TeamId ,
327+ 			AppId :        item .AppId ,
328+ 			EnvId :        item .EnvId ,
329+ 			PipelineId :   item .PipelineId ,
330+ 			PipelineType : item .PipelineType ,
331+ 			EventTypeId :  item .EventTypeId ,
332+ 			Config :       config ,
333+ 			ViewId :       item .ViewId ,
334+ 		})
335+ 	}
336+ 	return  notificationSettingsBean , nil 
337+ }
338+ 
339+ func  (impl  * EventRESTClientImpl ) deliverEvent (bodyBytes  []byte , destinationUrl  string ) (bool , error ) {
243340	if  impl .config .NotificationMedium  ==  PUB_SUB  {
244- 		err  =  impl .sendEventsOnNats (body )
245- 		if  err  !=  nil  {
246- 			impl .logger .Errorw ("error while publishing event  " , "err" , err )
341+ 		if  err  :=  impl .sendEventsOnNats (bodyBytes ); err  !=  nil  {
342+ 			impl .logger .Errorw ("error while publishing event" , "err" , err )
247343			return  false , err 
248344		}
249345		return  true , nil 
250346	}
251- 	 var   reqBody   =  [] byte ( body ) 
252- 	req , err  :=  http .NewRequest (http .MethodPost , impl . config . DestinationURL , bytes .NewBuffer (reqBody ))
347+ 
348+ 	req , err  :=  http .NewRequest (http .MethodPost , destinationUrl , bytes .NewBuffer (bodyBytes ))
253349	if  err  !=  nil  {
254- 		impl .logger .Errorw ("error while writing event " , "err" , err )
350+ 		impl .logger .Errorw ("error while creating HTTP request " , "err" , err )
255351		return  false , err 
256352	}
257353	req .Header .Set ("Content-Type" , "application/json" )
354+ 
258355	resp , err  :=  impl .client .Do (req )
259356	if  err  !=  nil  {
260- 		impl .logger .Errorw ("error while UpdateJiraTransition request  " , "err" , err )
357+ 		impl .logger .Errorw ("error while sending HTTP request " , "err" , err )
261358		return  false , err 
262359	}
263360	defer  resp .Body .Close ()
264- 	impl .logger .Debugw ("event completed" , "event resp" , resp )
265- 	return  true , err 
361+ 
362+ 	if  resp .StatusCode  >=  300  {
363+ 		impl .logger .Errorw ("unexpected response from notifier" , "status" , resp .StatusCode )
364+ 		return  false , fmt .Errorf ("unexpected response code: %d" , resp .StatusCode )
365+ 	}
366+ 
367+ 	impl .logger .Debugw ("event successfully delivered" , "status" , resp .StatusCode )
368+ 	return  true , nil 
266369}
267370
268371func  (impl  * EventRESTClientImpl ) WriteNatsEvent (topic  string , payload  interface {}) error  {
0 commit comments