Skip to content

Commit 253b79a

Browse files
committed
Yield after channel send and move cpu tasks to thread
1 parent 2ab1949 commit 253b79a

File tree

4 files changed

+102
-54
lines changed

4 files changed

+102
-54
lines changed

crates/puffin-client/src/cached_client.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ impl CachedClient {
104104
/// client.
105105
#[instrument(skip_all)]
106106
pub async fn get_cached_with_callback<
107-
Payload: Serialize + DeserializeOwned + Send,
107+
Payload: Serialize + DeserializeOwned + Send + 'static,
108108
CallBackError,
109109
Callback,
110110
CallbackReturn,
@@ -172,7 +172,7 @@ impl CachedClient {
172172
}
173173
}
174174

175-
async fn read_cache<Payload: Serialize + DeserializeOwned + Send>(
175+
async fn read_cache<Payload: Serialize + DeserializeOwned + Send + 'static>(
176176
cache_entry: &CacheEntry,
177177
) -> Option<DataWithCachePolicy<Payload>> {
178178
let read_span = info_span!("read_cache", file = %cache_entry.path().display());
@@ -185,8 +185,12 @@ impl CachedClient {
185185
"parse_cache",
186186
path = %cache_entry.path().display()
187187
);
188-
let parse_result = parse_span
189-
.in_scope(|| rmp_serde::from_slice::<DataWithCachePolicy<Payload>>(&cached));
188+
let parse_result = tokio::task::spawn_blocking(move || {
189+
parse_span
190+
.in_scope(|| rmp_serde::from_slice::<DataWithCachePolicy<Payload>>(&cached))
191+
})
192+
.await
193+
.expect("Tokio executor failed, was there a panic?");
190194
match parse_result {
191195
Ok(data) => Some(data),
192196
Err(err) => {

crates/puffin-resolver/src/resolver/mod.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use std::sync::Arc;
55

66
use anyhow::Result;
77
use dashmap::{DashMap, DashSet};
8-
use futures::channel::mpsc::UnboundedReceiver;
8+
use futures::channel::mpsc::{
9+
UnboundedReceiver as MpscUnboundedReceiver, UnboundedSender as MpscUnboundedSender,
10+
};
911
use futures::{FutureExt, StreamExt};
1012
use itertools::Itertools;
1113
use pubgrub::error::PubGrubError;
@@ -239,7 +241,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
239241
#[instrument(skip_all)]
240242
async fn solve(
241243
&self,
242-
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
244+
request_sink: &MpscUnboundedSender<Request>,
243245
) -> Result<ResolutionGraph, ResolveError> {
244246
let root = PubGrubPackage::Root(self.project.clone());
245247

@@ -384,7 +386,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
384386
&self,
385387
package: &PubGrubPackage,
386388
priorities: &mut PubGrubPriorities,
387-
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
389+
request_sink: &MpscUnboundedSender<Request>,
388390
) -> Result<(), ResolveError> {
389391
match package {
390392
PubGrubPackage::Root(_) => {}
@@ -411,14 +413,16 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
411413
}
412414
}
413415
}
416+
// Yield after sending on a channel to allow the subscribers to continue
417+
tokio::task::yield_now().await;
414418
Ok(())
415419
}
416420

417421
/// Visit the set of [`PubGrubPackage`] candidates prior to selection. This allows us to fetch
418422
/// metadata for all of the packages in parallel.
419423
fn pre_visit<'data>(
420424
packages: impl Iterator<Item = (&'data PubGrubPackage, &'data Range<Version>)>,
421-
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
425+
request_sink: &MpscUnboundedSender<Request>,
422426
) -> Result<(), ResolveError> {
423427
// Iterate over the potential packages, and fetch file metadata for any of them. These
424428
// represent our current best guesses for the versions that we _might_ select.
@@ -439,9 +443,9 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
439443
package: &PubGrubPackage,
440444
range: &Range<Version>,
441445
pins: &mut FilePins,
442-
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
446+
request_sink: &MpscUnboundedSender<Request>,
443447
) -> Result<Option<Version>, ResolveError> {
444-
return match package {
448+
let result = match package {
445449
PubGrubPackage::Root(_) => Ok(Some(MIN_VERSION.clone())),
446450

447451
PubGrubPackage::Python(PubGrubPython::Installed) => {
@@ -581,15 +585,19 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
581585
Ok(Some(version))
582586
}
583587
};
588+
// Yield after sending on a channel to allow the subscribers to continue
589+
tokio::task::yield_now().await;
590+
result
584591
}
585592

586593
/// Given a candidate package and version, return its dependencies.
594+
#[instrument(skip_all, fields(%package, %version))]
587595
async fn get_dependencies(
588596
&self,
589597
package: &PubGrubPackage,
590598
version: &Version,
591599
priorities: &mut PubGrubPriorities,
592-
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
600+
request_sink: &MpscUnboundedSender<Request>,
593601
) -> Result<Dependencies, ResolveError> {
594602
match package {
595603
PubGrubPackage::Root(_) => {
@@ -714,7 +722,10 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
714722
}
715723

716724
/// Fetch the metadata for a stream of packages and versions.
717-
async fn fetch(&self, request_stream: UnboundedReceiver<Request>) -> Result<(), ResolveError> {
725+
async fn fetch(
726+
&self,
727+
request_stream: MpscUnboundedReceiver<Request>,
728+
) -> Result<(), ResolveError> {
718729
let mut response_stream = request_stream
719730
.map(|request| self.process_request(request))
720731
.buffer_unordered(50);
@@ -904,10 +915,10 @@ impl Display for Request {
904915
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
905916
match self {
906917
Request::Package(package_name) => {
907-
write!(f, "Package {package_name}")
918+
write!(f, "Versions {package_name}")
908919
}
909920
Request::Dist(dist) => {
910-
write!(f, "Dist {dist}")
921+
write!(f, "Metadata {dist}")
911922
}
912923
Request::Prefetch(package_name, range) => {
913924
write!(f, "Prefetch {package_name} {range}")

crates/puffin-resolver/src/resolver/provider.rs

Lines changed: 72 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use std::future::Future;
2+
use std::ops::Deref;
3+
use std::sync::Arc;
24

35
use anyhow::Result;
46
use chrono::{DateTime, Utc};
57
use futures::FutureExt;
68
use url::Url;
79

8-
use distribution_types::Dist;
10+
use distribution_types::{Dist, IndexUrl};
911
use platform_tags::Tags;
10-
use puffin_client::{FlatIndex, RegistryClient};
12+
use puffin_client::{FlatIndex, RegistryClient, SimpleMetadata};
1113
use puffin_distribution::DistributionDatabase;
1214
use puffin_normalize::PackageName;
1315
use puffin_traits::{BuildContext, NoBinary};
@@ -45,17 +47,29 @@ pub trait ResolverProvider: Send + Sync {
4547
/// The main IO backend for the resolver, which does cached requests network requests using the
4648
/// [`RegistryClient`] and [`DistributionDatabase`].
4749
pub struct DefaultResolverProvider<'a, Context: BuildContext + Send + Sync> {
48-
/// The [`RegistryClient`] used to query the index.
49-
client: &'a RegistryClient,
5050
/// The [`DistributionDatabase`] used to build source distributions.
5151
fetcher: DistributionDatabase<'a, Context>,
52+
inner: Arc<DefaultResolverProviderInner>,
53+
}
54+
55+
pub struct DefaultResolverProviderInner {
56+
/// The [`RegistryClient`] used to query the index.
57+
client: RegistryClient,
5258
/// These are the entries from `--find-links` that act as overrides for index responses.
53-
flat_index: &'a FlatIndex,
54-
tags: &'a Tags,
59+
flat_index: FlatIndex,
60+
tags: Tags,
5561
python_requirement: PythonRequirement,
5662
exclude_newer: Option<DateTime<Utc>>,
5763
allowed_yanks: AllowedYanks,
58-
no_binary: &'a NoBinary,
64+
no_binary: NoBinary,
65+
}
66+
67+
impl<'a, Context: BuildContext + Send + Sync> Deref for DefaultResolverProvider<'a, Context> {
68+
type Target = DefaultResolverProviderInner;
69+
70+
fn deref(&self) -> &Self::Target {
71+
self.inner.as_ref()
72+
}
5973
}
6074

6175
impl<'a, Context: BuildContext + Send + Sync> DefaultResolverProvider<'a, Context> {
@@ -72,50 +86,69 @@ impl<'a, Context: BuildContext + Send + Sync> DefaultResolverProvider<'a, Contex
7286
no_binary: &'a NoBinary,
7387
) -> Self {
7488
Self {
75-
client,
7689
fetcher,
77-
flat_index,
78-
tags,
79-
python_requirement,
80-
exclude_newer,
81-
allowed_yanks,
82-
no_binary,
90+
inner: Arc::new(DefaultResolverProviderInner {
91+
client: client.clone(),
92+
flat_index: flat_index.clone(),
93+
tags: tags.clone(),
94+
python_requirement,
95+
exclude_newer,
96+
allowed_yanks,
97+
no_binary: no_binary.clone(),
98+
}),
8399
}
84100
}
85101
}
86102

103+
fn simple_to_version_map(
104+
self_arc: Arc<DefaultResolverProviderInner>,
105+
result: Result<(IndexUrl, SimpleMetadata), puffin_client::Error>,
106+
package_name: &PackageName,
107+
) -> VersionMapResponse {
108+
match result {
109+
Ok((index, metadata)) => Ok(VersionMap::from_metadata(
110+
metadata,
111+
&package_name,
112+
&index,
113+
&self_arc.tags,
114+
&self_arc.python_requirement,
115+
&self_arc.allowed_yanks,
116+
self_arc.exclude_newer.as_ref(),
117+
self_arc.flat_index.get(&package_name).cloned(),
118+
&self_arc.no_binary,
119+
)),
120+
Err(err) => match err.into_kind() {
121+
kind @ (puffin_client::ErrorKind::PackageNotFound(_)
122+
| puffin_client::ErrorKind::NoIndex(_)) => {
123+
if let Some(flat_index) = self_arc.flat_index.get(&package_name).cloned() {
124+
Ok(VersionMap::from(flat_index))
125+
} else {
126+
Err(kind.into())
127+
}
128+
}
129+
kind => Err(kind.into()),
130+
},
131+
}
132+
}
133+
87134
impl<'a, Context: BuildContext + Send + Sync> ResolverProvider
88135
for DefaultResolverProvider<'a, Context>
89136
{
90137
fn get_version_map<'io>(
91138
&'io self,
92139
package_name: &'io PackageName,
93140
) -> impl Future<Output = VersionMapResponse> + Send + 'io {
94-
self.client
95-
.simple(package_name)
96-
.map(move |result| match result {
97-
Ok((index, metadata)) => Ok(VersionMap::from_metadata(
98-
metadata,
99-
package_name,
100-
&index,
101-
self.tags,
102-
&self.python_requirement,
103-
&self.allowed_yanks,
104-
self.exclude_newer.as_ref(),
105-
self.flat_index.get(package_name).cloned(),
106-
self.no_binary,
107-
)),
108-
Err(err) => match err.into_kind() {
109-
kind @ (puffin_client::ErrorKind::PackageNotFound(_)
110-
| puffin_client::ErrorKind::NoIndex(_)) => {
111-
if let Some(flat_index) = self.flat_index.get(package_name).cloned() {
112-
Ok(VersionMap::from(flat_index))
113-
} else {
114-
Err(kind.into())
115-
}
116-
}
117-
kind => Err(kind.into()),
118-
},
141+
let package_name_owned = package_name.clone();
142+
let self_arc = self.inner.clone();
143+
self.inner
144+
.client
145+
.simple(&package_name)
146+
.then(|result| async move {
147+
tokio::task::spawn_blocking(move || {
148+
simple_to_version_map(self_arc, result, &package_name_owned)
149+
})
150+
.await
151+
.unwrap()
119152
})
120153
}
121154

crates/puffin-traits/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl Display for BuildKind {
160160
}
161161
}
162162

163-
#[derive(Debug)]
163+
#[derive(Debug, Clone)]
164164
pub enum NoBinary {
165165
/// Allow installation of any wheel.
166166
None,

0 commit comments

Comments
 (0)