-
Notifications
You must be signed in to change notification settings - Fork 11.1k
Expand file tree
/
Copy pathlib.rs
More file actions
227 lines (199 loc) · 7.9 KB
/
lib.rs
File metadata and controls
227 lines (199 loc) · 7.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
//! Prototype MCP server.
#![deny(clippy::print_stdout, clippy::print_stderr)]
use std::io::ErrorKind;
use std::io::Result as IoResult;
use codex_arg0::Arg0DispatchPaths;
use codex_core::config::Config;
use codex_utils_cli::CliConfigOverrides;
use rmcp::model::ClientNotification;
use rmcp::model::ClientRequest;
use rmcp::model::JsonRpcMessage;
use serde_json::Value;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::{self};
use tokio::sync::mpsc;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;
mod codex_tool_config;
mod codex_tool_runner;
mod exec_approval;
pub(crate) mod message_processor;
mod outgoing_message;
mod patch_approval;
use crate::message_processor::MessageProcessor;
use crate::outgoing_message::OutgoingJsonRpcMessage;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
pub use crate::codex_tool_config::CodexToolCallParam;
pub use crate::codex_tool_config::CodexToolCallReplyParam;
pub use crate::exec_approval::ExecApprovalElicitRequestParams;
pub use crate::exec_approval::ExecApprovalResponse;
pub use crate::patch_approval::PatchApprovalElicitRequestParams;
pub use crate::patch_approval::PatchApprovalResponse;
/// Size of the bounded channels used to communicate between tasks. The value
/// is a balance between throughput and memory usage – 128 messages should be
/// plenty for an interactive CLI.
const CHANNEL_CAPACITY: usize = 128;
const DEFAULT_ANALYTICS_ENABLED: bool = true;
const OTEL_SERVICE_NAME: &str = "codex_mcp_server";
type IncomingMessage = JsonRpcMessage<ClientRequest, Value, ClientNotification>;
pub async fn run_main(
arg0_paths: Arg0DispatchPaths,
cli_config_overrides: CliConfigOverrides,
) -> IoResult<()> {
// Parse CLI overrides once and derive the base Config eagerly so later
// components do not need to work with raw TOML values.
let cli_kv_overrides = cli_config_overrides.parse_overrides().map_err(|e| {
std::io::Error::new(
ErrorKind::InvalidInput,
format!("error parsing -c overrides: {e}"),
)
})?;
let config = Config::load_with_cli_overrides(cli_kv_overrides)
.await
.map_err(|e| {
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
})?;
let otel = codex_core::otel_init::build_provider(
&config,
env!("CARGO_PKG_VERSION"),
Some(OTEL_SERVICE_NAME),
DEFAULT_ANALYTICS_ENABLED,
)
.map_err(|e| {
std::io::Error::new(
ErrorKind::InvalidData,
format!("error loading otel config: {e}"),
)
})?;
let fmt_layer = tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_filter(EnvFilter::from_default_env());
let otel_logger_layer = otel.as_ref().and_then(|provider| provider.logger_layer());
let otel_tracing_layer = otel.as_ref().and_then(|provider| provider.tracing_layer());
let _ = tracing_subscriber::registry()
.with(fmt_layer)
.with(otel_logger_layer)
.with(otel_tracing_layer)
.try_init();
// Set up channels.
let (incoming_tx, mut incoming_rx) = mpsc::channel::<IncomingMessage>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::<OutgoingMessage>();
// Task: read from stdin, push to `incoming_tx`.
let stdin_reader_handle = tokio::spawn({
async move {
let stdin = io::stdin();
let reader = BufReader::new(stdin);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await.unwrap_or_default() {
match serde_json::from_str::<IncomingMessage>(&line) {
Ok(msg) => {
if incoming_tx.send(msg).await.is_err() {
// Receiver gone – nothing left to do.
break;
}
}
Err(e) => error!("Failed to deserialize JSON-RPC message: {e}"),
}
}
debug!("stdin reader finished (EOF)");
}
});
// Task: process incoming messages.
let processor_handle = tokio::spawn({
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
let mut processor = MessageProcessor::new(
outgoing_message_sender,
arg0_paths,
std::sync::Arc::new(config),
);
async move {
while let Some(msg) = incoming_rx.recv().await {
match msg {
JsonRpcMessage::Request(r) => processor.process_request(r).await,
JsonRpcMessage::Response(r) => processor.process_response(r).await,
JsonRpcMessage::Notification(n) => processor.process_notification(n).await,
JsonRpcMessage::Error(e) => processor.process_error(e),
}
}
info!("processor task exited (channel closed)");
}
});
// Task: write outgoing messages to stdout.
let stdout_writer_handle = tokio::spawn(async move {
let mut stdout = io::stdout();
while let Some(outgoing_message) = outgoing_rx.recv().await {
let msg: OutgoingJsonRpcMessage = outgoing_message.into();
match serde_json::to_string(&msg) {
Ok(json) => {
if let Err(e) = stdout.write_all(json.as_bytes()).await {
error!("Failed to write to stdout: {e}");
break;
}
if let Err(e) = stdout.write_all(b"\n").await {
error!("Failed to write newline to stdout: {e}");
break;
}
}
Err(e) => error!("Failed to serialize JSON-RPC message: {e}"),
}
}
info!("stdout writer exited (channel closed)");
});
// Wait for all tasks to finish. The typical exit path is the stdin reader
// hitting EOF which, once it drops `incoming_tx`, propagates shutdown to
// the processor and then to the stdout task.
let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use codex_core::config::ConfigBuilder;
use codex_core::config::types::OtelExporterKind;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use tempfile::TempDir;
#[test]
fn mcp_server_defaults_analytics_to_enabled() {
assert_eq!(DEFAULT_ANALYTICS_ENABLED, true);
}
#[tokio::test]
async fn mcp_server_builds_otel_provider_with_logs_traces_and_metrics() -> anyhow::Result<()> {
let codex_home = TempDir::new()?;
let mut config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.build()
.await?;
let exporter = OtelExporterKind::OtlpGrpc {
endpoint: "http://localhost:4317".to_string(),
headers: HashMap::new(),
tls: None,
};
config.otel.exporter = exporter.clone();
config.otel.trace_exporter = exporter.clone();
config.otel.metrics_exporter = exporter;
config.analytics_enabled = None;
let provider = codex_core::otel_init::build_provider(
&config,
"0.0.0-test",
Some(OTEL_SERVICE_NAME),
DEFAULT_ANALYTICS_ENABLED,
)
.map_err(|err| anyhow::anyhow!(err.to_string()))?
.expect("otel provider");
assert!(provider.logger.is_some(), "expected log exporter");
assert!(
provider.tracer_provider.is_some(),
"expected trace exporter"
);
assert!(provider.metrics().is_some(), "expected metrics exporter");
provider.shutdown();
Ok(())
}
}