diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 862925b2..c0e559b9 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -24,7 +24,7 @@ void State::queueMonitorLoop() { auto conn(dbPool.get()); - receiver buildsAdded(*conn, "builds_added"); + receiver evalAdded(*conn, "eval_added"); receiver buildsRestarted(*conn, "builds_restarted"); receiver buildsCancelled(*conn, "builds_cancelled"); receiver buildsDeleted(*conn, "builds_deleted"); @@ -48,8 +48,9 @@ void State::queueMonitorLoop() } else conn->get_notifs(); - if (auto lowestId = buildsAdded.get()) { - lastBuildId = std::min(lastBuildId, static_cast(std::stoul(*lowestId) - 1)); + if (auto evalIdStr = evalAdded.get()) { + unsigned int evalId = std::stoul(*evalIdStr); + getQueuedBuildsInEval(*conn, destStore, evalId); printMsg(lvlTalkative, "got notification: new builds added to the queue"); } if (buildsRestarted.get()) { @@ -74,6 +75,53 @@ struct PreviousFailure : public std::exception { }; +bool State::getQueuedBuildsInEval(Connection & conn, + ref destStore, unsigned int evalId) +{ + printInfo("checking the queue for builds in eval %d...", evalId); + + /* Grab the queued builds from the database, but don't process + them yet (since we don't want a long-running transaction). */ + std::vector newIDs; + std::map newBuildsByID; + std::multimap newBuildsByPath; + + { + pqxx::work txn(conn); + + pqxx::result res = txn.parameterized + ("select id, project, jobset, job, drvPath, maxsilent, timeout, timestamp, globalPriority, priority from Builds " + "where id IN (select build from jobsetevalmembers where eval = $1) order by globalPriority desc, id") + (evalId).exec(); + + for (auto const & row : res) { + auto builds_(builds.lock()); + BuildID id = row["id"].as(); + if (buildOne && id != buildOne) continue; + if (builds_->count(id)) continue; + + auto build = std::make_shared(); + build->id = id; + build->drvPath = row["drvPath"].as(); + build->projectName = row["project"].as(); + build->jobsetName = row["jobset"].as(); + build->jobName = row["job"].as(); + build->maxSilentTime = row["maxsilent"].as(); + build->buildTimeout = row["timeout"].as(); + build->timestamp = row["timestamp"].as(); + build->globalPriority = row["globalPriority"].as(); + build->localPriority = row["priority"].as(); + build->jobset = createJobset(txn, build->projectName, build->jobsetName); + + newIDs.push_back(id); + newBuildsByID[id] = build; + newBuildsByPath.emplace(std::make_pair(build->drvPath, id)); + } + } + unsigned int lastBuildId = 0; + return finishQueuedBuilds(conn, destStore, lastBuildId, 0, newIDs, newBuildsByID, newBuildsByPath); +} + bool State::getQueuedBuilds(Connection & conn, ref destStore, unsigned int & lastBuildId) { @@ -90,7 +138,7 @@ bool State::getQueuedBuilds(Connection & conn, { pqxx::work txn(conn); - auto res = txn.parameterized + pqxx::result res = txn.parameterized ("select id, project, jobset, job, drvPath, maxsilent, timeout, timestamp, globalPriority, priority from Builds " "where id > $1 and finished = 0 order by globalPriority desc, id") (lastBuildId).exec(); @@ -120,6 +168,10 @@ bool State::getQueuedBuilds(Connection & conn, newBuildsByPath.emplace(std::make_pair(build->drvPath, id)); } } + return finishQueuedBuilds(conn, destStore, lastBuildId, newLastBuildId, newIDs, newBuildsByID, newBuildsByPath); +} + +bool State::finishQueuedBuilds(Connection & conn, ref destStore, unsigned int & lastBuildId, unsigned int newLastBuildId, std::vector newIDs, std::map newBuildsByID, std::multimap newBuildsByPath) { std::set newRunnable; unsigned int nrAdded; diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 0e91ab56..cf019676 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -492,6 +492,9 @@ private: /* Check the queue for new builds. */ bool getQueuedBuilds(Connection & conn, nix::ref destStore, unsigned int & lastBuildId); + bool getQueuedBuildsInEval(Connection & conn, + nix::ref destStore, unsigned int evalId); + bool finishQueuedBuilds(Connection & conn, nix::ref destStore, unsigned int & lastBuildId, unsigned int newLastBuildId, std::vector newIDs, std::map newBuildsByID, std::multimap newBuildsByPath); /* Handle cancellation, deletion and priority bumps. */ void processQueueChange(Connection & conn); diff --git a/src/script/hydra-eval-jobset b/src/script/hydra-eval-jobset index 9d375c1b..8c026c2f 100755 --- a/src/script/hydra-eval-jobset +++ b/src/script/hydra-eval-jobset @@ -25,7 +25,7 @@ STDERR->autoflush(1); binmode STDERR, ":encoding(utf8)"; my $db = Hydra::Model::DB->new(); -my $notifyAdded = $db->storage->dbh->prepare("notify builds_added, ?"); +my $notifyAdded = $db->storage->dbh->prepare("notify eval_added, ?"); my $config = getHydraConfig(); @@ -744,11 +744,7 @@ sub checkJobsetWrapped { $ev->builds->update({iscurrent => 1}); # Wake up hydra-queue-runner. - my $lowestId; - while (my ($id, $x) = each %buildMap) { - $lowestId = $id if $x->{new} && (!defined $lowestId || $id < $lowestId); - } - $notifyAdded->execute($lowestId) if defined $lowestId; + $notifyAdded->execute($ev->id); } else { print STDERR " created cached eval ", $ev->id, "\n";