Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions crates/distribution-types/src/prioritized_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,18 @@ impl<'a> CompatibleDist<'a> {
} => ResolvedDistRef::Installable(source_dist),
}
}

/// Returns whether the distribution is a source distribution.
///
/// Avoid building source distributions we don't need.
pub fn prefetchable(&self) -> bool {
match *self {
CompatibleDist::SourceDist(_) => false,
CompatibleDist::InstalledDist(_)
| CompatibleDist::CompatibleWheel(_, _)
| CompatibleDist::IncompatibleWheel { .. } => true,
}
}
}

impl WheelCompatibility {
Expand Down
11 changes: 10 additions & 1 deletion crates/distribution-types/src/resolved.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::fmt::{Display, Formatter};

use pep508_rs::PackageName;

Expand Down Expand Up @@ -42,6 +42,15 @@ impl ResolvedDistRef<'_> {
}
}

impl Display for ResolvedDistRef<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Installable(dist) => Display::fmt(dist, f),
Self::Installed(dist) => Display::fmt(dist, f),
}
}
}

impl Name for ResolvedDistRef<'_> {
fn name(&self) -> &PackageName {
match self {
Expand Down
176 changes: 176 additions & 0 deletions crates/uv-resolver/src/resolver/batch_prefetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use std::cmp::min;

use pubgrub::range::Range;
use rustc_hash::FxHashMap;
use tokio::sync::mpsc::Sender;
use tracing::{debug, trace};

use distribution_types::{DistributionMetadata, ResolvedDistRef};
use pep440_rs::Version;

use crate::candidate_selector::{CandidateDist, CandidateSelector};
use crate::pubgrub::PubGrubPackage;
use crate::resolver::Request;
use crate::{InMemoryIndex, ResolveError, VersionsResponse};

enum BatchPrefetchStrategy {
/// Go through the next versions assuming the existing selection and its constraints
/// remain.
Compatible {
compatible: Range<Version>,
previous: Version,
},
/// We encounter cases (botocore) where the above doesn't work: Say we previously selected
/// a==x.y.z, which depends on b==x.y.z. a==x.y.z is incompatible, but we don't know that
/// yet. We just selected b==x.y.z and want to prefetch, since for all versions of a we try,
/// we have to wait for the matching version of b. The exiting range gives us only one version
/// of b, so the compatible strategy doesn't prefetch any version. Instead, we try the next
/// heuristic where the next version of b will be x.y.(z-1) and so forth.
InOrder { previous: Version },
}

/// Prefetch a large number of versions if we already unsuccessfully tried many versions.
///
/// This is an optimization specifically targeted at cold cache urllib3/boto3/botocore, where we
/// have to fetch the metadata for a lot of versions.
///
/// Note that these all heuristics that could totally prefetch lots of irrelevant versions.
#[derive(Default)]
pub(crate) struct BatchPrefetcher {
tried_versions: FxHashMap<PubGrubPackage, usize>,
last_prefetch: FxHashMap<PubGrubPackage, usize>,
}

impl BatchPrefetcher {
/// Prefetch a large number of versions if we already unsuccessfully tried many versions.
pub(crate) async fn prefetch_batches(
&mut self,
next: &PubGrubPackage,
version: &Version,
current_range: &Range<Version>,
request_sink: &Sender<Request>,
index: &InMemoryIndex,
selector: &CandidateSelector,
) -> anyhow::Result<(), ResolveError> {
let PubGrubPackage::Package(package_name, _, _) = &next else {
return Ok(());
};

let (num_tried, do_prefetch) = self.should_prefetch(next);
if !do_prefetch {
return Ok(());
}
let total_prefetch = min(num_tried, 50);

// This is immediate, we already fetched the version map.
let versions_response = index
.packages
.wait(package_name)
.await
.ok_or(ResolveError::Unregistered)?;

let VersionsResponse::Found(ref version_map) = *versions_response else {
return Ok(());
};

let mut phase = BatchPrefetchStrategy::Compatible {
compatible: current_range.clone(),
previous: version.clone(),
};
let mut prefetch_count = 0;
for _ in 0..total_prefetch {
let candidate = match phase {
BatchPrefetchStrategy::Compatible {
compatible,
previous,
} => {
if let Some(candidate) =
selector.select_no_preference(package_name, &compatible, version_map)
{
let compatible = compatible.intersection(
&Range::singleton(candidate.version().clone()).complement(),
);
phase = BatchPrefetchStrategy::Compatible {
compatible,
previous: candidate.version().clone(),
};
candidate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably only do this prefetching when the candidate has a wheel. Otherwise, we risk building tons and tons of source distributions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

} else {
// We exhausted the compatible version, switch to ignoring the existing
// constraints on the package and instead going through versions in order.
phase = BatchPrefetchStrategy::InOrder { previous };
continue;
}
}
BatchPrefetchStrategy::InOrder { previous } => {
let range = if selector.use_highest_version(package_name) {
Range::strictly_lower_than(previous)
} else {
Range::strictly_higher_than(previous)
};
if let Some(candidate) =
selector.select_no_preference(package_name, &range, version_map)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this risks being quadratic, because we iterate over total_prefetch, and then select_no_preference is also linear, right? Is there any way to improve that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did an attempt turning select_candidate into an iterator but then looked at the profiles for urllib3/boto3/botocore and i think it's not worth it.

{
phase = BatchPrefetchStrategy::InOrder {
previous: candidate.version().clone(),
};
candidate
} else {
// Both strategies exhausted their candidates.
break;
}
}
};

let CandidateDist::Compatible(dist) = candidate.dist() else {
continue;
};
// Avoid building a lot of source distributions.
if !dist.prefetchable() {
continue;
}
let dist = dist.for_resolution();

// Emit a request to fetch the metadata for this version.
trace!(
"Prefetching {prefetch_count} ({}) {}",
match phase {
BatchPrefetchStrategy::Compatible { .. } => "compatible",
BatchPrefetchStrategy::InOrder { .. } => "in order",
},
dist
);
prefetch_count += 1;
if index.distributions.register(candidate.package_id()) {
let request = match dist {
ResolvedDistRef::Installable(dist) => Request::Dist(dist.clone()),
ResolvedDistRef::Installed(dist) => Request::Installed(dist.clone()),
};
request_sink.send(request).await?;
}
}

debug!("Prefetching {prefetch_count} {package_name} versions");

self.last_prefetch.insert(next.clone(), num_tried);
Ok(())
}

/// Each time we tried a version for a package, we register that here.
pub(crate) fn version_tried(&mut self, package: PubGrubPackage) {
*self.tried_versions.entry(package).or_default() += 1;
}

/// After 5, 10, 20, 40 tried versions, prefetch that many versions to start early but not
/// too aggressive. Later we schedule the prefetch of 50 versions every 20 versions, this gives
/// us a good buffer until we see prefetch again and is high enough to saturate the task pool.
fn should_prefetch(&self, next: &PubGrubPackage) -> (usize, bool) {
let num_tried = self.tried_versions.get(next).copied().unwrap_or_default();
let previous_prefetch = self.last_prefetch.get(next).copied().unwrap_or_default();
let do_prefetch = (num_tried >= 5 && previous_prefetch < 5)
|| (num_tried >= 10 && previous_prefetch < 10)
|| (num_tried >= 20 && previous_prefetch < 20)
|| (num_tried >= 20 && num_tried - previous_prefetch >= 20);
(num_tried, do_prefetch)
}
}
22 changes: 19 additions & 3 deletions crates/uv-resolver/src/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::pubgrub::{
};
use crate::python_requirement::PythonRequirement;
use crate::resolution::ResolutionGraph;
use crate::resolver::batch_prefetch::BatchPrefetcher;
pub use crate::resolver::index::InMemoryIndex;
pub use crate::resolver::provider::{
DefaultResolverProvider, PackageVersionsResult, ResolverProvider, VersionsResponse,
Expand All @@ -54,6 +55,7 @@ pub use crate::resolver::reporter::{BuildId, Reporter};
use crate::yanks::AllowedYanks;
use crate::{DependencyMode, Exclusions, Options};

mod batch_prefetch;
mod index;
mod locals;
mod provider;
Expand Down Expand Up @@ -213,14 +215,14 @@ impl<
pub async fn resolve(self) -> Result<ResolutionGraph, ResolveError> {
// A channel to fetch package metadata (e.g., given `flask`, fetch all versions) and version
// metadata (e.g., given `flask==1.0.0`, fetch the metadata for that version).
// Channel size is set to the same size as the task buffer for simplicity.
let (request_sink, request_stream) = tokio::sync::mpsc::channel(50);
// Channel size is set large to accommodate batch prefetching.
let (request_sink, request_stream) = tokio::sync::mpsc::channel(300);

// Run the fetcher.
let requests_fut = self.fetch(request_stream).fuse();

// Run the solver.
let resolve_fut = self.solve(request_sink).fuse();
let resolve_fut = self.solve(request_sink).boxed().fuse();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once again making windows happy


// Wait for both to complete.
match tokio::try_join!(requests_fut, resolve_fut) {
Expand Down Expand Up @@ -256,6 +258,7 @@ impl<
request_sink: tokio::sync::mpsc::Sender<Request>,
) -> Result<ResolutionGraph, ResolveError> {
let root = PubGrubPackage::Root(self.project.clone());
let mut prefetcher = BatchPrefetcher::default();

// Keep track of the packages for which we've requested metadata.
let mut pins = FilePins::default();
Expand Down Expand Up @@ -304,6 +307,8 @@ impl<
};
next = highest_priority_pkg;

prefetcher.version_tried(next.clone());

let term_intersection = state
.partial_solution
.term_intersection_for_package(&next)
Expand Down Expand Up @@ -405,6 +410,17 @@ impl<
}
};

prefetcher
.prefetch_batches(
&next,
&version,
term_intersection.unwrap_positive(),
&request_sink,
self.index,
&self.selector,
)
.await?;

self.on_progress(&next, &version);

if added_dependencies
Expand Down