Introduce error recovery mechanisms with retry logic and circuit breaker integration.
- Added `ErrorRecovery.cc` and `ErrorRecovery.h` for retry and circuit breaker implementations. - Enhanced swap file handling with transient error retries and exponential backoff (e.g., ENOSPC, EDQUOT). - Integrated circuit breaker into SwapManager to gracefully handle repeated failures, prevent system overload, and enable automatic recovery. - Updated `DEVELOPER_GUIDE.md` with comprehensive documentation on error recovery patterns and graceful degradation strategies. - Refined fsync, temp file creation, and swap file logic with retry-on-failure mechanisms for improved resilience.
This commit is contained in:
108
Swap.cc
108
Swap.cc
@@ -2,6 +2,7 @@
|
||||
#include "Buffer.h"
|
||||
#include "ErrorHandler.h"
|
||||
#include "SyscallWrappers.h"
|
||||
#include "ErrorRecovery.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
@@ -613,10 +614,19 @@ SwapManager::open_ctx(JournalCtx &ctx, const std::string &path, std::string &err
|
||||
#ifdef O_CLOEXEC
|
||||
flags |= O_CLOEXEC;
|
||||
#endif
|
||||
int fd = kte::syscall::Open(path.c_str(), flags, 0600);
|
||||
if (fd < 0) {
|
||||
int saved_errno = errno;
|
||||
err = "Failed to open swap file '" + path + "': " + std::strerror(saved_errno);
|
||||
|
||||
// Retry on transient errors (ENOSPC, EDQUOT, EBUSY, etc.)
|
||||
int fd = -1;
|
||||
auto open_fn = [&]() -> bool {
|
||||
fd = kte::syscall::Open(path.c_str(), flags, 0600);
|
||||
return fd >= 0;
|
||||
};
|
||||
|
||||
if (!RetryOnTransientError(open_fn, RetryPolicy::Aggressive(), err)) {
|
||||
if (fd < 0) {
|
||||
int saved_errno = errno;
|
||||
err = "Failed to open swap file '" + path + "': " + std::strerror(saved_errno) + err;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
// Ensure permissions even if file already existed.
|
||||
@@ -636,10 +646,20 @@ SwapManager::open_ctx(JournalCtx &ctx, const std::string &path, std::string &err
|
||||
#ifdef O_CLOEXEC
|
||||
tflags |= O_CLOEXEC;
|
||||
#endif
|
||||
fd = kte::syscall::Open(path.c_str(), tflags, 0600);
|
||||
if (fd < 0) {
|
||||
int saved_errno = errno;
|
||||
err = "Failed to reopen swap file for truncation '" + path + "': " + std::strerror(saved_errno);
|
||||
|
||||
// Retry on transient errors for truncation open
|
||||
fd = -1;
|
||||
auto reopen_fn = [&]() -> bool {
|
||||
fd = kte::syscall::Open(path.c_str(), tflags, 0600);
|
||||
return fd >= 0;
|
||||
};
|
||||
|
||||
if (!RetryOnTransientError(reopen_fn, RetryPolicy::Aggressive(), err)) {
|
||||
if (fd < 0) {
|
||||
int saved_errno = errno;
|
||||
err = "Failed to reopen swap file for truncation '" + path + "': " + std::strerror(
|
||||
saved_errno) + err;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
(void) kte::syscall::Fchmod(fd, 0600);
|
||||
@@ -705,10 +725,19 @@ SwapManager::compact_to_checkpoint(JournalCtx &ctx, const std::vector<std::uint8
|
||||
#ifdef O_CLOEXEC
|
||||
flags |= O_CLOEXEC;
|
||||
#endif
|
||||
int tfd = kte::syscall::Open(tmp_path.c_str(), flags, 0600);
|
||||
if (tfd < 0) {
|
||||
int saved_errno = errno;
|
||||
err = "Failed to open temp swap file '" + tmp_path + "': " + std::strerror(saved_errno);
|
||||
|
||||
// Retry on transient errors for temp file creation
|
||||
int tfd = -1;
|
||||
auto open_tmp_fn = [&]() -> bool {
|
||||
tfd = kte::syscall::Open(tmp_path.c_str(), flags, 0600);
|
||||
return tfd >= 0;
|
||||
};
|
||||
|
||||
if (!RetryOnTransientError(open_tmp_fn, RetryPolicy::Aggressive(), err)) {
|
||||
if (tfd < 0) {
|
||||
int saved_errno = errno;
|
||||
err = "Failed to open temp swap file '" + tmp_path + "': " + std::strerror(saved_errno) + err;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
(void) kte::syscall::Fchmod(tfd, 0600);
|
||||
@@ -1062,6 +1091,34 @@ SwapManager::process_one(const Pending &p)
|
||||
if (!p.buf)
|
||||
return;
|
||||
|
||||
// Check circuit breaker before processing
|
||||
bool circuit_open = false;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
if (!circuit_breaker_.AllowRequest()) {
|
||||
circuit_open = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (circuit_open) {
|
||||
// Circuit is open - graceful degradation: skip swap write
|
||||
// This prevents repeated failures from overwhelming the system
|
||||
// Swap recording will resume when circuit closes
|
||||
static std::atomic<std::uint64_t> last_warning_ns{0};
|
||||
const std::uint64_t now = now_ns();
|
||||
const std::uint64_t last = last_warning_ns.load();
|
||||
// Log warning at most once per 60 seconds to avoid spam
|
||||
if (now - last > 60000000000ULL) {
|
||||
last_warning_ns.store(now);
|
||||
ErrorHandler::Instance().Warning("SwapManager",
|
||||
"Swap operations temporarily disabled due to repeated failures (circuit breaker open)",
|
||||
p.buf && !p.buf->Filename().empty()
|
||||
? p.buf->Filename()
|
||||
: "<unnamed>");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
Buffer &buf = *p.buf;
|
||||
|
||||
@@ -1084,10 +1141,18 @@ SwapManager::process_one(const Pending &p)
|
||||
std::string open_err;
|
||||
if (!open_ctx(*ctxp, path, open_err)) {
|
||||
report_error(open_err, p.buf);
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
circuit_breaker_.RecordFailure();
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (p.payload.size() > 0xFFFFFFu) {
|
||||
report_error("Payload too large: " + std::to_string(p.payload.size()) + " bytes", p.buf);
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
circuit_breaker_.RecordFailure();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1123,6 +1188,10 @@ SwapManager::process_one(const Pending &p)
|
||||
if (!ok) {
|
||||
int err = errno;
|
||||
report_error("Failed to write swap record to '" + path + "': " + std::strerror(err), p.buf);
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
circuit_breaker_.RecordFailure();
|
||||
}
|
||||
return;
|
||||
}
|
||||
ctxp->approx_size_bytes += static_cast<std::uint64_t>(rec.size());
|
||||
@@ -1138,12 +1207,27 @@ SwapManager::process_one(const Pending &p)
|
||||
std::string compact_err;
|
||||
if (!compact_to_checkpoint(*ctxp, rec, compact_err)) {
|
||||
report_error(compact_err, p.buf);
|
||||
// Note: compaction failure is not fatal, don't record circuit breaker failure
|
||||
}
|
||||
}
|
||||
|
||||
// Record success for circuit breaker
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
circuit_breaker_.RecordSuccess();
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
report_error(std::string("Exception in process_one: ") + e.what(), p.buf);
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
circuit_breaker_.RecordFailure();
|
||||
}
|
||||
} catch (...) {
|
||||
report_error("Unknown exception in process_one", p.buf);
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
circuit_breaker_.RecordFailure();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user