From e2f7734f37394097f330c4073bf7784500afdc9d Mon Sep 17 00:00:00 2001 From: Nahor Date: Wed, 2 Oct 2024 14:42:50 -0700 Subject: [PATCH] Limit the amount of parallelism in check_all Don't create more threads than there are CPU cores. --- src/app_state.rs | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/src/app_state.rs b/src/app_state.rs index 28226d6..ec79188 100644 --- a/src/app_state.rs +++ b/src/app_state.rs @@ -5,7 +5,7 @@ use std::{ io::{self, Read, Seek, StdoutLock, Write}, path::{Path, MAIN_SEPARATOR_STR}, process::{Command, Stdio}, - sync::mpsc, + sync::{atomic::AtomicUsize, mpsc, Arc}, thread, }; @@ -20,6 +20,7 @@ use crate::{ }; const STATE_FILE_NAME: &str = ".rustlings-state.txt"; +const DEFAULT_CHECK_PARALLELISM: usize = 8; #[must_use] pub enum ExercisesProgress { @@ -411,17 +412,31 @@ impl AppState { let (mut checked_count, mut results) = thread::scope(|s| { let (tx, rx) = mpsc::channel(); + let exercise_ind = Arc::new(AtomicUsize::default()); - self.exercises - .iter() - .enumerate() - .for_each(|(index, exercise)| { - let tx = tx.clone(); - let cmd_runner = &self.cmd_runner; - let _ = thread::Builder::new().spawn_scoped(s, move || { - tx.send((index, exercise.run_exercise(None, cmd_runner))) - }); + let num_core = thread::available_parallelism() + .map_or(DEFAULT_CHECK_PARALLELISM, |count| count.get()); + (0..num_core).for_each(|_| { + let tx = tx.clone(); + let exercise_ind = exercise_ind.clone(); + let this = &self; + let _ = thread::Builder::new().spawn_scoped(s, move || { + loop { + let exercise_ind = + exercise_ind.fetch_add(1, std::sync::atomic::Ordering::AcqRel); + let Some(exercise) = this.exercises.get(exercise_ind) else { + // No more exercises + break; + }; + if tx + .send((exercise_ind, exercise.run_exercise(None, &this.cmd_runner))) + .is_err() + { + break; + } + } }); + }); // Drop this `tx`, since the `rx` loop will not stop while there is // at least one tx alive (i.e. we want the loop to block only while