@@ -2,9 +2,10 @@ package cmd
22
33import (
44 "context"
5- "log/slog "
5+ "fmt "
66 "os"
77 "sync"
8+ "time"
89
910 tea "github.com/charmbracelet/bubbletea"
1011 "github.com/kujtimiihoxha/termai/internal/app"
@@ -13,6 +14,7 @@ import (
1314 "github.com/kujtimiihoxha/termai/internal/db"
1415 "github.com/kujtimiihoxha/termai/internal/llm/agent"
1516 "github.com/kujtimiihoxha/termai/internal/logging"
17+ "github.com/kujtimiihoxha/termai/internal/pubsub"
1618 "github.com/kujtimiihoxha/termai/internal/tui"
1719 zone "github.com/lrstanley/bubblezone"
1820 "github.com/spf13/cobra"
@@ -23,111 +25,229 @@ var rootCmd = &cobra.Command{
2325 Short : "A terminal ai assistant" ,
2426 Long : `A terminal ai assistant` ,
2527 RunE : func (cmd * cobra.Command , args []string ) error {
28+ // If the help flag is set, show the help message
2629 if cmd .Flag ("help" ).Changed {
2730 cmd .Help ()
2831 return nil
2932 }
33+
34+ // Load the config
3035 debug , _ := cmd .Flags ().GetBool ("debug" )
31- err := config .Load (debug )
36+ cwd , _ := cmd .Flags ().GetString ("cwd" )
37+ if cwd != "" {
38+ err := os .Chdir (cwd )
39+ if err != nil {
40+ return fmt .Errorf ("failed to change directory: %v" , err )
41+ }
42+ }
43+ if cwd == "" {
44+ c , err := os .Getwd ()
45+ if err != nil {
46+ return fmt .Errorf ("failed to get current working directory: %v" , err )
47+ }
48+ cwd = c
49+ }
50+ _ , err := config .Load (cwd , debug )
3251 if err != nil {
3352 return err
3453 }
35- cfg := config .Get ()
36- defaultLevel := slog .LevelInfo
37- if cfg .Debug {
38- defaultLevel = slog .LevelDebug
39- }
40- logger := slog .New (slog .NewTextHandler (logging .NewWriter (), & slog.HandlerOptions {
41- Level : defaultLevel ,
42- }))
43- slog .SetDefault (logger )
4454
4555 err = assets .WriteAssets ()
4656 if err != nil {
47- return err
57+ logging . Error ( "Error writing assets: %v" , err )
4858 }
4959
60+ // Connect DB, this will also run migrations
5061 conn , err := db .Connect ()
5162 if err != nil {
5263 return err
5364 }
54- ctx := context .Background ()
65+
66+ // Create main context for the application
67+ ctx , cancel := context .WithCancel (context .Background ())
68+ defer cancel ()
5569
5670 app := app .New (ctx , conn )
57- logging .Info ("Starting termai..." )
71+
72+ // Set up the TUI
5873 zone .NewGlobal ()
59- tui := tea .NewProgram (
74+ program := tea .NewProgram (
6075 tui .New (app ),
6176 tea .WithAltScreen (),
6277 tea .WithMouseCellMotion (),
6378 )
64- logging .Info ("Setting up subscriptions..." )
65- ch , unsub := setupSubscriptions (app )
66- defer unsub ()
6779
80+ // Initialize MCP tools in the background
81+ initMCPTools (ctx , app )
82+
83+ // Setup the subscriptions, this will send services events to the TUI
84+ ch , cancelSubs := setupSubscriptions (app )
85+
86+ // Create a context for the TUI message handler
87+ tuiCtx , tuiCancel := context .WithCancel (ctx )
88+ var tuiWg sync.WaitGroup
89+ tuiWg .Add (1 )
90+
91+ // Set up message handling for the TUI
6892 go func () {
69- // Set this up once
70- agent .GetMcpTools (ctx , app .Permissions )
71- for msg := range ch {
72- tui .Send (msg )
93+ defer tuiWg .Done ()
94+ defer func () {
95+ if r := recover (); r != nil {
96+ logging .Error ("Panic in TUI message handling: %v" , r )
97+ attemptTUIRecovery (program )
98+ }
99+ }()
100+
101+ for {
102+ select {
103+ case <- tuiCtx .Done ():
104+ logging .Info ("TUI message handler shutting down" )
105+ return
106+ case msg , ok := <- ch :
107+ if ! ok {
108+ logging .Info ("TUI message channel closed" )
109+ return
110+ }
111+ program .Send (msg )
112+ }
73113 }
74114 }()
75- if _ , err := tui .Run (); err != nil {
76- return err
115+
116+ // Cleanup function for when the program exits
117+ cleanup := func () {
118+ // Shutdown the app
119+ app .Shutdown ()
120+
121+ // Cancel subscriptions first
122+ cancelSubs ()
123+
124+ // Then cancel TUI message handler
125+ tuiCancel ()
126+
127+ // Wait for TUI message handler to finish
128+ tuiWg .Wait ()
129+
130+ logging .Info ("All goroutines cleaned up" )
131+ }
132+
133+ // Run the TUI
134+ result , err := program .Run ()
135+ cleanup ()
136+
137+ if err != nil {
138+ logging .Error ("TUI error: %v" , err )
139+ return fmt .Errorf ("TUI error: %v" , err )
77140 }
141+
142+ logging .Info ("TUI exited with result: %v" , result )
78143 return nil
79144 },
80145}
81146
82- func setupSubscriptions (app * app.App ) (chan tea.Msg , func ()) {
83- ch := make (chan tea.Msg )
84- wg := sync.WaitGroup {}
85- ctx , cancel := context .WithCancel (app .Context )
86- {
87- sub := logging .Subscribe (ctx )
88- wg .Add (1 )
89- go func () {
90- for ev := range sub {
91- ch <- ev
147+ // attemptTUIRecovery tries to recover the TUI after a panic
148+ func attemptTUIRecovery (program * tea.Program ) {
149+ logging .Info ("Attempting to recover TUI after panic" )
150+
151+ // We could try to restart the TUI or gracefully exit
152+ // For now, we'll just quit the program to avoid further issues
153+ program .Quit ()
154+ }
155+
156+ func initMCPTools (ctx context.Context , app * app.App ) {
157+ go func () {
158+ defer func () {
159+ if r := recover (); r != nil {
160+ logging .Error ("Panic in MCP goroutine: %v" , r )
92161 }
93- wg .Done ()
94162 }()
95- }
96- {
97- sub := app .Sessions .Subscribe (ctx )
98- wg .Add (1 )
99- go func () {
100- for ev := range sub {
101- ch <- ev
163+
164+ // Create a context with timeout for the initial MCP tools fetch
165+ ctxWithTimeout , cancel := context .WithTimeout (ctx , 30 * time .Second )
166+ defer cancel ()
167+
168+ // Set this up once with proper error handling
169+ agent .GetMcpTools (ctxWithTimeout , app .Permissions )
170+ logging .Info ("MCP message handling goroutine exiting" )
171+ }()
172+ }
173+
174+ func setupSubscriber [T any ](
175+ ctx context.Context ,
176+ wg * sync.WaitGroup ,
177+ name string ,
178+ subscriber func (context.Context ) <- chan pubsub.Event [T ],
179+ outputCh chan <- tea.Msg ,
180+ ) {
181+ wg .Add (1 )
182+ go func () {
183+ defer wg .Done ()
184+ defer func () {
185+ if r := recover (); r != nil {
186+ logging .Error ("Panic in %s subscription goroutine: %v" , name , r )
102187 }
103- wg .Done ()
104188 }()
105- }
106- {
107- sub := app .Messages .Subscribe (ctx )
108- wg .Add (1 )
109- go func () {
110- for ev := range sub {
111- ch <- ev
189+
190+ for {
191+ select {
192+ case event , ok := <- subscriber (ctx ):
193+ if ! ok {
194+ logging .Info ("%s subscription channel closed" , name )
195+ return
196+ }
197+
198+ // Convert generic event to tea.Msg if needed
199+ var msg tea.Msg = event
200+
201+ // Non-blocking send with timeout to prevent deadlocks
202+ select {
203+ case outputCh <- msg :
204+ case <- time .After (500 * time .Millisecond ):
205+ logging .Warn ("%s message dropped due to slow consumer" , name )
206+ case <- ctx .Done ():
207+ logging .Info ("%s subscription cancelled" , name )
208+ return
209+ }
210+ case <- ctx .Done ():
211+ logging .Info ("%s subscription cancelled" , name )
212+ return
112213 }
113- wg .Done ()
114- }()
115- }
116- {
117- sub := app .Permissions .Subscribe (ctx )
118- wg .Add (1 )
214+ }
215+ }()
216+ }
217+
218+ func setupSubscriptions (app * app.App ) (chan tea.Msg , func ()) {
219+ ch := make (chan tea.Msg , 100 )
220+ // Add a buffer to prevent blocking
221+ wg := sync.WaitGroup {}
222+ ctx , cancel := context .WithCancel (context .Background ())
223+ // Setup each subscription using the helper
224+ setupSubscriber (ctx , & wg , "logging" , logging .Subscribe , ch )
225+ setupSubscriber (ctx , & wg , "sessions" , app .Sessions .Subscribe , ch )
226+ setupSubscriber (ctx , & wg , "messages" , app .Messages .Subscribe , ch )
227+ setupSubscriber (ctx , & wg , "permissions" , app .Permissions .Subscribe , ch )
228+
229+ // Return channel and a cleanup function
230+ cleanupFunc := func () {
231+ logging .Info ("Cancelling all subscriptions" )
232+ cancel () // Signal all goroutines to stop
233+
234+ // Wait with a timeout for all goroutines to complete
235+ waitCh := make (chan struct {})
119236 go func () {
120- for ev := range sub {
121- ch <- ev
122- }
123- wg .Done ()
237+ wg .Wait ()
238+ close (waitCh )
124239 }()
240+
241+ select {
242+ case <- waitCh :
243+ logging .Info ("All subscription goroutines completed successfully" )
244+ case <- time .After (5 * time .Second ):
245+ logging .Warn ("Timed out waiting for some subscription goroutines to complete" )
246+ }
247+
248+ close (ch ) // Safe to close after all writers are done or timed out
125249 }
126- return ch , func () {
127- cancel ()
128- wg .Wait ()
129- close (ch )
130- }
250+ return ch , cleanupFunc
131251}
132252
133253func Execute () {
@@ -139,5 +259,6 @@ func Execute() {
139259
140260func init () {
141261 rootCmd .Flags ().BoolP ("help" , "h" , false , "Help" )
142- rootCmd .Flags ().BoolP ("debug" , "d" , false , "Help" )
262+ rootCmd .Flags ().BoolP ("debug" , "d" , false , "Debug" )
263+ rootCmd .Flags ().StringP ("cwd" , "c" , "" , "Current working directory" )
143264}
0 commit comments