Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ static-files = "0.2"
thiserror = "2.0"
ulid = { version = "1.0", features = ["serde"] }
xxhash-rust = { version = "0.8", features = ["xxh3"] }
futures-core = "0.3.31"

[build-dependencies]
cargo_toml = "0.21"
Expand Down
7 changes: 2 additions & 5 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

use std::{io::ErrorKind, sync::Arc};
use std::sync::Arc;

use chrono::{DateTime, Local, NaiveTime, Utc};
use column::Column;
Expand Down Expand Up @@ -259,10 +259,7 @@ async fn create_manifest(
.date_naive()
.and_time(
NaiveTime::from_num_seconds_from_midnight_opt(23 * 3600 + 59 * 60 + 59, 999_999_999)
.ok_or(IOError::new(
ErrorKind::Other,
"Failed to create upper bound for manifest",
))?,
.ok_or(IOError::other("Failed to create upper bound for manifest"))?,
)
.and_utc();

Expand Down
19 changes: 15 additions & 4 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ impl FlightService for AirServiceImpl {
}

async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
let key = extract_session_key(req.metadata())?;
let key = extract_session_key(req.metadata())
.map_err(|e| Status::unauthenticated(e.to_string()))?;

let ticket = get_query_from_ticket(&req)?;
let ticket =
get_query_from_ticket(&req).map_err(|e| Status::invalid_argument(e.to_string()))?;

info!("query requested to airplane: {:?}", ticket);

Expand Down Expand Up @@ -217,10 +219,19 @@ impl FlightService for AirServiceImpl {
})?;
let time = Instant::now();

let (records, _) = execute(query, &stream_name)
let (records, _) = execute(query, &stream_name, false)
.await
.map_err(|err| Status::internal(err.to_string()))?;

let records = match records {
actix_web::Either::Left(rbs) => rbs,
actix_web::Either::Right(_) => {
return Err(Status::failed_precondition(
"Expected batch results, got stream",
))
}
};

/*
* INFO: No returning the schema with the data.
* kept it in case it needs to be sent in the future.
Expand All @@ -246,7 +257,7 @@ impl FlightService for AirServiceImpl {
.observe(time);

// Airplane takes off 🛫
out
out.map_err(|e| *e)
}

async fn do_put(
Expand Down
Loading
Loading