1
0
Fork 0
mirror of https://github.com/NixOS/hydra.git synced 2024-10-18 17:02:28 -04:00

redo the notify events between eval and queue-runner

This commit is contained in:
Michael Bishop 2018-09-10 18:20:44 -03:00
parent 2f39355bad
commit 73ca325d1c
3 changed files with 61 additions and 10 deletions

View file

@ -24,7 +24,7 @@ void State::queueMonitorLoop()
{ {
auto conn(dbPool.get()); auto conn(dbPool.get());
receiver buildsAdded(*conn, "builds_added"); receiver evalAdded(*conn, "eval_added");
receiver buildsRestarted(*conn, "builds_restarted"); receiver buildsRestarted(*conn, "builds_restarted");
receiver buildsCancelled(*conn, "builds_cancelled"); receiver buildsCancelled(*conn, "builds_cancelled");
receiver buildsDeleted(*conn, "builds_deleted"); receiver buildsDeleted(*conn, "builds_deleted");
@ -48,8 +48,9 @@ void State::queueMonitorLoop()
} else } else
conn->get_notifs(); conn->get_notifs();
if (auto lowestId = buildsAdded.get()) { if (auto evalIdStr = evalAdded.get()) {
lastBuildId = std::min(lastBuildId, static_cast<unsigned>(std::stoul(*lowestId) - 1)); unsigned int evalId = std::stoul(*evalIdStr);
getQueuedBuildsInEval(*conn, destStore, evalId);
printMsg(lvlTalkative, "got notification: new builds added to the queue"); printMsg(lvlTalkative, "got notification: new builds added to the queue");
} }
if (buildsRestarted.get()) { if (buildsRestarted.get()) {
@ -74,6 +75,53 @@ struct PreviousFailure : public std::exception {
}; };
bool State::getQueuedBuildsInEval(Connection & conn,
ref<Store> destStore, unsigned int evalId)
{
printInfo("checking the queue for builds in eval %d...", evalId);
/* Grab the queued builds from the database, but don't process
them yet (since we don't want a long-running transaction). */
std::vector<BuildID> newIDs;
std::map<BuildID, Build::ptr> newBuildsByID;
std::multimap<Path, BuildID> newBuildsByPath;
{
pqxx::work txn(conn);
pqxx::result res = txn.parameterized
("select id, project, jobset, job, drvPath, maxsilent, timeout, timestamp, globalPriority, priority from Builds "
"where id IN (select build from jobsetevalmembers where eval = $1) order by globalPriority desc, id")
(evalId).exec();
for (auto const & row : res) {
auto builds_(builds.lock());
BuildID id = row["id"].as<BuildID>();
if (buildOne && id != buildOne) continue;
if (builds_->count(id)) continue;
auto build = std::make_shared<Build>();
build->id = id;
build->drvPath = row["drvPath"].as<string>();
build->projectName = row["project"].as<string>();
build->jobsetName = row["jobset"].as<string>();
build->jobName = row["job"].as<string>();
build->maxSilentTime = row["maxsilent"].as<int>();
build->buildTimeout = row["timeout"].as<int>();
build->timestamp = row["timestamp"].as<time_t>();
build->globalPriority = row["globalPriority"].as<int>();
build->localPriority = row["priority"].as<int>();
build->jobset = createJobset(txn, build->projectName, build->jobsetName);
newIDs.push_back(id);
newBuildsByID[id] = build;
newBuildsByPath.emplace(std::make_pair(build->drvPath, id));
}
}
unsigned int lastBuildId = 0;
return finishQueuedBuilds(conn, destStore, lastBuildId, 0, newIDs, newBuildsByID, newBuildsByPath);
}
bool State::getQueuedBuilds(Connection & conn, bool State::getQueuedBuilds(Connection & conn,
ref<Store> destStore, unsigned int & lastBuildId) ref<Store> destStore, unsigned int & lastBuildId)
{ {
@ -90,7 +138,7 @@ bool State::getQueuedBuilds(Connection & conn,
{ {
pqxx::work txn(conn); pqxx::work txn(conn);
auto res = txn.parameterized pqxx::result res = txn.parameterized
("select id, project, jobset, job, drvPath, maxsilent, timeout, timestamp, globalPriority, priority from Builds " ("select id, project, jobset, job, drvPath, maxsilent, timeout, timestamp, globalPriority, priority from Builds "
"where id > $1 and finished = 0 order by globalPriority desc, id") "where id > $1 and finished = 0 order by globalPriority desc, id")
(lastBuildId).exec(); (lastBuildId).exec();
@ -120,6 +168,10 @@ bool State::getQueuedBuilds(Connection & conn,
newBuildsByPath.emplace(std::make_pair(build->drvPath, id)); newBuildsByPath.emplace(std::make_pair(build->drvPath, id));
} }
} }
return finishQueuedBuilds(conn, destStore, lastBuildId, newLastBuildId, newIDs, newBuildsByID, newBuildsByPath);
}
bool State::finishQueuedBuilds(Connection & conn, ref<Store> destStore, unsigned int & lastBuildId, unsigned int newLastBuildId, std::vector<BuildID> newIDs, std::map<BuildID, Build::ptr> newBuildsByID, std::multimap<Path, BuildID> newBuildsByPath) {
std::set<Step::ptr> newRunnable; std::set<Step::ptr> newRunnable;
unsigned int nrAdded; unsigned int nrAdded;

View file

@ -492,6 +492,9 @@ private:
/* Check the queue for new builds. */ /* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn, bool getQueuedBuilds(Connection & conn,
nix::ref<nix::Store> destStore, unsigned int & lastBuildId); nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
bool getQueuedBuildsInEval(Connection & conn,
nix::ref<nix::Store> destStore, unsigned int evalId);
bool finishQueuedBuilds(Connection & conn, nix::ref<nix::Store> destStore, unsigned int & lastBuildId, unsigned int newLastBuildId, std::vector<BuildID> newIDs, std::map<BuildID, Build::ptr> newBuildsByID, std::multimap<nix::Path, BuildID> newBuildsByPath);
/* Handle cancellation, deletion and priority bumps. */ /* Handle cancellation, deletion and priority bumps. */
void processQueueChange(Connection & conn); void processQueueChange(Connection & conn);

View file

@ -25,7 +25,7 @@ STDERR->autoflush(1);
binmode STDERR, ":encoding(utf8)"; binmode STDERR, ":encoding(utf8)";
my $db = Hydra::Model::DB->new(); my $db = Hydra::Model::DB->new();
my $notifyAdded = $db->storage->dbh->prepare("notify builds_added, ?"); my $notifyAdded = $db->storage->dbh->prepare("notify eval_added, ?");
my $config = getHydraConfig(); my $config = getHydraConfig();
@ -744,11 +744,7 @@ sub checkJobsetWrapped {
$ev->builds->update({iscurrent => 1}); $ev->builds->update({iscurrent => 1});
# Wake up hydra-queue-runner. # Wake up hydra-queue-runner.
my $lowestId; $notifyAdded->execute($ev->id);
while (my ($id, $x) = each %buildMap) {
$lowestId = $id if $x->{new} && (!defined $lowestId || $id < $lowestId);
}
$notifyAdded->execute($lowestId) if defined $lowestId;
} else { } else {
print STDERR " created cached eval ", $ev->id, "\n"; print STDERR " created cached eval ", $ev->id, "\n";