diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index ef3ca2e3..3c9a8f79 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -283,16 +283,43 @@ void State::buildRemote(ref destStore, /* Copy the output paths. */ if (/* machine->sshName != "localhost" */ true) { - printMsg(lvlDebug, format("copying outputs of ‘%1%’ from ‘%2%’") % step->drvPath % machine->sshName); + MaintainCount mc(nrStepsCopyingFrom); + + auto now1 = std::chrono::steady_clock::now(); + PathSet outputs; for (auto & output : step->drv.outputs) outputs.insert(output.second.path); - MaintainCount mc(nrStepsCopyingFrom); + + /* Query the size of the output paths. */ + size_t totalNarSize = 0; + to << cmdQueryPathInfos << outputs; + to.flush(); + while (true) { + if (readString(from) == "") break; + readString(from); // deriver + readStrings(from); // references + readLongLong(from); // download size + totalNarSize += readLongLong(from); + } + + printMsg(lvlDebug, format("copying outputs of ‘%s’ from ‘%s’ (%d bytes)") + % step->drvPath % machine->sshName % totalNarSize); + + /* Block until we have the required amount of memory + available. FIXME: only need this for binary cache + destination stores. */ + auto resStart = std::chrono::steady_clock::now(); + auto memoryReservation(memoryTokens.get(totalNarSize)); + auto resStop = std::chrono::steady_clock::now(); + + auto resMs = std::chrono::duration_cast(resStop - resStart).count(); + if (resMs >= 1000) + printMsg(lvlError, format("warning: had to wait %d ms for %d memory tokens for %s") + % resMs % totalNarSize % step->drvPath); result.accessor = destStore->getFSAccessor(); - auto now1 = std::chrono::steady_clock::now(); - to << cmdExportPaths << 0 << outputs; to.flush(); destStore->importPaths(false, from, result.accessor); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index a4b4595b..2a21e647 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -17,6 +17,7 @@ using namespace nix; State::State() + : memoryTokens(4ULL << 30) // FIXME: make this configurable { hydraData = getEnv("HYDRA_DATA"); if (hydraData == "") throw Error("$HYDRA_DATA must be set"); @@ -567,6 +568,8 @@ void State::dumpStatus(Connection & conn, bool log) root.attr("dispatchTimeAvgMs", nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups); root.attr("nrDbConnections", dbPool.count()); root.attr("nrActiveDbUpdates", nrActiveDbUpdates); + root.attr("memoryTokensInUse", memoryTokens.currentUse()); + { root.attr("machines"); JSONObject nested(out); @@ -589,6 +592,7 @@ void State::dumpStatus(Connection & conn, bool log) } } } + { root.attr("jobsets"); JSONObject nested(out); @@ -600,6 +604,7 @@ void State::dumpStatus(Connection & conn, bool log) nested2.attr("seconds", jobset.second->getSeconds()); } } + { root.attr("machineTypes"); JSONObject nested(out); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index d6ad1ccb..ab70b52f 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -9,14 +9,13 @@ #include "db.hh" #include "counter.hh" +#include "token-server.hh" + +#include "derivations.hh" #include "pathlocks.hh" #include "pool.hh" -#include "sync.hh" - #include "store-api.hh" -#include "derivations.hh" - -#include "binary-cache-store.hh" // FIXME +#include "sync.hh" typedef unsigned int BuildID; @@ -354,6 +353,13 @@ private: std::shared_ptr _localStore; std::shared_ptr _destStore; + /* Token server to prevent threads from allocating too many big + strings concurrently while importing NARs from the build + machines. When a thread imports a NAR of size N, it will first + acquire N memory tokens, causing it to block until that many + tokens are available. */ + nix::TokenServer memoryTokens; + public: State(); diff --git a/src/hydra-queue-runner/token-server.hh b/src/hydra-queue-runner/token-server.hh index d4f5f843..cdd03b5e 100644 --- a/src/hydra-queue-runner/token-server.hh +++ b/src/hydra-queue-runner/token-server.hh @@ -3,21 +3,27 @@ #include #include "sync.hh" +#include "types.hh" + +namespace nix { + +MakeError(NoTokens, Error) /* This class hands out tokens. There are only ‘maxTokens’ tokens - available. Calling get() will return a Token object, representing - ownership of a token. If no token is available, get() will sleep - until another thread returns a token. */ + available. Calling get(N) will return a Token object, representing + ownership of N tokens. If the requested number of tokens is + unavailable, get() will sleep until another thread returns a + token. */ class TokenServer { - unsigned int maxTokens; + const size_t maxTokens; - Sync curTokens{0}; + Sync inUse{0}; std::condition_variable wakeup; public: - TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { } + TokenServer(size_t maxTokens) : maxTokens(maxTokens) { } class Token { @@ -25,19 +31,24 @@ public: TokenServer * ts; + size_t tokens; + bool acquired = false; - Token(TokenServer * ts, unsigned int timeout) : ts(ts) + Token(TokenServer * ts, size_t tokens, unsigned int timeout) + : ts(ts), tokens(tokens) { - auto curTokens(ts->curTokens.lock()); - while (*curTokens >= ts->maxTokens) + if (tokens >= ts->maxTokens) + throw NoTokens(format("requesting more tokens (%d) than exist (%d)") % tokens); + auto inUse(ts->inUse.lock()); + while (*inUse + tokens > ts->maxTokens) if (timeout) { - if (!curTokens.wait_for(ts->wakeup, std::chrono::seconds(timeout), - [&]() { return *curTokens < ts->maxTokens; })) + if (!inUse.wait_for(ts->wakeup, std::chrono::seconds(timeout), + [&]() { return *inUse + tokens <= ts->maxTokens; })) return; } else - curTokens.wait(ts->wakeup); - (*curTokens)++; + inUse.wait(ts->wakeup); + *inUse += tokens; acquired = true; } @@ -50,9 +61,9 @@ public: { if (!ts || !acquired) return; { - auto curTokens(ts->curTokens.lock()); - assert(*curTokens); - (*curTokens)--; + auto inUse(ts->inUse.lock()); + assert(*inUse >= tokens); + *inUse -= tokens; } ts->wakeup.notify_one(); } @@ -60,8 +71,16 @@ public: bool operator ()() { return acquired; } }; - Token get(unsigned int timeout = 0) + Token get(size_t tokens = 1, unsigned int timeout = 0) { - return Token(this, timeout); + return Token(this, tokens, timeout); + } + + size_t currentUse() + { + auto inUse_(inUse.lock()); + return *inUse_; } }; + +}