From 79b96c571c751753b42dfdcdbeb99a3138690051 Mon Sep 17 00:00:00 2001 From: cyqsimon <28627918+cyqsimon@users.noreply.github.com> Date: Mon, 22 Jan 2024 21:13:36 +0800 Subject: [PATCH] Built-in glob matchers build offload This is an initial implementation --- src/syntax_mapping.rs | 5 +- src/syntax_mapping/builtin.rs | 95 ++++++++++++++++++++++++++++++++++- 2 files changed, 96 insertions(+), 4 deletions(-) diff --git a/src/syntax_mapping.rs b/src/syntax_mapping.rs index 0dac0c02d1..9859ff82d4 100644 --- a/src/syntax_mapping.rs +++ b/src/syntax_mapping.rs @@ -3,7 +3,7 @@ use std::path::Path; use globset::{Candidate, GlobBuilder, GlobMatcher}; use crate::error::Result; -use builtin::BUILTIN_MAPPINGS; +use builtin::OffloadIter; use ignored_suffixes::IgnoredSuffixes; mod builtin; @@ -91,8 +91,7 @@ impl<'a> SyntaxMapping<'a> { pub fn builtin_mappings( &self, ) -> impl Iterator)> { - BUILTIN_MAPPINGS - .iter() + OffloadIter::new() .filter_map(|(matcher, target)| matcher.as_ref().map(|glob| (glob, target))) } diff --git a/src/syntax_mapping/builtin.rs b/src/syntax_mapping/builtin.rs index 1822be5706..4b32253193 100644 --- a/src/syntax_mapping/builtin.rs +++ b/src/syntax_mapping/builtin.rs @@ -1,4 +1,4 @@ -use std::env; +use std::{env, iter::Enumerate, slice, sync::mpsc, thread}; use globset::GlobMatcher; use once_cell::sync::Lazy; @@ -89,3 +89,96 @@ enum MatcherSegment { Text(&'static str), Env(&'static str), } + +/// The maximum number of offload workers. +const MAX_OFFLOAD_WORKERS: usize = 8; +/// The minimum number of built glob matchers remaining before offload workers +/// start building the next batch. +const OFFLOAD_PREEMPT_MARGIN: usize = 2; + +/// An iterator over the builtin mappings that offloads glob matcher building to +/// worker threads. +#[derive(Debug)] +pub struct OffloadIter { + /// The iterator tracking the item returned by `next()`. + next_iter: Enumerate>, MappingTarget<'static>)>>, + + /// The index of the next item that is to be fed to workers. + /// + /// Used to determine whether the next offload batch should be triggered. + build_idx: usize, + /// The iterator tracking the next item that is to be fed to workers. + build_iter: slice::Iter<'static, (Lazy>, MappingTarget<'static>)>, + + /// Control channels for worker threads. + workers: Vec>>>, +} + +impl OffloadIter { + pub fn new() -> Self { + let worker_count = match thread::available_parallelism() { + Ok(n) => (n.get() - 1).min(MAX_OFFLOAD_WORKERS), // leave 1 for main thread + Err(_) => 0, + }; + + let workers = (0..worker_count) + .map(|_| { + let (cmd_tx, cmd_rx) = mpsc::channel(); + thread::spawn(move || loop { + match cmd_rx.recv() { + Ok(cell) => { + Lazy::force(cell); + } + Err(_) => break, // cmd_tx dropped; nothing more to do + } + }); // no need for the join handle; thread will halt when cmd_tx is dropped + cmd_tx + }) + .collect(); + + Self { + next_iter: BUILTIN_MAPPINGS.iter().enumerate(), + build_idx: 0, + build_iter: BUILTIN_MAPPINGS.iter(), + workers, + } + } +} + +impl Iterator for OffloadIter { + type Item = &'static (Lazy>, MappingTarget<'static>); + + fn next(&mut self) -> Option { + let (idx, item) = self.next_iter.next()?; + + if idx + OFFLOAD_PREEMPT_MARGIN < self.build_idx + || idx + OFFLOAD_PREEMPT_MARGIN >= BUILTIN_MAPPINGS.len() + { + // no further work needed at this point + return Some(item); + } + + // feed jobs to workers + for cmd_tx in self.workers.iter() { + match self.build_iter.next() { + Some((unbuilt, _)) => { + cmd_tx + .send(unbuilt) + .expect("Offload worker should not hang up before main thread"); + self.build_idx += 1; + } + None => { + break; + } + } + } + + // halt workers if no longer needed + if self.build_idx >= BUILTIN_MAPPINGS.len() { + // workers stop when their current job is finished and cmd_tx is dropped + self.workers.clear(); + } + + Some(item) + } +}