-
Notifications
You must be signed in to change notification settings - Fork 118
Add Event Bus #184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Event Bus #184
Conversation
- replace Subscribe() and Unsubscribe() functions with event bus - update HTTP log streaming/handlers to use new log event bus
WalkthroughThe changes refactor the application's internal event and subscription handling to use an event-driven model based on the Changes
Sequence Diagram(s)sequenceDiagram
participant ConfigWatcher
participant EventBus
participant ProxyManager
participant HTTPServer
ConfigWatcher->>EventBus: Emit ConfigFileChangedEvent
EventBus->>ProxyManager: (debounced) Reload config
ProxyManager->>HTTPServer: Update handler with new ProxyManager
sequenceDiagram
participant Process
participant EventBus
participant SSEClient
Process->>EventBus: Emit ProcessStateChangeEvent
EventBus->>SSEClient: Push updated model status
sequenceDiagram
participant LogProducer
participant EventBus
participant LogClient
LogProducer->>EventBus: Emit LogDataEvent
EventBus->>LogClient: Callback with log data
Possibly related PRs
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (1)
proxy/events.go (1)
19-25: Consider renaming for consistency.The event type
ChatCompletionStatsdoesn't follow the*Eventnaming convention used by the other events (ProcessStateChangeEvent,ConfigFileChangedEvent). Consider renaming it toChatCompletionStatsEventfor consistency.-type ChatCompletionStats struct { +type ChatCompletionStatsEvent struct { TokensGenerated int } -func (e ChatCompletionStats) Type() uint32 { +func (e ChatCompletionStatsEvent) Type() uint32 { return ChatCompletionStatsEventID }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (9)
go.mod(1 hunks)llama-swap.go(2 hunks)proxy/events.go(1 hunks)proxy/logMonitor.go(4 hunks)proxy/logMonitor_test.go(1 hunks)proxy/process.go(2 hunks)proxy/proxymanager.go(5 hunks)proxy/proxymanager_api.go(2 hunks)proxy/proxymanager_loghandlers.go(2 hunks)
🧰 Additional context used
🧠 Learnings (1)
proxy/proxymanager.go (1)
Learnt from: mostlygeek
PR: mostlygeek/llama-swap#155
File: proxy/process.go:359-372
Timestamp: 2025-06-05T22:57:53.793Z
Learning: In Go, cmdWaitChan in proxy/process.go is a signal channel (chan struct{}) that follows the pattern where waitForCmd() closes the channel when cmd.Wait() returns, and stopCommand() waits on it. Reading from a closed channel returns immediately, so <-cmdWaitChan won't block indefinitely.
🧬 Code Graph Analysis (5)
proxy/process.go (1)
proxy/events.go (1)
ProcessStateChangeEvent(9-13)
proxy/proxymanager_api.go (1)
proxy/events.go (1)
ProcessStateChangeEvent(9-13)
llama-swap.go (3)
proxy/proxymanager.go (2)
ProxyManager(25-41)New(43-95)proxy/config.go (1)
LoadConfig(177-184)proxy/events.go (1)
ConfigFileChangedEvent(27-27)
proxy/events.go (1)
proxy/process.go (1)
ProcessState(20-20)
proxy/logMonitor.go (1)
proxy/proxymanager.go (1)
New(43-95)
🪛 GitHub Actions: Linux CI
proxy/logMonitor_test.go
[error] 42-42: TestLogMonitor failed: Client2 expected 123, got empty value.
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: run-tests
🔇 Additional comments (6)
proxy/process.go (1)
16-17: Event emission implementation looks goodThe addition of event emission after successful state transitions is well-placed and follows the event-driven pattern correctly. The event is emitted within the locked section ensuring state consistency, and includes all necessary information for listeners.
Also applies to: 132-132
proxy/proxymanager.go (1)
5-5: Well-implemented shutdown context patternThe addition of a shutdown context follows Go best practices for graceful shutdown coordination. The context is properly initialized during construction and canceled during shutdown, allowing dependent components to react to the shutdown signal.
Also applies to: 37-40, 71-71, 83-84, 274-274
proxy/proxymanager_loghandlers.go (2)
62-65: Excellent use of dual-context pattern for graceful shutdownThe select statement properly handles both client disconnection (via request context) and server shutdown (via shutdown context), ensuring clean termination in both scenarios.
Also applies to: 99-102
55-60: No additional connection validation neededThe
OnLogDatamethod returns aCancelFuncthat’s deferred in each handler and invoked on exit, which unsubscribes the callback before the function returns. This ensures the callback won’t be called after the HTTP connection has closed, so no extra nil checks or connection-state guards are required here. Keep the current implementation as-is.Likely an incorrect or invalid review comment.
proxy/proxymanager_api.go (1)
87-103: LGTM! Clean event-driven refactor.The transition from polling to event-driven updates is well implemented. The immediate flush of initial state followed by event-based updates provides a good user experience. The defensive nil checks and proper cleanup via defer are appreciated.
llama-swap.go (1)
68-88: Add nil check before type assertion.The type assertion on line 69 could panic if
srv.Handleris nil. Consider adding a nil check before the type assertion to prevent potential runtime panics.reloadProxyManager := func() { - if currentPM, ok := srv.Handler.(*proxy.ProxyManager); ok { + if srv.Handler != nil { + if currentPM, ok := srv.Handler.(*proxy.ProxyManager); ok { config, err = proxy.LoadConfig(*configPath) if err != nil { fmt.Printf("Warning, unable to reload configuration: %v\n", err) @@ -78,7 +79,8 @@ currentPM.Shutdown() srv.Handler = proxy.New(config) fmt.Println("Configuration Reloaded") - } else { + } + } else { config, err = proxy.LoadConfig(*configPath) if err != nil { fmt.Printf("Error, unable to load configuration: %v\n", err)Likely an incorrect or invalid review comment.
proxy/logMonitor.go
Outdated
| func (e LogDataEvent) Type() uint32 { | ||
| return 0x01 | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Event type ID conflicts with ProcessStateChangeEvent.
The LogDataEvent.Type() returns 0x01, which is the same ID used by ProcessStateChangeEvent in proxy/events.go. This will cause event routing issues. Consider moving this event definition to events.go with a unique ID.
Move the LogDataEvent definition to proxy/events.go:
// In proxy/events.go
const LogDataEventID = 0x04
type LogDataEvent struct {
Data []byte
}
func (e LogDataEvent) Type() uint32 {
return LogDataEventID
}Then remove the duplicate definition from this file.
🤖 Prompt for AI Agents
In proxy/logMonitor.go around lines 20 to 22, the LogDataEvent.Type() method
returns an event type ID 0x01, which conflicts with ProcessStateChangeEvent's ID
in proxy/events.go. To fix this, move the LogDataEvent struct and its Type()
method to proxy/events.go, assign it a unique ID such as 0x04, and remove the
LogDataEvent definition from logMonitor.go to avoid duplicate event definitions
and routing conflicts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
proxy/events.go (1)
19-25: Consider expanding ChatCompletionStats for future extensibility.The current implementation is minimal but functional. Consider if additional fields like completion time, model name, or error count might be useful for comprehensive statistics tracking.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
llama-swap.go(2 hunks)proxy/events.go(1 hunks)proxy/proxymanager_api.go(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- proxy/proxymanager_api.go
- llama-swap.go
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: run-tests
🔇 Additional comments (2)
proxy/events.go (2)
5-7: LGTM! Well-structured event ID constants.The hexadecimal notation and consistent naming convention provide clear, unique identifiers for the event system.
9-17: ProcessState type availability confirmed.
TheProcessStatetype is declared inproxy/process.go(line 20) and is accessible inproxy/events.go. No further action required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
ui/src/contexts/APIProvider.tsx (1)
105-111: Remove obsolete EventSource cleanup.The cleanup still references the old EventSource refs that are no longer used in the refactored code.
useEffect(() => { return () => { - proxyEventSource.current?.close(); - upstreamEventSource.current?.close(); - modelStatusEventSource.current?.close(); + apiEventSource.current?.close(); }; }, []);
🧹 Nitpick comments (2)
proxy/proxymanager_api.go (1)
127-129: Use Go-style comments.-/** - * Send updated models list - */ +// Send updated models listui/src/contexts/APIProvider.tsx (1)
84-84: Remove commented console.log.Remove the commented
//console.log(logData);line.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
proxy/logMonitor_test.go(1 hunks)proxy/proxymanager.go(5 hunks)proxy/proxymanager_api.go(3 hunks)proxy/proxymanager_loghandlers.go(2 hunks)ui/src/contexts/APIProvider.tsx(4 hunks)ui/src/pages/LogViewer.tsx(1 hunks)ui/src/pages/Models.tsx(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- proxy/logMonitor_test.go
- proxy/proxymanager.go
🧰 Additional context used
🧠 Learnings (1)
proxy/proxymanager_loghandlers.go (1)
Learnt from: mostlygeek
PR: mostlygeek/llama-swap#155
File: proxy/process.go:359-372
Timestamp: 2025-06-05T22:57:53.793Z
Learning: In Go, cmdWaitChan in proxy/process.go is a signal channel (chan struct{}) that follows the pattern where waitForCmd() closes the channel when cmd.Wait() returns, and stopCommand() waits on it. Reading from a closed channel returns immediately, so <-cmdWaitChan won't block indefinitely.
🧬 Code Graph Analysis (3)
ui/src/pages/LogViewer.tsx (1)
ui/src/contexts/APIProvider.tsx (1)
useAPI(171-177)
ui/src/contexts/APIProvider.tsx (1)
proxy/proxymanager_api.go (1)
Model(13-16)
proxy/proxymanager_api.go (2)
proxy/proxymanager.go (1)
ProxyManager(25-41)proxy/events.go (2)
ProcessStateChangeEvent(9-13)ConfigFileChangedEvent(27-29)
🪛 Biome (1.9.4)
ui/src/contexts/APIProvider.tsx
[error] 70-70: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 75-75: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
🔇 Additional comments (3)
ui/src/pages/LogViewer.tsx (1)
6-13: LGTM! Clean migration to unified event API.The component correctly uses the new
enableAPIEventsfunction, maintaining the same enable/disable lifecycle pattern while simplifying the API surface.proxy/proxymanager_loghandlers.go (1)
56-79: Well-implemented event-driven log streaming.The callback-based approach with proper context cancellation and cleanup is a solid implementation. The buffered channel with non-blocking sends prevents slow clients from affecting the log producer.
proxy/proxymanager_api.go (1)
77-165: Excellent implementation of unified event streaming.The multiplexed SSE endpoint cleanly consolidates multiple event sources with proper:
- Message envelope structure for type discrimination
- Non-blocking event delivery to prevent slow client impact
- Comprehensive cleanup with deferred event/callback unregistration
- Initial state transmission before streaming updates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
proxy/events.go (1)
28-34: Address the ReloadingState type safety concern.As noted in the previous review, using string constants for
ReloadingStatewould improve type safety and prevent inconsistent values.
🧹 Nitpick comments (1)
proxy/events.go (1)
5-8: Consider usingiotafor event ID constants.Using
iotawould be more idiomatic Go and helps prevent ID conflicts as new events are added.-const ProcessStateChangeEventID = 0x01 -const ChatCompletionStatsEventID = 0x02 -const ConfigFileChangedEventID = 0x03 -const LogDataEventID = 0x04 +const ( + ProcessStateChangeEventID = iota + 1 + ChatCompletionStatsEventID + ConfigFileChangedEventID + LogDataEventID +)
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
llama-swap.go(2 hunks)proxy/events.go(1 hunks)proxy/logMonitor.go(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- llama-swap.go
- proxy/logMonitor.go
🧰 Additional context used
🧬 Code Graph Analysis (1)
proxy/events.go (1)
proxy/process.go (1)
ProcessState(20-20)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: run-tests
🔇 Additional comments (1)
proxy/events.go (1)
10-42: Well-structured event types for the event bus system.The event structs are well-designed with consistent patterns:
- Clear, descriptive field names
- Appropriate data types
- Consistent
Type()method implementations- Good separation of concerns for different event categories
This provides a solid foundation for the event-driven architecture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
proxy/events.go (1)
28-33: Excellent improvement addressing type safety concerns.The
ReloadingStateis now properly typed with defined constants, addressing the previous review comment about string-based state management. This prevents typos and ensures consistent state values.
🧹 Nitpick comments (1)
proxy/proxymanager_api.go (1)
100-129: Review message dropping behavior in helper functions.The
sendModelsandsendLogDatafunctions useselectwithdefaultcases that silently drop messages when the buffer is full or context is cancelled. This could lead to data loss during high-frequency events.Consider logging dropped messages or implementing backpressure handling:
default: + // Log dropped message to help with debugging + // Consider implementing backpressure strategy
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
go.mod(1 hunks)llama-swap.go(2 hunks)proxy/events.go(1 hunks)proxy/proxymanager_api.go(2 hunks)ui/src/contexts/APIProvider.tsx(4 hunks)ui/src/pages/Models.tsx(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- go.mod
- ui/src/pages/Models.tsx
- llama-swap.go
- ui/src/contexts/APIProvider.tsx
🧰 Additional context used
🧬 Code Graph Analysis (2)
proxy/proxymanager_api.go (2)
proxy/proxymanager.go (1)
ProxyManager(25-41)proxy/events.go (2)
ProcessStateChangeEvent(10-14)ConfigFileChangedEvent(35-37)
proxy/events.go (1)
proxy/process.go (1)
ProcessState(20-20)
🔇 Additional comments (8)
proxy/proxymanager_api.go (5)
4-11: LGTM: Clean import additions for event-driven functionality.The added imports support the new event-driven architecture and JSON marshaling requirements.
14-17: LGTM: Model struct enhancement aligns with API requirements.The addition of
NameandDescriptionfields provides richer model information to the client.
81-91: Well-designed message envelope pattern for SSE multiplexing.The
messageTypeconstants andmessageEnvelopestruct provide a clean way to discriminate between different event types in the unified SSE stream.
156-169: Robust SSE loop with proper context handling.The main event loop correctly handles client disconnection, server shutdown, and message delivery. The context cancellation logic ensures proper cleanup.
134-149: Verify event handler unsubscribe patternThe use of
defer event.On(... )()relies onOnreturning a cleanup (unsubscribe) function that’s invoked when the surrounding function exits. IfOndoesn’t return such a function, the handlers will never be removed, leading to potential memory leaks and duplicate event handling.Please confirm:
- That
kelindar/event.Onreturns afunc()which, when called, unsubscribes the handler.- That both
pm.proxyLogger.OnLogDataandpm.upstreamLogger.OnLogDatalikewise return unsubscribe functions.If they do not, refactor to explicitly capture and defer the unsubscribe call, for example:
unsubscribe := event.On(func(e ProcessStateChangeEvent) { … }) defer unsubscribe()Locations to review:
- proxy/proxymanager_api.go lines 134–149
proxy/events.go (3)
5-8: LGTM: Well-defined event ID constants.The hexadecimal constant naming and values provide a clear registry of event types.
10-18: LGTM: ProcessStateChangeEvent properly structured.The event struct correctly captures state transitions with process name context and implements the required
Type()method.
43-49: LGTM: LogDataEvent follows consistent pattern.The struct uses
[]bytefor efficient data handling and implements the standardType()method interface.
Summary by CodeRabbit
New Features
Refactor
Chores