1
0
Fork 0
mirror of https://github.com/NixOS/nix synced 2024-09-19 10:50:24 -04:00

Make Repo::flush interruptible

This commit is contained in:
Robert Hensing 2024-08-28 02:35:46 +02:00
parent c1fe3546ed
commit 976f539f7d

View file

@ -166,6 +166,45 @@ static Object peelToTreeOrBlob(git_object * obj)
return peelObject<Object>(obj, GIT_OBJECT_TREE);
}
struct PackBuilderContext {
std::exception_ptr exception;
void handleException(const char * activity, int errCode)
{
switch (errCode) {
case GIT_OK:
break;
case GIT_EUSER:
if (!exception)
panic("PackBuilderContext::handleException: user error, but exception was not set");
std::rethrow_exception(exception);
default:
throw Error("%s: %i, %s", Uncolored(activity), errCode, git_error_last()->message);
}
}
};
extern "C" {
/**
* A `git_packbuilder_progress` implementation that aborts the pack building if needed.
*/
static int packBuilderProgressCheckInterrupt(int stage, uint32_t current, uint32_t total, void *payload)
{
PackBuilderContext & args = * (PackBuilderContext *) payload;
try {
checkInterrupt();
return GIT_OK;
} catch (const std::exception & e) {
args.exception = std::current_exception();
return GIT_EUSER;
}
};
static git_packbuilder_progress PACKBUILDER_PROGRESS_CHECK_INTERRUPT = &packBuilderProgressCheckInterrupt;
} // extern "C"
struct GitRepoImpl : GitRepo, std::enable_shared_from_this<GitRepoImpl>
{
/** Location of the repository on disk. */
@ -213,42 +252,58 @@ struct GitRepoImpl : GitRepo, std::enable_shared_from_this<GitRepoImpl>
}
void flush() override {
checkInterrupt();
git_buf buf = GIT_BUF_INIT;
try {
PackBuilder packBuilder;
git_packbuilder_new(Setter(packBuilder), *this);
checkInterrupt();
git_mempack_write_thin_pack(mempack_backend, packBuilder.get());
checkInterrupt();
// TODO make git_packbuilder_write_buf() interruptible
git_packbuilder_write_buf(&buf, packBuilder.get());
checkInterrupt();
Finally _disposeBuf { [&] { git_buf_dispose(&buf); } };
PackBuilder packBuilder;
PackBuilderContext packBuilderContext;
git_packbuilder_new(Setter(packBuilder), *this);
git_packbuilder_set_callbacks(packBuilder.get(), PACKBUILDER_PROGRESS_CHECK_INTERRUPT, &packBuilderContext);
git_packbuilder_set_threads(packBuilder.get(), 0 /* autodetect */);
std::string repo_path = std::string(git_repository_path(repo.get()));
while (!repo_path.empty() && repo_path.back() == '/')
repo_path.pop_back();
std::string pack_dir_path = repo_path + "/objects/pack";
packBuilderContext.handleException(
"preparing packfile",
git_mempack_write_thin_pack(mempack_backend, packBuilder.get())
);
checkInterrupt();
packBuilderContext.handleException(
"writing packfile",
git_packbuilder_write_buf(&buf, packBuilder.get())
);
checkInterrupt();
// TODO: could the indexing be done in a separate thread?
Indexer indexer;
git_indexer_progress stats;
if (git_indexer_new(Setter(indexer), pack_dir_path.c_str(), 0, nullptr, nullptr))
throw Error("creating git packfile indexer: %s", git_error_last()->message);
// TODO: feed buf in (fairly large) chunk to make this interruptible
if (git_indexer_append(indexer.get(), buf.ptr, buf.size, &stats))
std::string repo_path = std::string(git_repository_path(repo.get()));
while (!repo_path.empty() && repo_path.back() == '/')
repo_path.pop_back();
std::string pack_dir_path = repo_path + "/objects/pack";
// TODO (performance): could the indexing be done in a separate thread?
// we'd need a more streaming variation of
// git_packbuilder_write_buf, or incur the cost of
// copying parts of the buffer to a separate thread.
// (synchronously on the git_packbuilder_write_buf thread)
Indexer indexer;
git_indexer_progress stats;
if (git_indexer_new(Setter(indexer), pack_dir_path.c_str(), 0, nullptr, nullptr))
throw Error("creating git packfile indexer: %s", git_error_last()->message);
// TODO: provide index callback for checkInterrupt() termination
// though this is about an order of magnitude faster than the packbuilder
// expect up to 1 sec latency due to uninterruptible git_indexer_append.
constexpr size_t chunkSize = 128 * 1024;
for (size_t offset = 0; offset < buf.size; offset += chunkSize) {
if (git_indexer_append(indexer.get(), buf.ptr + offset, std::min(chunkSize, buf.size - offset), &stats))
throw Error("appending to git packfile index: %s", git_error_last()->message);
checkInterrupt();
if (git_indexer_commit(indexer.get(), &stats))
throw Error("committing git packfile index: %s", git_error_last()->message);
if (git_mempack_reset(mempack_backend))
throw Error("resetting git mempack backend: %s", git_error_last()->message);
git_buf_dispose(&buf);
} catch (...) {
git_buf_dispose(&buf);
throw;
}
if (git_indexer_commit(indexer.get(), &stats))
throw Error("committing git packfile index: %s", git_error_last()->message);
if (git_mempack_reset(mempack_backend))
throw Error("resetting git mempack backend: %s", git_error_last()->message);
checkInterrupt();
}