diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 30a1d856..6824fde6 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -84,6 +84,23 @@ struct Connection : pqxx::connection }; +struct receiver : public pqxx::notification_receiver +{ + bool status = false; + receiver(pqxx::connection_base & c, const std::string & channel) + : pqxx::notification_receiver(c, channel) { } + void operator() (const string & payload, int pid) override + { + status = true; + }; + bool get() { + bool b = status; + status = false; + return b; + } +}; + + typedef unsigned int BuildID; @@ -435,26 +452,9 @@ void State::queueMonitorLoop() { auto conn(dbPool.get()); - struct receiver : public pqxx::notification_receiver - { - bool status = false; - receiver(pqxx::connection_base & c, const std::string & channel) - : pqxx::notification_receiver(c, channel) { } - void operator() (const string & payload, int pid) override - { - status = true; - }; - bool get() { - bool b = status; - status = false; - return b; - } - }; - receiver buildsAdded(*conn, "builds_added"); receiver buildsRestarted(*conn, "builds_restarted"); receiver buildsCancelled(*conn, "builds_cancelled"); - receiver dumpStatus(*conn, "dump_status"); auto store = openStore(); // FIXME: pool @@ -479,8 +479,6 @@ void State::queueMonitorLoop() removeCancelledBuilds(*conn); } - if (dumpStatus.get()) - State::dumpStatus(); } } @@ -1314,6 +1312,22 @@ void State::run() std::thread(&State::dispatcher, this).detach(); + while (true) { + try { + auto conn(dbPool.get()); + receiver dumpStatus(*conn, "dump_status"); + while (true) { + conn->await_notification(); + if (dumpStatus.get()) + State::dumpStatus(); + } + } catch (std::exception & e) { + printMsg(lvlError, format("main thread: %1%") % e.what()); + sleep(10); // probably a DB problem, so don't retry right away + } + } + + // Never reached. queueMonitorThread.join(); }