diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index ef3326cd6..6079eae7b 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -402,6 +402,9 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); auto pathInfo = [&]() { // NB: FramedSource must be out of scope before logger->stopWork(); + // FIXME: this means that if there is an error + // half-way through, the client will keep sending + // data, since we haven't sent it the error yet. auto [contentAddressMethod, hashAlgo] = ContentAddressMethod::parseWithAlgo(camStr); FramedSource source(conn.from); FileSerialisationMethod dumpMethod; diff --git a/src/libstore/remote-store-connection.hh b/src/libstore/remote-store-connection.hh index 405120ee9..513bd6838 100644 --- a/src/libstore/remote-store-connection.hh +++ b/src/libstore/remote-store-connection.hh @@ -49,7 +49,7 @@ struct RemoteStore::ConnectionHandle RemoteStore::Connection & operator * () { return *handle; } RemoteStore::Connection * operator -> () { return &*handle; } - void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true); + void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true); void withFramedSink(std::function fun); }; diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 555936c18..69bbc64fc 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -153,9 +153,9 @@ RemoteStore::ConnectionHandle::~ConnectionHandle() } } -void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush) +void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush, bool block) { - handle->processStderr(&daemonException, sink, source, flush); + handle->processStderr(&daemonException, sink, source, flush, block); } @@ -926,43 +926,17 @@ void RemoteStore::ConnectionHandle::withFramedSink(std::functionto.flush(); - std::exception_ptr ex; - - /* Handle log messages / exceptions from the remote on a separate - thread. */ - std::thread stderrThread([&]() { - try { - ReceiveInterrupts receiveInterrupts; - processStderr(nullptr, nullptr, false); - } catch (...) { - ex = std::current_exception(); - } - }); - - Finally joinStderrThread([&]() - { - if (stderrThread.joinable()) { - stderrThread.join(); - if (ex) { - try { - std::rethrow_exception(ex); - } catch (...) { - ignoreException(); - } - } - } - }); - - { - FramedSink sink((*this)->to, ex); + FramedSink sink((*this)->to, [&]() { + /* Periodically process stderr messages and exceptions + from the daemon. */ + processStderr(nullptr, nullptr, false, false); + }); fun(sink); sink.flush(); } - stderrThread.join(); - if (ex) - std::rethrow_exception(ex); + processStderr(nullptr, nullptr, false); } } diff --git a/src/libstore/worker-protocol-connection.cc b/src/libstore/worker-protocol-connection.cc index a47dbb689..ae434c7f0 100644 --- a/src/libstore/worker-protocol-connection.cc +++ b/src/libstore/worker-protocol-connection.cc @@ -32,7 +32,8 @@ static Logger::Fields readFields(Source & from) return fields; } -std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush) +std::exception_ptr +WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush, bool block) { if (flush) to.flush(); @@ -41,6 +42,9 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink while (true) { + if (!block && !from.hasData()) + break; + auto msg = readNum(from); if (msg == STDERR_WRITE) { @@ -95,8 +99,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink logger->result(act, type, fields); } - else if (msg == STDERR_LAST) + else if (msg == STDERR_LAST) { + assert(block); break; + } else throw Error("got unknown message type %x from Nix daemon", msg); @@ -130,9 +136,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink } } -void WorkerProto::BasicClientConnection::processStderr(bool * daemonException, Sink * sink, Source * source, bool flush) +void WorkerProto::BasicClientConnection::processStderr( + bool * daemonException, Sink * sink, Source * source, bool flush, bool block) { - auto ex = processStderrReturn(sink, source, flush); + auto ex = processStderrReturn(sink, source, flush, block); if (ex) { *daemonException = true; std::rethrow_exception(ex); diff --git a/src/libstore/worker-protocol-connection.hh b/src/libstore/worker-protocol-connection.hh index 9c96195b5..9665067dd 100644 --- a/src/libstore/worker-protocol-connection.hh +++ b/src/libstore/worker-protocol-connection.hh @@ -70,9 +70,10 @@ struct WorkerProto::BasicClientConnection : WorkerProto::BasicConnection virtual void closeWrite() = 0; - std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true); + std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true); - void processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true); + void + processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true); /** * Establishes connection, negotiating version. diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index 5352a436b..8a57858f5 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -10,6 +10,8 @@ #ifdef _WIN32 # include # include "windows-error.hh" +#else +# include #endif @@ -158,6 +160,29 @@ bool FdSource::good() } +bool FdSource::hasData() +{ + if (BufferedSource::hasData()) return true; + + while (true) { + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 0; + + auto n = select(fd + 1, &fds, nullptr, nullptr, &timeout); + if (n < 0) { + if (errno == EINTR) continue; + throw SysError("polling file descriptor"); + } + return FD_ISSET(fd, &fds); + } +} + + size_t StringSource::read(char * data, size_t len) { if (pos == s.size()) throw EndOfFile("end of string reached"); diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index e9f3e3a4a..4bb1a3e4b 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -104,6 +104,9 @@ struct BufferedSource : Source size_t read(char * data, size_t len) override; + /** + * Return true if the buffer is not empty. + */ bool hasData(); protected: @@ -162,6 +165,13 @@ struct FdSource : BufferedSource FdSource & operator=(FdSource && s) = default; bool good() override; + + /** + * Return true if the buffer is not empty after a non-blocking + * read. + */ + bool hasData(); + protected: size_t readUnbuffered(char * data, size_t len) override; private: @@ -522,15 +532,16 @@ struct FramedSource : Source /** * Write as chunks in the format expected by FramedSource. * - * The exception_ptr reference can be used to terminate the stream when you - * detect that an error has occurred on the remote end. + * The `checkError` function can be used to terminate the stream when you + * detect that an error has occurred. It does so by throwing an exception. */ struct FramedSink : nix::BufferedSink { BufferedSink & to; - std::exception_ptr & ex; + std::function checkError; - FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex) + FramedSink(BufferedSink & to, std::function && checkError) + : to(to), checkError(checkError) { } ~FramedSink() @@ -545,13 +556,9 @@ struct FramedSink : nix::BufferedSink void writeUnbuffered(std::string_view data) override { - /* Don't send more data if the remote has - encountered an error. */ - if (ex) { - auto ex2 = ex; - ex = nullptr; - std::rethrow_exception(ex2); - } + /* Don't send more data if an error has occured. */ + checkError(); + to << data.size(); to(data); };