diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 5552e91a..73f46a53 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -32,6 +32,8 @@ static void append(Strings & dst, const Strings & src) dst.insert(dst.end(), src.begin(), src.end()); } +namespace nix::build_remote { + static Strings extraStoreArgs(std::string & machine) { Strings result; @@ -107,33 +109,27 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Chil } -static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore, - FdSource & from, FdSink & to, ServeProto::Version remoteVersion, const StorePathSet & paths, +static void copyClosureTo( + Machine::Connection & conn, + Store & destStore, + const StorePathSet & paths, bool useSubstitutes = false) { StorePathSet closure; destStore.computeFSClosure(paths, closure); - ServeProto::WriteConn wconn { - .to = to, - .version = remoteVersion, - }; - ServeProto::ReadConn rconn { - .from = from, - .version = remoteVersion, - }; /* Send the "query valid paths" command with the "lock" option enabled. This prevents a race where the remote host garbage-collect paths that are already there. Optionally, ask the remote host to substitute missing paths. */ // FIXME: substitute output pollutes our build log - to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes; - ServeProto::write(destStore, wconn, closure); - to.flush(); + conn.to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes; + ServeProto::write(destStore, conn, closure); + conn.to.flush(); /* Get back the set of paths that are already valid on the remote host. */ - auto present = ServeProto::Serialise::read(destStore, rconn); + auto present = ServeProto::Serialise::read(destStore, conn); if (present.size() == closure.size()) return; @@ -145,20 +141,20 @@ static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore, printMsg(lvlDebug, "sending %d missing paths", missing.size()); - std::unique_lock sendLock(sendMutex, + std::unique_lock sendLock(conn.machine->state->sendLock, std::chrono::seconds(600)); - to << ServeProto::Command::ImportPaths; - destStore.exportPaths(missing, to); - to.flush(); + conn.to << ServeProto::Command::ImportPaths; + destStore.exportPaths(missing, conn.to); + conn.to.flush(); - if (readInt(from) != 1) + if (readInt(conn.from) != 1) throw Error("remote machine failed to import closure"); } // FIXME: use Store::topoSortPaths(). -StorePaths reverseTopoSortPaths(const std::map & paths) +static StorePaths reverseTopoSortPaths(const std::map & paths) { StorePaths sorted; StorePathSet visited; @@ -186,24 +182,311 @@ StorePaths reverseTopoSortPaths(const std::map & paths return sorted; } +static std::pair openLogFile(const std::string & logDir, const StorePath & drvPath) +{ + std::string base(drvPath.to_string()); + auto logFile = logDir + "/" + std::string(base, 0, 2) + "/" + std::string(base, 2); + + createDirs(dirOf(logFile)); + + AutoCloseFD logFD = open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666); + if (!logFD) throw SysError("creating log file ‘%s’", logFile); + + return {std::move(logFile), std::move(logFD)}; +} + +/** + * @param conn is not fully initialized; it is this functions job to set + * the `remoteVersion` field after the handshake is completed. + * Therefore, no `ServeProto::Serialize` functions can be used until + * that field is set. + */ +static void handshake(Machine::Connection & conn, unsigned int repeats) +{ + conn.to << SERVE_MAGIC_1 << 0x206; + conn.to.flush(); + + unsigned int magic = readInt(conn.from); + if (magic != SERVE_MAGIC_2) + throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", conn.machine->sshName); + conn.remoteVersion = readInt(conn.from); + // Now `conn` is initialized. + if (GET_PROTOCOL_MAJOR(conn.remoteVersion) != 0x200) + throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", conn.machine->sshName); + if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 3 && repeats > 0) + throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", conn.machine->sshName); +} + +static BasicDerivation sendInputs( + State & state, + Step & step, + Store & localStore, + Store & destStore, + Machine::Connection & conn, + unsigned int & overhead, + counter & nrStepsWaiting, + counter & nrStepsCopyingTo +) +{ + BasicDerivation basicDrv(*step.drv); + + for (const auto & [drvPath, node] : step.drv->inputDrvs.map) { + auto drv2 = localStore.readDerivation(drvPath); + for (auto & name : node.value) { + if (auto i = get(drv2.outputs, name)) { + auto outPath = i->path(localStore, drv2.name, name); + basicDrv.inputSrcs.insert(*outPath); + } + } + } + + /* Ensure that the inputs exist in the destination store. This is + a no-op for regular stores, but for the binary cache store, + this will copy the inputs to the binary cache from the local + store. */ + if (&localStore != &destStore) { + copyClosure(localStore, destStore, + step.drv->inputSrcs, + NoRepair, NoCheckSigs, NoSubstitute); + } + + { + auto mc1 = std::make_shared>(nrStepsWaiting); + mc1.reset(); + MaintainCount mc2(nrStepsCopyingTo); + + printMsg(lvlDebug, "sending closure of ‘%s’ to ‘%s’", + localStore.printStorePath(step.drvPath), conn.machine->sshName); + + auto now1 = std::chrono::steady_clock::now(); + + /* Copy the input closure. */ + if (conn.machine->isLocalhost()) { + StorePathSet closure; + destStore.computeFSClosure(basicDrv.inputSrcs, closure); + copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute); + } else { + copyClosureTo(conn, destStore, basicDrv.inputSrcs, true); + } + + auto now2 = std::chrono::steady_clock::now(); + + overhead += std::chrono::duration_cast(now2 - now1).count(); + } + + return basicDrv; +} + +static BuildResult performBuild( + Machine::Connection & conn, + Store & localStore, + StorePath drvPath, + const BasicDerivation & drv, + const State::BuildOptions & options, + counter & nrStepsBuilding +) +{ + + BuildResult result; + + conn.to << ServeProto::Command::BuildDerivation << localStore.printStorePath(drvPath); + writeDerivation(conn.to, localStore, drv); + conn.to << options.maxSilentTime << options.buildTimeout; + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 2) + conn.to << options.maxLogSize; + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) { + conn.to + << options.repeats // == build-repeat + << options.enforceDeterminism; + } + conn.to.flush(); + + result.startTime = time(0); + + { + MaintainCount mc(nrStepsBuilding); + result.status = (BuildResult::Status)readInt(conn.from); + } + result.stopTime = time(0); + + + result.errorMsg = readString(conn.from); + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) { + result.timesBuilt = readInt(conn.from); + result.isNonDeterministic = readInt(conn.from); + auto start = readInt(conn.from); + auto stop = readInt(conn.from); + if (start && start) { + /* Note: this represents the duration of a single + round, rather than all rounds. */ + result.startTime = start; + result.stopTime = stop; + } + } + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 6) { + ServeProto::Serialise::read(localStore, conn); + } + + return result; +} + +static std::map queryPathInfos( + Machine::Connection & conn, + Store & localStore, + StorePathSet & outputs, + size_t & totalNarSize +) +{ + + /* Get info about each output path. */ + std::map infos; + conn.to << ServeProto::Command::QueryPathInfos; + ServeProto::write(localStore, conn, outputs); + conn.to.flush(); + while (true) { + auto storePathS = readString(conn.from); + if (storePathS == "") break; + auto deriver = readString(conn.from); // deriver + auto references = ServeProto::Serialise::read(localStore, conn); + readLongLong(conn.from); // download size + auto narSize = readLongLong(conn.from); + auto narHash = Hash::parseAny(readString(conn.from), htSHA256); + auto ca = ContentAddress::parseOpt(readString(conn.from)); + readStrings(conn.from); // sigs + ValidPathInfo info(localStore.parseStorePath(storePathS), narHash); + assert(outputs.count(info.path)); + info.references = references; + info.narSize = narSize; + totalNarSize += info.narSize; + info.narHash = narHash; + info.ca = ca; + if (deriver != "") + info.deriver = localStore.parseStorePath(deriver); + infos.insert_or_assign(info.path, info); + } + + return infos; +} + +static void copyPathFromRemote( + Machine::Connection & conn, + NarMemberDatas & narMembers, + Store & localStore, + Store & destStore, + const ValidPathInfo & info +) +{ + /* Receive the NAR from the remote and add it to the + destination store. Meanwhile, extract all the info from the + NAR that getBuildOutput() needs. */ + auto source2 = sinkToSource([&](Sink & sink) + { + /* Note: we should only send the command to dump the store + path to the remote if the NAR is actually going to get read + by the destination store, which won't happen if this path + is already valid on the destination store. Since this + lambda function only gets executed if someone tries to read + from source2, we will send the command from here rather + than outside the lambda. */ + conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path); + conn.to.flush(); + + TeeSource tee(conn.from, sink); + extractNarData(tee, localStore.printStorePath(info.path), narMembers); + }); + + destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); +} + +static void copyPathsFromRemote( + Machine::Connection & conn, + NarMemberDatas & narMembers, + Store & localStore, + Store & destStore, + const std::map & infos +) +{ + auto pathsSorted = reverseTopoSortPaths(infos); + + for (auto & path : pathsSorted) { + auto & info = infos.find(path)->second; + copyPathFromRemote(conn, narMembers, localStore, destStore, info); + } + +} + +} + +/* using namespace nix::build_remote; */ + +void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) +{ + startTime = buildResult.startTime; + stopTime = buildResult.stopTime; + timesBuilt = buildResult.timesBuilt; + errorMsg = buildResult.errorMsg; + isNonDeterministic = buildResult.isNonDeterministic; + + switch ((BuildResult::Status) buildResult.status) { + case BuildResult::Built: + stepStatus = bsSuccess; + break; + case BuildResult::Substituted: + case BuildResult::AlreadyValid: + stepStatus = bsSuccess; + isCached = true; + break; + case BuildResult::PermanentFailure: + stepStatus = bsFailed; + canCache = true; + errorMsg = ""; + break; + case BuildResult::InputRejected: + case BuildResult::OutputRejected: + stepStatus = bsFailed; + canCache = true; + break; + case BuildResult::TransientFailure: + stepStatus = bsFailed; + canRetry = true; + errorMsg = ""; + break; + case BuildResult::TimedOut: + stepStatus = bsTimedOut; + errorMsg = ""; + break; + case BuildResult::MiscFailure: + stepStatus = bsAborted; + canRetry = true; + break; + case BuildResult::LogLimitExceeded: + stepStatus = bsLogLimitExceeded; + break; + case BuildResult::NotDeterministic: + stepStatus = bsNotDeterministic; + canRetry = false; + canCache = true; + break; + default: + stepStatus = bsAborted; + break; + } + +} + void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, - unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats, + const BuildOptions & buildOptions, RemoteResult & result, std::shared_ptr activeStep, std::function updateStep, NarMemberDatas & narMembers) { assert(BuildResult::TimedOut == 8); - std::string base(step->drvPath.to_string()); - result.logFile = logDir + "/" + std::string(base, 0, 2) + "/" + std::string(base, 2); - AutoDelete autoDelete(result.logFile, false); - - createDirs(dirOf(result.logFile)); - - AutoCloseFD logFD = open(result.logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666); - if (!logFD) throw SysError("creating log file ‘%s’", result.logFile); + auto [logFile, logFD] = build_remote::openLogFile(logDir, step->drvPath); + AutoDelete logFileDel(logFile, false); + result.logFile = logFile; nix::Path tmpDir = createTempDir(); AutoDelete tmpDirDel(tmpDir, true); @@ -214,7 +497,7 @@ void State::buildRemote(ref destStore, // FIXME: rewrite to use Store. Child child; - openConnection(machine, tmpDir, logFD.get(), child); + build_remote::openConnection(machine, tmpDir, logFD.get(), child); { auto activeStepState(activeStep->state_.lock()); @@ -234,45 +517,25 @@ void State::buildRemote(ref destStore, process. Meh. */ }); - FdSource from(child.from.get()); - FdSink to(child.to.get()); + Machine::Connection conn { + .from = child.from.get(), + .to = child.to.get(), + .machine = machine, + }; Finally updateStats([&]() { - bytesReceived += from.read; - bytesSent += to.written; + bytesReceived += conn.from.read; + bytesSent += conn.to.written; }); - /* Handshake. */ - ServeProto::Version remoteVersion; - try { - to << SERVE_MAGIC_1 << 0x206; - to.flush(); - - unsigned int magic = readInt(from); - if (magic != SERVE_MAGIC_2) - throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", machine->sshName); - remoteVersion = readInt(from); - if (GET_PROTOCOL_MAJOR(remoteVersion) != 0x200) - throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", machine->sshName); - if (GET_PROTOCOL_MINOR(remoteVersion) < 3 && repeats > 0) - throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", machine->sshName); - + build_remote::handshake(conn, buildOptions.repeats); } catch (EndOfFile & e) { child.pid.wait(); std::string s = chomp(readFile(result.logFile)); throw Error("cannot connect to ‘%1%’: %2%", machine->sshName, s); } - ServeProto::ReadConn rconn { - .from = from, - .version = remoteVersion, - }; - ServeProto::WriteConn wconn { - .to = to, - .version = remoteVersion, - }; - { auto info(machine->state->connectInfo.lock()); info->consecutiveFailures = 0; @@ -284,62 +547,12 @@ void State::buildRemote(ref destStore, copy the immediate sources of the derivation and the required outputs of the input derivations. */ updateStep(ssSendingInputs); + BasicDerivation resolvedDrv = build_remote::sendInputs(*this, *step, *localStore, *destStore, conn, result.overhead, nrStepsWaiting, nrStepsCopyingTo); - StorePathSet inputs; - BasicDerivation basicDrv(*step->drv); - - for (auto & p : step->drv->inputSrcs) - inputs.insert(p); - - for (auto & [drvPath, node] : step->drv->inputDrvs.map) { - auto drv2 = localStore->readDerivation(drvPath); - for (auto & name : node.value) { - if (auto i = get(drv2.outputs, name)) { - auto outPath = i->path(*localStore, drv2.name, name); - inputs.insert(*outPath); - basicDrv.inputSrcs.insert(*outPath); - } - } - } - - /* Ensure that the inputs exist in the destination store. This is - a no-op for regular stores, but for the binary cache store, - this will copy the inputs to the binary cache from the local - store. */ - if (localStore != std::shared_ptr(destStore)) { - copyClosure(*localStore, *destStore, - step->drv->inputSrcs, - NoRepair, NoCheckSigs, NoSubstitute); - } - - { - auto mc1 = std::make_shared>(nrStepsWaiting); - mc1.reset(); - MaintainCount mc2(nrStepsCopyingTo); - - printMsg(lvlDebug, "sending closure of ‘%s’ to ‘%s’", - localStore->printStorePath(step->drvPath), machine->sshName); - - auto now1 = std::chrono::steady_clock::now(); - - /* Copy the input closure. */ - if (machine->isLocalhost()) { - StorePathSet closure; - destStore->computeFSClosure(inputs, closure); - copyPaths(*destStore, *localStore, closure, NoRepair, NoCheckSigs, NoSubstitute); - } else { - copyClosureTo(machine->state->sendLock, *destStore, from, to, remoteVersion, inputs, true); - } - - auto now2 = std::chrono::steady_clock::now(); - - result.overhead += std::chrono::duration_cast(now2 - now1).count(); - } - - autoDelete.cancel(); + logFileDel.cancel(); /* Truncate the log to get rid of messages about substitutions - etc. on the remote system. */ + etc. on the remote system. */ if (lseek(logFD.get(), SEEK_SET, 0) != 0) throw SysError("seeking to the start of log file ‘%s’", result.logFile); @@ -355,85 +568,17 @@ void State::buildRemote(ref destStore, updateStep(ssBuilding); - to << ServeProto::Command::BuildDerivation << localStore->printStorePath(step->drvPath); - writeDerivation(to, *localStore, basicDrv); - to << maxSilentTime << buildTimeout; - if (GET_PROTOCOL_MINOR(remoteVersion) >= 2) - to << maxLogSize; - if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) { - to << repeats // == build-repeat - << step->isDeterministic; // == enforce-determinism - } - to.flush(); + BuildResult buildResult = build_remote::performBuild( + conn, + *localStore, + step->drvPath, + resolvedDrv, + buildOptions, + nrStepsBuilding + ); - result.startTime = time(0); - int res; - { - MaintainCount mc(nrStepsBuilding); - res = readInt(from); - } - result.stopTime = time(0); + result.updateWithBuildResult(buildResult); - result.errorMsg = readString(from); - if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) { - result.timesBuilt = readInt(from); - result.isNonDeterministic = readInt(from); - auto start = readInt(from); - auto stop = readInt(from); - if (start && start) { - /* Note: this represents the duration of a single - round, rather than all rounds. */ - result.startTime = start; - result.stopTime = stop; - } - } - if (GET_PROTOCOL_MINOR(remoteVersion) >= 6) { - ServeProto::Serialise::read(*localStore, rconn); - } - switch ((BuildResult::Status) res) { - case BuildResult::Built: - result.stepStatus = bsSuccess; - break; - case BuildResult::Substituted: - case BuildResult::AlreadyValid: - result.stepStatus = bsSuccess; - result.isCached = true; - break; - case BuildResult::PermanentFailure: - result.stepStatus = bsFailed; - result.canCache = true; - result.errorMsg = ""; - break; - case BuildResult::InputRejected: - case BuildResult::OutputRejected: - result.stepStatus = bsFailed; - result.canCache = true; - break; - case BuildResult::TransientFailure: - result.stepStatus = bsFailed; - result.canRetry = true; - result.errorMsg = ""; - break; - case BuildResult::TimedOut: - result.stepStatus = bsTimedOut; - result.errorMsg = ""; - break; - case BuildResult::MiscFailure: - result.stepStatus = bsAborted; - result.canRetry = true; - break; - case BuildResult::LogLimitExceeded: - result.stepStatus = bsLogLimitExceeded; - break; - case BuildResult::NotDeterministic: - result.stepStatus = bsNotDeterministic; - result.canRetry = false; - result.canCache = true; - break; - default: - result.stepStatus = bsAborted; - break; - } if (result.stepStatus != bsSuccess) return; result.errorMsg = ""; @@ -461,33 +606,8 @@ void State::buildRemote(ref destStore, outputs.insert(*i.second.second); } - /* Get info about each output path. */ - std::map infos; size_t totalNarSize = 0; - to << ServeProto::Command::QueryPathInfos; - ServeProto::write(*localStore, wconn, outputs); - to.flush(); - while (true) { - auto storePathS = readString(from); - if (storePathS == "") break; - auto deriver = readString(from); // deriver - auto references = ServeProto::Serialise::read(*localStore, rconn); - readLongLong(from); // download size - auto narSize = readLongLong(from); - auto narHash = Hash::parseAny(readString(from), htSHA256); - auto ca = ContentAddress::parseOpt(readString(from)); - readStrings(from); // sigs - ValidPathInfo info(localStore->parseStorePath(storePathS), narHash); - assert(outputs.count(info.path)); - info.references = references; - info.narSize = narSize; - totalNarSize += info.narSize; - info.narHash = narHash; - info.ca = ca; - if (deriver != "") - info.deriver = localStore->parseStorePath(deriver); - infos.insert_or_assign(info.path, info); - } + auto infos = build_remote::queryPathInfos(conn, *localStore, outputs, totalNarSize); if (totalNarSize > maxOutputSize) { result.stepStatus = bsNarSizeLimitExceeded; @@ -498,33 +618,7 @@ void State::buildRemote(ref destStore, printMsg(lvlDebug, "copying outputs of ‘%s’ from ‘%s’ (%d bytes)", localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize); - auto pathsSorted = reverseTopoSortPaths(infos); - - for (auto & path : pathsSorted) { - auto & info = infos.find(path)->second; - - /* Receive the NAR from the remote and add it to the - destination store. Meanwhile, extract all the info from the - NAR that getBuildOutput() needs. */ - auto source2 = sinkToSource([&](Sink & sink) - { - /* Note: we should only send the command to dump the store - path to the remote if the NAR is actually going to get read - by the destination store, which won't happen if this path - is already valid on the destination store. Since this - lambda function only gets executed if someone tries to read - from source2, we will send the command from here rather - than outside the lambda. */ - to << ServeProto::Command::DumpStorePath << localStore->printStorePath(path); - to.flush(); - - TeeSource tee(from, sink); - extractNarData(tee, localStore->printStorePath(path), narMembers); - }); - - destStore->addToStore(info, *source2, NoRepair, NoCheckSigs); - } - + build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos); auto now2 = std::chrono::steady_clock::now(); result.overhead += std::chrono::duration_cast(now2 - now1).count(); diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc index 89aec323..307eee8e 100644 --- a/src/hydra-queue-runner/builder.cc +++ b/src/hydra-queue-runner/builder.cc @@ -98,8 +98,10 @@ State::StepResult State::doBuildStep(nix::ref destStore, it). */ BuildID buildId; std::optional buildDrvPath; - unsigned int maxSilentTime, buildTimeout; - unsigned int repeats = step->isDeterministic ? 1 : 0; + BuildOptions buildOptions; + buildOptions.repeats = step->isDeterministic ? 1 : 0; + buildOptions.maxLogSize = maxLogSize; + buildOptions.enforceDeterminism = step->isDeterministic; auto conn(dbPool.get()); @@ -134,18 +136,18 @@ State::StepResult State::doBuildStep(nix::ref destStore, { auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName)); if (i != jobsetRepeats.end()) - repeats = std::max(repeats, i->second); + buildOptions.repeats = std::max(buildOptions.repeats, i->second); } } if (!build) build = *dependents.begin(); buildId = build->id; buildDrvPath = build->drvPath; - maxSilentTime = build->maxSilentTime; - buildTimeout = build->buildTimeout; + buildOptions.maxSilentTime = build->maxSilentTime; + buildOptions.buildTimeout = build->buildTimeout; printInfo("performing step ‘%s’ %d times on ‘%s’ (needed by build %d and %d others)", - localStore->printStorePath(step->drvPath), repeats + 1, machine->sshName, buildId, (dependents.size() - 1)); + localStore->printStorePath(step->drvPath), buildOptions.repeats + 1, machine->sshName, buildId, (dependents.size() - 1)); } if (!buildOneDone) @@ -206,7 +208,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, try { /* FIXME: referring builds may have conflicting timeouts. */ - buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, repeats, result, activeStep, updateStep, narMembers); + buildRemote(destStore, machine, step, buildOptions, result, activeStep, updateStep, narMembers); } catch (Error & e) { if (activeStep->state_.lock()->cancelled) { printInfo("marking step %d of build %d as cancelled", stepNr, buildId); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 55c99afc..6359063a 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -21,6 +21,7 @@ #include "store-api.hh" #include "sync.hh" #include "nar-extractor.hh" +#include "serve-protocol.hh" typedef unsigned int BuildID; @@ -78,6 +79,8 @@ struct RemoteResult { return stepStatus == bsCachedFailure ? bsFailed : stepStatus; } + + void updateWithBuildResult(const nix::BuildResult &); }; @@ -297,6 +300,32 @@ struct Machine std::regex r("^(ssh://|ssh-ng://)?localhost$"); return std::regex_search(sshName, r); } + + // A connection to a machine + struct Connection { + nix::FdSource from; + nix::FdSink to; + nix::ServeProto::Version remoteVersion; + + // Backpointer to the machine + ptr machine; + + operator nix::ServeProto::ReadConn () + { + return { + .from = from, + .version = remoteVersion, + }; + } + + operator nix::ServeProto::WriteConn () + { + return { + .to = to, + .version = remoteVersion, + }; + } + }; }; @@ -459,6 +488,12 @@ private: public: State(std::optional metricsAddrOpt); + struct BuildOptions { + unsigned int maxSilentTime, buildTimeout, repeats; + size_t maxLogSize; + bool enforceDeterminism; + }; + private: nix::MaintainCount startDbUpdate(); @@ -543,8 +578,7 @@ private: void buildRemote(nix::ref destStore, Machine::ptr machine, Step::ptr step, - unsigned int maxSilentTime, unsigned int buildTimeout, - unsigned int repeats, + const BuildOptions & buildOptions, RemoteResult & result, std::shared_ptr activeStep, std::function updateStep, NarMemberDatas & narMembers);