Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ rust-version = "1.63"

[features]
json = ["serde_json"]
unstable-v1 = ["bolt-protocol-impl-v2"]
bolt-protocol-impl-v2 = []
unstable-v1 = ["unstable-bolt-protocol-impl-v2", "unstable-streaming-summary"]
unstable-serde-packstream-format = []
unstable-streaming-summary = ["unstable-serde-packstream-format"]
unstable-bolt-protocol-impl-v2 = [
"unstable-serde-packstream-format",
"unstable-streaming-summary",
]

[dependencies]
async-trait = "0.1.0"
Expand Down
3 changes: 2 additions & 1 deletion lib/src/bolt/detail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ mod tests {
use serde::de::DeserializeOwned;

use super::*;
use crate::bolt::{packstream::value::bolt, MessageResponse as _};

use crate::{bolt::MessageResponse as _, packstream::bolt};

#[test]
fn parse_empty_record() {
Expand Down
10 changes: 3 additions & 7 deletions lib/src/bolt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,13 @@ use serde::{
};

mod detail;
mod packstream;
mod request;
mod summary;

#[cfg(debug_assertions)]
pub use packstream::debug::Dbg;
#[cfg(test)]
pub use packstream::value::{bolt, BoltBytesBuilder};
pub use packstream::{de, from_bytes, ser, to_bytes};
pub use request::{Commit, Discard, Goodbye, Hello, Reset, Rollback, WrapExtra};
pub use summary::{Failure, Streaming, StreamingSummary, Success, Summary};
pub use summary::{Failure, Success, Summary};

use crate::packstream::{self, de, from_bytes, ser, to_bytes};

pub(crate) trait Message: Serialize {
/// Serialize this type into a packstream encoded byte slice.
Expand Down
2 changes: 1 addition & 1 deletion lib/src/bolt/request/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl ExpectedResponse for Commit {
#[cfg(test)]
mod tests {
use super::*;
use crate::bolt::{packstream::value::bolt, Message as _};
use crate::{bolt::Message as _, packstream::bolt};

#[test]
fn serialize() {
Expand Down
8 changes: 6 additions & 2 deletions lib/src/bolt/request/discard.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::{
bolt::{
request::extra::{Extra, WrapExtra},
ExpectedResponse, Streaming, Summary,
ExpectedResponse, Summary,
},
errors::Result,
summary::Streaming,
};
use serde::Serialize;

Expand Down Expand Up @@ -38,7 +39,10 @@ impl ExpectedResponse for Discard {
#[cfg(test)]
mod tests {
use super::*;
use crate::bolt::{packstream::value::bolt, Message as _, MessageResponse as _};
use crate::{
bolt::{Message as _, MessageResponse as _},
packstream::bolt,
};

#[test]
fn serialize() {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/bolt/request/extra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl Serialize for Extra {
#[cfg(test)]
mod tests {
use super::*;
use crate::bolt::{packstream::value::bolt, Message as _};
use crate::{bolt::Message as _, packstream::bolt};

#[test]
fn serialize() {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/bolt/request/goodbye.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl Serialize for Goodbye {
#[cfg(test)]
mod tests {
use super::*;
use crate::bolt::{packstream::value::bolt, Message as _};
use crate::{bolt::Message as _, packstream::bolt};

#[test]
fn serialize() {
Expand Down
5 changes: 4 additions & 1 deletion lib/src/bolt/request/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ impl<'a> Serialize for Hello<'a> {
#[cfg(test)]
mod tests {
use super::*;
use crate::bolt::{packstream::value::bolt, Message as _, MessageResponse as _};
use crate::{
bolt::{Message as _, MessageResponse as _},
packstream::bolt,
};

#[test]
fn serialize() {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/bolt/request/reset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl Serialize for Reset {
#[cfg(test)]
mod tests {
use super::*;
use crate::bolt::{packstream::value::bolt, Message as _};
use crate::{bolt::Message as _, packstream::bolt};

#[test]
fn serialize() {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/bolt/request/rollback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl ExpectedResponse for Rollback {
#[cfg(test)]
mod tests {
use super::*;
use crate::bolt::{packstream::value::bolt, Message as _};
use crate::{bolt::Message as _, packstream::bolt};

#[test]
fn serialize() {
Expand Down
254 changes: 2 additions & 252 deletions lib/src/bolt/summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,191 +85,11 @@ impl<'de, R: Deserialize<'de>> Deserialize<'de> for Summary<R> {
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum Streaming {
HasMore,
Done(Box<StreamingSummary>),
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Type {
Read,
Write,
ReadWrite,
SchemaOnly,
}

#[derive(Debug, Clone, PartialEq)]
pub struct StreamingSummary {
pub(crate) bookmark: Option<String>,
pub(crate) t_last: Option<i64>,
pub(crate) r#type: Option<Type>,
pub(crate) db: Option<String>,
pub(crate) stats: Option<crate::BoltMap>,
pub(crate) plan: Option<crate::BoltMap>,
pub(crate) profile: Option<crate::BoltMap>,
pub(crate) notifications: Option<Vec<crate::BoltMap>>,
}

impl<'de> Deserialize<'de> for Type {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: de::Deserializer<'de>,
{
struct Visit;

impl<'de> Visitor<'de> for Visit {
type Value = Type;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a valid type string")
}

fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
match v {
"r" => Ok(Type::Read),
"w" => Ok(Type::Write),
"rw" => Ok(Type::ReadWrite),
"s" => Ok(Type::SchemaOnly),
_ => Err(E::custom(format!("invalid type string: {}", v))),
}
}

fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E>
where
E: de::Error,
{
self.visit_str(v)
}

fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: de::Error,
{
self.visit_str(&v)
}
}

deserializer.deserialize_str(Visit)
}
}

impl<'de> Deserialize<'de> for Streaming {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: de::Deserializer<'de>,
{
const FIELDS: &[&str] = &[
"has_more",
"bookmark",
"t_last",
"type",
"db",
"stats",
"plan",
"profile",
"notifications",
];

struct Visit;

impl<'de> Visitor<'de> for Visit {
type Value = Streaming;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a valid streaming response")
}

fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: de::MapAccess<'de>,
{
macro_rules! str {
(r#type) => {
"type"
};
($key:ident) => {
stringify!($key)
};
}

macro_rules! set {
($($keys:ident),+ $(,)?) => {

$(
let mut $keys = None;
)+

while let Some(key) = map.next_key()? {
match key {
$(
str!($keys) => {
if $keys.is_some() {
return Err(de::Error::duplicate_field(str!($keys)));
}
$keys = Some(map.next_value()?);
}
)+
_other => {
// return Err(de::Error::unknown_field(other, FIELDS));
map.next_value::<de::IgnoredAny>()?;
}
}
}
};
}

set!(
has_more,
bookmark,
t_last,
r#type,
db,
stats,
plan,
profile,
notifications,
);

let has_more = has_more.unwrap_or(false);

if has_more {
return Ok(Streaming::HasMore);
}

let t_last = t_last.ok_or_else(|| de::Error::missing_field("t_last"))?;
let r#type = r#type.ok_or_else(|| de::Error::missing_field("type"))?;
let db = db.ok_or_else(|| de::Error::missing_field("db"))?;

let full = StreamingSummary {
bookmark,
t_last,
r#type,
db,
stats,
plan,
profile,
notifications,
};

Ok(Streaming::Done(Box::new(full)))
}
}

deserializer.deserialize_struct("Response", FIELDS, Visit)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
bolt::{packstream::value::bolt, MessageResponse as _},
BoltMap, BoltString, BoltType,
};
use crate::packstream::bolt;
use crate::{bolt::MessageResponse as _, BoltMap, BoltString, BoltType};

#[test]
fn parse_hello_success() {
Expand Down Expand Up @@ -316,74 +136,4 @@ mod tests {
"The client is unauthorized due to authentication failure."
);
}

#[test]
fn parse_stream_summary() {
let data = bolt()
.structure(1, 0x70)
.tiny_map(1)
.tiny_string("has_more")
.bool(true)
.build();

let success = Summary::<Streaming>::parse(data).unwrap();

assert!(matches!(
success,
Summary::Success(Success {
metadata: Streaming::HasMore,
})
));
}

#[test]
fn parse_full_summary() {
let data = bolt()
.structure(1, 0x70)
.tiny_map(5)
.tiny_string("bookmark")
.string16("FB:kcwQ9vYF5wN+TCaprZQJITJbQnaQ")
.tiny_string("stats")
.tiny_map(3)
.tiny_string("labels-added")
.tiny_int(1)
.tiny_string("nodes-created")
.tiny_int(2)
.tiny_string("properties-set")
.tiny_int(3)
.tiny_string("type")
.tiny_string("rw")
.tiny_string("t_last")
.tiny_int(42)
.tiny_string("db")
.tiny_string("neo4j")
.build();

let expected = StreamingSummary {
bookmark: Some("FB:kcwQ9vYF5wN+TCaprZQJITJbQnaQ".to_owned()),
t_last: Some(42),
r#type: Some(Type::ReadWrite),
db: Some("neo4j".to_owned()),
stats: Some(BoltMap::from_iter([
(BoltString::from("labels-added"), BoltType::from(1)),
(BoltString::from("nodes-created"), BoltType::from(2)),
(BoltString::from("properties-set"), BoltType::from(3)),
])),
plan: None,
profile: None,
notifications: None,
};

let actual = Summary::<Streaming>::parse(data).unwrap();
let actual = match actual {
Summary::Success(actual) => actual,
_ => panic!("Expected success"),
};
let actual = match actual.metadata {
Streaming::Done(actual) => actual,
_ => panic!("Expected done"),
};

assert_eq!(*actual, expected);
}
}
Loading