Skip to content

Commit 545e829

Browse files
committed
codewide: share ResultMetadata by an Arc
ResultMetadata is now passed behind an Arc, enabling shared ownership between PreparedStatement and QueryResult and RowIterator. Thanks to that, if `use_cached_metadata` flag is set on PreparedStatement, no new allocations occur when deserializing ResultMetadata on a request result.
1 parent 993e17a commit 545e829

File tree

7 files changed

+26
-23
lines changed

7 files changed

+26
-23
lines changed

scylla-cql/src/frame/response/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ pub mod event;
55
pub mod result;
66
pub mod supported;
77

8+
use std::sync::Arc;
9+
810
pub use error::Error;
911
pub use supported::Supported;
1012

@@ -66,7 +68,7 @@ impl Response {
6668
features: &ProtocolFeatures,
6769
opcode: ResponseOpcode,
6870
buf_bytes: bytes::Bytes,
69-
cached_metadata: Option<&ResultMetadata>,
71+
cached_metadata: Option<&Arc<ResultMetadata>>,
7072
) -> Result<Response, CqlResponseParseError> {
7173
let buf = &mut &*buf_bytes;
7274
let response = match opcode {

scylla-cql/src/frame/response/result.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::types::deserialize::value::{
1616
use crate::types::deserialize::{DeserializationError, FrameSlice};
1717
use bytes::{Buf, Bytes};
1818
use std::borrow::Cow;
19+
use std::sync::Arc;
1920
use std::{net::IpAddr, result::Result as StdResult, str};
2021
use uuid::Uuid;
2122

@@ -475,7 +476,7 @@ impl Row {
475476

476477
#[derive(Debug)]
477478
pub struct Rows {
478-
pub metadata: ResultMetadata,
479+
pub metadata: Arc<ResultMetadata>,
479480
pub paging_state: Option<Bytes>,
480481
pub rows_count: usize,
481482
pub rows: Vec<Row>,
@@ -862,16 +863,13 @@ pub fn deser_cql_value(
862863

863864
fn deser_rows(
864865
buf_bytes: Bytes,
865-
cached_metadata: Option<&ResultMetadata>,
866+
cached_metadata: Option<&Arc<ResultMetadata>>,
866867
) -> StdResult<Rows, RowsParseError> {
867868
let buf = &mut &*buf_bytes;
868869
let (server_metadata, paging_state) = deser_result_metadata(buf)?;
869870

870871
let metadata = match cached_metadata {
871-
Some(cached) => ResultMetadata {
872-
col_count: cached.col_count,
873-
col_specs: cached.col_specs.clone(),
874-
},
872+
Some(cached) => Arc::clone(cached),
875873
None => {
876874
// No cached_metadata provided. Server is supposed to provide the result metadata.
877875
if server_metadata.col_count != server_metadata.col_specs.len() {
@@ -880,7 +878,7 @@ fn deser_rows(
880878
col_specs_count: server_metadata.col_specs.len(),
881879
});
882880
}
883-
server_metadata
881+
Arc::new(server_metadata)
884882
}
885883
};
886884

@@ -943,7 +941,7 @@ fn deser_schema_change(buf: &mut &[u8]) -> StdResult<SchemaChange, SchemaChangeE
943941

944942
pub fn deserialize(
945943
buf_bytes: Bytes,
946-
cached_metadata: Option<&ResultMetadata>,
944+
cached_metadata: Option<&Arc<ResultMetadata>>,
947945
) -> StdResult<Result, CqlResultParseError> {
948946
let buf = &mut &*buf_bytes;
949947
use self::Result::*;

scylla/src/statement/prepared_statement.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub struct PreparedStatement {
100100
#[derive(Debug)]
101101
struct PreparedStatementSharedData {
102102
metadata: PreparedMetadata,
103-
result_metadata: ResultMetadata,
103+
result_metadata: Arc<ResultMetadata>,
104104
statement: String,
105105
}
106106

@@ -123,7 +123,7 @@ impl PreparedStatement {
123123
id: Bytes,
124124
is_lwt: bool,
125125
metadata: PreparedMetadata,
126-
result_metadata: ResultMetadata,
126+
result_metadata: Arc<ResultMetadata>,
127127
statement: String,
128128
page_size: Option<i32>,
129129
config: StatementConfig,
@@ -412,7 +412,7 @@ impl PreparedStatement {
412412
}
413413

414414
/// Access metadata about the result of prepared statement returned by the database
415-
pub(crate) fn get_result_metadata(&self) -> &ResultMetadata {
415+
pub(crate) fn get_result_metadata(&self) -> &Arc<ResultMetadata> {
416416
&self.shared.result_metadata
417417
}
418418

scylla/src/transport/caching_session.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use scylla_cql::types::serialize::batch::BatchValues;
1313
use scylla_cql::types::serialize::row::SerializeRow;
1414
use std::collections::hash_map::RandomState;
1515
use std::hash::BuildHasher;
16+
use std::sync::Arc;
1617

1718
/// Contains just the parts of a prepared statement that were returned
1819
/// from the database. All remaining parts (query string, page size,
@@ -23,7 +24,7 @@ struct RawPreparedStatementData {
2324
id: Bytes,
2425
is_confirmed_lwt: bool,
2526
metadata: PreparedMetadata,
26-
result_metadata: ResultMetadata,
27+
result_metadata: Arc<ResultMetadata>,
2728
partitioner_name: PartitionerName,
2829
}
2930

scylla/src/transport/connection.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ impl Connection {
754754
.protocol_features
755755
.prepared_flags_contain_lwt_mark(p.prepared_metadata.flags as u32),
756756
p.prepared_metadata,
757-
p.result_metadata,
757+
Arc::new(p.result_metadata),
758758
query.contents.clone(),
759759
query.get_page_size(),
760760
query.config.clone(),
@@ -1203,7 +1203,7 @@ impl Connection {
12031203
request: &impl SerializableRequest,
12041204
compress: bool,
12051205
tracing: bool,
1206-
cached_metadata: Option<&ResultMetadata>,
1206+
cached_metadata: Option<&Arc<ResultMetadata>>,
12071207
) -> Result<QueryResponse, QueryError> {
12081208
let compression = if compress {
12091209
self.config.compression
@@ -1228,7 +1228,7 @@ impl Connection {
12281228
task_response: TaskResponse,
12291229
compression: Option<Compression>,
12301230
features: &ProtocolFeatures,
1231-
cached_metadata: Option<&ResultMetadata>,
1231+
cached_metadata: Option<&Arc<ResultMetadata>>,
12321232
) -> Result<QueryResponse, QueryError> {
12331233
let body_with_ext = frame::parse_response_body_extensions(
12341234
task_response.params.flags,

scylla/src/transport/iterator.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ mod checked_channel_sender {
429429
errors::QueryError,
430430
frame::response::result::{ResultMetadata, Rows},
431431
};
432-
use std::marker::PhantomData;
432+
use std::{marker::PhantomData, sync::Arc};
433433
use tokio::sync::mpsc;
434434
use uuid::Uuid;
435435

@@ -470,7 +470,7 @@ mod checked_channel_sender {
470470
) {
471471
let empty_page = ReceivedPage {
472472
rows: Rows {
473-
metadata: ResultMetadata::mock_empty(),
473+
metadata: Arc::new(ResultMetadata::mock_empty()),
474474
paging_state: None,
475475
rows_count: 0,
476476
rows: Vec::new(),
@@ -678,7 +678,7 @@ where
678678

679679
match query_response {
680680
Ok(NonErrorQueryResponse {
681-
response: NonErrorResponse::Result(result::Result::Rows(mut rows)),
681+
response: NonErrorResponse::Result(result::Result::Rows(rows)),
682682
tracing_id,
683683
..
684684
}) => {
@@ -689,7 +689,7 @@ where
689689
.load_balancing_policy
690690
.on_query_success(&self.statement_info, elapsed, node);
691691

692-
self.paging_state = rows.paging_state.take();
692+
self.paging_state = rows.paging_state.clone();
693693

694694
request_span.record_rows_fields(&rows);
695695

@@ -853,8 +853,8 @@ where
853853
let result = (self.fetcher)(paging_state).await?;
854854
let response = result.into_non_error_query_response()?;
855855
match response.response {
856-
NonErrorResponse::Result(result::Result::Rows(mut rows)) => {
857-
paging_state = rows.paging_state.take();
856+
NonErrorResponse::Result(result::Result::Rows(rows)) => {
857+
paging_state = rows.paging_state.clone();
858858
let (proof, send_result) = self
859859
.sender
860860
.send(Ok(ReceivedPage {

scylla/src/transport/query_result.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use crate::frame::response::cql_to_rust::{FromRow, FromRowError};
24
use crate::frame::response::result::ColumnSpec;
35
use crate::frame::response::result::Row;
@@ -22,7 +24,7 @@ pub struct QueryResult {
2224
/// Paging state returned from the server
2325
pub paging_state: Option<Bytes>,
2426
/// Metadata returned along with this response.
25-
pub(crate) metadata: Option<ResultMetadata>,
27+
pub(crate) metadata: Option<Arc<ResultMetadata>>,
2628
/// The original size of the serialized rows in request
2729
pub serialized_size: usize,
2830
}

0 commit comments

Comments
 (0)