Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ pixi_progress = { workspace = true }
pixi_pypi_spec = { workspace = true }
pixi_record = { workspace = true }
pixi_spec = { workspace = true }
pixi_spec_containers = { workspace = true }
pixi_toml = { workspace = true }
pixi_utils = { workspace = true, default-features = false }
pixi_uv_conversions = { workspace = true }
Expand Down
208 changes: 103 additions & 105 deletions src/global/project/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
},
prefix::{Executable, Prefix},
repodata::Repodata,
reporters::TopLevelProgress,
rlimit::try_increase_rlimit_to_sensible,
};

Expand All @@ -25,7 +26,7 @@ use std::{
fmt::{Debug, Formatter},
path::{Path, PathBuf},
str::FromStr,
sync::{Arc, LazyLock},
sync::Arc,
};

use ahash::HashSet;
Expand All @@ -35,34 +36,35 @@ use fs::tokio as tokio_fs;
use fs_err as fs;
use futures::stream::StreamExt;
use indexmap::{IndexMap, IndexSet};
use indicatif::ProgressBar;
use is_executable::IsExecutable;
use itertools::Itertools;
pub(crate) use manifest::{ExposedType, Manifest, Mapping};
use miette::{Context, IntoDiagnostic, miette};
use miette::{Context, IntoDiagnostic};
use once_cell::sync::OnceCell;
use parsed_manifest::ParsedManifest;
pub(crate) use parsed_manifest::{ExposedName, ParsedEnvironment};
use pixi_build_discovery::EnabledProtocols;
use pixi_command_dispatcher::{
BuildEnvironment, CommandDispatcher, InstallPixiEnvironmentSpec, PixiEnvironmentSpec,
};
use pixi_config::{Config, default_channel_config, pixi_home};
use pixi_consts::consts::{self, CACHED_PACKAGES};
use pixi_consts::consts::{self};
use pixi_manifest::PrioritizedChannel;
use pixi_progress::{await_in_progress, global_multi_progress, wrap_in_progress};
use pixi_progress::global_multi_progress;
use pixi_spec_containers::DependencyMap;
use pixi_utils::{executable_from_path, reqwest::build_reqwest_clients};
use rattler::{
install::{DefaultProgressFormatter, IndicatifReporter, Installer},
package_cache::PackageCache,
};
use rattler_conda_types::{
ChannelConfig, GenericVirtualPackage, MatchSpec, PackageName, Platform, PrefixRecord,
menuinst::MenuMode,
};
use rattler_lock::Matches;
use rattler_repodata_gateway::Gateway;
use rattler_solve::{SolverImpl, SolverTask, resolvo::Solver};
// Removed unused rattler_solve imports
use rattler_virtual_packages::{VirtualPackage, VirtualPackageOverrides};
use reqwest_middleware::ClientWithMiddleware;
use tokio::sync::Semaphore;
use toml_edit::DocumentMut;
use uv_configuration::RAYON_INITIALIZE;

mod environment;
mod manifest;
Expand Down Expand Up @@ -96,6 +98,9 @@ pub struct Project {
repodata_gateway: OnceCell<Gateway>,
/// The concurrent request semaphore
concurrent_downloads_semaphore: OnceCell<Arc<Semaphore>>,
/// The command dispatcher for solving environments
/// This is wrapped in a `OnceCell` to allow for lazy initialization.
command_dispatcher: OnceCell<CommandDispatcher>,
}

impl Debug for Project {
Expand Down Expand Up @@ -274,6 +279,7 @@ impl Project {
client,
repodata_gateway,
concurrent_downloads_semaphore: OnceCell::new(),
command_dispatcher: OnceCell::new(),
}
}

Expand Down Expand Up @@ -500,110 +506,83 @@ impl Project {

let platform = environment.platform.unwrap_or_else(Platform::current);

let (match_specs, dependencies_names) = environment
.dependencies
.specs
.iter()
.map(|(name, spec)| {
if let Some(nameless_spec) = spec
.clone()
.try_into_nameless_match_spec(self.config().global_channel_config())
.into_diagnostic()?
{
Ok((
MatchSpec::from_nameless(nameless_spec, Some(name.clone())),
name.clone(),
))
} else {
Err(miette!("Couldn't convert {spec:?} to nameless match spec."))
}
})
.collect::<miette::Result<(Vec<MatchSpec>, Vec<PackageName>)>>()?;
// Convert dependency specs to binary specs for CommandDispatcher
let mut pixi_specs = DependencyMap::default();
let mut dependencies_names = Vec::new();

let repodata = await_in_progress(
format!(
"Querying repodata for environment: {} ",
env_name.fancy_display()
),
|_| async {
self.repodata_gateway()?
.query(channels, [platform, Platform::NoArch], match_specs.clone())
.recursive(true)
.await
.into_diagnostic()
},
)
.await?;
for (name, spec) in &environment.dependencies.specs {
pixi_specs.insert(name.clone(), spec.clone());
dependencies_names.push(name.clone());
}

// Determine virtual packages of the current platform
let virtual_packages = VirtualPackage::detect(&VirtualPackageOverrides::default())
.into_diagnostic()
.wrap_err_with(|| {
miette::miette!(
"Failed to determine virtual packages for environment {}",
env_name.fancy_display()
)
})?
.iter()
.cloned()
.map(GenericVirtualPackage::from)
.collect();
let command_dispatcher = self.command_dispatcher()?;

// Solve the environment
let cloned_env_name = env_name.clone();
let solved_records = tokio::task::spawn_blocking(move || {
wrap_in_progress(
format!("Solving environment: {}", cloned_env_name.fancy_display()),
move || {
Solver.solve(SolverTask {
specs: match_specs,
virtual_packages,
..SolverTask::from_iter(&repodata)
})
},
)
.into_diagnostic()
})
.await
.into_diagnostic()??;
// Check if the platform matches the current platform (OS)
// We only need to detect virtual packages if the platform is the current one.
// Otherwise, we use an empty list
let virtual_packages = if platform
.only_platform()
.map(|p| p == Platform::current().only_platform().unwrap_or(""))
.unwrap_or(false)
{
VirtualPackage::detect(&VirtualPackageOverrides::default())
.into_diagnostic()
.wrap_err_with(|| {
miette::miette!(
"Failed to determine virtual packages for environment {}",
env_name.fancy_display()
)
})?
.iter()
.cloned()
.map(GenericVirtualPackage::from)
.collect()
} else {
vec![]
};

try_increase_rlimit_to_sensible();
// Create solve spec
let channels = channels
.into_iter()
.map(|channel| channel.base_url.clone())
.collect::<Vec<_>>();

let solve_spec = PixiEnvironmentSpec {
name: Some(env_name.to_string()),
dependencies: pixi_specs,
build_environment: BuildEnvironment::simple(platform, virtual_packages),
channels: channels.clone(),
channel_config: self.config.global_channel_config().clone(),
..Default::default()
};

// Solve using CommandDispatcher
let pixi_records = command_dispatcher
.solve_pixi_environment(solve_spec)
.await?;

// Force the initialization of the rayon thread pool to avoid implicit creation
// by the Installer.
LazyLock::force(&RAYON_INITIALIZE);
// Move this to a separate function to avoid code duplication
try_increase_rlimit_to_sensible();

// Install the environment
let package_cache = PackageCache::new(pixi_config::get_cache_dir()?.join(CACHED_PACKAGES));
let prefix = self.environment_prefix(env_name).await?;
let authenticated_client = self.authenticated_client()?.clone();
let result = await_in_progress(
format!(
"Creating virtual environment for {}",
env_name.fancy_display()
),
|pb| {
Installer::new()
.with_download_client(authenticated_client)
.with_execute_link_scripts(false)
.with_package_cache(package_cache)
.with_target_platform(platform)
.with_reporter(
IndicatifReporter::builder()
.with_multi_progress(global_multi_progress())
.with_placement(rattler::install::Placement::After(pb))
.with_formatter(DefaultProgressFormatter::default().with_prefix(" "))
.clear_when_done(true)
.finish(),
)
.install(prefix.root(), solved_records.records)
},
)
.await
.into_diagnostic()?;

let install_changes = get_install_changes(result.transaction);
let result = command_dispatcher
.install_pixi_environment(InstallPixiEnvironmentSpec {
name: env_name.to_string(),
records: pixi_records,
prefix: rattler_conda_types::prefix::Prefix::create(prefix.root())
.into_diagnostic()?,
target_platform: platform,
channels,
channel_config: self.config.global_channel_config().clone(),
enabled_protocols: EnabledProtocols::default(),
installed: None,
force_reinstall: Default::default(),
variants: None,
})
.await?;

let install_changes = get_install_changes(result.transaction);
Ok(EnvironmentUpdate::new(install_changes, dependencies_names))
}

Expand Down Expand Up @@ -1270,6 +1249,25 @@ impl Project {
})
.clone()
}

/// Returns the command dispatcher for this project.
fn command_dispatcher(&self) -> miette::Result<&CommandDispatcher> {
self.command_dispatcher.get_or_try_init(|| {
let multi_progress = global_multi_progress();
let anchor_pb = multi_progress.add(ProgressBar::hidden());
let cache_dirs = pixi_command_dispatcher::CacheDirs::new(
pixi_config::get_cache_dir()
.map_err(|e| miette::miette!("Failed to get cache directory: {}", e))?,
);

Ok(pixi_command_dispatcher::CommandDispatcher::builder()
.with_gateway(self.repodata_gateway()?.clone())
.with_cache_dirs(cache_dirs)
.with_reporter(TopLevelProgress::new(multi_progress, anchor_pb))
.with_root_dir(self.root.clone())
.finish())
})
}
}

impl Repodata for Project {
Expand Down
Loading