Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions kube-client/src/api/remote_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream},
select,
};
use tokio_tungstenite::{
tungstenite::{self as ws},
WebSocketStream,
};
use tokio_tungstenite::tungstenite as ws;

use crate::client::Connection;

use super::AttachParams;

Expand Down Expand Up @@ -112,9 +111,7 @@ pub struct AttachedProcess {
}

impl AttachedProcess {
pub(crate) fn new<S>(stream: WebSocketStream<S>, ap: &AttachParams) -> Self
where
S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static,
pub(crate) fn new(connection: Connection, ap: &AttachParams) -> Self
{
// To simplify the implementation, always create a pipe for stdin.
// The caller does not have access to it unless they had requested.
Expand All @@ -140,7 +137,7 @@ impl AttachedProcess {
};

let task = tokio::spawn(start_message_loop(
stream,
connection,
stdin_reader,
stdout_writer,
stderr_writer,
Expand Down Expand Up @@ -259,32 +256,37 @@ impl AttachedProcess {
}
}

// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cri/streaming/remotecommand/websocket.go#L34
// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L57
const STDIN_CHANNEL: u8 = 0;
const STDOUT_CHANNEL: u8 = 1;
const STDERR_CHANNEL: u8 = 2;
// status channel receives `Status` object on exit.
const STATUS_CHANNEL: u8 = 3;
// resize channel is use to send TerminalSize object to change the size of the terminal
const RESIZE_CHANNEL: u8 = 4;
/// Used to signal that a channel has reached EOF. Only works on V5 of the protocol.
const CLOSE_CHANNEL: u8 = 255;

async fn start_message_loop<S>(
stream: WebSocketStream<S>,
async fn start_message_loop(
connection: Connection,
stdin: impl AsyncRead + Unpin,
mut stdout: Option<impl AsyncWrite + Unpin>,
mut stderr: Option<impl AsyncWrite + Unpin>,
status_tx: StatusSender,
mut terminal_size_rx: Option<TerminalSizeReceiver>,
) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static,
{
let supports_stream_close = connection.supports_stream_close();
let stream = connection.into_stream();
let mut stdin_stream = tokio_util::io::ReaderStream::new(stdin);
let (mut server_send, raw_server_recv) = stream.split();
// Work with filtered messages to reduce noise.
let mut server_recv = raw_server_recv.filter_map(filter_message).boxed();
let mut have_terminal_size_rx = terminal_size_rx.is_some();

// True until we reach EOF for stdin.
let mut stdin_is_open = true;

loop {
let terminal_size_next = async {
match terminal_size_rx.as_mut() {
Expand Down Expand Up @@ -319,7 +321,7 @@ where
},
}
},
stdin_message = stdin_stream.next() => {
stdin_message = stdin_stream.next(), if stdin_is_open => {
match stdin_message {
Some(Ok(bytes)) => {
if !bytes.is_empty() {
Expand All @@ -337,9 +339,24 @@ where
}
None => {
// Stdin closed (writer half dropped).
// Let the server know and disconnect.
server_send.close().await.map_err(Error::SendClose)?;
break;
// Let the server know we reached the end of stdin.
if supports_stream_close {
// Signal stdin has reached EOF.
// See: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go#L346
let vec = vec![CLOSE_CHANNEL, STDIN_CHANNEL];
server_send
.send(ws::Message::binary(vec))
.await
.map_err(Error::SendStdin)?;
} else {
// Best we can do is trigger the whole websocket to close.
// We may miss out on any remaining stdout data that has not
// been sent yet.
server_send.close().await.map_err(Error::SendClose)?;
}

// Do not check stdin_stream for data in future loops.
stdin_is_open = false;
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions kube-client/src/api/subresource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ where
.request
.portforward(name, ports)
.map_err(Error::BuildRequest)?;
let stream = self.client.connect(req).await?;
Ok(Portforwarder::new(stream, ports))
let connection = self.client.connect(req).await?;
Ok(Portforwarder::new(connection.into_stream(), ports))
}
}
4 changes: 2 additions & 2 deletions kube-client/src/client/kubelet_debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl Client {
let mut req =
Request::kubelet_node_portforward(kubelet_params, ports).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_portforward");
let stream = self.connect(req).await?;
Ok(Portforwarder::new(stream, ports))
let connection = self.connect(req).await?;
Ok(Portforwarder::new(connection.into_stream(), ports))
}

/// Stream logs directly from node
Expand Down
42 changes: 29 additions & 13 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,29 @@ pub struct Client {
default_ns: String,
}

/// Represents a WebSocket connection.
/// Value returned by [`Client::connect`].
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub struct Connection {
stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
protocol: upgrade::StreamProtocol,
}
Comment thread
clux marked this conversation as resolved.

#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
impl Connection {
/// Return true if the stream supports graceful close signaling.
pub fn supports_stream_close(&self) -> bool {
self.protocol.supports_stream_close()
}

/// Transform into the raw WebSocketStream.
pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
self.stream
}
}

/// Constructors and low-level api interfaces.
///
/// Most users only need [`Client::try_default`] or [`Client::new`] from this block.
Expand Down Expand Up @@ -193,7 +216,7 @@ impl Client {
pub async fn connect(
&self,
request: Request<Vec<u8>>,
) -> Result<WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>> {
) -> Result<Connection> {
use http::header::HeaderValue;
let (mut parts, body) = request.into_parts();
parts
Expand All @@ -211,25 +234,18 @@ impl Client {
http::header::SEC_WEBSOCKET_KEY,
key.parse().expect("valid header value"),
);
// Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
// There's no official documentation about this protocol, but it's described in
// [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
// There's a comment about v4 and `Status` object in
// [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
parts.headers.insert(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_static(upgrade::WS_PROTOCOL),
);
upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;

let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
match hyper::upgrade::on(res).await {
Ok(upgraded) => Ok(WebSocketStream::from_raw_socket(
Ok(upgraded) => Ok(Connection {
stream: WebSocketStream::from_raw_socket(
TokioIo::new(upgraded),
ws::protocol::Role::Client,
None,
)
.await),
.await, protocol}),

Err(e) => Err(Error::UpgradeConnection(
UpgradeConnectionError::GetPendingUpgrade(e),
Expand Down
96 changes: 82 additions & 14 deletions kube-client/src/client/upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,82 @@
use http::{self, Response, StatusCode};
use http::{self, HeaderValue, Response, StatusCode};
use thiserror::Error;
use tokio_tungstenite::tungstenite as ws;

use crate::client::Body;
use crate::{client::Body, Error, Result};

// Binary subprotocol v4. See `Client::connect`.
pub const WS_PROTOCOL: &str = "v4.channel.k8s.io";
#[derive(Debug)]
pub enum StreamProtocol {
/// Binary subprotocol v4. See `Client::connect`.
V4,

/// Binary subprotocol v5. See `Client::connect`.
/// v5 supports CLOSE signals.
/// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L52C26-L52C43
V5,
}

impl StreamProtocol {
pub fn as_str(&self) -> &'static str {
match self {
StreamProtocol::V4 => "v4.channel.k8s.io",
StreamProtocol::V5 => "v5.channel.k8s.io"
}
}

fn as_bytes(&self) -> &'static [u8] {
self.as_str().as_bytes()
}

pub fn supports_stream_close(&self) -> bool {
match self {
StreamProtocol::V4 => false,
StreamProtocol::V5 => true
Comment thread
clux marked this conversation as resolved.
Outdated
}
}

/// Add HTTP header SEC_WEBSOCKET_PROTOCOL with a list of supported protocol.
pub fn add_to_headers(headers: &mut http::HeaderMap) -> Result<()> {
// Protocols we support in our preferred order.
let supported_protocols = vec![
// v5 supports CLOSE signals.
StreamProtocol::V5.as_str(),
// Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
// There's no official documentation about this protocol, but it's described in
// [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
// There's a comment about v4 and `Status` object in
// [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
StreamProtocol::V4.as_str(),
];

let header_value_string = supported_protocols.join(", ");
Comment thread
clux marked this conversation as resolved.

// Note: Multiple headers does not work. Only a single CSV works.
headers.insert(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_str(&header_value_string).map_err(|e| Error::HttpError(e.into()))?,
);

Ok(())
}

/// Return the subprotocol of an HTTP response.
fn get_from_response<B>(res: &Response<B>) -> Option<Self> {
let headers = res.headers();

match headers
.get(http::header::SEC_WEBSOCKET_PROTOCOL)
.map(|h| h.as_bytes()) {
Some(protocol) => if protocol == StreamProtocol::V4.as_bytes() {
Some(StreamProtocol::V4)
} else if protocol == StreamProtocol::V5.as_bytes() {
Some(StreamProtocol::V5)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for checking my understanding; the idea here is that we send a comma separated list of protocols we support, and the far end gives us the most recent one they can handle, which we can inspect viasupports_stream_close later for feature selection?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is correct (or at least correct as far as my understanding goes).

I found this doc which explains how the protocol header is intended to operate. https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Sec-WebSocket-Protocol

} else {
None
},
_ => None,
}
}
}

/// Possible errors from upgrading to a WebSocket connection
#[cfg(feature = "ws")]
Expand Down Expand Up @@ -42,7 +113,7 @@ pub enum UpgradeConnectionError {

// Verify upgrade response according to RFC6455.
// Based on `tungstenite` and added subprotocol verification.
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeConnectionError> {
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<StreamProtocol, UpgradeConnectionError> {
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
return Err(UpgradeConnectionError::ProtocolSwitch(res.status()));
}
Expand Down Expand Up @@ -75,14 +146,11 @@ pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeCon
return Err(UpgradeConnectionError::SecWebSocketAcceptKeyMismatch);
}

// Make sure that the server returned the correct subprotocol.
if !headers
.get(http::header::SEC_WEBSOCKET_PROTOCOL)
.map(|h| h == WS_PROTOCOL)
.unwrap_or(false)
{
return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch);
}
// Make sure that the server returned an expected subprotocol.
let protocol = match StreamProtocol::get_from_response(res) {
Some(p) => p,
None => return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch)
};

Ok(())
Ok(protocol)
}
36 changes: 35 additions & 1 deletion kube-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ mod test {
#[cfg(feature = "ws")]
async fn pod_can_exec_and_write_to_stdin() -> Result<(), Box<dyn std::error::Error>> {
use crate::api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent};
use tokio::io::AsyncWriteExt;

let client = Client::try_default().await?;
let pods: Api<Pod> = Api::default_namespaced(client);
Expand Down Expand Up @@ -348,9 +349,42 @@ mod test {
assert_eq!(out, "1\n2\n3\n");
}

// Verify we read from stdout after stdin is closed.
{
let name = "busybox-kube2";
let command = vec!["sh", "-c", "sleep 2; echo test string 2"];
let ap = AttachParams::default().stdin(true).stderr(false);

// Make a connection so we can determine if the K8s cluster supports stream closing.
let mut req = pods.request.exec(name, command.clone(), &ap)?;
req.extensions_mut().insert("exec");
let stream = pods.client.connect(req).await?;

// This only works is the cluster supports protocol version v5.channel.k8s.io
// Skip for older protocols.
Comment thread
clux marked this conversation as resolved.
if stream.supports_stream_close() {
let mut attached = pods.exec(name, command, &ap).await?;
let mut stdin_writer = attached.stdin().unwrap();
let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());

stdin_writer.write_all(b"this will be ignored\n").await?;
_ = stdin_writer.shutdown().await;

let next_stdout = stdout_stream.next();
let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
assert_eq!(stdout, "test string 2\n");

// AttachedProcess resolves with status object.
let status = attached.take_status().unwrap();
if let Some(status) = status.await {
assert_eq!(status.status, Some("Success".to_owned()));
assert_eq!(status.reason, None);
}
}
}

// Verify we can write to Stdin
{
use tokio::io::AsyncWriteExt;
let mut attached = pods
.exec(
"busybox-kube2",
Expand Down