From 976f539f7dd2ee6345b605def105883d16b32c60 Mon Sep 17 00:00:00 2001 From: Robert Hensing Date: Wed, 28 Aug 2024 02:35:46 +0200 Subject: [PATCH] Make Repo::flush interruptible --- src/libfetchers/git-utils.cc | 115 ++++++++++++++++++++++++++--------- 1 file changed, 85 insertions(+), 30 deletions(-) diff --git a/src/libfetchers/git-utils.cc b/src/libfetchers/git-utils.cc index 70391a287..265eb35d2 100644 --- a/src/libfetchers/git-utils.cc +++ b/src/libfetchers/git-utils.cc @@ -166,6 +166,45 @@ static Object peelToTreeOrBlob(git_object * obj) return peelObject(obj, GIT_OBJECT_TREE); } +struct PackBuilderContext { + std::exception_ptr exception; + + void handleException(const char * activity, int errCode) + { + switch (errCode) { + case GIT_OK: + break; + case GIT_EUSER: + if (!exception) + panic("PackBuilderContext::handleException: user error, but exception was not set"); + + std::rethrow_exception(exception); + default: + throw Error("%s: %i, %s", Uncolored(activity), errCode, git_error_last()->message); + } + } +}; + +extern "C" { + +/** + * A `git_packbuilder_progress` implementation that aborts the pack building if needed. + */ +static int packBuilderProgressCheckInterrupt(int stage, uint32_t current, uint32_t total, void *payload) +{ + PackBuilderContext & args = * (PackBuilderContext *) payload; + try { + checkInterrupt(); + return GIT_OK; + } catch (const std::exception & e) { + args.exception = std::current_exception(); + return GIT_EUSER; + } +}; +static git_packbuilder_progress PACKBUILDER_PROGRESS_CHECK_INTERRUPT = &packBuilderProgressCheckInterrupt; + +} // extern "C" + struct GitRepoImpl : GitRepo, std::enable_shared_from_this { /** Location of the repository on disk. */ @@ -213,42 +252,58 @@ struct GitRepoImpl : GitRepo, std::enable_shared_from_this } void flush() override { + checkInterrupt(); + git_buf buf = GIT_BUF_INIT; - try { - PackBuilder packBuilder; - git_packbuilder_new(Setter(packBuilder), *this); - checkInterrupt(); - git_mempack_write_thin_pack(mempack_backend, packBuilder.get()); - checkInterrupt(); - // TODO make git_packbuilder_write_buf() interruptible - git_packbuilder_write_buf(&buf, packBuilder.get()); - checkInterrupt(); + Finally _disposeBuf { [&] { git_buf_dispose(&buf); } }; + PackBuilder packBuilder; + PackBuilderContext packBuilderContext; + git_packbuilder_new(Setter(packBuilder), *this); + git_packbuilder_set_callbacks(packBuilder.get(), PACKBUILDER_PROGRESS_CHECK_INTERRUPT, &packBuilderContext); + git_packbuilder_set_threads(packBuilder.get(), 0 /* autodetect */); - std::string repo_path = std::string(git_repository_path(repo.get())); - while (!repo_path.empty() && repo_path.back() == '/') - repo_path.pop_back(); - std::string pack_dir_path = repo_path + "/objects/pack"; + packBuilderContext.handleException( + "preparing packfile", + git_mempack_write_thin_pack(mempack_backend, packBuilder.get()) + ); + checkInterrupt(); + packBuilderContext.handleException( + "writing packfile", + git_packbuilder_write_buf(&buf, packBuilder.get()) + ); + checkInterrupt(); - // TODO: could the indexing be done in a separate thread? - Indexer indexer; - git_indexer_progress stats; - if (git_indexer_new(Setter(indexer), pack_dir_path.c_str(), 0, nullptr, nullptr)) - throw Error("creating git packfile indexer: %s", git_error_last()->message); - // TODO: feed buf in (fairly large) chunk to make this interruptible - if (git_indexer_append(indexer.get(), buf.ptr, buf.size, &stats)) + std::string repo_path = std::string(git_repository_path(repo.get())); + while (!repo_path.empty() && repo_path.back() == '/') + repo_path.pop_back(); + std::string pack_dir_path = repo_path + "/objects/pack"; + + // TODO (performance): could the indexing be done in a separate thread? + // we'd need a more streaming variation of + // git_packbuilder_write_buf, or incur the cost of + // copying parts of the buffer to a separate thread. + // (synchronously on the git_packbuilder_write_buf thread) + Indexer indexer; + git_indexer_progress stats; + if (git_indexer_new(Setter(indexer), pack_dir_path.c_str(), 0, nullptr, nullptr)) + throw Error("creating git packfile indexer: %s", git_error_last()->message); + + // TODO: provide index callback for checkInterrupt() termination + // though this is about an order of magnitude faster than the packbuilder + // expect up to 1 sec latency due to uninterruptible git_indexer_append. + constexpr size_t chunkSize = 128 * 1024; + for (size_t offset = 0; offset < buf.size; offset += chunkSize) { + if (git_indexer_append(indexer.get(), buf.ptr + offset, std::min(chunkSize, buf.size - offset), &stats)) throw Error("appending to git packfile index: %s", git_error_last()->message); checkInterrupt(); - if (git_indexer_commit(indexer.get(), &stats)) - throw Error("committing git packfile index: %s", git_error_last()->message); - - if (git_mempack_reset(mempack_backend)) - throw Error("resetting git mempack backend: %s", git_error_last()->message); - - git_buf_dispose(&buf); - } catch (...) { - git_buf_dispose(&buf); - throw; } + + if (git_indexer_commit(indexer.get(), &stats)) + throw Error("committing git packfile index: %s", git_error_last()->message); + + if (git_mempack_reset(mempack_backend)) + throw Error("resetting git mempack backend: %s", git_error_last()->message); + checkInterrupt(); }