|
| 1 | +use std::cmp::min; |
| 2 | + |
| 3 | +use pubgrub::range::Range; |
| 4 | +use rustc_hash::FxHashMap; |
| 5 | +use tokio::sync::mpsc::Sender; |
| 6 | +use tracing::{debug, trace}; |
| 7 | + |
| 8 | +use distribution_types::{DistributionMetadata, ResolvedDistRef}; |
| 9 | +use pep440_rs::Version; |
| 10 | + |
| 11 | +use crate::candidate_selector::{CandidateDist, CandidateSelector}; |
| 12 | +use crate::pubgrub::PubGrubPackage; |
| 13 | +use crate::resolver::Request; |
| 14 | +use crate::{InMemoryIndex, ResolveError, VersionsResponse}; |
| 15 | + |
| 16 | +enum BatchPrefetchStrategy { |
| 17 | + /// Go through the next versions assuming the existing selection and its constraints |
| 18 | + /// remain. |
| 19 | + Compatible { |
| 20 | + compatible: Range<Version>, |
| 21 | + previous: Version, |
| 22 | + }, |
| 23 | + /// We encounter cases (botocore) where the above doesn't work: Say we previously selected |
| 24 | + /// a==x.y.z, which depends on b==x.y.z. a==x.y.z is incompatible, but we don't know that |
| 25 | + /// yet. We just selected b==x.y.z and want to prefetch, since for all versions of a we try, |
| 26 | + /// we have to wait for the matching version of b. The exiting range gives us only one version |
| 27 | + /// of b, so the compatible strategy doesn't prefetch any version. Instead, we try the next |
| 28 | + /// heuristic where the next version of b will be x.y.(z-1) and so forth. |
| 29 | + InOrder { previous: Version }, |
| 30 | +} |
| 31 | + |
| 32 | +/// Prefetch a large number of versions if we already unsuccessfully tried many versions. |
| 33 | +/// |
| 34 | +/// This is an optimization specifically targeted at cold cache urllib3/boto3/botocore, where we |
| 35 | +/// have to fetch the metadata for a lot of versions. |
| 36 | +/// |
| 37 | +/// Note that these all heuristics that could totally prefetch lots of irrelevant versions. |
| 38 | +#[derive(Default)] |
| 39 | +pub(crate) struct BatchPrefetcher { |
| 40 | + tried_versions: FxHashMap<PubGrubPackage, usize>, |
| 41 | + last_prefetch: FxHashMap<PubGrubPackage, usize>, |
| 42 | +} |
| 43 | + |
| 44 | +impl BatchPrefetcher { |
| 45 | + /// Prefetch a large number of versions if we already unsuccessfully tried many versions. |
| 46 | + pub(crate) async fn prefetch_batches( |
| 47 | + &mut self, |
| 48 | + next: &PubGrubPackage, |
| 49 | + version: &Version, |
| 50 | + current_range: &Range<Version>, |
| 51 | + request_sink: &Sender<Request>, |
| 52 | + index: &InMemoryIndex, |
| 53 | + selector: &CandidateSelector, |
| 54 | + ) -> anyhow::Result<(), ResolveError> { |
| 55 | + let PubGrubPackage::Package(package_name, _, _) = &next else { |
| 56 | + return Ok(()); |
| 57 | + }; |
| 58 | + |
| 59 | + let (num_tried, do_prefetch) = self.should_prefetch(next); |
| 60 | + if !do_prefetch { |
| 61 | + return Ok(()); |
| 62 | + } |
| 63 | + let total_prefetch = min(num_tried, 50); |
| 64 | + |
| 65 | + // This is immediate, we already fetched the version map. |
| 66 | + let versions_response = index |
| 67 | + .packages |
| 68 | + .wait(package_name) |
| 69 | + .await |
| 70 | + .ok_or(ResolveError::Unregistered)?; |
| 71 | + |
| 72 | + let VersionsResponse::Found(ref version_map) = *versions_response else { |
| 73 | + return Ok(()); |
| 74 | + }; |
| 75 | + |
| 76 | + let mut phase = BatchPrefetchStrategy::Compatible { |
| 77 | + compatible: current_range.clone(), |
| 78 | + previous: version.clone(), |
| 79 | + }; |
| 80 | + let mut i = 0; |
| 81 | + while i < total_prefetch { |
| 82 | + i += 1; |
| 83 | + let candidate = match phase { |
| 84 | + BatchPrefetchStrategy::Compatible { |
| 85 | + compatible, |
| 86 | + previous, |
| 87 | + } => { |
| 88 | + if let Some(candidate) = |
| 89 | + selector.select_no_preference(package_name, &compatible, version_map) |
| 90 | + { |
| 91 | + let compatible = compatible.intersection( |
| 92 | + &Range::singleton(candidate.version().clone()).complement(), |
| 93 | + ); |
| 94 | + phase = BatchPrefetchStrategy::Compatible { |
| 95 | + compatible, |
| 96 | + previous: candidate.version().clone(), |
| 97 | + }; |
| 98 | + candidate |
| 99 | + } else { |
| 100 | + // We exhausted the compatible version, switch to ignoring the existing |
| 101 | + // constraints on the package and instead going through versions in order. |
| 102 | + phase = BatchPrefetchStrategy::InOrder { previous }; |
| 103 | + continue; |
| 104 | + } |
| 105 | + } |
| 106 | + BatchPrefetchStrategy::InOrder { previous } => { |
| 107 | + let range = if selector.use_highest_version(package_name) { |
| 108 | + Range::strictly_lower_than(previous) |
| 109 | + } else { |
| 110 | + Range::strictly_higher_than(previous) |
| 111 | + }; |
| 112 | + if let Some(candidate) = |
| 113 | + selector.select_no_preference(package_name, &range, version_map) |
| 114 | + { |
| 115 | + phase = BatchPrefetchStrategy::InOrder { |
| 116 | + previous: candidate.version().clone(), |
| 117 | + }; |
| 118 | + candidate |
| 119 | + } else { |
| 120 | + // Both strategies exhausted their candidates. |
| 121 | + break; |
| 122 | + } |
| 123 | + } |
| 124 | + }; |
| 125 | + |
| 126 | + let CandidateDist::Compatible(dist) = candidate.dist() else { |
| 127 | + continue; |
| 128 | + }; |
| 129 | + let dist = dist.for_resolution(); |
| 130 | + |
| 131 | + // Emit a request to fetch the metadata for this version. |
| 132 | + trace!( |
| 133 | + "Prefetching {i} ({}) {}", |
| 134 | + match phase { |
| 135 | + BatchPrefetchStrategy::Compatible { .. } => "compatible", |
| 136 | + BatchPrefetchStrategy::InOrder { .. } => "in order", |
| 137 | + }, |
| 138 | + dist |
| 139 | + ); |
| 140 | + if index.distributions.register(candidate.package_id()) { |
| 141 | + let request = match dist { |
| 142 | + ResolvedDistRef::Installable(dist) => Request::Dist(dist.clone()), |
| 143 | + ResolvedDistRef::Installed(dist) => Request::Installed(dist.clone()), |
| 144 | + }; |
| 145 | + request_sink.send(request).await?; |
| 146 | + } |
| 147 | + } |
| 148 | + |
| 149 | + debug!("Prefetching {i} {package_name} versions"); |
| 150 | + |
| 151 | + self.last_prefetch.insert(next.clone(), num_tried); |
| 152 | + Ok(()) |
| 153 | + } |
| 154 | + |
| 155 | + /// Each time we tried a version for a package, we register that here. |
| 156 | + pub(crate) fn version_tried(&mut self, package: PubGrubPackage) { |
| 157 | + *self.tried_versions.entry(package).or_default() += 1; |
| 158 | + } |
| 159 | + |
| 160 | + /// After 5, 10, 20, 40 tried versions, prefetch that many versions to start early but not |
| 161 | + /// too aggressive. Later we schedule the prefetch of 50 versions every 20 versions, this gives |
| 162 | + /// us a good buffer until we see prefetch again and is high enough to saturate the task pool. |
| 163 | + fn should_prefetch(&self, next: &PubGrubPackage) -> (usize, bool) { |
| 164 | + let num_tried = self.tried_versions.get(next).copied().unwrap_or_default(); |
| 165 | + let previous_prefetch = self.last_prefetch.get(next).copied().unwrap_or_default(); |
| 166 | + let do_prefetch = (num_tried >= 5 && previous_prefetch < 5) |
| 167 | + || (num_tried >= 10 && previous_prefetch < 10) |
| 168 | + || (num_tried >= 20 && previous_prefetch < 20) |
| 169 | + || (num_tried >= 20 && num_tried - previous_prefetch >= 20); |
| 170 | + (num_tried, do_prefetch) |
| 171 | + } |
| 172 | +} |
0 commit comments