@@ -19,9 +19,11 @@ import (
1919 "crypto/rand"
2020 "encoding/hex"
2121 "encoding/json"
22+ errors2 "errors"
2223 "fmt"
2324 "github.com/caarlos0/env"
2425 "github.com/devtron-labs/common-lib/utils/k8s"
26+ "github.com/devtron-labs/devtron/internal/middleware"
2527 "github.com/devtron-labs/devtron/pkg/argoApplication"
2628 "github.com/devtron-labs/devtron/pkg/cluster"
2729 "github.com/devtron-labs/devtron/pkg/cluster/repository"
@@ -31,6 +33,7 @@ import (
3133 "k8s.io/apimachinery/pkg/api/errors"
3234 "log"
3335 "net/http"
36+ "strconv"
3437 "strings"
3538 "sync"
3639 "time"
@@ -44,6 +47,7 @@ import (
4447)
4548
4649const END_OF_TRANSMISSION = "\u0004 "
50+ const ProcessExitedMsg = "Process exited"
4751
4852// PtyHandler is what remotecommand expects from a pty
4953type PtyHandler interface {
@@ -54,11 +58,17 @@ type PtyHandler interface {
5458
5559// TerminalSession implements PtyHandler (using a SockJS connection)
5660type TerminalSession struct {
57- id string
58- bound chan error
59- sockJSSession sockjs.Session
60- sizeChan chan remotecommand.TerminalSize
61- doneChan chan struct {}
61+ id string
62+ bound chan error
63+ sockJSSession sockjs.Session
64+ sizeChan chan remotecommand.TerminalSize
65+ doneChan chan struct {}
66+ context context.Context
67+ contextCancelFunc context.CancelFunc
68+ podName string
69+ namespace string
70+ clusterId string
71+ startedOn time.Time
6272}
6373
6474// TerminalMessage is the messaging protocol between ShellController and TerminalSession.
@@ -166,6 +176,15 @@ func (sm *SessionMap) Set(sessionId string, session TerminalSession) {
166176 sm .Sessions [sessionId ] = session
167177}
168178
179+ func (sm * SessionMap ) SetTerminalSessionStartTime (sessionId string ) {
180+ sm .Lock .Lock ()
181+ defer sm .Lock .Unlock ()
182+ if session , ok := sm .Sessions [sessionId ]; ok {
183+ session .startedOn = time .Now ()
184+ sm .Sessions [sessionId ] = session
185+ }
186+ }
187+
169188// Close shuts down the SockJS connection and sends the status code and reason to the client
170189// Can happen if the process exits or if there is an error starting up the process
171190// For now the status code is unused and reason is shown to the user (unless "")
@@ -178,11 +197,23 @@ func (sm *SessionMap) Close(sessionId string, status uint32, reason string) {
178197 if err != nil {
179198 log .Println (err )
180199 }
200+ isErroredConnectionTermination := isConnectionClosedByError (status )
201+ middleware .IncTerminalSessionRequestCounter (SessionTerminated , strconv .FormatBool (isErroredConnectionTermination ))
202+ middleware .RecordTerminalSessionDurationMetrics (terminalSession .podName , terminalSession .namespace , terminalSession .clusterId , time .Since (terminalSession .startedOn ).Seconds ())
203+ close (terminalSession .doneChan )
204+ terminalSession .contextCancelFunc ()
181205 delete (sm .Sessions , sessionId )
182206 }
183207
184208}
185209
210+ func isConnectionClosedByError (status uint32 ) bool {
211+ if status == 2 {
212+ return true
213+ }
214+ return false
215+ }
216+
186217var terminalSessions = SessionMap {Sessions : make (map [string ]TerminalSession )}
187218
188219// handleTerminalSession is Called by net/http for any new /api/sockjs connections
@@ -243,7 +274,7 @@ func CreateAttachHandler(path string) http.Handler {
243274
244275// startProcess is called by handleAttach
245276// Executed cmd in the container specified in request and connects it up with the ptyHandler (a session)
246- func startProcess (k8sClient kubernetes.Interface , cfg * rest.Config ,
277+ func startProcess (ctx context. Context , k8sClient kubernetes.Interface , cfg * rest.Config ,
247278 cmd []string , ptyHandler PtyHandler , sessionRequest * TerminalSessionRequest ) error {
248279 namespace := sessionRequest .Namespace
249280 podName := sessionRequest .PodName
@@ -262,17 +293,18 @@ func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config,
262293 TerminalSizeQueue : ptyHandler ,
263294 Tty : true ,
264295 }
265-
266- err = execWithStreamOptions (exec , streamOptions )
296+ isErroredConnectionTermination := false
297+ middleware .IncTerminalSessionRequestCounter (SessionInitiating , strconv .FormatBool (isErroredConnectionTermination ))
298+ terminalSessions .SetTerminalSessionStartTime (sessionRequest .SessionId )
299+ err = execWithStreamOptions (ctx , exec , streamOptions )
267300 if err != nil {
268301 return err
269302 }
270-
271303 return nil
272304}
273305
274- func execWithStreamOptions (exec remotecommand.Executor , streamOptions remotecommand.StreamOptions ) error {
275- return exec .Stream ( streamOptions )
306+ func execWithStreamOptions (ctx context. Context , exec remotecommand.Executor , streamOptions remotecommand.StreamOptions ) error {
307+ return exec .StreamWithContext ( ctx , streamOptions )
276308}
277309
278310func getExecutor (k8sClient kubernetes.Interface , cfg * rest.Config , podName , namespace , containerName string , cmd []string , stdin bool , tty bool ) (remotecommand.Executor , error ) {
@@ -344,32 +376,39 @@ var validShells = []string{"bash", "sh", "powershell", "cmd"}
344376// Waits for the SockJS connection to be opened by the client the session to be bound in handleTerminalSession
345377func WaitForTerminal (k8sClient kubernetes.Interface , cfg * rest.Config , request * TerminalSessionRequest ) {
346378
379+ session := terminalSessions .Get (request .SessionId )
380+ sessionCtx := session .context
381+ timedCtx , _ := context .WithTimeout (sessionCtx , 60 * time .Second )
347382 select {
348- case <- terminalSessions . Get ( request . SessionId ) .bound :
349- close (terminalSessions . Get ( request . SessionId ) .bound )
383+ case <- session .bound :
384+ close (session .bound )
350385
351386 var err error
352387 if isValidShell (validShells , request .Shell ) {
353388 cmd := []string {request .Shell }
354389
355- err = startProcess (k8sClient , cfg , cmd , terminalSessions .Get (request .SessionId ), request )
390+ err = startProcess (sessionCtx , k8sClient , cfg , cmd , terminalSessions .Get (request .SessionId ), request )
356391 } else {
357392 // No Shell given or it was not valid: try some shells until one succeeds or all fail
358393 // FIXME: if the first Shell fails then the first keyboard event is lost
359394 for _ , testShell := range validShells {
360395 cmd := []string {testShell }
361- if err = startProcess (k8sClient , cfg , cmd , terminalSessions .Get (request .SessionId ), request ); err == nil {
396+ if err = startProcess (sessionCtx , k8sClient , cfg , cmd , terminalSessions .Get (request .SessionId ), request ); err == nil || errors2 . Is ( err , context . Canceled ) {
362397 break
363398 }
364399 }
365400 }
366401
367- if err != nil {
402+ if err != nil && ! errors2 . Is ( err , context . Canceled ) {
368403 terminalSessions .Close (request .SessionId , 2 , err .Error ())
369404 return
370405 }
371406
372- terminalSessions .Close (request .SessionId , 1 , "Process exited" )
407+ terminalSessions .Close (request .SessionId , 1 , ProcessExitedMsg )
408+ case <- timedCtx .Done ():
409+ // handle case when connection has not been initiated from FE side within particular time
410+ close (session .bound )
411+ terminalSessions .Close (request .SessionId , 1 , ProcessExitedMsg )
373412 }
374413}
375414
@@ -432,10 +471,17 @@ func (impl *TerminalSessionHandlerImpl) GetTerminalSession(req *TerminalSessionR
432471 return statusCode , nil , err
433472 }
434473 req .SessionId = sessionID
474+ sessionCtx , cancelFunc := context .WithCancel (context .Background ())
435475 terminalSessions .Set (sessionID , TerminalSession {
436- id : sessionID ,
437- bound : make (chan error ),
438- sizeChan : make (chan remotecommand.TerminalSize ),
476+ id : sessionID ,
477+ bound : make (chan error ),
478+ sizeChan : make (chan remotecommand.TerminalSize ),
479+ doneChan : make (chan struct {}),
480+ context : sessionCtx ,
481+ contextCancelFunc : cancelFunc ,
482+ podName : req .PodName ,
483+ namespace : req .Namespace ,
484+ clusterId : strconv .Itoa (req .ClusterId ),
439485 })
440486 config , client , err := impl .getClientConfig (req )
441487
@@ -559,7 +605,7 @@ func (impl *TerminalSessionHandlerImpl) RunCmdInRemotePod(req *TerminalSessionRe
559605 buf := & bytes.Buffer {}
560606 errBuf := & bytes.Buffer {}
561607 impl .logger .Debug ("reached execWithStreamOptions method call" )
562- err = execWithStreamOptions (exec , remotecommand.StreamOptions {
608+ err = execWithStreamOptions (context . Background (), exec , remotecommand.StreamOptions {
563609 Stdout : buf ,
564610 Stderr : errBuf ,
565611 })
0 commit comments