From fc9775f2fb577c984c295f30b3677e13cf78a9f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 14 Aug 2024 08:33:26 +0200 Subject: [PATCH 01/12] docs: mention SerializeRow instead of ValueList ValueList is a deprecated API. --- scylla/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scylla/src/lib.rs b/scylla/src/lib.rs index 07ba8aaef5..3adce0fc04 100644 --- a/scylla/src/lib.rs +++ b/scylla/src/lib.rs @@ -63,7 +63,8 @@ //! # Ok(()) //! # } //! ``` -//! But the driver will accept anything implementing the trait [ValueList](crate::frame::value::ValueList) +//! But the driver will accept anything implementing the trait [SerializeRow] +//! (crate::serialize::row::SerializeRow) //! //! ### Receiving results //! The easiest way to read rows returned by a query is to cast each row to a tuple of values: From f94c3d1bb10c73ca87082dcdff742c0330741536 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 13 Aug 2024 19:39:56 +0200 Subject: [PATCH 02/12] session_test: fix bad row type in tracing tests As the old deserialization framework only validates types when some row is actually converted into the end type (and not if 0 rows were received), the error went unnoticed. The new framework, however, failed with the bad type provided. --- scylla/src/transport/session_test.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index a88fa73018..89d427d102 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -1056,7 +1056,7 @@ async fn test_tracing_query_iter(session: &Session, ks: String) { assert!(untraced_row_iter.get_tracing_ids().is_empty()); // The same is true for TypedRowIter - let untraced_typed_row_iter = untraced_row_iter.into_typed::<(i32,)>(); + let untraced_typed_row_iter = untraced_row_iter.into_typed::<(String,)>(); assert!(untraced_typed_row_iter.get_tracing_ids().is_empty()); // A query with tracing enabled has a tracing ids in result @@ -1071,7 +1071,7 @@ async fn test_tracing_query_iter(session: &Session, ks: String) { assert!(!traced_row_iter.get_tracing_ids().is_empty()); // The same is true for TypedRowIter - let traced_typed_row_iter = traced_row_iter.into_typed::<(i32,)>(); + let traced_typed_row_iter = traced_row_iter.into_typed::<(String,)>(); assert!(!traced_typed_row_iter.get_tracing_ids().is_empty()); for tracing_id in traced_typed_row_iter.get_tracing_ids() { @@ -1094,7 +1094,7 @@ async fn test_tracing_execute_iter(session: &Session, ks: String) { assert!(untraced_row_iter.get_tracing_ids().is_empty()); // The same is true for TypedRowIter - let untraced_typed_row_iter = untraced_row_iter.into_typed::<(i32,)>(); + let untraced_typed_row_iter = untraced_row_iter.into_typed::<(String,)>(); assert!(untraced_typed_row_iter.get_tracing_ids().is_empty()); // A prepared statement with tracing enabled has a tracing ids in result @@ -1112,7 +1112,7 @@ async fn test_tracing_execute_iter(session: &Session, ks: String) { assert!(!traced_row_iter.get_tracing_ids().is_empty()); // The same is true for TypedRowIter - let traced_typed_row_iter = traced_row_iter.into_typed::<(i32,)>(); + let traced_typed_row_iter = traced_row_iter.into_typed::<(String,)>(); assert!(!traced_typed_row_iter.get_tracing_ids().is_empty()); for tracing_id in traced_typed_row_iter.get_tracing_ids() { From 6e12378380e8351b5829847ed40828b35eb56f19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 13 Aug 2024 11:24:05 +0200 Subject: [PATCH 03/12] scylla_cql: impl Debug for (Typed)RowIterator The impl was missing, and lack of it makes it impossible to e.g. unwrap a `Result`. --- scylla-cql/src/types/deserialize/result.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scylla-cql/src/types/deserialize/result.rs b/scylla-cql/src/types/deserialize/result.rs index 036b909afb..cb18d14727 100644 --- a/scylla-cql/src/types/deserialize/result.rs +++ b/scylla-cql/src/types/deserialize/result.rs @@ -5,6 +5,7 @@ use super::{DeserializationError, FrameSlice, TypeCheckError}; use std::marker::PhantomData; /// Iterates over the whole result, returning rows. +#[derive(Debug)] pub struct RowIterator<'frame> { specs: &'frame [ColumnSpec], remaining: usize, @@ -76,6 +77,7 @@ impl<'frame> Iterator for RowIterator<'frame> { /// A typed version of [RowIterator] which deserializes the rows before /// returning them. +#[derive(Debug)] pub struct TypedRowIterator<'frame, R> { inner: RowIterator<'frame>, _phantom: PhantomData, From 4c19429c32027906efb47fb787b0aa5c6fd13a5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 11 Jul 2024 12:47:43 +0200 Subject: [PATCH 04/12] errors: add TypeCheckError variant to ParseError This was overlooked in a previous PR, where TypeCheckError was introduced. --- scylla-cql/src/frame/frame_errors.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scylla-cql/src/frame/frame_errors.rs b/scylla-cql/src/frame/frame_errors.rs index b90eef3a79..0fa4d1e568 100644 --- a/scylla-cql/src/frame/frame_errors.rs +++ b/scylla-cql/src/frame/frame_errors.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use super::TryFromPrimitiveError; use crate::cql_to_rust::CqlTypeError; use crate::frame::value::SerializeValuesError; -use crate::types::deserialize::DeserializationError; +use crate::types::deserialize::{DeserializationError, TypeCheckError}; use crate::types::serialize::SerializationError; use thiserror::Error; @@ -46,6 +46,8 @@ pub enum ParseError { #[error(transparent)] DeserializationError(#[from] DeserializationError), #[error(transparent)] + DeserializationTypeCheckError(#[from] TypeCheckError), + #[error(transparent)] IoError(#[from] std::io::Error), #[error(transparent)] SerializeValuesError(#[from] SerializeValuesError), From a27efa0c98dfbc07ea8f2a7fc9a4d60f5e95b51e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 13 Aug 2024 17:33:06 +0200 Subject: [PATCH 05/12] errors: impl deser errors conversions to QueryError This is a temporary measure, until error refactor gets rid of the awful QueryError::InvalidMessage(String) variant and replaces it with a decent matchable error. --- scylla-cql/src/errors.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 1ebf9cfede..fa96b2880c 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -3,6 +3,7 @@ use crate::frame::frame_errors::{CqlResponseParseError, FrameError, ParseError}; use crate::frame::protocol_features::ProtocolFeatures; use crate::frame::value::SerializeValuesError; +use crate::types::deserialize::{DeserializationError, TypeCheckError}; use crate::types::serialize::SerializationError; use crate::Consistency; use bytes::Bytes; @@ -461,6 +462,18 @@ impl From for QueryError { } } +impl From for QueryError { + fn from(value: DeserializationError) -> Self { + Self::InvalidMessage(value.to_string()) + } +} + +impl From for QueryError { + fn from(value: TypeCheckError) -> Self { + Self::InvalidMessage(value.to_string()) + } +} + impl From for QueryError { fn from(parse_error: ParseError) -> QueryError { QueryError::InvalidMessage(format!("Error parsing message: {}", parse_error)) From 12f0423613e5209f53a6454bed7df764127b828d Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Fri, 25 Nov 2022 15:15:22 +0100 Subject: [PATCH 06/12] {frame,transport}: propagate frame contents as Bytes The new deserialization API will need to receive ownership of the serialized frame for two reasons: - To be able to deserialize to types that borrow from the frame (e.g. `text` as &str), - To be able to deserialize to types that share ownership of the bytes (e.g. `blob` -> bytes::Bytes). --- scylla-cql/src/frame/response/mod.rs | 7 +++++-- scylla-cql/src/frame/response/result.rs | 8 +++++--- scylla/src/transport/connection.rs | 2 +- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/scylla-cql/src/frame/response/mod.rs b/scylla-cql/src/frame/response/mod.rs index 8e6e7ff335..690e945048 100644 --- a/scylla-cql/src/frame/response/mod.rs +++ b/scylla-cql/src/frame/response/mod.rs @@ -65,9 +65,10 @@ impl Response { pub fn deserialize( features: &ProtocolFeatures, opcode: ResponseOpcode, - buf: &mut &[u8], + buf_bytes: bytes::Bytes, cached_metadata: Option<&ResultMetadata>, ) -> Result { + let buf = &mut &*buf_bytes; let response = match opcode { ResponseOpcode::Error => Response::Error(Error::deserialize(features, buf)?), ResponseOpcode::Ready => Response::Ready, @@ -75,7 +76,9 @@ impl Response { Response::Authenticate(authenticate::Authenticate::deserialize(buf)?) } ResponseOpcode::Supported => Response::Supported(Supported::deserialize(buf)?), - ResponseOpcode::Result => Response::Result(result::deserialize(buf, cached_metadata)?), + ResponseOpcode::Result => { + Response::Result(result::deserialize(buf_bytes, cached_metadata)?) + } ResponseOpcode::Event => Response::Event(event::Event::deserialize(buf)?), ResponseOpcode::AuthChallenge => { Response::AuthChallenge(authenticate::AuthChallenge::deserialize(buf)?) diff --git a/scylla-cql/src/frame/response/result.rs b/scylla-cql/src/frame/response/result.rs index b6b51dc7df..f966834085 100644 --- a/scylla-cql/src/frame/response/result.rs +++ b/scylla-cql/src/frame/response/result.rs @@ -859,9 +859,10 @@ pub fn deser_cql_value( } fn deser_rows( - buf: &mut &[u8], + buf_bytes: Bytes, cached_metadata: Option<&ResultMetadata>, ) -> StdResult { + let buf = &mut &*buf_bytes; let server_metadata = deser_result_metadata(buf)?; let metadata = match cached_metadata { @@ -935,16 +936,17 @@ fn deser_schema_change(buf: &mut &[u8]) -> StdResult, ) -> StdResult { + let buf = &mut &*buf_bytes; use self::Result::*; Ok( match types::read_int(buf) .map_err(|err| CqlResultParseError::ResultIdParseError(err.into()))? { 0x0001 => Void, - 0x0002 => Rows(deser_rows(buf, cached_metadata)?), + 0x0002 => Rows(deser_rows(buf_bytes.slice_ref(buf), cached_metadata)?), 0x0003 => SetKeyspace(deser_set_keyspace(buf)?), 0x0004 => Prepared(deser_prepared(buf)?), 0x0005 => SchemaChange(deser_schema_change(buf)?), diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 76a226ce54..335320f7b1 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1316,7 +1316,7 @@ impl Connection { let response = Response::deserialize( features, task_response.opcode, - &mut &*body_with_ext.body, + body_with_ext.body, cached_metadata, )?; From 88632eff578ab1511212b2144e2e9e8e2e6fe95f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 14 Aug 2024 11:38:04 +0200 Subject: [PATCH 07/12] deser/value: convenient deserialization of empty collections ScyllaDB does not distinguish empty collections from nulls. That is, INSERTing an empty collection is equivalent to nullifying the corresponding column. As pointed out in [#1001](https://github.com/scylladb/scylla-rust-driver/issues/1001), it's a nice QOL feature to be able to deserialize empty CQL collections to empty Rust collections instead of `None::`. Deserialization logic is modified to enable that. --- scylla-cql/src/types/deserialize/value.rs | 73 +++++++++++++++---- .../src/types/deserialize/value_tests.rs | 58 +++++++++------ 2 files changed, 93 insertions(+), 38 deletions(-) diff --git a/scylla-cql/src/types/deserialize/value.rs b/scylla-cql/src/types/deserialize/value.rs index 074a7c298a..ca6e22b651 100644 --- a/scylla-cql/src/types/deserialize/value.rs +++ b/scylla-cql/src/types/deserialize/value.rs @@ -670,6 +670,15 @@ impl<'frame, T> ListlikeIterator<'frame, T> { phantom_data: std::marker::PhantomData, } } + + fn empty(coll_typ: &'frame ColumnType, elem_typ: &'frame ColumnType) -> Self { + Self { + coll_typ, + elem_typ, + raw_iter: FixedLengthBytesSequenceIterator::empty(), + phantom_data: std::marker::PhantomData, + } + } } impl<'frame, T> DeserializeValue<'frame> for ListlikeIterator<'frame, T> @@ -699,7 +708,19 @@ where typ: &'frame ColumnType, v: Option>, ) -> Result { - let mut v = ensure_not_null_frame_slice::(typ, v)?; + let elem_typ = match typ { + ColumnType::List(elem_typ) | ColumnType::Set(elem_typ) => elem_typ, + _ => { + unreachable!("Typecheck should have prevented this scenario!") + } + }; + + let mut v = if let Some(v) = v { + v + } else { + return Ok(Self::empty(typ, elem_typ)); + }; + let count = types::read_int_length(v.as_slice_mut()).map_err(|err| { mk_deser_err::( typ, @@ -708,12 +729,7 @@ where ), ) })?; - let elem_typ = match typ { - ColumnType::List(elem_typ) | ColumnType::Set(elem_typ) => elem_typ, - _ => { - unreachable!("Typecheck should have prevented this scenario!") - } - }; + Ok(Self::new(typ, elem_typ, count, v)) } } @@ -849,6 +865,21 @@ impl<'frame, K, V> MapIterator<'frame, K, V> { phantom_data_v: std::marker::PhantomData, } } + + fn empty( + coll_typ: &'frame ColumnType, + k_typ: &'frame ColumnType, + v_typ: &'frame ColumnType, + ) -> Self { + Self { + coll_typ, + k_typ, + v_typ, + raw_iter: FixedLengthBytesSequenceIterator::empty(), + phantom_data_k: std::marker::PhantomData, + phantom_data_v: std::marker::PhantomData, + } + } } impl<'frame, K, V> DeserializeValue<'frame> for MapIterator<'frame, K, V> @@ -875,7 +906,19 @@ where typ: &'frame ColumnType, v: Option>, ) -> Result { - let mut v = ensure_not_null_frame_slice::(typ, v)?; + let (k_typ, v_typ) = match typ { + ColumnType::Map(k_t, v_t) => (k_t, v_t), + _ => { + unreachable!("Typecheck should have prevented this scenario!") + } + }; + + let mut v = if let Some(v) = v { + v + } else { + return Ok(Self::empty(typ, k_typ, v_typ)); + }; + let count = types::read_int_length(v.as_slice_mut()).map_err(|err| { mk_deser_err::( typ, @@ -884,12 +927,7 @@ where ), ) })?; - let (k_typ, v_typ) = match typ { - ColumnType::Map(k_t, v_t) => (k_t, v_t), - _ => { - unreachable!("Typecheck should have prevented this scenario!") - } - }; + Ok(Self::new(typ, k_typ, v_typ, 2 * count, v)) } } @@ -1275,6 +1313,13 @@ impl<'frame> FixedLengthBytesSequenceIterator<'frame> { remaining: count, } } + + fn empty() -> Self { + Self { + slice: FrameSlice::new_empty(), + remaining: 0, + } + } } impl<'frame> Iterator for FixedLengthBytesSequenceIterator<'frame> { diff --git a/scylla-cql/src/types/deserialize/value_tests.rs b/scylla-cql/src/types/deserialize/value_tests.rs index 9375ce47f6..fd14c5e730 100644 --- a/scylla-cql/src/types/deserialize/value_tests.rs +++ b/scylla-cql/src/types/deserialize/value_tests.rs @@ -424,6 +424,24 @@ fn test_list_and_set() { expected_vec_string.into_iter().collect(), ); + // Null collections are interpreted as empty collections, to retain convenience: + // when an empty collection is sent to the DB, the DB nullifies the column instead. + { + let list_typ = ColumnType::List(Box::new(ColumnType::BigInt)); + let set_typ = ColumnType::Set(Box::new(ColumnType::BigInt)); + type CollTyp = i64; + + fn check<'frame, Collection: DeserializeValue<'frame>>(typ: &'frame ColumnType) { + >::type_check(typ).unwrap(); + >::deserialize(typ, None).unwrap(); + } + + check::>(&list_typ); + check::>(&set_typ); + check::>(&set_typ); + check::>(&set_typ); + } + // ser/de identity assert_ser_de_identity(&list_typ, &vec!["qwik"], &mut Bytes::new()); assert_ser_de_identity(&set_typ, &vec!["qwik"], &mut Bytes::new()); @@ -486,6 +504,22 @@ fn test_map() { ); assert_eq!(decoded_btree_string, expected_string.into_iter().collect()); + // Null collections are interpreted as empty collections, to retain convenience: + // when an empty collection is sent to the DB, the DB nullifies the column instead. + { + let map_typ = ColumnType::Map(Box::new(ColumnType::BigInt), Box::new(ColumnType::Ascii)); + type KeyTyp = i64; + type ValueTyp<'s> = &'s str; + + fn check<'frame, Collection: DeserializeValue<'frame>>(typ: &'frame ColumnType) { + >::type_check(typ).unwrap(); + >::deserialize(typ, None).unwrap(); + } + + check::>(&map_typ); + check::>(&map_typ); + } + // ser/de identity assert_ser_de_identity( &typ, @@ -1218,18 +1252,6 @@ fn test_set_or_list_errors() { ); } - // Got null - { - type RustTyp = Vec; - let ser_typ = ColumnType::List(Box::new(ColumnType::Int)); - - let err = RustTyp::deserialize(&ser_typ, None).unwrap_err(); - let err = get_deser_err(&err); - assert_eq!(err.rust_name, std::any::type_name::()); - assert_eq!(err.cql_type, ser_typ); - assert_matches!(err.kind, BuiltinDeserializationErrorKind::ExpectedNonNull); - } - // Bad element type { assert_type_check_error!( @@ -1316,18 +1338,6 @@ fn test_map_errors() { ); } - // Got null - { - type RustTyp = HashMap; - let ser_typ = ColumnType::Map(Box::new(ColumnType::Int), Box::new(ColumnType::Boolean)); - - let err = RustTyp::deserialize(&ser_typ, None).unwrap_err(); - let err = get_deser_err(&err); - assert_eq!(err.rust_name, std::any::type_name::()); - assert_eq!(err.cql_type, ser_typ); - assert_matches!(err.kind, BuiltinDeserializationErrorKind::ExpectedNonNull); - } - // Key type mismatch { let err = deserialize::>( From b9c5256237c0924b1a9f67aecf95b6462ae6b5a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Fri, 23 Aug 2024 11:56:49 +0200 Subject: [PATCH 08/12] result: slightly refactor deser_result_metadata This aids readability. --- scylla-cql/src/frame/response/result.rs | 27 +++++++++++-------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/scylla-cql/src/frame/response/result.rs b/scylla-cql/src/frame/response/result.rs index f966834085..e208ca4077 100644 --- a/scylla-cql/src/frame/response/result.rs +++ b/scylla-cql/src/frame/response/result.rs @@ -635,27 +635,24 @@ fn deser_result_metadata(buf: &mut &[u8]) -> StdResult Date: Fri, 23 Aug 2024 11:57:53 +0200 Subject: [PATCH 09/12] result: extract paging_state from ResultMetadata Although paging state comes from the DB in the CQL protocol entity, it should be considered separate. The main reason for that is that PreparedStatement also receives (and optionally caches) ResultMetadata upon preparation, which is then used for an optimisation (col_specs are not sent in RESULT::Rows response by the DB). However, paging state should not be cached this way, as it's going to change in between execution of the same statement with different paging state given. In the next commit ResultMetadata is going to be shared between PreparedStatement and its QueryResults by an Arc, and for that paging state must be kept separate. --- scylla-cql/src/frame/frame_errors.rs | 2 ++ scylla-cql/src/frame/request/query.rs | 8 ++++++++ scylla-cql/src/frame/response/result.rs | 25 +++++++++++++++++-------- scylla/src/transport/connection.rs | 2 +- scylla/src/transport/iterator.rs | 16 ++++++++++------ 5 files changed, 38 insertions(+), 15 deletions(-) diff --git a/scylla-cql/src/frame/frame_errors.rs b/scylla-cql/src/frame/frame_errors.rs index 0fa4d1e568..155491ef91 100644 --- a/scylla-cql/src/frame/frame_errors.rs +++ b/scylla-cql/src/frame/frame_errors.rs @@ -218,6 +218,8 @@ pub enum PreparedParseError { ResultMetadataParseError(ResultMetadataParseError), #[error("Invalid prepared metadata: {0}")] PreparedMetadataParseError(ResultMetadataParseError), + #[error("Non-zero paging state in result metadata: {0:?}")] + NonZeroPagingState(Arc<[u8]>), } /// An error type returned when deserialization diff --git a/scylla-cql/src/frame/request/query.rs b/scylla-cql/src/frame/request/query.rs index 9c755c3db9..2794a2b5d9 100644 --- a/scylla-cql/src/frame/request/query.rs +++ b/scylla-cql/src/frame/request/query.rs @@ -242,6 +242,14 @@ impl PagingStateResponse { Self::NoMorePages => ControlFlow::Break(()), } } + + /// Swaps the paging state response with PagingStateResponse::NoMorePages. + /// + /// Only for use in driver's inner code, as an optimisation. + #[doc(hidden)] + pub fn take(&mut self) -> Self { + std::mem::replace(self, Self::NoMorePages) + } } /// The state of a paged query, i.e. where to resume fetching result rows diff --git a/scylla-cql/src/frame/response/result.rs b/scylla-cql/src/frame/response/result.rs index e208ca4077..c1cd4bb7d9 100644 --- a/scylla-cql/src/frame/response/result.rs +++ b/scylla-cql/src/frame/response/result.rs @@ -431,7 +431,6 @@ pub struct ColumnSpec { #[derive(Debug, Clone)] pub struct ResultMetadata { col_count: usize, - pub paging_state: PagingStateResponse, pub col_specs: Vec, } @@ -440,7 +439,6 @@ impl ResultMetadata { pub fn mock_empty() -> Self { Self { col_count: 0, - paging_state: PagingStateResponse::NoMorePages, col_specs: Vec::new(), } } @@ -479,6 +477,7 @@ impl Row { #[derive(Debug)] pub struct Rows { pub metadata: ResultMetadata, + pub paging_state_response: PagingStateResponse, pub rows_count: usize, pub rows: Vec, /// Original size of the serialized rows. @@ -620,7 +619,9 @@ fn deser_col_specs( Ok(col_specs) } -fn deser_result_metadata(buf: &mut &[u8]) -> StdResult { +fn deser_result_metadata( + buf: &mut &[u8], +) -> StdResult<(ResultMetadata, PagingStateResponse), ResultMetadataParseError> { let flags = types::read_int(buf) .map_err(|err| ResultMetadataParseError::FlagsParseError(err.into()))?; let global_tables_spec = flags & 0x0001 != 0; @@ -649,10 +650,9 @@ fn deser_result_metadata(buf: &mut &[u8]) -> StdResult, ) -> StdResult { let buf = &mut &*buf_bytes; - let server_metadata = deser_result_metadata(buf)?; + let (server_metadata, paging_state_response) = deser_result_metadata(buf)?; let metadata = match cached_metadata { Some(cached) => ResultMetadata { col_count: cached.col_count, - paging_state: server_metadata.paging_state, col_specs: cached.col_specs.clone(), }, None => { @@ -897,6 +896,7 @@ fn deser_rows( Ok(Rows { metadata, + paging_state_response, rows_count, rows, serialized_size: original_size - buf.len(), @@ -917,8 +917,17 @@ fn deser_prepared(buf: &mut &[u8]) -> StdResult { buf.advance(id_len); let prepared_metadata = deser_prepared_metadata(buf).map_err(PreparedParseError::PreparedMetadataParseError)?; - let result_metadata = + let (result_metadata, paging_state_response) = deser_result_metadata(buf).map_err(PreparedParseError::ResultMetadataParseError)?; + if let PagingStateResponse::HasMorePages { state } = paging_state_response { + return Err(PreparedParseError::NonZeroPagingState( + state + .as_bytes_slice() + .cloned() + .unwrap_or_else(|| Arc::from([])), + )); + } + Ok(Prepared { id, prepared_metadata, diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 335320f7b1..c78042ff2f 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -270,7 +270,7 @@ impl NonErrorQueryResponse { let (rows, paging_state, col_specs, serialized_size) = match self.response { NonErrorResponse::Result(result::Result::Rows(rs)) => ( Some(rs.rows), - rs.metadata.paging_state, + rs.paging_state_response, rs.metadata.col_specs, rs.serialized_size, ), diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index cb5c26ca87..2b21233712 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -410,7 +410,10 @@ impl RowIterator { mod checked_channel_sender { use scylla_cql::{ errors::QueryError, - frame::response::result::{ResultMetadata, Rows}, + frame::{ + request::query::PagingStateResponse, + response::result::{ResultMetadata, Rows}, + }, }; use std::marker::PhantomData; use tokio::sync::mpsc; @@ -454,6 +457,7 @@ mod checked_channel_sender { let empty_page = ReceivedPage { rows: Rows { metadata: ResultMetadata::mock_empty(), + paging_state_response: PagingStateResponse::NoMorePages, rows_count: 0, rows: Vec::new(), serialized_size: 0, @@ -660,7 +664,7 @@ where match query_response { Ok(NonErrorQueryResponse { - response: NonErrorResponse::Result(result::Result::Rows(rows)), + response: NonErrorResponse::Result(result::Result::Rows(mut rows)), tracing_id, .. }) => { @@ -671,9 +675,9 @@ where .load_balancing_policy .on_query_success(&self.statement_info, elapsed, node); - request_span.record_rows_fields(&rows); + let paging_state_response = rows.paging_state_response.take(); - let paging_state_response = rows.metadata.paging_state.clone(); + request_span.record_rows_fields(&rows); let received_page = ReceivedPage { rows, tracing_id }; @@ -840,8 +844,8 @@ where let result = (self.fetcher)(paging_state).await?; let response = result.into_non_error_query_response()?; match response.response { - NonErrorResponse::Result(result::Result::Rows(rows)) => { - let paging_state_response = rows.metadata.paging_state.clone(); + NonErrorResponse::Result(result::Result::Rows(mut rows)) => { + let paging_state_response = rows.paging_state_response.take(); let (proof, send_result) = self .sender From 2fcb5c385d04b4b26ceaecf4c27310e270165b2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Fri, 23 Aug 2024 12:31:42 +0200 Subject: [PATCH 10/12] query_result: unpub col_specs field and provide getter This is preparation for next commits, where QueryResult is going to hold Arc instead of Vec. --- scylla/src/transport/query_result.rs | 11 ++++++++--- scylla/src/transport/session_test.rs | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/scylla/src/transport/query_result.rs b/scylla/src/transport/query_result.rs index 49a68f5c73..592bfdace4 100644 --- a/scylla/src/transport/query_result.rs +++ b/scylla/src/transport/query_result.rs @@ -7,7 +7,6 @@ use uuid::Uuid; /// Result of a single query\ /// Contains all rows returned by the database and some more information -#[non_exhaustive] #[derive(Debug)] pub struct QueryResult { /// Rows returned by the database.\ @@ -19,7 +18,7 @@ pub struct QueryResult { /// CQL Tracing uuid - can only be Some if tracing is enabled for this query pub tracing_id: Option, /// Column specification returned from the server - pub col_specs: Vec, + pub(crate) col_specs: Vec, /// The original size of the serialized rows in request pub serialized_size: usize, } @@ -134,9 +133,15 @@ impl QueryResult { Ok(self.single_row()?.into_typed::()?) } + /// Returns column specifications. + #[inline] + pub fn col_specs(&self) -> &[ColumnSpec] { + self.col_specs.as_slice() + } + /// Returns a column specification for a column with given name, or None if not found pub fn get_column_spec<'a>(&'a self, name: &str) -> Option<(usize, &'a ColumnSpec)> { - self.col_specs + self.col_specs() .iter() .enumerate() .find(|(_id, spec)| spec.name == name) diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index 89d427d102..412b76a00a 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -2570,7 +2570,7 @@ async fn test_batch_lwts() { let batch_res: QueryResult = session.batch(&batch, ((), (), ())).await.unwrap(); // Scylla returns 5 columns, but Cassandra returns only 1 - let is_scylla: bool = batch_res.col_specs.len() == 5; + let is_scylla: bool = batch_res.col_specs().len() == 5; if is_scylla { test_batch_lwts_for_scylla(&session, &batch, batch_res).await; From 6201845bf679ade0194d3acf06e805eff2f46c3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Fri, 23 Aug 2024 12:36:09 +0200 Subject: [PATCH 11/12] query_result: store full metadata instead of col_specs For now, the metadata is owned. In the next commit, metadata's ownership is made shared by Arc. --- scylla-cql/src/frame/response/result.rs | 9 ++++++++ scylla/src/transport/connection.rs | 8 +++---- scylla/src/transport/query_result.rs | 28 ++++++++++++++++++------- 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/scylla-cql/src/frame/response/result.rs b/scylla-cql/src/frame/response/result.rs index c1cd4bb7d9..9d6aec229c 100644 --- a/scylla-cql/src/frame/response/result.rs +++ b/scylla-cql/src/frame/response/result.rs @@ -442,6 +442,15 @@ impl ResultMetadata { col_specs: Vec::new(), } } + + #[inline] + #[doc(hidden)] + pub fn new_for_test(col_count: usize, col_specs: Vec) -> Self { + Self { + col_count, + col_specs, + } + } } #[derive(Debug, Copy, Clone)] diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index c78042ff2f..3e36ea625f 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -267,14 +267,14 @@ impl NonErrorQueryResponse { pub(crate) fn into_query_result_and_paging_state( self, ) -> Result<(QueryResult, PagingStateResponse), QueryError> { - let (rows, paging_state, col_specs, serialized_size) = match self.response { + let (rows, paging_state, metadata, serialized_size) = match self.response { NonErrorResponse::Result(result::Result::Rows(rs)) => ( Some(rs.rows), rs.paging_state_response, - rs.metadata.col_specs, + Some(rs.metadata), rs.serialized_size, ), - NonErrorResponse::Result(_) => (None, PagingStateResponse::NoMorePages, vec![], 0), + NonErrorResponse::Result(_) => (None, PagingStateResponse::NoMorePages, None, 0), _ => { return Err(QueryError::ProtocolError( "Unexpected server response, expected Result or Error", @@ -287,7 +287,7 @@ impl NonErrorQueryResponse { rows, warnings: self.warnings, tracing_id: self.tracing_id, - col_specs, + metadata, serialized_size, }, paging_state, diff --git a/scylla/src/transport/query_result.rs b/scylla/src/transport/query_result.rs index 592bfdace4..4f1918fb90 100644 --- a/scylla/src/transport/query_result.rs +++ b/scylla/src/transport/query_result.rs @@ -2,6 +2,7 @@ use crate::frame::response::cql_to_rust::{FromRow, FromRowError}; use crate::frame::response::result::ColumnSpec; use crate::frame::response::result::Row; use crate::transport::session::{IntoTypedRows, TypedRowIter}; +use scylla_cql::frame::response::result::ResultMetadata; use thiserror::Error; use uuid::Uuid; @@ -17,8 +18,8 @@ pub struct QueryResult { pub warnings: Vec, /// CQL Tracing uuid - can only be Some if tracing is enabled for this query pub tracing_id: Option, - /// Column specification returned from the server - pub(crate) col_specs: Vec, + /// Metadata returned along with this response. + pub(crate) metadata: Option, /// The original size of the serialized rows in request pub serialized_size: usize, } @@ -29,7 +30,7 @@ impl QueryResult { rows: None, warnings: Vec::new(), tracing_id: None, - col_specs: Vec::new(), + metadata: None, serialized_size: 0, } } @@ -136,10 +137,14 @@ impl QueryResult { /// Returns column specifications. #[inline] pub fn col_specs(&self) -> &[ColumnSpec] { - self.col_specs.as_slice() + self.metadata + .as_ref() + .map(|metadata| metadata.col_specs.as_slice()) + .unwrap_or_default() } /// Returns a column specification for a column with given name, or None if not found + #[inline] pub fn get_column_spec<'a>(&'a self, name: &str) -> Option<(usize, &'a ColumnSpec)> { self.col_specs() .iter() @@ -274,12 +279,13 @@ impl From for SingleRowTypedError { mod tests { use super::*; use crate::{ - frame::response::result::{ColumnSpec, ColumnType, CqlValue, Row, TableSpec}, + frame::response::result::{CqlValue, Row}, test_utils::setup_tracing, }; use std::convert::TryInto; use assert_matches::assert_matches; + use scylla_cql::frame::response::result::{ColumnType, TableSpec}; // Returns specified number of rows, each one containing one int32 value. // Values are 0, 1, 2, 3, 4, ... @@ -306,8 +312,8 @@ mod tests { rows } - fn make_not_rows_query_result() -> QueryResult { - let table_spec = TableSpec::owned("some_keyspace".to_string(), "some_table".to_string()); + fn make_test_metadata() -> ResultMetadata { + let table_spec = TableSpec::borrowed("some_keyspace", "some_table"); let column_spec = ColumnSpec { table_spec, @@ -315,11 +321,15 @@ mod tests { typ: ColumnType::Int, }; + ResultMetadata::new_for_test(1, vec![column_spec]) + } + + fn make_not_rows_query_result() -> QueryResult { QueryResult { rows: None, warnings: vec![], tracing_id: None, - col_specs: vec![column_spec], + metadata: None, serialized_size: 0, } } @@ -327,12 +337,14 @@ mod tests { fn make_rows_query_result(rows_num: usize) -> QueryResult { let mut res = make_not_rows_query_result(); res.rows = Some(make_rows(rows_num)); + res.metadata = Some(make_test_metadata()); res } fn make_string_rows_query_result(rows_num: usize) -> QueryResult { let mut res = make_not_rows_query_result(); res.rows = Some(make_string_rows(rows_num)); + res.metadata = Some(make_test_metadata()); res } From d47aa812a3f1535620ee548030c2433a81326878 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Fri, 23 Aug 2024 12:38:22 +0200 Subject: [PATCH 12/12] codewide: share ResultMetadata by an Arc ResultMetadata is now passed behind an Arc, enabling shared ownership between PreparedStatement and QueryResult and RowIterator. Thanks to that, if `use_cached_metadata` flag is set on PreparedStatement, no new allocations occur when deserializing ResultMetadata on a request result. --- scylla-cql/src/frame/response/mod.rs | 4 +++- scylla-cql/src/frame/response/result.rs | 14 ++++++-------- scylla/src/statement/prepared_statement.rs | 6 +++--- scylla/src/transport/caching_session.rs | 3 ++- scylla/src/transport/connection.rs | 6 +++--- scylla/src/transport/iterator.rs | 4 ++-- scylla/src/transport/query_result.rs | 8 +++++--- 7 files changed, 24 insertions(+), 21 deletions(-) diff --git a/scylla-cql/src/frame/response/mod.rs b/scylla-cql/src/frame/response/mod.rs index 690e945048..d084eb71c9 100644 --- a/scylla-cql/src/frame/response/mod.rs +++ b/scylla-cql/src/frame/response/mod.rs @@ -5,6 +5,8 @@ pub mod event; pub mod result; pub mod supported; +use std::sync::Arc; + pub use error::Error; pub use supported::Supported; @@ -66,7 +68,7 @@ impl Response { features: &ProtocolFeatures, opcode: ResponseOpcode, buf_bytes: bytes::Bytes, - cached_metadata: Option<&ResultMetadata>, + cached_metadata: Option<&Arc>, ) -> Result { let buf = &mut &*buf_bytes; let response = match opcode { diff --git a/scylla-cql/src/frame/response/result.rs b/scylla-cql/src/frame/response/result.rs index 9d6aec229c..4506d6a680 100644 --- a/scylla-cql/src/frame/response/result.rs +++ b/scylla-cql/src/frame/response/result.rs @@ -17,6 +17,7 @@ use crate::types::deserialize::value::{ use crate::types::deserialize::{DeserializationError, FrameSlice}; use bytes::{Buf, Bytes}; use std::borrow::Cow; +use std::sync::Arc; use std::{net::IpAddr, result::Result as StdResult, str}; use uuid::Uuid; @@ -485,7 +486,7 @@ impl Row { #[derive(Debug)] pub struct Rows { - pub metadata: ResultMetadata, + pub metadata: Arc, pub paging_state_response: PagingStateResponse, pub rows_count: usize, pub rows: Vec, @@ -866,16 +867,13 @@ pub fn deser_cql_value( fn deser_rows( buf_bytes: Bytes, - cached_metadata: Option<&ResultMetadata>, + cached_metadata: Option<&Arc>, ) -> StdResult { let buf = &mut &*buf_bytes; let (server_metadata, paging_state_response) = deser_result_metadata(buf)?; let metadata = match cached_metadata { - Some(cached) => ResultMetadata { - col_count: cached.col_count, - col_specs: cached.col_specs.clone(), - }, + Some(cached) => Arc::clone(cached), None => { // No cached_metadata provided. Server is supposed to provide the result metadata. if server_metadata.col_count != server_metadata.col_specs.len() { @@ -884,7 +882,7 @@ fn deser_rows( col_specs_count: server_metadata.col_specs.len(), }); } - server_metadata + Arc::new(server_metadata) } }; @@ -952,7 +950,7 @@ fn deser_schema_change(buf: &mut &[u8]) -> StdResult, + cached_metadata: Option<&Arc>, ) -> StdResult { let buf = &mut &*buf_bytes; use self::Result::*; diff --git a/scylla/src/statement/prepared_statement.rs b/scylla/src/statement/prepared_statement.rs index 2899c40dd4..809a8fe30c 100644 --- a/scylla/src/statement/prepared_statement.rs +++ b/scylla/src/statement/prepared_statement.rs @@ -102,7 +102,7 @@ pub struct PreparedStatement { #[derive(Debug)] struct PreparedStatementSharedData { metadata: PreparedMetadata, - result_metadata: ResultMetadata, + result_metadata: Arc, statement: String, } @@ -125,7 +125,7 @@ impl PreparedStatement { id: Bytes, is_lwt: bool, metadata: PreparedMetadata, - result_metadata: ResultMetadata, + result_metadata: Arc, statement: String, page_size: PageSize, config: StatementConfig, @@ -417,7 +417,7 @@ impl PreparedStatement { } /// Access metadata about the result of prepared statement returned by the database - pub(crate) fn get_result_metadata(&self) -> &ResultMetadata { + pub(crate) fn get_result_metadata(&self) -> &Arc { &self.shared.result_metadata } diff --git a/scylla/src/transport/caching_session.rs b/scylla/src/transport/caching_session.rs index 0449937956..3fa352cd7d 100644 --- a/scylla/src/transport/caching_session.rs +++ b/scylla/src/transport/caching_session.rs @@ -14,6 +14,7 @@ use scylla_cql::types::serialize::batch::BatchValues; use scylla_cql::types::serialize::row::SerializeRow; use std::collections::hash_map::RandomState; use std::hash::BuildHasher; +use std::sync::Arc; /// Contains just the parts of a prepared statement that were returned /// from the database. All remaining parts (query string, page size, @@ -24,7 +25,7 @@ struct RawPreparedStatementData { id: Bytes, is_confirmed_lwt: bool, metadata: PreparedMetadata, - result_metadata: ResultMetadata, + result_metadata: Arc, partitioner_name: PartitionerName, } diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 3e36ea625f..456f08d386 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -778,7 +778,7 @@ impl Connection { .protocol_features .prepared_flags_contain_lwt_mark(p.prepared_metadata.flags as u32), p.prepared_metadata, - p.result_metadata, + Arc::new(p.result_metadata), query.contents.clone(), query.get_validated_page_size(), query.config.clone(), @@ -1273,7 +1273,7 @@ impl Connection { request: &impl SerializableRequest, compress: bool, tracing: bool, - cached_metadata: Option<&ResultMetadata>, + cached_metadata: Option<&Arc>, ) -> Result { let compression = if compress { self.config.compression @@ -1298,7 +1298,7 @@ impl Connection { task_response: TaskResponse, compression: Option, features: &ProtocolFeatures, - cached_metadata: Option<&ResultMetadata>, + cached_metadata: Option<&Arc>, ) -> Result { let body_with_ext = frame::parse_response_body_extensions( task_response.params.flags, diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 2b21233712..cb5a8141c8 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -415,7 +415,7 @@ mod checked_channel_sender { response::result::{ResultMetadata, Rows}, }, }; - use std::marker::PhantomData; + use std::{marker::PhantomData, sync::Arc}; use tokio::sync::mpsc; use uuid::Uuid; @@ -456,7 +456,7 @@ mod checked_channel_sender { ) { let empty_page = ReceivedPage { rows: Rows { - metadata: ResultMetadata::mock_empty(), + metadata: Arc::new(ResultMetadata::mock_empty()), paging_state_response: PagingStateResponse::NoMorePages, rows_count: 0, rows: Vec::new(), diff --git a/scylla/src/transport/query_result.rs b/scylla/src/transport/query_result.rs index 4f1918fb90..db446209f7 100644 --- a/scylla/src/transport/query_result.rs +++ b/scylla/src/transport/query_result.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::frame::response::cql_to_rust::{FromRow, FromRowError}; use crate::frame::response::result::ColumnSpec; use crate::frame::response::result::Row; @@ -19,7 +21,7 @@ pub struct QueryResult { /// CQL Tracing uuid - can only be Some if tracing is enabled for this query pub tracing_id: Option, /// Metadata returned along with this response. - pub(crate) metadata: Option, + pub(crate) metadata: Option>, /// The original size of the serialized rows in request pub serialized_size: usize, } @@ -337,14 +339,14 @@ mod tests { fn make_rows_query_result(rows_num: usize) -> QueryResult { let mut res = make_not_rows_query_result(); res.rows = Some(make_rows(rows_num)); - res.metadata = Some(make_test_metadata()); + res.metadata = Some(Arc::new(make_test_metadata())); res } fn make_string_rows_query_result(rows_num: usize) -> QueryResult { let mut res = make_not_rows_query_result(); res.rows = Some(make_string_rows(rows_num)); - res.metadata = Some(make_test_metadata()); + res.metadata = Some(Arc::new(make_test_metadata())); res }