Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions crates/shim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,8 @@ tokio = { workspace = true, features = ["full"], optional = true }
[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs.workspace = true

[target.'cfg(unix)'.dependencies]
command-fds = "0.3.0"

[target.'cfg(windows)'.dependencies]
mio = { version = "1.0", features = ["os-ext", "os-poll"] }
os_pipe.workspace = true
windows-sys = { version = "0.52.0", features = [
"Win32_Foundation",
"Win32_System_WindowsProgramming",
Expand Down
121 changes: 79 additions & 42 deletions crates/shim/src/asynchronous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ use std::{
io::Read,
os::unix::{fs::FileTypeExt, net::UnixListener},
path::Path,
process::{self, Command, Stdio},
process::{self, Command as StdCommand, Stdio},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use async_trait::async_trait;
use command_fds::{CommandFdExt, FdMapping};
use containerd_shim_protos::{
api::DeleteResponse,
protobuf::{well_known_types::any::Any, Message, MessageField},
Expand All @@ -50,7 +49,7 @@ use nix::{
};
use oci_spec::runtime::Features;
use signal_hook_tokio::Signals;
use tokio::{io::AsyncWriteExt, sync::Notify};
use tokio::{io::AsyncWriteExt, process::Command, sync::Notify};
use which::which;

const DEFAULT_BINARY_NAME: &str = "runc";
Expand All @@ -61,7 +60,7 @@ use crate::{
error::{Error, Result},
logger, parse_sockaddr, reap, socket_address,
util::{asyncify, read_file_to_str, write_str_to_file},
Config, Flags, StartOpts, SOCKET_FD, TTRPC_ADDRESS,
Config, Flags, StartOpts, TTRPC_ADDRESS,
};

pub mod monitor;
Expand Down Expand Up @@ -142,7 +141,10 @@ pub fn run_info() -> Result<RuntimeInfo> {
let binary_path = which(binary_name).unwrap();

// get features
let output = Command::new(binary_path).arg("features").output().unwrap();
let output = StdCommand::new(binary_path)
.arg("features")
.output()
.unwrap();

let features: Features = serde_json::from_str(&String::from_utf8_lossy(&output.stdout))?;

Expand Down Expand Up @@ -215,6 +217,12 @@ where
Ok(())
}
_ => {
if flags.socket.is_empty() {
return Err(Error::InvalidArgument(String::from(
"Shim socket cannot be empty",
)));
}

if !config.no_setup_logger {
logger::init(
flags.debug,
Expand All @@ -228,11 +236,15 @@ where
let task = Box::new(shim.create_task_service(publisher).await)
as Box<dyn containerd_shim_protos::shim_async::Task + Send + Sync>;
let task_service = create_task(Arc::from(task));
let mut server = Server::new().register_service(task_service);
server = server.add_listener(SOCKET_FD)?;
server = server.set_domain_unix();
let Some(mut server) = create_server_with_retry(&flags).await? else {
signal_server_started();
return Ok(());
};
server = server.register_service(task_service);
server.start().await?;

signal_server_started();

info!("Shim successfully started, waiting for exit signal...");
tokio::spawn(async move {
handle_signals(signals).await;
Expand Down Expand Up @@ -296,61 +308,86 @@ pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) ->
let cwd = env::current_dir().map_err(io_error!(e, ""))?;
let address = socket_address(&opts.address, &opts.namespace, grouping);

// Create socket and prepare listener.
// We'll use `add_listener` when creating TTRPC server.
let listener = match start_listener(&address).await {
Ok(l) => l,
Err(e) => {
if let Error::IoError {
err: ref io_err, ..
} = e
{
if io_err.kind() != std::io::ErrorKind::AddrInUse {
return Err(e);
};
}
if let Ok(()) = wait_socket_working(&address, 5, 200).await {
write_str_to_file("address", &address).await?;
return Ok(address);
}
remove_socket(&address).await?;
start_listener(&address).await?
}
};
// Activation pattern comes from the hcsshim: https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L57-L70
// another way to do it would to create named pipe and pass it to the child process through handle inheritence but that would require duplicating
// the logic in Rust's 'command' for process creation. There is an issue in Rust to make it simplier to specify handle inheritence and this could
// be revisited once https://github.com/rust-lang/rust/issues/54760 is implemented.

// tokio::process::Command do not have method `fd_mappings`,
// and the `spawn()` is also not an async method,
// so we use the std::process::Command here
let mut command = Command::new(cmd);

command
.current_dir(cwd)
.stdout(Stdio::null())
.stdout(Stdio::piped())
.stdin(Stdio::null())
.stderr(Stdio::null())
.envs(vars)
.args([
"-namespace",
&opts.namespace,
"-id",
&opts.id,
"-address",
&opts.address,
])
.fd_mappings(vec![FdMapping {
parent_fd: listener.into(),
child_fd: SOCKET_FD,
}])?;
"-socket",
&address,
]);

if opts.debug {
command.arg("-debug");
}
command.envs(vars);

let _child = command.spawn().map_err(io_error!(e, "spawn shim"))?;
let mut child = command.spawn().map_err(io_error!(e, "spawn shim"))?;

#[cfg(target_os = "linux")]
crate::cgroup::set_cgroup_and_oom_score(_child.id())?;
crate::cgroup::set_cgroup_and_oom_score(child.id().unwrap())?;

let mut reader = child.stdout.take().unwrap();
tokio::io::copy(&mut reader, &mut tokio::io::stderr())
.await
.unwrap();

Ok(address)
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
async fn create_server(flags: &args::Flags) -> Result<Server> {
use std::os::fd::IntoRawFd;
let listener = start_listener(&flags.socket).await?;
let mut server = Server::new();
server = server.add_listener(listener.into_raw_fd())?;
server = server.set_domain_unix();
Ok(server)
}

async fn create_server_with_retry(flags: &args::Flags) -> Result<Option<Server>> {
// Really try to create a server.
let server = match create_server(flags).await {
Ok(server) => server,
Err(Error::IoError { err, .. }) if err.kind() == std::io::ErrorKind::AddrInUse => {
// If the address is already in use then make sure it is up and running and return the address
// This allows for running a single shim per container scenarios
if let Ok(()) = wait_socket_working(&flags.socket, 5, 200).await {
write_str_to_file("address", &flags.socket).await?;
return Ok(None);
}
remove_socket(&flags.socket).await?;
create_server(flags).await?
}
Err(e) => return Err(e),
};

Ok(Some(server))
}

fn signal_server_started() {
use libc::{dup2, STDERR_FILENO, STDOUT_FILENO};

unsafe {
if dup2(STDERR_FILENO, STDOUT_FILENO) < 0 {
panic!("Error closing pipe: {}", std::io::Error::last_os_error())
}
}
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
fn setup_signals_tokio(config: &Config) -> Signals {
if config.no_reaper {
Expand Down
5 changes: 0 additions & 5 deletions crates/shim/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ pub enum Error {
#[error("Failed to setup logger: {0}")]
Setup(#[from] log::SetLoggerError),

/// Unable to pass fd to child process (we rely on `command_fds` crate for this).
#[cfg(unix)]
#[error("Failed to pass socket fd to child: {0}")]
FdMap(#[from] command_fds::FdMappingCollision),

#[cfg(unix)]
#[error("Nix error: {0}")]
Nix(#[from] nix::Error),
Expand Down
11 changes: 1 addition & 10 deletions crates/shim/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

use std::{fs::File, path::PathBuf};
#[cfg(unix)]
use std::{
os::unix::{io::RawFd, net::UnixListener},
path::Path,
};
use std::{os::unix::net::UnixListener, path::Path};

pub use containerd_shim_protos as protos;
#[cfg(unix)]
Expand Down Expand Up @@ -151,12 +148,6 @@ pub struct StartOpts {
pub debug: bool,
}

/// The shim process communicates with the containerd server through a communication channel
/// created by containerd. One endpoint of the communication channel is passed to shim process
/// through a file descriptor during forking, which is the fourth(3) file descriptor.
#[cfg(unix)]
const SOCKET_FD: RawFd = 3;

#[cfg(target_os = "linux")]
pub const SOCKET_ROOT: &str = "/run/containerd";

Expand Down
Loading
Loading