diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 417c5202..30a1d856 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -159,13 +160,7 @@ struct Machine unsigned int maxJobs = 1; float speedFactor = 1.0; - Sync currentJobs; - - Machine() - { - auto currentJobs_(currentJobs.lock()); - *currentJobs_ = 0; - } + std::atomic currentJobs{0}; bool supportsStep(Step::ptr step) { @@ -187,13 +182,11 @@ struct MachineReservation Machine::ptr machine; MachineReservation(Machine::ptr machine) : machine(machine) { - auto currentJobs_(machine->currentJobs.lock()); - (*currentJobs_)++; + machine->currentJobs++; } ~MachineReservation() { - auto currentJobs_(machine->currentJobs.lock()); - if (*currentJobs_ > 0) (*currentJobs_)--; + machine->currentJobs--; } }; @@ -284,8 +277,6 @@ public: void wakeDispatcher(); - MachineReservation::ptr findMachine(Step::ptr step); - void builder(Step::ptr step, MachineReservation::ptr reservation); /* Perform the given build step. Return true if the step is to be @@ -878,49 +869,98 @@ void State::dispatcher() auto sleepUntil = system_time::max(); - { - auto runnable_(runnable.lock()); - printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size()); + bool keepGoing; - /* FIXME: we're holding the runnable lock too long - here. This could be more efficient. */ + do { + /* Bail out when there are no slots left. */ + std::vector machinesSorted; + { + auto machines_(machines.lock()); + machinesSorted.insert(machinesSorted.end(), + machines_->begin(), machines_->end()); + } + /* Sort the machines by a combination of speed factor and + available slots. Prioritise the available machines as + follows: + + - First by load divided by speed factor, rounded to the + nearest integer. This causes fast machines to be + preferred over slow machines with similar loads. + + - Then by speed factor. + + - Finally by load. */ + sort(machinesSorted.begin(), machinesSorted.end(), + [](const Machine::ptr & a, const Machine::ptr & b) -> bool + { + float ta = roundf(a->currentJobs / a->speedFactor); + float tb = roundf(b->currentJobs / b->speedFactor); + return + ta != tb ? ta > tb : + a->speedFactor != b->speedFactor ? a->speedFactor > b->speedFactor : + a->maxJobs > b->maxJobs; + }); + + /* Find a machine with a free slot and find a step to run + on it. Once we find such a pair, we restart the outer + loop because the machine sorting will have changed. */ + keepGoing = false; system_time now = std::chrono::system_clock::now(); - for (auto i = runnable_->begin(); i != runnable_->end(); ) { - auto step = i->lock(); + for (auto & machine : machinesSorted) { + // FIXME: can we lose a wakeup if a builder exits concurrently? + if (machine->currentJobs >= machine->maxJobs) continue; - /* Delete dead steps. */ - if (!step) { - i = runnable_->erase(i); - continue; - } + auto runnable_(runnable.lock()); + printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size()); - /* Skip previously failed steps that aren't ready to - be retried. */ - { - auto step_(step->state.lock()); - if (step_->tries > 0 && step_->after > now) { - if (step_->after < sleepUntil) - sleepUntil = step_->after; + /* FIXME: we're holding the runnable lock too long + here. This could be more efficient. */ + + for (auto i = runnable_->begin(); i != runnable_->end(); ) { + auto step = i->lock(); + + /* Delete dead steps. */ + if (!step) { + i = runnable_->erase(i); + continue; + } + + /* Can this machine do this step? */ + if (!machine->supportsStep(step)) { ++i; continue; } + + /* Skip previously failed steps that aren't ready + to be retried. */ + { + auto step_(step->state.lock()); + if (step_->tries > 0 && step_->after > now) { + if (step_->after < sleepUntil) + sleepUntil = step_->after; + ++i; + continue; + } + } + + /* Make a slot reservation and start a thread to + do the build. */ + auto reservation = std::make_shared(machine); + i = runnable_->erase(i); + + auto builderThread = std::thread(&State::builder, this, step, reservation); + builderThread.detach(); // FIXME? + + keepGoing = true; + break; } - auto reservation = findMachine(step); - if (!reservation) { - printMsg(lvlDebug, format("cannot execute step ‘%1%’ right now") % step->drvPath); - ++i; - continue; - } - - i = runnable_->erase(i); - - auto builderThread = std::thread(&State::builder, this, step, reservation); - builderThread.detach(); // FIXME? + if (keepGoing) break; } - } + + } while (keepGoing); /* Sleep until we're woken up (either because a runnable build is added, or because a build finishes). */ @@ -944,23 +984,6 @@ void State::wakeDispatcher() } -MachineReservation::ptr State::findMachine(Step::ptr step) -{ - auto machines_(machines.lock()); - - for (auto & machine : *machines_) { - if (!machine->supportsStep(step)) continue; - { - auto currentJobs_(machine->currentJobs.lock()); - if (*currentJobs_ >= machine->maxJobs) continue; - } - return std::make_shared(machine); - } - - return 0; -} - - void State::builder(Step::ptr step, MachineReservation::ptr reservation) { bool retry = true; @@ -1274,9 +1297,8 @@ void State::dumpStatus() { auto machines_(machines.lock()); for (auto & m : *machines_) { - auto currentJobs_(m->currentJobs.lock()); printMsg(lvlError, format("machine %1%: %2%/%3% active") - % m->sshName % *currentJobs_ % m->maxJobs); + % m->sshName % m->currentJobs % m->maxJobs); } } }