diff --git a/Cargo.lock b/Cargo.lock index 971beeae82336..21b812a7ea3f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2364,6 +2364,7 @@ name = "once-map" version = "0.0.1" dependencies = [ "dashmap", + "futures", "tokio", ] @@ -4998,6 +4999,7 @@ dependencies = [ "cache-key", "chrono", "clap", + "dashmap", "derivative", "distribution-filename", "distribution-types", diff --git a/crates/bench/benches/uv.rs b/crates/bench/benches/uv.rs index 8816dbaaa2add..05c84fa68977f 100644 --- a/crates/bench/benches/uv.rs +++ b/crates/bench/benches/uv.rs @@ -108,7 +108,7 @@ mod resolver { &index, &hashes, &build_context, - &installed_packages, + installed_packages, DistributionDatabase::new(client, &build_context, concurrency.downloads), )?; diff --git a/crates/once-map/Cargo.toml b/crates/once-map/Cargo.toml index 8e6add42aeb7f..9e31e9e441e55 100644 --- a/crates/once-map/Cargo.toml +++ b/crates/once-map/Cargo.toml @@ -15,3 +15,4 @@ workspace = true [dependencies] dashmap = { workspace = true } tokio = { workspace = true } +futures = { workspace = true } diff --git a/crates/once-map/src/lib.rs b/crates/once-map/src/lib.rs index 990626f6e508c..3fcedba4979d8 100644 --- a/crates/once-map/src/lib.rs +++ b/crates/once-map/src/lib.rs @@ -63,6 +63,27 @@ impl OnceMap { } } + /// Wait for the result of a job that is running, in a blocking context. + /// + /// Will hang if [`OnceMap::done`] isn't called for this key. + pub fn wait_blocking(&self, key: &K) -> Option { + let entry = self.items.get(key)?; + match entry.value() { + Value::Filled(value) => Some(value.clone()), + Value::Waiting(notify) => { + let notify = notify.clone(); + drop(entry); + futures::executor::block_on(notify.notified()); + + let entry = self.items.get(key).expect("map is append-only"); + match entry.value() { + Value::Filled(value) => Some(value.clone()), + Value::Waiting(_) => unreachable!("notify was called"), + } + } + } + } + /// Return the result of a previous job, if any. pub fn get(&self, key: &Q) -> Option where diff --git a/crates/uv-dispatch/src/lib.rs b/crates/uv-dispatch/src/lib.rs index e5a185128b5e7..37d2d74c9c750 100644 --- a/crates/uv-dispatch/src/lib.rs +++ b/crates/uv-dispatch/src/lib.rs @@ -153,7 +153,7 @@ impl<'a> BuildContext for BuildDispatch<'a> { self.index, &HashStrategy::None, self, - &EmptyInstalledPackages, + EmptyInstalledPackages, DistributionDatabase::new(self.client, self, self.concurrency.downloads), )?; let graph = resolver.resolve().await.with_context(|| { diff --git a/crates/uv-installer/src/plan.rs b/crates/uv-installer/src/plan.rs index 85709c89c8061..67c7c2b98b562 100644 --- a/crates/uv-installer/src/plan.rs +++ b/crates/uv-installer/src/plan.rs @@ -68,7 +68,7 @@ impl<'a> Planner<'a> { #[allow(clippy::too_many_arguments)] pub fn build( self, - mut site_packages: SitePackages<'_>, + mut site_packages: SitePackages, reinstall: &Reinstall, no_binary: &NoBinary, hasher: &HashStrategy, diff --git a/crates/uv-installer/src/site_packages.rs b/crates/uv-installer/src/site_packages.rs index be56d9c10d68e..05fe50b63eb8e 100644 --- a/crates/uv-installer/src/site_packages.rs +++ b/crates/uv-installer/src/site_packages.rs @@ -23,9 +23,9 @@ use crate::satisfies::RequirementSatisfaction; /// An index over the packages installed in an environment. /// /// Packages are indexed by both name and (for editable installs) URL. -#[derive(Debug)] -pub struct SitePackages<'a> { - venv: &'a PythonEnvironment, +#[derive(Debug, Clone)] +pub struct SitePackages { + venv: PythonEnvironment, /// The vector of all installed distributions. The `by_name` and `by_url` indices index into /// this vector. The vector may contain `None` values, which represent distributions that were /// removed from the virtual environment. @@ -38,9 +38,9 @@ pub struct SitePackages<'a> { by_url: FxHashMap>, } -impl<'a> SitePackages<'a> { +impl SitePackages { /// Build an index of installed packages from the given Python executable. - pub fn from_executable(venv: &'a PythonEnvironment) -> Result> { + pub fn from_executable(venv: &PythonEnvironment) -> Result { let mut distributions: Vec> = Vec::new(); let mut by_name = FxHashMap::default(); let mut by_url = FxHashMap::default(); @@ -68,7 +68,7 @@ impl<'a> SitePackages<'a> { } Err(err) if err.kind() == std::io::ErrorKind::NotFound => { return Ok(Self { - venv, + venv: venv.clone(), distributions, by_name, by_url, @@ -107,7 +107,7 @@ impl<'a> SitePackages<'a> { } Ok(Self { - venv, + venv: venv.clone(), distributions, by_name, by_url, @@ -439,7 +439,7 @@ pub enum SatisfiesResult { Unsatisfied(String), } -impl IntoIterator for SitePackages<'_> { +impl IntoIterator for SitePackages { type Item = InstalledDist; type IntoIter = Flatten>>; @@ -540,7 +540,7 @@ impl Diagnostic { } } -impl InstalledPackagesProvider for SitePackages<'_> { +impl InstalledPackagesProvider for SitePackages { fn iter(&self) -> impl Iterator { self.iter() } diff --git a/crates/uv-interpreter/src/environment.rs b/crates/uv-interpreter/src/environment.rs index 2811755e572f1..69df98e391542 100644 --- a/crates/uv-interpreter/src/environment.rs +++ b/crates/uv-interpreter/src/environment.rs @@ -1,6 +1,7 @@ use itertools::Either; use std::env; use std::path::{Path, PathBuf}; +use std::sync::Arc; use same_file::is_same_file; @@ -12,7 +13,10 @@ use crate::{find_default_python, find_requested_python, Error, Interpreter, Targ /// A Python environment, consisting of a Python [`Interpreter`] and its associated paths. #[derive(Debug, Clone)] -pub struct PythonEnvironment { +pub struct PythonEnvironment(Arc); + +#[derive(Debug, Clone)] +struct PythonEnvironmentShared { root: PathBuf, interpreter: Interpreter, } @@ -46,10 +50,10 @@ impl PythonEnvironment { interpreter.base_prefix().display() ); - Ok(Self { + Ok(Self(Arc::new(PythonEnvironmentShared { root: venv, interpreter, - }) + }))) } /// Create a [`PythonEnvironment`] for a Python interpreter specifier (e.g., a path or a binary name). @@ -57,57 +61,58 @@ impl PythonEnvironment { let Some(interpreter) = find_requested_python(python, cache)? else { return Err(Error::RequestedPythonNotFound(python.to_string())); }; - Ok(Self { + Ok(Self(Arc::new(PythonEnvironmentShared { root: interpreter.prefix().to_path_buf(), interpreter, - }) + }))) } /// Create a [`PythonEnvironment`] for the default Python interpreter. pub fn from_default_python(cache: &Cache) -> Result { let interpreter = find_default_python(cache)?; - Ok(Self { + Ok(Self(Arc::new(PythonEnvironmentShared { root: interpreter.prefix().to_path_buf(), interpreter, - }) + }))) } /// Create a [`PythonEnvironment`] from an existing [`Interpreter`] and root directory. pub fn from_interpreter(interpreter: Interpreter) -> Self { - Self { + Self(Arc::new(PythonEnvironmentShared { root: interpreter.prefix().to_path_buf(), interpreter, - } + })) } /// Create a [`PythonEnvironment`] from an existing [`Interpreter`] and `--target` directory. #[must_use] pub fn with_target(self, target: Target) -> Self { - Self { - interpreter: self.interpreter.with_target(target), - ..self - } + let inner = Arc::unwrap_or_clone(self.0); + Self(Arc::new(PythonEnvironmentShared { + interpreter: inner.interpreter.with_target(target), + ..inner + })) } /// Returns the root (i.e., `prefix`) of the Python interpreter. pub fn root(&self) -> &Path { - &self.root + &self.0.root } /// Return the [`Interpreter`] for this virtual environment. pub fn interpreter(&self) -> &Interpreter { - &self.interpreter + &self.0.interpreter } /// Return the [`PyVenvConfiguration`] for this virtual environment, as extracted from the /// `pyvenv.cfg` file. pub fn cfg(&self) -> Result { - Ok(PyVenvConfiguration::parse(self.root.join("pyvenv.cfg"))?) + Ok(PyVenvConfiguration::parse(self.0.root.join("pyvenv.cfg"))?) } /// Returns the location of the Python executable. pub fn python_executable(&self) -> &Path { - self.interpreter.sys_executable() + self.0.interpreter.sys_executable() } /// Returns an iterator over the `site-packages` directories inside a virtual environment. @@ -118,11 +123,11 @@ impl PythonEnvironment { /// Some distributions also create symbolic links from `purelib` to `platlib`; in such cases, we /// still deduplicate the entries, returning a single path. pub fn site_packages(&self) -> impl Iterator { - if let Some(target) = self.interpreter.target() { + if let Some(target) = self.0.interpreter.target() { Either::Left(std::iter::once(target.root())) } else { - let purelib = self.interpreter.purelib(); - let platlib = self.interpreter.platlib(); + let purelib = self.0.interpreter.purelib(); + let platlib = self.0.interpreter.platlib(); Either::Right(std::iter::once(purelib).chain( if purelib == platlib || is_same_file(purelib, platlib).unwrap_or(false) { None @@ -135,31 +140,31 @@ impl PythonEnvironment { /// Returns the path to the `bin` directory inside a virtual environment. pub fn scripts(&self) -> &Path { - self.interpreter.scripts() + self.0.interpreter.scripts() } /// Grab a file lock for the virtual environment to prevent concurrent writes across processes. pub fn lock(&self) -> Result { - if let Some(target) = self.interpreter.target() { + if let Some(target) = self.0.interpreter.target() { // If we're installing into a `--target`, use a target-specific lock file. LockedFile::acquire( target.root().join(".lock"), target.root().simplified_display(), ) - } else if self.interpreter.is_virtualenv() { + } else if self.0.interpreter.is_virtualenv() { // If the environment a virtualenv, use a virtualenv-specific lock file. - LockedFile::acquire(self.root.join(".lock"), self.root.simplified_display()) + LockedFile::acquire(self.0.root.join(".lock"), self.0.root.simplified_display()) } else { // Otherwise, use a global lock file. LockedFile::acquire( - env::temp_dir().join(format!("uv-{}.lock", cache_key::digest(&self.root))), - self.root.simplified_display(), + env::temp_dir().join(format!("uv-{}.lock", cache_key::digest(&self.0.root))), + self.0.root.simplified_display(), ) } } /// Return the [`Interpreter`] for this virtual environment. pub fn into_interpreter(self) -> Interpreter { - self.interpreter + Arc::unwrap_or_clone(self.0).interpreter } } diff --git a/crates/uv-requirements/src/lookahead.rs b/crates/uv-requirements/src/lookahead.rs index 188ee02a6e4fe..e016319f0759c 100644 --- a/crates/uv-requirements/src/lookahead.rs +++ b/crates/uv-requirements/src/lookahead.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::{collections::VecDeque, sync::Arc}; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -197,17 +197,18 @@ impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> { // Fetch the metadata for the distribution. let requires_dist = { let id = dist.version_id(); - if let Some(archive) = self - .index - .get_metadata(&id) - .as_deref() - .and_then(|response| { - if let MetadataResponse::Found(archive, ..) = response { - Some(archive) - } else { - None - } - }) + if let Some(archive) = + self.index + .distributions() + .get(&id) + .as_deref() + .and_then(|response| { + if let MetadataResponse::Found(archive, ..) = response { + Some(archive) + } else { + None + } + }) { // If the metadata is already in the index, return it. archive @@ -234,7 +235,8 @@ impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> { // Insert the metadata into the index. self.index - .insert_metadata(id, MetadataResponse::Found(archive)); + .distributions() + .done(id, Arc::new(MetadataResponse::Found(archive))); requires_dist .into_iter() diff --git a/crates/uv-requirements/src/source_tree.rs b/crates/uv-requirements/src/source_tree.rs index 0cbf5a9d75433..548b164111321 100644 --- a/crates/uv-requirements/src/source_tree.rs +++ b/crates/uv-requirements/src/source_tree.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::{Context, Result}; use futures::stream::FuturesOrdered; @@ -117,17 +118,18 @@ impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> { // Fetch the metadata for the distribution. let metadata = { let id = VersionId::from_url(source.url()); - if let Some(archive) = self - .index - .get_metadata(&id) - .as_deref() - .and_then(|response| { - if let MetadataResponse::Found(archive) = response { - Some(archive) - } else { - None - } - }) + if let Some(archive) = + self.index + .distributions() + .get(&id) + .as_deref() + .and_then(|response| { + if let MetadataResponse::Found(archive) = response { + Some(archive) + } else { + None + } + }) { // If the metadata is already in the index, return it. archive.metadata.clone() @@ -138,7 +140,8 @@ impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> { // Insert the metadata into the index. self.index - .insert_metadata(id, MetadataResponse::Found(archive.clone())); + .distributions() + .done(id, Arc::new(MetadataResponse::Found(archive.clone()))); archive.metadata } diff --git a/crates/uv-requirements/src/unnamed.rs b/crates/uv-requirements/src/unnamed.rs index 2c22cb8674e87..2379c9f31fdf2 100644 --- a/crates/uv-requirements/src/unnamed.rs +++ b/crates/uv-requirements/src/unnamed.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use std::path::Path; use std::str::FromStr; +use std::sync::Arc; use anyhow::Result; use configparser::ini::Ini; @@ -254,13 +255,18 @@ impl<'a, Context: BuildContext> NamedRequirementsResolver<'a, Context> { // Fetch the metadata for the distribution. let name = { let id = VersionId::from_url(source.url()); - if let Some(archive) = index.get_metadata(&id).as_deref().and_then(|response| { - if let MetadataResponse::Found(archive) = response { - Some(archive) - } else { - None - } - }) { + if let Some(archive) = index + .distributions() + .get(&id) + .as_deref() + .and_then(|response| { + if let MetadataResponse::Found(archive) = response { + Some(archive) + } else { + None + } + }) + { // If the metadata is already in the index, return it. archive.metadata.name.clone() } else { @@ -272,7 +278,9 @@ impl<'a, Context: BuildContext> NamedRequirementsResolver<'a, Context> { let name = archive.metadata.name.clone(); // Insert the metadata into the index. - index.insert_metadata(id, MetadataResponse::Found(archive)); + index + .distributions() + .done(id, Arc::new(MetadataResponse::Found(archive))); name } diff --git a/crates/uv-resolver/Cargo.toml b/crates/uv-resolver/Cargo.toml index 7239d3079527b..5dfbbcb2b7e8b 100644 --- a/crates/uv-resolver/Cargo.toml +++ b/crates/uv-resolver/Cargo.toml @@ -56,6 +56,7 @@ tokio = { workspace = true } tokio-stream = { workspace = true } tracing = { workspace = true } url = { workspace = true } +dashmap = { workspace = true } [dev-dependencies] uv-interpreter = { workspace = true } diff --git a/crates/uv-resolver/src/error.rs b/crates/uv-resolver/src/error.rs index 8031f944c4171..5132d61e2d400 100644 --- a/crates/uv-resolver/src/error.rs +++ b/crates/uv-resolver/src/error.rs @@ -1,7 +1,6 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Formatter; use std::ops::Deref; -use std::rc::Rc; use std::sync::Arc; use indexmap::IndexMap; @@ -9,6 +8,7 @@ use pubgrub::range::Range; use pubgrub::report::{DefaultStringReporter, DerivationTree, External, Reporter}; use rustc_hash::FxHashMap; +use dashmap::{DashMap, DashSet}; use distribution_types::{BuiltDist, IndexLocations, InstalledDist, ParsedUrlError, SourceDist}; use once_map::OnceMap; use pep440_rs::Version; @@ -19,10 +19,7 @@ use crate::candidate_selector::CandidateSelector; use crate::dependency_provider::UvDependencyProvider; use crate::pubgrub::{PubGrubPackage, PubGrubPython, PubGrubReportFormatter}; use crate::python_requirement::PythonRequirement; -use crate::resolver::{ - IncompletePackage, SharedMap, SharedSet, UnavailablePackage, UnavailableReason, - VersionsResponse, -}; +use crate::resolver::{IncompletePackage, UnavailablePackage, UnavailableReason, VersionsResponse}; #[derive(Debug, thiserror::Error)] pub enum ResolveError { @@ -238,8 +235,8 @@ impl NoSolutionError { pub(crate) fn with_available_versions( mut self, python_requirement: &PythonRequirement, - visited: &SharedSet, - package_versions: &OnceMap>, + visited: &DashSet, + package_versions: &OnceMap>, ) -> Self { let mut available_versions = IndexMap::default(); for package in self.derivation_tree.packages() { @@ -263,7 +260,7 @@ impl NoSolutionError { // tree, but were never visited during resolution. We _may_ have metadata for // these packages, but it's non-deterministic, and omitting them ensures that // we represent the state of the resolver at the time of failure. - if visited.borrow().contains(name) { + if visited.contains(name) { if let Some(response) = package_versions.get(name) { if let VersionsResponse::Found(ref version_maps) = *response { for version_map in version_maps { @@ -302,9 +299,8 @@ impl NoSolutionError { #[must_use] pub(crate) fn with_unavailable_packages( mut self, - unavailable_packages: &SharedMap, + unavailable_packages: &DashMap, ) -> Self { - let unavailable_packages = unavailable_packages.borrow(); let mut new = FxHashMap::default(); for package in self.derivation_tree.packages() { if let PubGrubPackage::Package(name, _, _) = package { @@ -321,14 +317,14 @@ impl NoSolutionError { #[must_use] pub(crate) fn with_incomplete_packages( mut self, - incomplete_packages: &SharedMap>, + incomplete_packages: &DashMap>, ) -> Self { let mut new = FxHashMap::default(); - let incomplete_packages = incomplete_packages.borrow(); for package in self.derivation_tree.packages() { if let PubGrubPackage::Package(name, _, _) = package { if let Some(versions) = incomplete_packages.get(name) { - for (version, reason) in versions.borrow().iter() { + for entry in versions.iter() { + let (version, reason) = entry.pair(); new.entry(name.clone()) .or_insert_with(BTreeMap::default) .insert(version.clone(), reason.clone()); diff --git a/crates/uv-resolver/src/preferences.rs b/crates/uv-resolver/src/preferences.rs index d86963126ef45..5f1aa636bfa82 100644 --- a/crates/uv-resolver/src/preferences.rs +++ b/crates/uv-resolver/src/preferences.rs @@ -1,4 +1,5 @@ use std::str::FromStr; +use std::sync::Arc; use rustc_hash::FxHashMap; use tracing::trace; @@ -69,7 +70,7 @@ impl Preference { /// A set of pinned packages that should be preserved during resolution, if possible. #[derive(Debug, Clone)] -pub(crate) struct Preferences(FxHashMap); +pub(crate) struct Preferences(Arc>); impl Preferences { /// Create a map of pinned packages from an iterator of [`Preference`] entries. @@ -81,10 +82,10 @@ impl Preferences { preferences: PreferenceIterator, markers: Option<&MarkerEnvironment>, ) -> Self { - Self( - // TODO(zanieb): We should explicitly ensure that when a package name is seen multiple times - // that the newest or oldest version is preferred dependning on the resolution strategy; - // right now, the order is dependent on the given iterator. + // TODO(zanieb): We should explicitly ensure that when a package name is seen multiple times + // that the newest or oldest version is preferred dependning on the resolution strategy; + // right now, the order is dependent on the given iterator. + let preferences = preferences .into_iter() .filter_map(|preference| { @@ -130,8 +131,9 @@ impl Preferences { } } }) - .collect(), - ) + .collect(); + + Self(Arc::new(preferences)) } /// Return the pinned version for a package, if any. diff --git a/crates/uv-resolver/src/resolution/graph.rs b/crates/uv-resolver/src/resolution/graph.rs index ae0789b83f61d..a3853deba0f50 100644 --- a/crates/uv-resolver/src/resolution/graph.rs +++ b/crates/uv-resolver/src/resolution/graph.rs @@ -1,5 +1,5 @@ use std::hash::BuildHasherDefault; -use std::rc::Rc; +use std::sync::Arc; use pubgrub::range::Range; use pubgrub::solver::{Kind, State}; @@ -45,8 +45,8 @@ impl ResolutionGraph { pub(crate) fn from_state( selection: &SelectedDependencies, pins: &FilePins, - packages: &OnceMap>, - distributions: &OnceMap>, + packages: &OnceMap>, + distributions: &OnceMap>, state: &State, preferences: &Preferences, editables: Editables, @@ -458,7 +458,7 @@ impl ResolutionGraph { VersionOrUrlRef::Url(verbatim_url) => VersionId::from_url(verbatim_url.raw()), }; let res = index - .distributions + .distributions() .get(&version_id) .expect("every package in resolution graph has metadata"); let MetadataResponse::Found(archive, ..) = &*res else { diff --git a/crates/uv-resolver/src/resolver/batch_prefetch.rs b/crates/uv-resolver/src/resolver/batch_prefetch.rs index a5d25937825a1..311fa3b54952b 100644 --- a/crates/uv-resolver/src/resolver/batch_prefetch.rs +++ b/crates/uv-resolver/src/resolver/batch_prefetch.rs @@ -44,7 +44,7 @@ pub(crate) struct BatchPrefetcher { impl BatchPrefetcher { /// Prefetch a large number of versions if we already unsuccessfully tried many versions. - pub(crate) async fn prefetch_batches( + pub(crate) fn prefetch_batches( &mut self, next: &PubGrubPackage, version: &Version, @@ -65,9 +65,8 @@ impl BatchPrefetcher { // This is immediate, we already fetched the version map. let versions_response = index - .packages - .wait(package_name) - .await + .packages() + .wait_blocking(package_name) .ok_or(ResolveError::Unregistered)?; let VersionsResponse::Found(ref version_map) = *versions_response else { @@ -142,9 +141,10 @@ impl BatchPrefetcher { dist ); prefetch_count += 1; - if index.distributions.register(candidate.version_id()) { + + if index.distributions().register(candidate.version_id()) { let request = Request::from(dist); - request_sink.send(request).await?; + request_sink.blocking_send(request)?; } } diff --git a/crates/uv-resolver/src/resolver/index.rs b/crates/uv-resolver/src/resolver/index.rs index 8f06bbd07cdfd..fedcbcffe1d5a 100644 --- a/crates/uv-resolver/src/resolver/index.rs +++ b/crates/uv-resolver/src/resolver/index.rs @@ -1,4 +1,4 @@ -use std::rc::Rc; +use std::sync::Arc; use distribution_types::VersionId; use once_map::OnceMap; @@ -7,34 +7,27 @@ use uv_normalize::PackageName; use crate::resolver::provider::{MetadataResponse, VersionsResponse}; /// In-memory index of package metadata. +#[derive(Default, Clone)] +pub struct InMemoryIndex(Arc); + #[derive(Default)] -pub struct InMemoryIndex { +struct SharedInMemoryIndex { /// A map from package name to the metadata for that package and the index where the metadata /// came from. - pub(crate) packages: OnceMap>, + packages: OnceMap>, /// A map from package ID to metadata for that distribution. - pub(crate) distributions: OnceMap>, + distributions: OnceMap>, } impl InMemoryIndex { - /// Insert a [`VersionsResponse`] into the index. - pub fn insert_package(&self, package_name: PackageName, response: VersionsResponse) { - self.packages.done(package_name, Rc::new(response)); - } - - /// Insert a [`Metadata23`] into the index. - pub fn insert_metadata(&self, version_id: VersionId, response: MetadataResponse) { - self.distributions.done(version_id, Rc::new(response)); - } - - /// Get the [`VersionsResponse`] for a given package name, without waiting. - pub fn get_package(&self, package_name: &PackageName) -> Option> { - self.packages.get(package_name) + /// Returns a reference to the package metadata map. + pub fn packages(&self) -> &OnceMap> { + &self.0.packages } - /// Get the [`MetadataResponse`] for a given package ID, without waiting. - pub fn get_metadata(&self, version_id: &VersionId) -> Option> { - self.distributions.get(version_id) + /// Returns a reference to the distribution metadata map. + pub fn distributions(&self) -> &OnceMap> { + &self.0.distributions } } diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index cdb12bd66d91b..87f9f636c44da 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -1,22 +1,23 @@ //! Given a set of requirements, find a set of compatible packages. use std::borrow::Cow; -use std::cell::RefCell; -use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::ops::Deref; -use std::rc::Rc; use std::sync::Arc; +use std::thread; use anyhow::Result; +use dashmap::{DashMap, DashSet}; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use pubgrub::error::PubGrubError; use pubgrub::range::Range; use pubgrub::solver::{Incompatibility, State}; use rustc_hash::{FxHashMap, FxHashSet}; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::oneshot; use tokio_stream::wrappers::ReceiverStream; -use tracing::{debug, enabled, info_span, instrument, trace, warn, Instrument, Level}; +use tracing::{debug, enabled, instrument, trace, warn, Level}; use distribution_types::{ BuiltDist, Dist, DistributionMetadata, IncompatibleDist, IncompatibleSource, IncompatibleWheel, @@ -172,10 +173,14 @@ enum ResolverVersion { Unavailable(Version, UnavailableVersion), } -pub(crate) type SharedMap = Rc>>; -pub(crate) type SharedSet = Rc>>; +pub struct Resolver { + state: ResolverState, + provider: Provider, +} -pub struct Resolver<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider> { +/// State that is shared between the prefetcher and the PubGrub solver during +/// resolution. +struct ResolverState { project: Option, requirements: Vec, constraints: Constraints, @@ -186,25 +191,24 @@ pub struct Resolver<'a, Provider: ResolverProvider, InstalledPackages: Installed urls: Urls, locals: Locals, dependency_mode: DependencyMode, - hasher: &'a HashStrategy, + hasher: HashStrategy, /// When not set, the resolver is in "universal" mode. - markers: Option<&'a MarkerEnvironment>, - python_requirement: &'a PythonRequirement, + markers: Option, + python_requirement: PythonRequirement, selector: CandidateSelector, - index: &'a InMemoryIndex, - installed_packages: &'a InstalledPackages, + index: InMemoryIndex, + installed_packages: InstalledPackages, /// Incompatibilities for packages that are entirely unavailable. - unavailable_packages: SharedMap, + unavailable_packages: DashMap, /// Incompatibilities for packages that are unavailable at specific versions. - incomplete_packages: SharedMap>, + incomplete_packages: DashMap>, /// The set of all registry-based packages visited during resolution. - visited: SharedSet, + visited: DashSet, reporter: Option>, - provider: Provider, } impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider> - Resolver<'a, DefaultResolverProvider<'a, Context>, InstalledPackages> + Resolver, InstalledPackages> { /// Initialize a new resolver using the default backend doing real requests. /// @@ -235,7 +239,7 @@ impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider> index: &'a InMemoryIndex, hasher: &'a HashStrategy, build_context: &'a Context, - installed_packages: &'a InstalledPackages, + installed_packages: InstalledPackages, database: DistributionDatabase<'a, Context>, ) -> Result { let provider = DefaultResolverProvider::new( @@ -263,26 +267,26 @@ impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider> } } -impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider> - Resolver<'a, Provider, InstalledPackages> +impl + Resolver { /// Initialize a new resolver using a user provided backend. #[allow(clippy::too_many_arguments)] pub fn new_custom_io( manifest: Manifest, options: Options, - hasher: &'a HashStrategy, - markers: Option<&'a MarkerEnvironment>, - python_requirement: &'a PythonRequirement, - index: &'a InMemoryIndex, + hasher: &HashStrategy, + markers: Option<&MarkerEnvironment>, + python_requirement: &PythonRequirement, + index: &InMemoryIndex, provider: Provider, - installed_packages: &'a InstalledPackages, + installed_packages: InstalledPackages, ) -> Result { - Ok(Self { - index, - unavailable_packages: SharedMap::default(), - incomplete_packages: SharedMap::default(), - visited: SharedSet::default(), + let state = ResolverState { + index: index.clone(), + unavailable_packages: DashMap::default(), + incomplete_packages: DashMap::default(), + visited: DashSet::default(), selector: CandidateSelector::for_resolution(options, &manifest, markers), dependency_mode: options.dependency_mode, urls: Urls::from_manifest(&manifest, markers, options.dependency_mode)?, @@ -294,43 +298,63 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide preferences: Preferences::from_iter(manifest.preferences, markers), exclusions: manifest.exclusions, editables: Editables::from_requirements(manifest.editables), - hasher, - markers, - python_requirement, + hasher: hasher.clone(), + markers: markers.cloned(), + python_requirement: python_requirement.clone(), reporter: None, - provider, installed_packages, - }) + }; + Ok(Self { state, provider }) } /// Set the [`Reporter`] to use for this installer. #[must_use] pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self { let reporter = Arc::new(reporter); + Self { - reporter: Some(reporter.clone()), + state: ResolverState { + reporter: Some(reporter.clone()), + ..self.state + }, provider: self.provider.with_reporter(Facade { reporter }), - ..self } } /// Resolve a set of requirements into a set of pinned versions. pub async fn resolve(self) -> Result { + let state = Arc::new(self.state); + let provider = Arc::new(self.provider); + // A channel to fetch package metadata (e.g., given `flask`, fetch all versions) and version // metadata (e.g., given `flask==1.0.0`, fetch the metadata for that version). // Channel size is set large to accommodate batch prefetching. - let (request_sink, request_stream) = tokio::sync::mpsc::channel(300); + let (request_sink, request_stream) = mpsc::channel(300); // Run the fetcher. - let requests_fut = self.fetch(request_stream).fuse(); - - // Run the solver. - let resolve_fut = self.solve(request_sink).boxed_local().fuse(); + let requests_fut = state.clone().fetch(provider.clone(), request_stream).fuse(); + + // Spawn the PubGrub solver on a dedicated thread. + let solver = state.clone(); + let (tx, rx) = oneshot::channel(); + thread::Builder::new() + .name("uv-resolver".into()) + .spawn(move || { + let result = solver.solve(request_sink); + tx.send(result).unwrap(); + }) + .unwrap(); + + let resolve_fut = async move { + rx.await + .map_err(|_| ResolveError::ChannelClosed) + .and_then(|result| result) + }; // Wait for both to complete. match tokio::try_join!(requests_fut, resolve_fut) { Ok(((), resolution)) => { - self.on_complete(); + state.on_complete(); Ok(resolution) } Err(err) => { @@ -338,15 +362,15 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide Err(if let ResolveError::NoSolution(err) = err { ResolveError::NoSolution( err.with_available_versions( - self.python_requirement, - &self.visited, - &self.index.packages, + &state.python_requirement, + &state.visited, + state.index.packages(), ) - .with_selector(self.selector.clone()) - .with_python_requirement(self.python_requirement) - .with_index_locations(self.provider.index_locations()) - .with_unavailable_packages(&self.unavailable_packages) - .with_incomplete_packages(&self.incomplete_packages), + .with_selector(state.selector.clone()) + .with_python_requirement(&state.python_requirement) + .with_index_locations(provider.index_locations()) + .with_unavailable_packages(&state.unavailable_packages) + .with_incomplete_packages(&state.incomplete_packages), ) } else { err @@ -354,16 +378,18 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide } } } +} +impl ResolverState { /// Run the PubGrub solver. #[instrument(skip_all)] - async fn solve( - &self, - request_sink: tokio::sync::mpsc::Sender, + fn solve( + self: Arc, + request_sink: Sender, ) -> Result { let root = PubGrubPackage::Root(self.project.clone()); let mut prefetcher = BatchPrefetcher::default(); - let mut state = ResolverState { + let mut state = SolveState { pubgrub: State::init(root.clone(), MIN_VERSION.clone()), next: root, pins: FilePins::default(), @@ -386,8 +412,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide Self::pre_visit( state.pubgrub.partial_solution.prioritized_packages(), &request_sink, - ) - .await?; + )?; } // Choose a package version. @@ -403,8 +428,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide return ResolutionGraph::from_state( &selection, &state.pins, - &self.index.packages, - &self.index.distributions, + self.index.packages(), + self.index.distributions(), &state.pubgrub, &self.preferences, self.editables.clone(), @@ -421,14 +446,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide .ok_or_else(|| { PubGrubError::Failure("a package was chosen but we don't have a term.".into()) })?; - let decision = self - .choose_version( - &state.next, - term_intersection.unwrap_positive(), - &mut state.pins, - &request_sink, - ) - .await?; + let decision = self.choose_version( + &state.next, + term_intersection.unwrap_positive(), + &mut state.pins, + &request_sink, + )?; // Pick the next compatible version. let version = match decision { @@ -443,7 +466,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Check if the decision was due to the package being unavailable if let PubGrubPackage::Package(ref package_name, _, _) = state.next { - if let Some(entry) = self.unavailable_packages.borrow().get(package_name) { + if let Some(entry) = self.unavailable_packages.get(package_name) { state .pubgrub .add_incompatibility(Incompatibility::custom_term( @@ -512,16 +535,14 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide } }; - prefetcher - .prefetch_batches( - &state.next, - &version, - term_intersection.unwrap_positive(), - &request_sink, - self.index, - &self.selector, - ) - .await?; + prefetcher.prefetch_batches( + &state.next, + &version, + term_intersection.unwrap_positive(), + &request_sink, + &self.index, + &self.selector, + )?; self.on_progress(&state.next, &version); @@ -533,10 +554,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide { // Retrieve that package dependencies. let package = &state.next; - let dependencies = match self - .get_dependencies(package, &version, &mut state.priorities, &request_sink) - .await? - { + let dependencies = match self.get_dependencies( + package, + &version, + &mut state.priorities, + &request_sink, + )? { Dependencies::Unavailable(reason) => { state .pubgrub @@ -590,10 +613,10 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide /// Visit a [`PubGrubPackage`] prior to selection. This should be called on a [`PubGrubPackage`] /// before it is selected, to allow metadata to be fetched in parallel. - async fn visit_package( + fn visit_package( &self, package: &PubGrubPackage, - request_sink: &tokio::sync::mpsc::Sender, + request_sink: &Sender, ) -> Result<(), ResolveError> { match package { PubGrubPackage::Root(_) => {} @@ -606,8 +629,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide } // Emit a request to fetch the metadata for this package. - if self.index.packages.register(name.clone()) { - request_sink.send(Request::Package(name.clone())).await?; + if self.index.packages().register(name.clone()) { + request_sink.blocking_send(Request::Package(name.clone()))?; } } PubGrubPackage::Package(name, _extra, Some(url)) => { @@ -623,8 +646,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Emit a request to fetch the metadata for this distribution. let dist = Dist::from_url(name.clone(), url.clone())?; - if self.index.distributions.register(dist.version_id()) { - request_sink.send(Request::Dist(dist)).await?; + if self.index.distributions().register(dist.version_id()) { + request_sink.blocking_send(Request::Dist(dist))?; } } } @@ -633,9 +656,9 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide /// Visit the set of [`PubGrubPackage`] candidates prior to selection. This allows us to fetch /// metadata for all of the packages in parallel. - async fn pre_visit<'data>( + fn pre_visit<'data>( packages: impl Iterator)>, - request_sink: &tokio::sync::mpsc::Sender, + request_sink: &Sender, ) -> Result<(), ResolveError> { // Iterate over the potential packages, and fetch file metadata for any of them. These // represent our current best guesses for the versions that we _might_ select. @@ -643,9 +666,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide let PubGrubPackage::Package(package_name, None, None) = package else { continue; }; - request_sink - .send(Request::Prefetch(package_name.clone(), range.clone())) - .await?; + request_sink.blocking_send(Request::Prefetch(package_name.clone(), range.clone()))?; } Ok(()) } @@ -655,12 +676,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide /// /// Returns [None] when there are no versions in the given range. #[instrument(skip_all, fields(%package))] - async fn choose_version( + fn choose_version( &self, - package: &'a PubGrubPackage, + package: &PubGrubPackage, range: &Range, pins: &mut FilePins, - request_sink: &tokio::sync::mpsc::Sender, + request_sink: &Sender, ) -> Result, ResolveError> { match package { PubGrubPackage::Root(_) => Ok(Some(ResolverVersion::Available(MIN_VERSION.clone()))), @@ -718,9 +739,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide let dist = PubGrubDistribution::from_url(package_name, url); let response = self .index - .distributions - .wait(&dist.version_id()) - .await + .distributions() + .wait_blocking(&dist.version_id()) .ok_or(ResolveError::Unregistered)?; // If we failed to fetch the metadata for a URL, we can't proceed. @@ -728,26 +748,25 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide MetadataResponse::Found(archive) => &archive.metadata, MetadataResponse::Offline => { self.unavailable_packages - .borrow_mut() .insert(package_name.clone(), UnavailablePackage::Offline); return Ok(None); } MetadataResponse::InvalidMetadata(err) => { - self.unavailable_packages.borrow_mut().insert( + self.unavailable_packages.insert( package_name.clone(), UnavailablePackage::InvalidMetadata(err.to_string()), ); return Ok(None); } MetadataResponse::InconsistentMetadata(err) => { - self.unavailable_packages.borrow_mut().insert( + self.unavailable_packages.insert( package_name.clone(), UnavailablePackage::InvalidMetadata(err.to_string()), ); return Ok(None); } MetadataResponse::InvalidStructure(err) => { - self.unavailable_packages.borrow_mut().insert( + self.unavailable_packages.insert( package_name.clone(), UnavailablePackage::InvalidStructure(err.to_string()), ); @@ -783,30 +802,25 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Wait for the metadata to be available. let versions_response = self .index - .packages - .wait(package_name) - .instrument(info_span!("package_wait", %package_name)) - .await + .packages() + .wait_blocking(package_name) .ok_or(ResolveError::Unregistered)?; - self.visited.borrow_mut().insert(package_name.clone()); + self.visited.insert(package_name.clone()); let version_maps = match *versions_response { VersionsResponse::Found(ref version_maps) => version_maps.as_slice(), VersionsResponse::NoIndex => { self.unavailable_packages - .borrow_mut() .insert(package_name.clone(), UnavailablePackage::NoIndex); &[] } VersionsResponse::Offline => { self.unavailable_packages - .borrow_mut() .insert(package_name.clone(), UnavailablePackage::Offline); &[] } VersionsResponse::NotFound => { self.unavailable_packages - .borrow_mut() .insert(package_name.clone(), UnavailablePackage::NotFound); &[] } @@ -820,7 +834,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide range, version_maps, &self.preferences, - self.installed_packages, + &self.installed_packages, &self.exclusions, ) else { // Short circuit: we couldn't find _any_ versions for a package. @@ -863,9 +877,9 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Emit a request to fetch the metadata for this version. if matches!(package, PubGrubPackage::Package(_, _, _)) { - if self.index.distributions.register(candidate.version_id()) { + if self.index.distributions().register(candidate.version_id()) { let request = Request::from(dist.for_resolution()); - request_sink.send(request).await?; + request_sink.blocking_send(request)?; } } @@ -876,12 +890,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide /// Given a candidate package and version, return its dependencies. #[instrument(skip_all, fields(%package, %version))] - async fn get_dependencies( + fn get_dependencies( &self, package: &PubGrubPackage, version: &Version, priorities: &mut PubGrubPriorities, - request_sink: &tokio::sync::mpsc::Sender, + request_sink: &Sender, ) -> Result { match package { PubGrubPackage::Root(_) => { @@ -894,7 +908,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide None, &self.urls, &self.locals, - self.markers, + self.markers.as_ref(), ); let mut dependencies = match dependencies { @@ -913,7 +927,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide priorities.insert(package, version); // Emit a request to fetch the metadata for this package. - self.visit_package(package, request_sink).await?; + self.visit_package(package, request_sink)?; } // Add a dependency on each editable. @@ -942,7 +956,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Add any constraints. for constraint in self.constraints.get(&metadata.name).into_iter().flatten() { - if constraint.evaluate_markers(self.markers, &[]) { + if constraint.evaluate_markers(self.markers.as_ref(), &[]) { let PubGrubRequirement { package, version } = PubGrubRequirement::from_constraint( constraint, @@ -974,10 +988,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Wait for the metadata to be available. self.index - .distributions - .wait(&version_id) - .instrument(info_span!("distributions_wait", %version_id)) - .await + .distributions() + .wait_blocking(&version_id) .ok_or(ResolveError::Unregistered)?; } @@ -1000,7 +1012,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide extra.as_ref(), &self.urls, &self.locals, - self.markers, + self.markers.as_ref(), )?; for (dep_package, dep_version) in dependencies.iter() { @@ -1010,7 +1022,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide priorities.insert(dep_package, dep_version); // Emit a request to fetch the metadata for this package. - self.visit_package(dep_package, request_sink).await?; + self.visit_package(dep_package, request_sink)?; } return Ok(Dependencies::Available(dependencies.into())); @@ -1024,11 +1036,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide let version_id = dist.version_id(); // If the package does not exist in the registry or locally, we cannot fetch its dependencies - if self - .unavailable_packages - .borrow() - .get(package_name) - .is_some() + if self.unavailable_packages.get(package_name).is_some() && self .installed_packages .get_packages(package_name) @@ -1046,30 +1054,24 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Wait for the metadata to be available. let response = self .index - .distributions - .wait(&version_id) - .instrument(info_span!("distributions_wait", %version_id)) - .await + .distributions() + .wait_blocking(&version_id) .ok_or(ResolveError::Unregistered)?; let metadata = match &*response { MetadataResponse::Found(archive) => &archive.metadata, MetadataResponse::Offline => { self.incomplete_packages - .borrow_mut() .entry(package_name.clone()) .or_default() - .borrow_mut() .insert(version.clone(), IncompletePackage::Offline); return Ok(Dependencies::Unavailable(UnavailableVersion::Offline)); } MetadataResponse::InvalidMetadata(err) => { warn!("Unable to extract metadata for {package_name}: {err}"); self.incomplete_packages - .borrow_mut() .entry(package_name.clone()) .or_default() - .borrow_mut() .insert( version.clone(), IncompletePackage::InvalidMetadata(err.to_string()), @@ -1081,10 +1083,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide MetadataResponse::InconsistentMetadata(err) => { warn!("Unable to extract metadata for {package_name}: {err}"); self.incomplete_packages - .borrow_mut() .entry(package_name.clone()) .or_default() - .borrow_mut() .insert( version.clone(), IncompletePackage::InconsistentMetadata(err.to_string()), @@ -1096,10 +1096,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide MetadataResponse::InvalidStructure(err) => { warn!("Unable to extract metadata for {package_name}: {err}"); self.incomplete_packages - .borrow_mut() .entry(package_name.clone()) .or_default() - .borrow_mut() .insert( version.clone(), IncompletePackage::InvalidStructure(err.to_string()), @@ -1124,7 +1122,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide extra.as_ref(), &self.urls, &self.locals, - self.markers, + self.markers.as_ref(), )?; for (dep_package, dep_version) in dependencies.iter() { @@ -1134,7 +1132,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide priorities.insert(dep_package, dep_version); // Emit a request to fetch the metadata for this package. - self.visit_package(dep_package, request_sink).await?; + self.visit_package(dep_package, request_sink)?; } Ok(Dependencies::Available(dependencies.into())) @@ -1155,12 +1153,13 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide } /// Fetch the metadata for a stream of packages and versions. - async fn fetch( - &self, - request_stream: tokio::sync::mpsc::Receiver, + async fn fetch( + self: Arc, + provider: Arc, + request_stream: Receiver, ) -> Result<(), ResolveError> { let mut response_stream = ReceiverStream::new(request_stream) - .map(|request| self.process_request(request).boxed_local()) + .map(|request| self.process_request(request, &*provider).boxed_local()) // Allow as many futures as possible to start in the background. // Backpressure is provided by at a more granular level by `DistributionDatabase` // and `SourceDispatch`, as well as the bounded request channel. @@ -1170,13 +1169,15 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide match response? { Some(Response::Package(package_name, version_map)) => { trace!("Received package metadata for: {package_name}"); - self.index.packages.done(package_name, Rc::new(version_map)); + self.index + .packages() + .done(package_name, Arc::new(version_map)); } Some(Response::Installed { dist, metadata }) => { trace!("Received installed distribution metadata for: {dist}"); - self.index.distributions.done( + self.index.distributions().done( dist.version_id(), - Rc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))), + Arc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))), ); } Some(Response::Dist { @@ -1194,8 +1195,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide _ => {} } self.index - .distributions - .done(dist.version_id(), Rc::new(metadata)); + .distributions() + .done(dist.version_id(), Arc::new(metadata)); } Some(Response::Dist { dist: Dist::Source(dist), @@ -1212,8 +1213,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide _ => {} } self.index - .distributions - .done(dist.version_id(), Rc::new(metadata)); + .distributions() + .done(dist.version_id(), Arc::new(metadata)); } None => {} } @@ -1223,12 +1224,15 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide } #[instrument(skip_all, fields(%request))] - async fn process_request(&self, request: Request) -> Result, ResolveError> { + async fn process_request( + &self, + request: Request, + provider: &Provider, + ) -> Result, ResolveError> { match request { // Fetch package metadata from the registry. Request::Package(package_name) => { - let package_versions = self - .provider + let package_versions = provider .get_package_versions(&package_name) .boxed_local() .await @@ -1239,8 +1243,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Fetch distribution metadata from the distribution database. Request::Dist(dist) => { - let metadata = self - .provider + let metadata = provider .get_or_build_wheel_metadata(&dist) .boxed_local() .await @@ -1274,7 +1277,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Wait for the package metadata to become available. let versions_response = self .index - .packages + .packages() .wait(&package_name) .await .ok_or(ResolveError::Unregistered)?; @@ -1284,21 +1287,18 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Short-circuit if we did not find any versions for the package VersionsResponse::NoIndex => { self.unavailable_packages - .borrow_mut() .insert(package_name.clone(), UnavailablePackage::NoIndex); return Ok(None); } VersionsResponse::Offline => { self.unavailable_packages - .borrow_mut() .insert(package_name.clone(), UnavailablePackage::Offline); return Ok(None); } VersionsResponse::NotFound => { self.unavailable_packages - .borrow_mut() .insert(package_name.clone(), UnavailablePackage::NotFound); return Ok(None); @@ -1312,7 +1312,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide &range, version_map, &self.preferences, - self.installed_packages, + &self.installed_packages, &self.exclusions, ) else { return Ok(None); @@ -1324,13 +1324,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide }; // Emit a request to fetch the metadata for this version. - if self.index.distributions.register(candidate.version_id()) { + if self.index.distributions().register(candidate.version_id()) { let dist = dist.for_resolution().to_owned(); let response = match dist { ResolvedDist::Installable(dist) => { - let metadata = self - .provider + let metadata = provider .get_or_build_wheel_metadata(&dist) .boxed_local() .await @@ -1394,7 +1393,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide /// State that is used during unit propagation in the resolver. #[derive(Clone)] -struct ResolverState { +struct SolveState { /// The internal state used by the resolver. /// /// Note that not all parts of this state are strictly internal. For diff --git a/crates/uv-resolver/tests/resolver.rs b/crates/uv-resolver/tests/resolver.rs index 5eca33aa0e055..874400fdd75aa 100644 --- a/crates/uv-resolver/tests/resolver.rs +++ b/crates/uv-resolver/tests/resolver.rs @@ -145,7 +145,7 @@ async fn resolve( &index, &hashes, &build_context, - &installed_packages, + installed_packages, DistributionDatabase::new(&client, &build_context, concurrency.downloads), )?; Ok(resolver.resolve().await?) diff --git a/crates/uv-types/src/traits.rs b/crates/uv-types/src/traits.rs index d85a5ec17407e..e71d9fb058182 100644 --- a/crates/uv-types/src/traits.rs +++ b/crates/uv-types/src/traits.rs @@ -128,12 +128,13 @@ pub trait SourceBuildTrait { } /// A wrapper for [`uv_installer::SitePackages`] -pub trait InstalledPackagesProvider { +pub trait InstalledPackagesProvider: Clone + Send + Sync + 'static { fn iter(&self) -> impl Iterator; fn get_packages(&self, name: &PackageName) -> Vec<&InstalledDist>; } /// An [`InstalledPackagesProvider`] with no packages in it. +#[derive(Clone)] pub struct EmptyInstalledPackages; impl InstalledPackagesProvider for EmptyInstalledPackages { diff --git a/crates/uv/src/commands/pip/compile.rs b/crates/uv/src/commands/pip/compile.rs index 2080543027e98..1a6e266c87aa4 100644 --- a/crates/uv/src/commands/pip/compile.rs +++ b/crates/uv/src/commands/pip/compile.rs @@ -545,7 +545,7 @@ pub(crate) async fn pip_compile( &top_level_index, &hasher, &build_dispatch, - &EmptyInstalledPackages, + EmptyInstalledPackages, DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), )? .with_reporter(ResolverReporter::from(printer)); diff --git a/crates/uv/src/commands/pip/editables.rs b/crates/uv/src/commands/pip/editables.rs index efaade03bc5a9..ad1c610458e9b 100644 --- a/crates/uv/src/commands/pip/editables.rs +++ b/crates/uv/src/commands/pip/editables.rs @@ -43,7 +43,7 @@ impl ResolvedEditables { #[allow(clippy::too_many_arguments)] pub(crate) async fn resolve( editables: Vec, - site_packages: &SitePackages<'_>, + site_packages: &SitePackages, reinstall: &Reinstall, hasher: &HashStrategy, interpreter: &Interpreter, diff --git a/crates/uv/src/commands/pip/install.rs b/crates/uv/src/commands/pip/install.rs index 7539e7c72b23c..ae1d540a20e5d 100644 --- a/crates/uv/src/commands/pip/install.rs +++ b/crates/uv/src/commands/pip/install.rs @@ -427,7 +427,7 @@ pub(crate) async fn pip_install( project, &editables, &hasher, - &site_packages, + site_packages.clone(), &reinstall, &upgrade, &interpreter, @@ -574,7 +574,7 @@ async fn resolve( project: Option, editables: &[ResolvedEditable], hasher: &HashStrategy, - site_packages: &SitePackages<'_>, + site_packages: SitePackages, reinstall: &Reinstall, upgrade: &Upgrade, interpreter: &Interpreter, @@ -733,7 +733,7 @@ async fn resolve( async fn install( resolution: &Resolution, editables: &[ResolvedEditable], - site_packages: SitePackages<'_>, + site_packages: SitePackages, reinstall: &Reinstall, no_binary: &NoBinary, link_mode: LinkMode, diff --git a/crates/uv/src/commands/pip/sync.rs b/crates/uv/src/commands/pip/sync.rs index 2525466cd8c27..c2370928078f9 100644 --- a/crates/uv/src/commands/pip/sync.rs +++ b/crates/uv/src/commands/pip/sync.rs @@ -381,7 +381,7 @@ pub(crate) async fn pip_sync( &hasher, &build_dispatch, // TODO(zanieb): We should consider support for installed packages in pip sync - &EmptyInstalledPackages, + EmptyInstalledPackages, DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), )? .with_reporter(reporter); diff --git a/crates/uv/src/commands/project/lock.rs b/crates/uv/src/commands/project/lock.rs index 081d9e620330e..52da445e26dab 100644 --- a/crates/uv/src/commands/project/lock.rs +++ b/crates/uv/src/commands/project/lock.rs @@ -111,7 +111,7 @@ pub(crate) async fn lock( // Resolve the requirements. let resolution = project::resolve( spec, - &EmptyInstalledPackages, + EmptyInstalledPackages, &hasher, &interpreter, tags, diff --git a/crates/uv/src/commands/project/mod.rs b/crates/uv/src/commands/project/mod.rs index 23b5441a4bc04..35bb047520688 100644 --- a/crates/uv/src/commands/project/mod.rs +++ b/crates/uv/src/commands/project/mod.rs @@ -115,7 +115,7 @@ pub(crate) fn init( #[allow(clippy::too_many_arguments)] pub(crate) async fn resolve( spec: RequirementsSpecification, - installed_packages: &InstalledPackages, + installed_packages: InstalledPackages, hasher: &HashStrategy, interpreter: &Interpreter, tags: &Tags, @@ -242,7 +242,7 @@ pub(crate) async fn resolve( #[allow(clippy::too_many_arguments)] pub(crate) async fn install( resolution: &Resolution, - site_packages: SitePackages<'_>, + site_packages: SitePackages, no_binary: &NoBinary, link_mode: LinkMode, index_urls: &IndexLocations, diff --git a/crates/uv/src/commands/project/run.rs b/crates/uv/src/commands/project/run.rs index 9ead77768d18b..312b67a87bdad 100644 --- a/crates/uv/src/commands/project/run.rs +++ b/crates/uv/src/commands/project/run.rs @@ -292,7 +292,7 @@ async fn update_environment( // Resolve the requirements. let resolution = match project::resolve( spec, - &site_packages, + site_packages.clone(), &hasher, &interpreter, tags,